[jira] [Commented] (KAFKA-4888) offset 449883 is invalid, cause: Record is corrupt (stored crc = 2171407101, computed crc = 1371274824)

2017-03-15 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927500#comment-15927500
 ] 

Jun Rao commented on KAFKA-4888:


[~HanlinLiu], thanks for the info. Were there any issue with the network 
(switches, routers)?

> offset 449883 is invalid, cause: Record is corrupt (stored crc = 2171407101, 
> computed crc = 1371274824)
> ---
>
> Key: KAFKA-4888
> URL: https://issues.apache.org/jira/browse/KAFKA-4888
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
> Environment: Release version:
> LSB Version:  
> :base-4.0-amd64:base-4.0-noarch:core-4.0-amd64:core-4.0-noarch:graphics-4.0-amd64:graphics-4.0-noarch:printing-4.0-amd64:printing-4.0-noarch
> Distributor ID:   RedHatEnterpriseServer
> Description:  Red Hat Enterprise Linux Server release 6.5 (Santiago)
> Release:  6.5
> Codename: Santiago
> Memory SO:
> Mem:15.577G total,   15.416G used,  164.508M free,   49.895M buffers
> Filesystem
> FilesystemSize  Used Avail Use% Mounted on
> /dev/mapper/appvg-lv_opt
>30G 1018M   28G   20% /opt/ngin
>Reporter: Eduardo da Silva Neto
> Attachments: CRC_dump.cap, CRC_log.txt, issue_kafka
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> We created five kafka consumers that consume with one message at a time 
> max.poll.records = 1. After two days of intensive processing with the kafka 
> server file system with about 50% used the corrupted registry error occurred.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-15 Thread Jun Rao
Hi, Dong,

Thanks for the reply.

10. Could you comment on that?

11.2 "I am concerned that the ChangeReplicaDirRequest would be lost if
broker
restarts after it sends ChangeReplicaDirResponse but before it receives
LeaderAndIsrRequest."

In that case, the reassignment tool could detect that through
DescribeDirsRequest
and issue ChangeReplicaDirRequest again, right? In the common case, this is
probably not needed and we only need to write each replica once.

My main concern with the approach in the current KIP is that once a new
replica is created in the wrong log dir, the cross log directory movement
may not catch up until the new replica is fully bootstrapped. So, we end up
writing the data for the same replica twice.

11.3 Are you saying the value in --throttle will be used to set both
intra.broker.throttled.rate and leader.follower.replication.
throttled.replicas?

12.2 If the user only wants to check one topic, the tool could do the
filtering on the client side, right? My concern with having both log_dirs
and topics is the semantic. For example, if both are not empty, do we
return the intersection or the union?

12.3. Yes, firstOffset may not be useful.

14. Hmm, I would think moving data across log dirs will be io bound. We
also have num.recovery.threads.per.data.dir, which defaults to 1. So,
having num.replica.move.threads defaults to # log dirs or half of that (to
account for ios on both source and target) seems reasonable. Is a magical
value of 3 going to be better? Does that work with only 2 log dirs? There
will always be cases when the user needs to customize the value. We just
need a reasonable default to cover the common case.

20. Should we support canceling the movement across log dirs? I was
thinking this can be achieved with a ChangeReplicaDirRequest with dir = any.

Jun


On Mon, Mar 13, 2017 at 3:32 PM, Dong Lin  wrote:

> Hey Jun,
>
> Thanks much for your detailed comments. Please see my reply below.
>
> On Mon, Mar 13, 2017 at 9:09 AM, Jun Rao  wrote:
>
> > Hi, Dong,
> >
> > Thanks for the updated KIP. Some more comments below.
> >
> > 10. For the .move log, do we perform any segment deletion (based on
> > retention) or log cleaning (if a compacted topic)? Or do we only enable
> > that after the swap?
> >
> > 11. kafka-reassign-partitions.sh
> > 11.1 If all reassigned replicas are in the current broker and only the
> log
> > directories have changed, we can probably optimize the tool to not
> trigger
> > partition reassignment through the controller and only
> > send ChangeReplicaDirRequest.
> >
>
> Yes, the reassignment script should not create the reassignment znode if no
> replicas are not be moved between brokers. This falls into the "How to move
> replica between log directories on the same broker" of the Proposed Change
> section.
>
>
> > 11.2 If ChangeReplicaDirRequest specifies a replica that's not created
> yet,
> > could the broker just remember that in memory and create the replica when
> > the creation is requested? This way, when doing cluster expansion, we can
> > make sure that the new replicas on the new brokers are created in the
> right
> > log directory in the first place. We can also avoid the tool having to
> keep
> > issuing ChangeReplicaDirRequest in response to
> > ReplicaNotAvailableException.
> >
>
> I am concerned that the ChangeReplicaDirRequest would be lost if broker
> restarts after it sends ChangeReplicaDirResponse but before it receives
> LeaderAndIsrRequest. In this case, the user will receive success when they
> initiate replica reassignment, but replica reassignment will never complete
> when they verify the reassignment later. This would be confusing to user.
>
> There are three different approaches to this problem if broker has not
> created replica yet after it receives ChangeReplicaDirResquest:
>
> 1) Broker immediately replies to user with ReplicaNotAvailableException and
> user can decide to retry again later. The advantage of this solution is
> that the broker logic is very simple and the reassignment script logic also
> seems straightforward. The disadvantage is that user script has to retry.
> But it seems fine - we can set interval between retries to be 0.5 sec so
> that broker want be bombarded by those requests. This is the solution
> chosen in the current KIP.
>
> 2) Broker can put ChangeReplicaDirRequest in a purgatory with timeout and
> replies to user after the replica has been created. I didn't choose this in
> the interest of keeping broker logic simpler.
>
> 3) Broker can remember that by making a mark in the disk, e.g. create
> topicPartition.tomove directory in the destination log directory. This mark
> will be persisted across broker restart. This is the first idea I had but I
> replaced it with solution 1) in the interest of keeping broker simple.
>
> It seems that solution 1) is the simplest one that works. But I am OK to
> switch to the other two solutions if we don't want the retry 

[jira] [Commented] (KAFKA-4907) compacted topic shouldn't reject messages with old timestamp

2017-03-15 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927460#comment-15927460
 ] 

Jiangjie Qin commented on KAFKA-4907:
-

I see. Yeah, the default behavior probably should just be accepting all the 
messages if user did not specify any message.timestamp.difference.max.ms. I'll 
fix that to maintain the previous behavior.

> compacted topic shouldn't reject messages with old timestamp
> 
>
> Key: KAFKA-4907
> URL: https://issues.apache.org/jira/browse/KAFKA-4907
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Jun Rao
>
> In LogValidator.validateTimestamp(), we check the validity of the timestamp 
> in the message without checking whether the topic is compacted or not. This 
> can cause messages to a compacted topic to be rejected when it shouldn't.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4907) compacted topic shouldn't reject messages with old timestamp

2017-03-15 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned KAFKA-4907:
---

Assignee: Jiangjie Qin

> compacted topic shouldn't reject messages with old timestamp
> 
>
> Key: KAFKA-4907
> URL: https://issues.apache.org/jira/browse/KAFKA-4907
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Jun Rao
>Assignee: Jiangjie Qin
>
> In LogValidator.validateTimestamp(), we check the validity of the timestamp 
> in the message without checking whether the topic is compacted or not. This 
> can cause messages to a compacted topic to be rejected when it shouldn't.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4906) Support 0.9 brokers with a newer Producer or Consumer version

2017-03-15 Thread Grant Henke (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927449#comment-15927449
 ] 

Grant Henke commented on KAFKA-4906:


[~ijuma] Following up with a summary of some of our out of band chats here. 

In smoke testing of a WIP patch it appeared I was able to send messages from a 
trunk client to a 0.9 broker and receive the from a trunk consumer. We were a 
bit confused by this since the message format had changed and should not be 
parsable. I think since I was using uncompressed messages and a regular topic, 
the message could pass through without the format really being parsed or 
validated. 

However, that is likely not the case for a compacted topic or a compressed 
message set. More testing would be needed to be sure. 

Regardless the safest approach would likely be to ensure the message format 
matches the producer message version. (Produce v1 = Message Format 0, and 
Produce V2 = Message Format 1). I will investigate further and see how large of 
a change is required before posting anything further to do that. 

> Support 0.9 brokers with a newer Producer or Consumer version
> -
>
> Key: KAFKA-4906
> URL: https://issues.apache.org/jira/browse/KAFKA-4906
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.10.2.0
>Reporter: Grant Henke
>Assignee: Grant Henke
>
> KAFKA-4507 added the ability for newer Kafka clients to talk to older Kafka 
> brokers if a new feature supported by a newer wire protocol was not 
> used/required. 
> We currently support brokers as old as 0.10.0.0 because thats when the 
> ApiVersionsRequest/Response was added to the broker (KAFKA-3307).
> However, there are relatively few changes between 0.9.0.0 and 0.10.0.0 on the 
> wire, making it possible to support another major broker version set by 
> assuming that any disconnect resulting from an ApiVersionsRequest is from a 
> 0.9 broker and defaulting to legacy protocol versions. 
> Supporting 0.9 with newer clients can drastically simplify upgrades, allow 
> for libraries and frameworks to easily support a wider set of environments, 
> and let developers take advantage of client side improvements without 
> requiring cluster upgrades first. 
> Below is a list of the wire protocol versions by release for reference: 
> {noformat}
> 0.10.x
>   Produce(0): 0 to 2
>   Fetch(1): 0 to 2 
>   Offsets(2): 0
>   Metadata(3): 0 to 1
>   OffsetCommit(8): 0 to 2
>   OffsetFetch(9): 0 to 1
>   GroupCoordinator(10): 0
>   JoinGroup(11): 0
>   Heartbeat(12): 0
>   LeaveGroup(13): 0
>   SyncGroup(14): 0
>   DescribeGroups(15): 0
>   ListGroups(16): 0
>   SaslHandshake(17): 0
>   ApiVersions(18): 0
> 0.9.x:
>   Produce(0): 0 to 1 (no response timestamp from v2)
>   Fetch(1): 0 to 1 (no response timestamp from v2)
>   Offsets(2): 0
>   Metadata(3): 0 (no cluster id or rack info from v1)
>   OffsetCommit(8): 0 to 2
>   OffsetFetch(9): 0 to 1
>   GroupCoordinator(10): 0
>   JoinGroup(11): 0
>   Heartbeat(12): 0
>   LeaveGroup(13): 0
>   SyncGroup(14): 0
>   DescribeGroups(15): 0
>   ListGroups(16): 0
>   SaslHandshake(17): UNSUPPORTED
>   ApiVersions(18): UNSUPPORTED
> 0.8.2.x:
>   Produce(0): 0 (no quotas from v1)
>   Fetch(1): 0 (no quotas from v1)
>   Offsets(2): 0
>   Metadata(3): 0
>   OffsetCommit(8): 0 to 1 (no global retention time from v2)
>   OffsetFetch(9): 0 to 1
>   GroupCoordinator(10): 0
>   JoinGroup(11): UNSUPPORTED
>   Heartbeat(12): UNSUPPORTED
>   LeaveGroup(13): UNSUPPORTED
>   SyncGroup(14): UNSUPPORTED
>   DescribeGroups(15): UNSUPPORTED
>   ListGroups(16): UNSUPPORTED
>   SaslHandshake(17): UNSUPPORTED
>   ApiVersions(18): UNSUPPORTED
> {noformat}
> Note: Due to KAFKA-3088 it may take up to request.timeout.time to fail an 
> ApiVersionsRequest and failover to legacy protocol versions unless we handle 
> that scenario specifically in this patch. The workaround would be to reduce 
> request.timeout.time if needed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4907) compacted topic shouldn't reject messages with old timestamp

2017-03-15 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927444#comment-15927444
 ] 

Ismael Juma commented on KAFKA-4907:


[~becket_qin], yes, so we should not reject messages based on their timestamp 
in that case. But the current code does.

> compacted topic shouldn't reject messages with old timestamp
> 
>
> Key: KAFKA-4907
> URL: https://issues.apache.org/jira/browse/KAFKA-4907
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Jun Rao
>
> In LogValidator.validateTimestamp(), we check the validity of the timestamp 
> in the message without checking whether the topic is compacted or not. This 
> can cause messages to a compacted topic to be rejected when it shouldn't.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4907) compacted topic shouldn't reject messages with old timestamp

2017-03-15 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927438#comment-15927438
 ] 

Jiangjie Qin commented on KAFKA-4907:
-

[~ijuma] Hmm, if {{cleanup.policy}} is NOT set to {{delete}}, the 
{{retention.ms}} value seems not really matter, right?

> compacted topic shouldn't reject messages with old timestamp
> 
>
> Key: KAFKA-4907
> URL: https://issues.apache.org/jira/browse/KAFKA-4907
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Jun Rao
>
> In LogValidator.validateTimestamp(), we check the validity of the timestamp 
> in the message without checking whether the topic is compacted or not. This 
> can cause messages to a compacted topic to be rejected when it shouldn't.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4906) Support 0.9 brokers with a newer Producer or Consumer version

2017-03-15 Thread Grant Henke (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke updated KAFKA-4906:
---
Fix Version/s: (was: 0.10.2.1)

> Support 0.9 brokers with a newer Producer or Consumer version
> -
>
> Key: KAFKA-4906
> URL: https://issues.apache.org/jira/browse/KAFKA-4906
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.10.2.0
>Reporter: Grant Henke
>Assignee: Grant Henke
>
> KAFKA-4507 added the ability for newer Kafka clients to talk to older Kafka 
> brokers if a new feature supported by a newer wire protocol was not 
> used/required. 
> We currently support brokers as old as 0.10.0.0 because thats when the 
> ApiVersionsRequest/Response was added to the broker (KAFKA-3307).
> However, there are relatively few changes between 0.9.0.0 and 0.10.0.0 on the 
> wire, making it possible to support another major broker version set by 
> assuming that any disconnect resulting from an ApiVersionsRequest is from a 
> 0.9 broker and defaulting to legacy protocol versions. 
> Supporting 0.9 with newer clients can drastically simplify upgrades, allow 
> for libraries and frameworks to easily support a wider set of environments, 
> and let developers take advantage of client side improvements without 
> requiring cluster upgrades first. 
> Below is a list of the wire protocol versions by release for reference: 
> {noformat}
> 0.10.x
>   Produce(0): 0 to 2
>   Fetch(1): 0 to 2 
>   Offsets(2): 0
>   Metadata(3): 0 to 1
>   OffsetCommit(8): 0 to 2
>   OffsetFetch(9): 0 to 1
>   GroupCoordinator(10): 0
>   JoinGroup(11): 0
>   Heartbeat(12): 0
>   LeaveGroup(13): 0
>   SyncGroup(14): 0
>   DescribeGroups(15): 0
>   ListGroups(16): 0
>   SaslHandshake(17): 0
>   ApiVersions(18): 0
> 0.9.x:
>   Produce(0): 0 to 1 (no response timestamp from v2)
>   Fetch(1): 0 to 1 (no response timestamp from v2)
>   Offsets(2): 0
>   Metadata(3): 0 (no cluster id or rack info from v1)
>   OffsetCommit(8): 0 to 2
>   OffsetFetch(9): 0 to 1
>   GroupCoordinator(10): 0
>   JoinGroup(11): 0
>   Heartbeat(12): 0
>   LeaveGroup(13): 0
>   SyncGroup(14): 0
>   DescribeGroups(15): 0
>   ListGroups(16): 0
>   SaslHandshake(17): UNSUPPORTED
>   ApiVersions(18): UNSUPPORTED
> 0.8.2.x:
>   Produce(0): 0 (no quotas from v1)
>   Fetch(1): 0 (no quotas from v1)
>   Offsets(2): 0
>   Metadata(3): 0
>   OffsetCommit(8): 0 to 1 (no global retention time from v2)
>   OffsetFetch(9): 0 to 1
>   GroupCoordinator(10): 0
>   JoinGroup(11): UNSUPPORTED
>   Heartbeat(12): UNSUPPORTED
>   LeaveGroup(13): UNSUPPORTED
>   SyncGroup(14): UNSUPPORTED
>   DescribeGroups(15): UNSUPPORTED
>   ListGroups(16): UNSUPPORTED
>   SaslHandshake(17): UNSUPPORTED
>   ApiVersions(18): UNSUPPORTED
> {noformat}
> Note: Due to KAFKA-3088 it may take up to request.timeout.time to fail an 
> ApiVersionsRequest and failover to legacy protocol versions unless we handle 
> that scenario specifically in this patch. The workaround would be to reduce 
> request.timeout.time if needed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-111 Kafka should preserve the Principal generated by the PrincipalBuilder while processing the request received on socket channel, on the broker.

2017-03-15 Thread Mayuresh Gharat
Hi Jun,

Sorry for the delayed reply.
I agree that the easiest thing will be to add an additional field in the
Session class and we should be OK.
But having a KafkaPrincipal and java Principal with in the same class looks
little weird.

So we can do this and slowly deprecate the usage of KafkaPrincipal in
public api's.

We add new apis and make changes to the existing apis as follows :


   - Changes to Session class :

@Deprecated
case class Session(principal: KafkaPrincipal, clientAddress: InetAddress) {
val sanitizedUser = QuotaId.sanitize(principal.getName)
}


*@Deprecated .. (NEW)*


*case class Session(principal: KafkaPrincipal, clientAddress: InetAddress,
channelPrincipal: Java.security.Principal) {val sanitizedUser =
QuotaId.sanitize(principal.getName)}*

*(NEW)*


*case class Session(principal: Java.security.Principal, clientAddress:
InetAddress) {val sanitizedUser = QuotaId.sanitize(principal.getName)}*


   - Changes to Authorizer Interface :

@Deprecated
def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]]

*(NEW)*
*def getAcls(principal: Java.security.Principal): Map[Resource, Set[Acl]]*


   - Changes to Acl class :

@Deprecated
case class Acl(principal: KafkaPrincipal, permissionType: PermissionType,
host: String, operation: Operation)

   *(NEW)*


*case class Acl(principal: Java.security.Principal, permissionType:
PermissionType, host: String, operation: Operation) *
The one in Bold are the new api's. We will remove them eventually, probably
in next major release.
We don't want to get rid of KafkaPrincipal class and it will be used in the
same way as it does right now for out of box authorizer and commandline
tool. We would only be removing its direct usage from public apis.
Doing the above deprecation will help us to support other implementation of
Java.security.Principal as well which seems necessary especially since
Kafka provides pluggable Authorizer and PrincipalBuilder.

Let me know your thoughts on this.

Thanks,

Mayuresh

On Tue, Feb 28, 2017 at 2:33 PM, Mayuresh Gharat  wrote:

> Hi Jun,
>
> Sure.
> I had an offline discussion with Joel on how we can deprecate the
> KafkaPrincipal from  Session and Authorizer.
> I will update the KIP to see if we can address all the concerns here. If
> not we can keep the KafkaPrincipal.
>
> Thanks,
>
> Mayuresh
>
> On Tue, Feb 28, 2017 at 1:53 PM, Jun Rao  wrote:
>
>> Hi, Joel,
>>
>> Good point on the getAcls() method. KafkaPrincipal is also tied to ACL,
>> which is used in pretty much every method in Authorizer. Now, I am not
>> sure
>> if it's easy to deprecate KafkaPrincipal.
>>
>> Hi, Mayuresh,
>>
>> Given the above, it seems that the easiest thing is to add a new Principal
>> field in Session. We want to make it clear that it's ignored in the
>> default
>> implementation, but a customizer authorizer could take advantage of that.
>>
>> Thanks,
>>
>> Jun
>>
>> On Tue, Feb 28, 2017 at 10:52 AM, Joel Koshy  wrote:
>>
>> > If we deprecate KafkaPrincipal, then the Authorizer interface will also
>> > need to change - i.e., deprecate the getAcls(KafkaPrincipal) method.
>> >
>> > On Tue, Feb 28, 2017 at 10:11 AM, Mayuresh Gharat <
>> > gharatmayures...@gmail.com> wrote:
>> >
>> > > Hi Jun/Ismael,
>> > >
>> > > Thanks for the comments.
>> > >
>> > > I agree.
>> > > What I was thinking was, we get the KIP passed now and wait till major
>> > > kafka version release. We can then make this change, but for now we
>> can
>> > > wait. Does that work?
>> > >
>> > > If there are concerns, we can make the addition of extra field of type
>> > > Principal to Session and then deprecate the KafkaPrincipal later.
>> > >
>> > > I am fine either ways. What do you think?
>> > >
>> > > Thanks,
>> > >
>> > > Mayuresh
>> > >
>> > > On Tue, Feb 28, 2017 at 9:53 AM, Jun Rao  wrote:
>> > >
>> > > > Hi, Ismael,
>> > > >
>> > > > Good point on compatibility.
>> > > >
>> > > > Hi, Mayuresh,
>> > > >
>> > > > Given that, it seems that it's better to just add the raw principal
>> as
>> > a
>> > > > new field in Session for now and deprecate the KafkaPrincipal field
>> in
>> > > the
>> > > > future if needed?
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Jun
>> > > >
>> > > > On Mon, Feb 27, 2017 at 5:05 PM, Ismael Juma 
>> > wrote:
>> > > >
>> > > > > Breaking clients without a deprecation period is something we
>> only do
>> > > as
>> > > > a
>> > > > > last resort. Is there strong justification for doing it here?
>> > > > >
>> > > > > Ismael
>> > > > >
>> > > > > On Mon, Feb 27, 2017 at 11:28 PM, Mayuresh Gharat <
>> > > > > gharatmayures...@gmail.com> wrote:
>> > > > >
>> > > > > > Hi Ismael,
>> > > > > >
>> > > > > > Yeah. I agree that it might break the clients if the user is
>> using
>> > > the
>> > > > > > kafkaPrincipal directly. But since KafkaPrincipal is also a Java
>> > > > > Principal
>> > > > > > and I think, it would be a 

[jira] [Updated] (KAFKA-4888) offset 449883 is invalid, cause: Record is corrupt (stored crc = 2171407101, computed crc = 1371274824)

2017-03-15 Thread Hanlin Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hanlin Liu updated KAFKA-4888:
--
Attachment: CRC_log.txt
CRC_dump.cap

CRC_log.txt is the log from our consumer and kafka.

CRC_dump.cap is the tcp dump file we took when CRC happened.

> offset 449883 is invalid, cause: Record is corrupt (stored crc = 2171407101, 
> computed crc = 1371274824)
> ---
>
> Key: KAFKA-4888
> URL: https://issues.apache.org/jira/browse/KAFKA-4888
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
> Environment: Release version:
> LSB Version:  
> :base-4.0-amd64:base-4.0-noarch:core-4.0-amd64:core-4.0-noarch:graphics-4.0-amd64:graphics-4.0-noarch:printing-4.0-amd64:printing-4.0-noarch
> Distributor ID:   RedHatEnterpriseServer
> Description:  Red Hat Enterprise Linux Server release 6.5 (Santiago)
> Release:  6.5
> Codename: Santiago
> Memory SO:
> Mem:15.577G total,   15.416G used,  164.508M free,   49.895M buffers
> Filesystem
> FilesystemSize  Used Avail Use% Mounted on
> /dev/mapper/appvg-lv_opt
>30G 1018M   28G   20% /opt/ngin
>Reporter: Eduardo da Silva Neto
> Attachments: CRC_dump.cap, CRC_log.txt, issue_kafka
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> We created five kafka consumers that consume with one message at a time 
> max.poll.records = 1. After two days of intensive processing with the kafka 
> server file system with about 50% used the corrupted registry error occurred.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4888) offset 449883 is invalid, cause: Record is corrupt (stored crc = 2171407101, computed crc = 1371274824)

2017-03-15 Thread Hanlin Liu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927415#comment-15927415
 ] 

Hanlin Liu commented on KAFKA-4888:
---

[~junrao][~ijuma] Hi, I am co-worker of [~chengc].
The kafka version is 0.10.1.0-cp2.
We are running confluent kafka and kafka connect on public cloud machines.
The issue happened quiet often but not in a persistent pattern. The CRC error 
is not from the same offset.
Actually, we tried consume messages at localhost, it works perfectly fine. We 
believe the messages are not corrupted on disk.



> offset 449883 is invalid, cause: Record is corrupt (stored crc = 2171407101, 
> computed crc = 1371274824)
> ---
>
> Key: KAFKA-4888
> URL: https://issues.apache.org/jira/browse/KAFKA-4888
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
> Environment: Release version:
> LSB Version:  
> :base-4.0-amd64:base-4.0-noarch:core-4.0-amd64:core-4.0-noarch:graphics-4.0-amd64:graphics-4.0-noarch:printing-4.0-amd64:printing-4.0-noarch
> Distributor ID:   RedHatEnterpriseServer
> Description:  Red Hat Enterprise Linux Server release 6.5 (Santiago)
> Release:  6.5
> Codename: Santiago
> Memory SO:
> Mem:15.577G total,   15.416G used,  164.508M free,   49.895M buffers
> Filesystem
> FilesystemSize  Used Avail Use% Mounted on
> /dev/mapper/appvg-lv_opt
>30G 1018M   28G   20% /opt/ngin
>Reporter: Eduardo da Silva Neto
> Attachments: CRC_dump.cap, CRC_log.txt, issue_kafka
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> We created five kafka consumers that consume with one message at a time 
> max.poll.records = 1. After two days of intensive processing with the kafka 
> server file system with about 50% used the corrupted registry error occurred.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-4907) compacted topic shouldn't reject messages with old timestamp

2017-03-15 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927399#comment-15927399
 ] 

Ismael Juma edited comment on KAFKA-4907 at 3/16/17 3:11 AM:
-

I think Jun is suggesting that defaulting to `retention.ms` only makes sense if 
retention by time is enabled. For compacted topics without `cleanup.policy` of 
`delete`, retention by time is basically disabled. So, in that case, the 
default should be infinite as it was before. Not sure what is the best way to 
achieve this though.


was (Author: ijuma):
I think Jun is suggesting that defaulting to `retention.ms` only makes sense if 
retention by time is enabled. For compacted topics without `cleanup.policy` of 
`delete`, retention is basically disabled. So, in that case, the default should 
be infinite as it was before. Not sure what is the best way to achieve this 
though.

> compacted topic shouldn't reject messages with old timestamp
> 
>
> Key: KAFKA-4907
> URL: https://issues.apache.org/jira/browse/KAFKA-4907
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Jun Rao
>
> In LogValidator.validateTimestamp(), we check the validity of the timestamp 
> in the message without checking whether the topic is compacted or not. This 
> can cause messages to a compacted topic to be rejected when it shouldn't.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4907) compacted topic shouldn't reject messages with old timestamp

2017-03-15 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927399#comment-15927399
 ] 

Ismael Juma commented on KAFKA-4907:


I think Jun is suggesting that defaulting to `retention.ms` only makes sense if 
retention by time is enabled. For compacted topics without `cleanup.policy` of 
`delete`, retention is basically disabled. So, in that case, the default should 
be infinite as it was before. Not sure what is the best way to achieve this 
though.

> compacted topic shouldn't reject messages with old timestamp
> 
>
> Key: KAFKA-4907
> URL: https://issues.apache.org/jira/browse/KAFKA-4907
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Jun Rao
>
> In LogValidator.validateTimestamp(), we check the validity of the timestamp 
> in the message without checking whether the topic is compacted or not. This 
> can cause messages to a compacted topic to be rejected when it shouldn't.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4907) compacted topic shouldn't reject messages with old timestamp

2017-03-15 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927392#comment-15927392
 ] 

Jiangjie Qin commented on KAFKA-4907:
-

[~junrao] Thanks for the explanation. Yes, that does sound an issue. But it 
also doesn't seem ideal to simply accept any timestamp for a log compacted 
topic. It looks there are two scenarios:
1. Users are mirroring a log compacted topic to a new cluster. In this case the 
broker should just accept any timestamp.
2. Users are producing real time messages into a log compacted topic. In this 
case the broker should reject a timestamp that is out of the 
message.timestamp.difference.max.ms.

I am not sure what is the best way to address both cases. Because the broker 
cannot distinguish between the two scenarios, it seems that manual 
configuration is necessary. i.e. in case 1 the users will have to manually 
change the message.timestamp.difference.max.ms to Long.MAX_VALUE and delete the 
configuration after the mirror has caught up. 

There might be a way for the broker to automatically change the configuration 
by guessing what the user is doing. For example, let the broker accept any 
timestamp until the broker sees a timestamp that falls into the acceptable 
range (assuming it has caught up). But this seems not intuitive and is not 
guaranteed to work given the timestamp can actually be out of order.



> compacted topic shouldn't reject messages with old timestamp
> 
>
> Key: KAFKA-4907
> URL: https://issues.apache.org/jira/browse/KAFKA-4907
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Jun Rao
>
> In LogValidator.validateTimestamp(), we check the validity of the timestamp 
> in the message without checking whether the topic is compacted or not. This 
> can cause messages to a compacted topic to be rejected when it shouldn't.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4893) async topic deletion conflicts with max topic length

2017-03-15 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927303#comment-15927303
 ] 

Vahid Hashemian commented on KAFKA-4893:


[~onurkaraman] Sure, I agree that your suggestion is the long term fix for the 
issue. I just wanted to mention and document the alternative here :)

> async topic deletion conflicts with max topic length
> 
>
> Key: KAFKA-4893
> URL: https://issues.apache.org/jira/browse/KAFKA-4893
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Vahid Hashemian
>Priority: Minor
>
> As per the 
> [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], 
> topics can be only 249 characters long to line up with typical filesystem 
> limitations:
> {quote}
> Each sharded partition log is placed into its own folder under the Kafka log 
> directory. The name of such folders consists of the topic name, appended by a 
> dash (\-) and the partition id. Since a typical folder name can not be over 
> 255 characters long, there will be a limitation on the length of topic names. 
> We assume the number of partitions will not ever be above 100,000. Therefore, 
> topic names cannot be longer than 249 characters. This leaves just enough 
> room in the folder name for a dash and a potentially 5 digit long partition 
> id.
> {quote}
> {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during 
> validation.
> This limit ends up not being quite right since topic deletion ends up 
> renaming the directory to the form {{topic-partition.uniqueId-delete}} as can 
> be seen in {{LogManager.asyncDelete}}:
> {code}
> val dirName = new StringBuilder(removedLog.name)
>   .append(".")
>   
> .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
>   .append(Log.DeleteDirSuffix)
>   .toString()
> {code}
> So the unique id and "-delete" suffix end up hogging some of the characters. 
> Deleting a long-named topic results in a log message such as the following:
> {code}
> kafka.common.KafkaStorageException: Failed to rename log directory from 
> /tmp/kafka-logs0/0-0
>  to 
> /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete
>   at kafka.log.LogManager.asyncDelete(LogManager.scala:439)
>   at 
> kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
>   at kafka.cluster.Partition.delete(Partition.scala:137)
>   at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259)
>   at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The topic after this point still exists but has Leader set to -1 and the 
> controller recognizes the topic completion as incomplete (the topic znode is 
> still in /admin/delete_topics).
> I don't believe linkedin has any topic name this long but I'm making the 
> ticket in case anyone runs into this problem.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4893) async topic deletion conflicts with max topic length

2017-03-15 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927298#comment-15927298
 ] 

Onur Karaman commented on KAFKA-4893:
-

Also [~vahid] while I agree reducing the limit is by far the easiest, it 
definitely feels like we're moving the goalpost. We should instead try to 
maintain our guarantees as long as it's not exceedingly difficult. In this 
scenario, it doesn't feel exceedingly difficult.

> async topic deletion conflicts with max topic length
> 
>
> Key: KAFKA-4893
> URL: https://issues.apache.org/jira/browse/KAFKA-4893
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Vahid Hashemian
>Priority: Minor
>
> As per the 
> [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], 
> topics can be only 249 characters long to line up with typical filesystem 
> limitations:
> {quote}
> Each sharded partition log is placed into its own folder under the Kafka log 
> directory. The name of such folders consists of the topic name, appended by a 
> dash (\-) and the partition id. Since a typical folder name can not be over 
> 255 characters long, there will be a limitation on the length of topic names. 
> We assume the number of partitions will not ever be above 100,000. Therefore, 
> topic names cannot be longer than 249 characters. This leaves just enough 
> room in the folder name for a dash and a potentially 5 digit long partition 
> id.
> {quote}
> {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during 
> validation.
> This limit ends up not being quite right since topic deletion ends up 
> renaming the directory to the form {{topic-partition.uniqueId-delete}} as can 
> be seen in {{LogManager.asyncDelete}}:
> {code}
> val dirName = new StringBuilder(removedLog.name)
>   .append(".")
>   
> .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
>   .append(Log.DeleteDirSuffix)
>   .toString()
> {code}
> So the unique id and "-delete" suffix end up hogging some of the characters. 
> Deleting a long-named topic results in a log message such as the following:
> {code}
> kafka.common.KafkaStorageException: Failed to rename log directory from 
> /tmp/kafka-logs0/0-0
>  to 
> /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete
>   at kafka.log.LogManager.asyncDelete(LogManager.scala:439)
>   at 
> kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
>   at kafka.cluster.Partition.delete(Partition.scala:137)
>   at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259)
>   at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The topic after this point still exists but has Leader set to -1 and the 
> controller recognizes the topic completion as incomplete (the topic znode is 
> still in /admin/delete_topics).
> I don't believe linkedin has any topic name this long but I'm making the 
> ticket in case anyone runs into this problem.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4844) kafka is holding open file descriptors

2017-03-15 Thread chao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927273#comment-15927273
 ] 

chao commented on KAFKA-4844:
-

we want to delete index and log files under  TOPIC_PARTITION_X/... and  
__consumer_offsets-/...  7 days ago 
how can we configure that ?? what is the detail mean for log.cleaner.enable and 
offsets.retention.minutes??
for example :
offsets.retention.minutes=10080

> kafka is holding open file descriptors
> --
>
> Key: KAFKA-4844
> URL: https://issues.apache.org/jira/browse/KAFKA-4844
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: chao
>Priority: Critical
>
> We found strange issue on Kafka 0.9.0.1 , kafka is holding opne file 
> descriptors , and not allowing disk space to be reclaimed
> my question:
> 1. what does file (nfsX) mean ??? 
> 2. why kafka is holding file ?? 
> $ sudo lsof /nas/kafka_logs/kafka/Order-6/.nfs04550ffcbd61
> COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
> java 97465 kafka mem REG 0,25 10485760 72683516 
> /nas/kafka_logs/kafka/Order-6/.nfs04550ffcbd61



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4907) compacted topic shouldn't reject messages with old timestamp

2017-03-15 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927265#comment-15927265
 ] 

Jun Rao commented on KAFKA-4907:


[~becket_qin], I was referring to the following code in KafkaConfig. If 
LogMessageTimestampDifferenceMaxMsProp is not set, we default to the retention 
time even for a compacted topic. Since the retention in a compacted topic is 
infinite, it seems that the default logMessageTimestampDifferenceMaxMs should 
be infinite?

  private def getMessageTimestampDifferenceMaxMs: Long = {
Option(getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)) match {
  case Some(value) => value
  case None => getLogRetentionTimeMillis
}
  }


> compacted topic shouldn't reject messages with old timestamp
> 
>
> Key: KAFKA-4907
> URL: https://issues.apache.org/jira/browse/KAFKA-4907
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Jun Rao
>
> In LogValidator.validateTimestamp(), we check the validity of the timestamp 
> in the message without checking whether the topic is compacted or not. This 
> can cause messages to a compacted topic to be rejected when it shouldn't.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-107: Add purgeDataBefore() API in AdminClient

2017-03-15 Thread Dong Lin
When changing the code, I realized that it feels weird to have
DeleteRequest and DeleteTopicsRequest. Thus I would follow the suggestion
and change it to DeleteRecordsRequest in this KIP, unless we decide to use
PurgeRequest.

On Wed, Mar 15, 2017 at 12:04 PM, Dong Lin  wrote:

> Hey Jason, Ismael, Jeff,
>
> Regarding Purge vs PurgeRecords, would it be OK for me to make a followup
> patch to rename PurgeRequest to PurgeRecordsRequest (similarly for
> ProduceRequest and FetchRequest)? This is because I favor PurgeRequest over
> PurgeRecordsRequest before we rename ProduceRequest and FetchRequest.
> Also, since the patch is ready for merge other than the naming issue we
> are discussing here, I would like to make less cosmetic code change and
> have it merged soon. I can submit the patch to rename the requests right
> after the pull request is committed.
>
> Hey Jun,
>
> You mentioned that the purpose of having purge is to distinguish between
> removing while log vs removing portion of the log. The PurgeRequest
> proposed in this KIP will remove portion of the Log since it works on the
> granularity of records. This will be more explicit after we rename it to
> PurgeRecordsRequest. If we want to have request in the future to remove the
> entire log, we can name it PurgeLogRequest. Thus I think it is OK to use
> "delete" instead of "purge" in the name and still be able to distinguish
> between removing while log vs removing portion of the log.
>
> I have updated the KIP to replace "purge" with "delete" in the names of
> the Java API and requests. Are you OK with the change?
>
> Thanks,
> Dong
>
>
> On Wed, Mar 15, 2017 at 9:59 AM, Jason Gustafson 
> wrote:
>
>> Hey Dong,
>>
>> Sorry for the late reply. Yes, I prefer PurgeRecordsRequest instead of
>> PurgeRequest. DeleteRecords seems even better. As mentioned, I also think
>> it would be a good idea to rename FetchRequest and ProduceRequest
>> accordingly, but we need not consider that here. We could potentially
>> rename Purge to PurgeRecords if and when we rename Fetch and Produce, but
>> if that's the plan, we may as well do it from the start. Anyway, it's just
>> my preference, so don't block on my opinion if the consensus is unclear.
>>
>> -Jason
>>
>>
>>
>> On Wed, Mar 15, 2017 at 8:45 AM, Ismael Juma  wrote:
>>
>> > Hi Dong,
>> >
>> > I think your suggestion of including `Records` in the name of the new
>> > request and renaming `Fetch` and `Produce` to be `FetchRecords` and
>> > `ProduceRecords` is a good one. We can do the the renames separately.
>> It's
>> > a compatible change since the name of the API is never exchanged with
>> > clients and the request/response classes are internal (we have done such
>> > renames before as Jason pointed out offline). The number of APIs will
>> > continue to grow and it will be much clearer if we avoid implicit
>> > assumptions about the target of an API request/response.
>> >
>> > Given that, I also think that DeleteRecords makes sense since we also
>> have
>> > DeleteTopics. Both are batch APIs that delete multiple items (the space
>> is
>> > only freed later). If we use Oracle's definition of "purge", it seems
>> to be
>> > what happens to cause the space to be freed (and happens in the
>> background
>> > in Kafka):
>> >
>> > "Purging is the process of freeing up space in the database or of
>> deleting
>> > obsolete data that is not required by the system. The purge process can
>> be
>> > based on the age of the data or the type of data"
>> > https://docs.oracle.com/cd/E12057_01/doc.1014/e12050/archpur
>> g.htm#BABHDECI
>> >
>> > Regarding the AdminClient, I thought the KIP was proposing adding this
>> > method to the Java AdminClient (KIP-117). If it's the Scala one, then it
>> > doesn't even need to be in the KIP as the Scala AdminClient is internal
>> and
>> > no compatibility guarantees are offered (the methods that exist there
>> never
>> > went through a KIP for example). So, I'm OK with keeping the method
>> > signature as it is.
>> >
>> > Ismael
>> >
>> > On Fri, Mar 10, 2017 at 6:06 PM, Dong Lin  wrote:
>> >
>> > > Hey Jason,
>> > >
>> > > Just to clarify, are you, together with Ismael and Jeff, suggesting
>> that
>> > > the new request should be named PurgeRecordsRequest instead of
>> > > PurgeRequest? The advantage of PurgeRecordsRequest is the name itself
>> is
>> > > more explicit about what it does. The disadvantage of
>> PurgeRecordsRequest
>> > > is that it is a bit consistent with ProduceRequest and FetchRequest,
>> > which
>> > > already assumes that if the target is not explicitly specified then
>> the
>> > > target is "Records".
>> > >
>> > > I would be in favor of PurgeRecordsRequest if we plan to change
>> > > FetchRequest
>> > > to FetchRecordsRequest and ProduceRequest to ProduceRequestsRequest.
>> > > Otherwise, I would prefer PurgeRequest since it is more consistent
>> with
>> > > existing style. Would 

Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-03-15 Thread Ismael Juma
Thanks for the updates Dong, they look good to me.

Ismael

On Wed, Mar 15, 2017 at 5:50 PM, Dong Lin  wrote:

> Hey Ismael,
>
> Sure, I have updated "Changes in Operational Procedures" section in KIP-113
> to specify the problem and solution with known disk failure. And I updated
> the "Test Plan" section to note that we have test in KIP-113 to verify that
> replicas already created on the good log directories will not be affected
> by failure of other log directories.
>
> Please let me know if there is any other improvement I can make. Thanks for
> your comment.
>
> Dong
>
>
> On Wed, Mar 15, 2017 at 3:18 AM, Ismael Juma  wrote:
>
> > Hi Dong,
> >
> > Yes, that sounds good to me. I'd list option 2 first since that is safe
> > and, as you said, no worse than what happens today. The file approach is
> a
> > bit hacky as you said, so it may be a bit fragile. Not sure if we really
> > want to mention that. :)
> >
> > About the note in KIP-112 versus adding the test in KIP-113, I think it
> > would make sense to add a short sentence stating that this scenario is
> > covered in KIP-113. People won't necessarily read both KIPs at the same
> > time and it's helpful to cross-reference when it makes sense.
> >
> > Thanks for your work on this.
> >
> > Ismael
> >
> > On Tue, Mar 14, 2017 at 11:00 PM, Dong Lin  wrote:
> >
> > > Hey Ismael,
> > >
> > > I get your concern that it is more likely for a disk to be slow, or
> > exhibit
> > > other forms of non-fatal symptom, after some known fatal error. Then it
> > is
> > > weird for user to start broker with the likely-problematic disk in the
> > > broker config. In that case, I think there are two things user can do:
> > >
> > > 1) Intentionally change the log directory in the config to point to a
> > file.
> > > This is a bit hacky but it works well before we make more-appropriate
> > > long-term change in Kafka to handle this case.
> > > 2) Just don't start broker with bad log directories. Always fix disk
> > before
> > > restarting the broker. This is a safe approach that is no worse than
> > > current practice.
> > >
> > > Would this address your concern if I specify the problem and the two
> > > solutions in the KIP?
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Tue, Mar 14, 2017 at 3:29 PM, Dong Lin  wrote:
> > >
> > > > Hey Ismael,
> > > >
> > > > Thanks for the comment. Please see my reply below.
> > > >
> > > > On Tue, Mar 14, 2017 at 10:31 AM, Ismael Juma 
> > wrote:
> > > >
> > > >> Thanks Dong. Comments inline.
> > > >>
> > > >> On Fri, Mar 10, 2017 at 6:25 PM, Dong Lin 
> > wrote:
> > > >> >
> > > >> > I get your point. But I am not sure we should recommend user to
> > simply
> > > >> > remove disk from the broker config. If user simply does this
> without
> > > >> > checking the utilization of good disks, replica on the bad disk
> will
> > > be
> > > >> > re-created on the good disk and may overload the good disks,
> causing
> > > >> > cascading failure.
> > > >> >
> > > >>
> > > >> Good point.
> > > >>
> > > >>
> > > >> >
> > > >> > I agree with you and Colin that slow disk may cause problem.
> > However,
> > > >> > performance degradation due to slow disk this is an existing
> problem
> > > >> that
> > > >> > is not detected or handled by Kafka or KIP-112.
> > > >>
> > > >>
> > > >> I think an important difference is that a number of disk errors are
> > > >> currently fatal and won't be after KIP-112. So it introduces new
> > > scenarios
> > > >> (for example, bouncing a broker that is working fine although some
> > disks
> > > >> have been marked bad).
> > > >>
> > > >
> > > > Hmm.. I am still trying to understand why KIP-112 creates new
> > scenarios.
> > > > Slow disk is not considered fatal error and won't be caught by either
> > > > existing Kafka design or this KIP. If any disk is marked bad, it
> means
> > > > broker encounters IOException when accessing disk, most likely the
> > broker
> > > > will encounter IOException again when accessing this disk and mark
> this
> > > > disk as bad after bounce. I guess you are talking about the case
> that a
> > > > disk is marked bad, broker is bounced, then the disk provides
> degraded
> > > > performance without being marked bad, right? But this seems to be an
> > > > existing problem we already have today with slow disk.
> > > >
> > > > Here are the possible scenarios with bad disk after broker bounce:
> > > >
> > > > 1) bad disk -> broker bounce -> good disk. This would be great.
> > > > 2) bad disk -> broker bounce -> slow disk. Slow disk is an existing
> > > > problem that is not addressed by Kafka today.
> > > > 3) bad disk -> broker bounce -> bad disk. This is handled by this KIP
> > > such
> > > > that only replicas on the bad disk become offline.
> > > >
> > > >
> > > >>
> > > >> > Detection and handling of
> > > >> > slow disk is a separate problem that needs to be addressed 

[jira] [Commented] (KAFKA-4907) compacted topic shouldn't reject messages with old timestamp

2017-03-15 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927231#comment-15927231
 ] 

Jiangjie Qin commented on KAFKA-4907:
-

[~junrao] Are you referring to the log cleaner? The log cleaner seems using 
different method and appends the message directly to the log without going 
through the timestamp validation. So it seems not an issue?

> compacted topic shouldn't reject messages with old timestamp
> 
>
> Key: KAFKA-4907
> URL: https://issues.apache.org/jira/browse/KAFKA-4907
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Jun Rao
>
> In LogValidator.validateTimestamp(), we check the validity of the timestamp 
> in the message without checking whether the topic is compacted or not. This 
> can cause messages to a compacted topic to be rejected when it shouldn't.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-4893) async topic deletion conflicts with max topic length

2017-03-15 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927033#comment-15927033
 ] 

Vahid Hashemian edited comment on KAFKA-4893 at 3/15/17 11:55 PM:
--

[~onurkaraman] What you suggested should work. Of course, an easier solution 
would be to decrease the max topic length (again) to 249 - 1 - 32 - 7 = 209.
{code}
.append(".") // len = 1
.append(java.util.UUID.randomUUID.toString.replaceAll("-","")) // len = 32
.append(Log.DeleteDirSuffix) // len = 7
{code}


was (Author: vahid):
[~onurkaraman] What you suggested should work. Of course, an easier solution 
would be to decrease the max topic length (again) to 249 - 1 - 32 - 7 = 219.
{code}
.append(".") // len = 1
.append(java.util.UUID.randomUUID.toString.replaceAll("-","")) // len = 32
.append(Log.DeleteDirSuffix) // len = 7
{code}

> async topic deletion conflicts with max topic length
> 
>
> Key: KAFKA-4893
> URL: https://issues.apache.org/jira/browse/KAFKA-4893
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Vahid Hashemian
>Priority: Minor
>
> As per the 
> [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], 
> topics can be only 249 characters long to line up with typical filesystem 
> limitations:
> {quote}
> Each sharded partition log is placed into its own folder under the Kafka log 
> directory. The name of such folders consists of the topic name, appended by a 
> dash (\-) and the partition id. Since a typical folder name can not be over 
> 255 characters long, there will be a limitation on the length of topic names. 
> We assume the number of partitions will not ever be above 100,000. Therefore, 
> topic names cannot be longer than 249 characters. This leaves just enough 
> room in the folder name for a dash and a potentially 5 digit long partition 
> id.
> {quote}
> {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during 
> validation.
> This limit ends up not being quite right since topic deletion ends up 
> renaming the directory to the form {{topic-partition.uniqueId-delete}} as can 
> be seen in {{LogManager.asyncDelete}}:
> {code}
> val dirName = new StringBuilder(removedLog.name)
>   .append(".")
>   
> .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
>   .append(Log.DeleteDirSuffix)
>   .toString()
> {code}
> So the unique id and "-delete" suffix end up hogging some of the characters. 
> Deleting a long-named topic results in a log message such as the following:
> {code}
> kafka.common.KafkaStorageException: Failed to rename log directory from 
> /tmp/kafka-logs0/0-0
>  to 
> /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete
>   at kafka.log.LogManager.asyncDelete(LogManager.scala:439)
>   at 
> kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
>   at kafka.cluster.Partition.delete(Partition.scala:137)
>   at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259)
>   at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The topic after this point still exists but has Leader set to -1 and the 
> controller recognizes the topic completion as incomplete (the topic znode is 

[jira] [Commented] (KAFKA-4900) Brokers stuck in controller re-election loop after failing to register metrics

2017-03-15 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927200#comment-15927200
 ] 

Onur Karaman commented on KAFKA-4900:
-

I hadn't yet investigated why some of the fetchers hadn't started up, but it 
makes me wonder if we should change the controller-broker interaction such that 
either:
# the expected leadership/followership is periodically heartbeated from 
controller to broker
# the current leadership/followership is periodically heartbeated from broker 
to controller

> Brokers stuck in controller re-election loop after failing to register metrics
> --
>
> Key: KAFKA-4900
> URL: https://issues.apache.org/jira/browse/KAFKA-4900
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, core
>Affects Versions: 0.10.1.1
>Reporter: Nick Travers
>
> We hit this today in one of out three node staging clusters. The exception 
> continues to occur on all three nodes.
> {code}
> 2017-03-15 02:17:30,677 ERROR 
> [ZkClient-EventThread-35-samsa-zkserver.stage.sjc1.square:26101/samsa] 
> server.ZookeeperLeaderElector - Error while electing or becoming leader on 
> broker 9
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=connection-close-rate, 
> group=controller-channel-metrics,description=Connections closed per second in 
> the window., tags={broker-id=10}]' already exists, can't register another one.
> at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:380)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:179)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:164)
> at 
> org.apache.kafka.common.network.Selector$SelectorMetrics.(Selector.java:617)
> at org.apache.kafka.common.network.Selector.(Selector.java:138)
> at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:101)
> at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45)
> at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45)
> at scala.collection.immutable.Set$Set3.foreach(Set.scala:163)
> at 
> kafka.controller.ControllerChannelManager.(ControllerChannelManager.scala:45)
> at 
> kafka.controller.KafkaController.startChannelManager(KafkaController.scala:814)
> at 
> kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:742)
> at 
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:334)
> at 
> kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:167)
> at 
> kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141)
> at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:824)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) 
> {code}
> We observe a tight loop of controller (re-)election, i.e. one node hits this 
> exception, and leadership transitions to the next, which then hits the 
> exception, ad infinitum.
> Producers and consumers appear to be connecting ok, and are able to produce 
> and consume messages.
> Relevant data points:
>  - prior to this cluster restart, a partition reassignment was attempted for 
> a number of topics, which appeared to get stuck in the "in progress" state 
> (on the order of days)
>  - these topics were subsequently deleted
>  - a rolling restart of the cluster was performed was to turn on 
> broker-to-broker SSL communication
>  - the SSL change has subsequently been _rolled back_ after we observed these 
> exceptions
>  - the entire cluster was shut down, and nodes brought back one at a time in 
> an attempt to clear the exception. We were able to restart the cluster, but 
> we continue to see the exceptions
> We also observed, during the same time as the exception above, the following 
> exception on all hosts:
> {code}
> 2017-03-15 01:44:04,572 ERROR 
> [ZkClient-EventThread-36-samsa-zkserver.stage.sjc1.square:26101/samsa] 
> 

[jira] [Comment Edited] (KAFKA-4893) async topic deletion conflicts with max topic length

2017-03-15 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927155#comment-15927155
 ] 

Onur Karaman edited comment on KAFKA-4893 at 3/15/17 11:23 PM:
---

[~ijuma] I had thought about the additional fsync on the directory as well 
after posting the earlier comment.

Would the following structure help?
{code}
kafkaLogDir
| -- cleaner-offset-checkpoint
| -- delete
|| -- 8fc621a18fe746d1b8eb4a7fa55a04bc
| | -- .index
| | -- .log
| | -- .timeindex
| | -- t-0
| -- meta.properties
| -- recovery-point-offset-checkpoint
| -- replication-offset-checkpoint
{code}

We'd put all the partitions staged for async deletion under kafkaLogDir/delete. 
Each directory corresponding to a partition staged for async deletion would 
have a new empty file with filename topic-partition so you can figure out later 
on what partition the data belonged to.

The kafkaLogDir/delete directory would just be a trash bin directory that 
always exists, and we'd just delete all of the data inside the trash bin.


was (Author: onurkaraman):
[~ijuma] I had thought about the additional fsync on the directory as well 
after posting the earlier comment.

Would the following structure help?
{code}
kafkaLogDir
| -- cleaner-offset-checkpoint
| -- delete
|| -- 8fc621a18fe746d1b8eb4a7fa55a04bc
| | -- .index
| | -- .log
| | -- .timeindex
| | -- t-0
| -- meta.properties
| -- recovery-point-offset-checkpoint
| -- replication-offset-checkpoint
{code}

The kafkaLogDir/delete directory would always be there. We'd put all the 
partitions staged for async deletion under kafkaLogDir/delete. Each directory 
corresponding to a partition staged for async deletion would have a new empty 
file with filename topic-partition so you can figure out later on what 
partition the data belonged to.

> async topic deletion conflicts with max topic length
> 
>
> Key: KAFKA-4893
> URL: https://issues.apache.org/jira/browse/KAFKA-4893
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Vahid Hashemian
>Priority: Minor
>
> As per the 
> [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], 
> topics can be only 249 characters long to line up with typical filesystem 
> limitations:
> {quote}
> Each sharded partition log is placed into its own folder under the Kafka log 
> directory. The name of such folders consists of the topic name, appended by a 
> dash (\-) and the partition id. Since a typical folder name can not be over 
> 255 characters long, there will be a limitation on the length of topic names. 
> We assume the number of partitions will not ever be above 100,000. Therefore, 
> topic names cannot be longer than 249 characters. This leaves just enough 
> room in the folder name for a dash and a potentially 5 digit long partition 
> id.
> {quote}
> {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during 
> validation.
> This limit ends up not being quite right since topic deletion ends up 
> renaming the directory to the form {{topic-partition.uniqueId-delete}} as can 
> be seen in {{LogManager.asyncDelete}}:
> {code}
> val dirName = new StringBuilder(removedLog.name)
>   .append(".")
>   
> .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
>   .append(Log.DeleteDirSuffix)
>   .toString()
> {code}
> So the unique id and "-delete" suffix end up hogging some of the characters. 
> Deleting a long-named topic results in a log message such as the following:
> {code}
> kafka.common.KafkaStorageException: Failed to rename log directory from 
> /tmp/kafka-logs0/0-0
>  to 
> /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete
>   at kafka.log.LogManager.asyncDelete(LogManager.scala:439)
>   at 
> kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>   at 

[jira] [Commented] (KAFKA-4893) async topic deletion conflicts with max topic length

2017-03-15 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927155#comment-15927155
 ] 

Onur Karaman commented on KAFKA-4893:
-

[~ijuma] I had thought about the additional fsync on the directory as well 
after posting the earlier comment.

Would the following structure help?
{code}
kafkaLogDir
| -- cleaner-offset-checkpoint
| -- delete
|| -- 8fc621a18fe746d1b8eb4a7fa55a04bc
| | -- .index
| | -- .log
| | -- .timeindex
| | -- t-0
| -- meta.properties
| -- recovery-point-offset-checkpoint
| -- replication-offset-checkpoint
{code}

The kafkaLogDir/delete directory would always be there. We'd put all the 
partitions staged for async deletion under kafkaLogDir/delete. Each directory 
corresponding to a partition staged for async deletion would have a new empty 
file with filename topic-partition so you can figure out later on what 
partition the data belonged to.

> async topic deletion conflicts with max topic length
> 
>
> Key: KAFKA-4893
> URL: https://issues.apache.org/jira/browse/KAFKA-4893
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Vahid Hashemian
>Priority: Minor
>
> As per the 
> [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], 
> topics can be only 249 characters long to line up with typical filesystem 
> limitations:
> {quote}
> Each sharded partition log is placed into its own folder under the Kafka log 
> directory. The name of such folders consists of the topic name, appended by a 
> dash (\-) and the partition id. Since a typical folder name can not be over 
> 255 characters long, there will be a limitation on the length of topic names. 
> We assume the number of partitions will not ever be above 100,000. Therefore, 
> topic names cannot be longer than 249 characters. This leaves just enough 
> room in the folder name for a dash and a potentially 5 digit long partition 
> id.
> {quote}
> {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during 
> validation.
> This limit ends up not being quite right since topic deletion ends up 
> renaming the directory to the form {{topic-partition.uniqueId-delete}} as can 
> be seen in {{LogManager.asyncDelete}}:
> {code}
> val dirName = new StringBuilder(removedLog.name)
>   .append(".")
>   
> .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
>   .append(Log.DeleteDirSuffix)
>   .toString()
> {code}
> So the unique id and "-delete" suffix end up hogging some of the characters. 
> Deleting a long-named topic results in a log message such as the following:
> {code}
> kafka.common.KafkaStorageException: Failed to rename log directory from 
> /tmp/kafka-logs0/0-0
>  to 
> /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete
>   at kafka.log.LogManager.asyncDelete(LogManager.scala:439)
>   at 
> kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
>   at kafka.cluster.Partition.delete(Partition.scala:137)
>   at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259)
>   at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The topic after this point still 

[jira] [Commented] (KAFKA-4905) StreamPartitionAssignor doesn't respect subscriptions to assign partitions.

2017-03-15 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927115#comment-15927115
 ] 

Matthias J. Sax commented on KAFKA-4905:


I see. If you use the same app-id, all instances must be exactly the same. 
Thus, either you let all your instances subscribe to all topics 
({{KStreamBuilder.stream(...)}} allows to specify multiple input topics at 
once), or you need to use different app-ids. May I ask, why you did start up 
the same app for two different topics the way you described it?

I think, we can close this, as "not a problem".

> StreamPartitionAssignor doesn't respect subscriptions to assign partitions.
> ---
>
> Key: KAFKA-4905
> URL: https://issues.apache.org/jira/browse/KAFKA-4905
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>
> Both RangeAssignor and RoundRobinAssignor use the subscriptions to assign 
> partition to each consumer. This allow to have two consumers belonging to the 
> the same group and subscribing to two differents topics.
> This doesn't seem to be the case of the StreamPartitionAssignor resulting to 
> an IllegalArgumentException thrown during rebalance. 
> java.lang.IllegalArgumentException: Assigned partition foo-2 for 
> non-subscribed topic regex pattern; subscription pattern is bar
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:190)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:216)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> This is because the consumer group leader attempt to assign partitions to a 
> consumer that didn't subscribe to the associated topic.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4885) processstreamwithcachedstatestore and other streams benchmarks fail occasionally

2017-03-15 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-4885:


Assignee: Guozhang Wang

> processstreamwithcachedstatestore and other streams benchmarks fail 
> occasionally
> -
>
> Key: KAFKA-4885
> URL: https://issues.apache.org/jira/browse/KAFKA-4885
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
>Assignee: Guozhang Wang
> Fix For: 0.11.0.0
>
>
> test_id:
> kafkatest.benchmarks.streams.streams_simple_benchmark_test.StreamsSimpleBenchmarkTest.test_simple_benchmark.test=processstreamwithcachedstatestore.scale=2
> status: FAIL
> run time:   14 minutes 58.069 seconds
> Streams Test process on ubuntu@worker5 took too long to exit
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py",
>  line 86, in test_simple_benchmark
> self.driver[num].wait()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/services/streams.py",
>  line 102, in wait
> self.wait_node(node, timeout_sec)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/services/streams.py",
>  line 106, in wait_node
> wait_until(lambda: not node.account.alive(pid), timeout_sec=timeout_sec, 
> err_msg="Streams Test process on " + str(node.account) + " took too long to 
> exit")
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py",
>  line 36, in wait_until
> raise TimeoutError(err_msg)
> TimeoutError: Streams Test process on ubuntu@worker5 took too long to exit
> The log contains several lines like:
> [2017-03-11 04:52:59,080] DEBUG Attempt to heartbeat failed for group 
> simple-benchmark-streams-with-storetrue since it is rebalancing. 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2017-03-11 04:53:01,987] DEBUG Sending Heartbeat request for group 
> simple-benchmark-streams-with-storetrue to coordinator worker10:9092 (id: 
> 2147483646 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2017-03-11 04:53:02,088] DEBUG Attempt to heartbeat failed for group 
> simple-benchmark-streams-with-storetrue since it is rebalancing. 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2017-03-11 04:53:04,995] DEBUG Sending Heartbeat request for group 
> simple-benchmark-streams-with-storetrue to coordinator worker10:9092 (id: 
> 2147483646 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> Other tests that fail the same way include:
> test_id:
> kafkatest.benchmarks.streams.streams_simple_benchmark_test.StreamsSimpleBenchmarkTest.test_simple_benchmark.test=count.scale=2
> test_id:
> kafkatest.benchmarks.streams.streams_simple_benchmark_test.StreamsSimpleBenchmarkTest.test_simple_benchmark.test=processstreamwithsink.scale=1
> test_id:
> kafkatest.tests.streams.streams_bounce_test.StreamsBounceTest.test_bounce



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4905) StreamPartitionAssignor doesn't respect subscriptions to assign partitions.

2017-03-15 Thread Florian Hussonnois (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927100#comment-15927100
 ] 

Florian Hussonnois commented on KAFKA-4905:
---

Hi [~mjsax], I'm sorry if my explanation was not really clear. 
On a project we have inadvertently started two stream-applications (which use 
KStreamBuilder) with the same group but consuming from two different topics. So 
we ended up with that exception and the behavior suprised us.

> StreamPartitionAssignor doesn't respect subscriptions to assign partitions.
> ---
>
> Key: KAFKA-4905
> URL: https://issues.apache.org/jira/browse/KAFKA-4905
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>
> Both RangeAssignor and RoundRobinAssignor use the subscriptions to assign 
> partition to each consumer. This allow to have two consumers belonging to the 
> the same group and subscribing to two differents topics.
> This doesn't seem to be the case of the StreamPartitionAssignor resulting to 
> an IllegalArgumentException thrown during rebalance. 
> java.lang.IllegalArgumentException: Assigned partition foo-2 for 
> non-subscribed topic regex pattern; subscription pattern is bar
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:190)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:216)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> This is because the consumer group leader attempt to assign partitions to a 
> consumer that didn't subscribe to the associated topic.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2565: MINOR: Use scale of 3 in streams benchmark by defa...

2017-03-15 Thread guozhangwang
Github user guozhangwang closed the pull request at:

https://github.com/apache/kafka/pull/2565


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP 130: Expose states of active tasks to KafkaStreams public API

2017-03-15 Thread Florian Hussonnois
Thanks Guozhang for pointing me to the KIP-120.

I've made some modifications to the KIP. I also proposed a new PR (there is
still some tests to make).
https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API

Exposing consumed offsets through JMX is sufficient for debugging purpose.
But I think this could be part to another JIRA as there is no impact to
public API.

Thanks

2017-03-10 22:35 GMT+01:00 Guozhang Wang :

> Hello Florian,
>
> As for programmatically discover monitoring data by piping metrics into a
> dedicated topic. I think you can actually use a KafkaMetricsReporter which
> pipes the polled metric values into a pre-defined topic (note that in Kafka
> the MetricsReporter is simply an interface and users can build their own
> impl in addition to the JMXReporter), for example :
>
> https://github.com/krux/kafka-metrics-reporter
>
> As for the "static task-level assignment", what I meant is that the mapping
> from source-topic-partitions -> tasks are static, via the
> "PartitionGrouper", and a task won't switch from an active task to a
> standby task, it is actually that an active task could be migrated, as a
> whole along with all its assigned partitions, to another thread / process
> and a new standby task will be created on the host that this active task is
> migrating from. So for the SAME task, its taskMetadata.
> assignedPartitions()
> will always return you the same partitions.
>
> As for the `toString` function that what we have today, I feel it has some
> correlations with KIP-120 so I'm trying to coordinate some discussions here
> (cc'ing Matthias as the owner of KIP-120). My understand is that:
>
> 1. In KIP-120, the `toString` function of `KafkaStreams` will be removed
> and instead the `Topology#describe` function will be introduced for users
> to debug the topology BEFORE start running their instance with the
> topology. And hence the description won't contain any task information as
> they are not formed yet.
> 2. In KIP-130, we want to add the task-level information for monitoring
> purposes, which is not static and can only be captured AFTER the instance
> has started running. Again I'm wondering for KIP-130 alone if adding those
> metrics mentioned in my previous email would suffice even for the use case
> that you have mentioned.
>
>
> Guozhang
>
> On Wed, Mar 8, 2017 at 3:18 PM, Florian Hussonnois 
> wrote:
>
> > Hi Guozhang
> >
> > Thank you for your feedback. I've started to look more deeply into the
> > code. As you mention, it would be more clever to use the current
> > StreamMetadata API to expose these information.
> >
> > I think exposing metrics through JMX is great for building monitoring
> > dashboards using some tools like jmxtrans and grafana.
> > But for our use case we would like to expose the states directely from
> the
> > application embedding the kstreams topologies.
> > So we expect to be able to retrieve states in a programmatic way.
> >
> > For instance, we could imagin to produce those states into a dedicated
> > topic. In that way a third application could automatically discover all
> > kafka-streams applications which could be monitored.
> > In production environment, that can be clearly a solution to have a
> > complete overview of a microservices architecture based on Kafka Streams.
> >
> > The toString() method give a lots of information it can only be used for
> > debugging purpose but not to build a topologies visualization tool. We
> > could actually expose same details about the stream topology from the
> > StreamMetadata API ? So the TaskMetadata class you have suggested could
> > contains similar information that ones return by the toString method from
> > AbstractTask class ?
> >
> > I can update the KIP in that way.
> >
> > Finally,  I'm not sure to understand your last point :* "Note that the
> > task-level assignment information is static, i.e. it will not change
> during
> > the runtime" *
> >
> > Does that mean when a rebalance occurs new tasks are created for the new
> > assignments and old ones just switch to a standby state ?
> >
> > Thanks,
> >
> > 2017-03-05 7:04 GMT+01:00 Guozhang Wang :
> >
> > > Hello Florian,
> > >
> > > Thanks for the KIP and your detailed explanation of your use case. I
> > think
> > > there are two dimensions to discuss on how to improve Streams'
> > > debuggability (or more specifically state exposure for visualization).
> > >
> > > First question is "what information should we expose to the user". From
> > > your KIP I saw generally three categories:
> > >
> > > 1. The state of the thread within a process, as you mentioned currently
> > we
> > > only expose the state of the process but not the finer grained
> per-thread
> > > state.
> > > 2. The state of the task. Currently the most close API to this is
> > > StreamsMetadata,
> > > however it aggregates the tasks across all threads 

[jira] [Commented] (KAFKA-4885) processstreamwithcachedstatestore and other streams benchmarks fail occasionally

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927080#comment-15927080
 ] 

ASF GitHub Bot commented on KAFKA-4885:
---

GitHub user guozhangwang opened a pull request:

https://github.com/apache/kafka/pull/2693

KAFKA-4885: Add client.close as exception handler in streams system tests



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guozhangwang/kafka 
K4885-system-test-unexpected-exception-handler

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2693.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2693


commit 756b26836d40a8836ed2e04f5860ac6e90e4c9e7
Author: Guozhang Wang 
Date:   2017-03-15T22:09:01Z

add close exception handler




> processstreamwithcachedstatestore and other streams benchmarks fail 
> occasionally
> -
>
> Key: KAFKA-4885
> URL: https://issues.apache.org/jira/browse/KAFKA-4885
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
> Fix For: 0.11.0.0
>
>
> test_id:
> kafkatest.benchmarks.streams.streams_simple_benchmark_test.StreamsSimpleBenchmarkTest.test_simple_benchmark.test=processstreamwithcachedstatestore.scale=2
> status: FAIL
> run time:   14 minutes 58.069 seconds
> Streams Test process on ubuntu@worker5 took too long to exit
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py",
>  line 86, in test_simple_benchmark
> self.driver[num].wait()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/services/streams.py",
>  line 102, in wait
> self.wait_node(node, timeout_sec)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/services/streams.py",
>  line 106, in wait_node
> wait_until(lambda: not node.account.alive(pid), timeout_sec=timeout_sec, 
> err_msg="Streams Test process on " + str(node.account) + " took too long to 
> exit")
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py",
>  line 36, in wait_until
> raise TimeoutError(err_msg)
> TimeoutError: Streams Test process on ubuntu@worker5 took too long to exit
> The log contains several lines like:
> [2017-03-11 04:52:59,080] DEBUG Attempt to heartbeat failed for group 
> simple-benchmark-streams-with-storetrue since it is rebalancing. 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2017-03-11 04:53:01,987] DEBUG Sending Heartbeat request for group 
> simple-benchmark-streams-with-storetrue to coordinator worker10:9092 (id: 
> 2147483646 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2017-03-11 04:53:02,088] DEBUG Attempt to heartbeat failed for group 
> simple-benchmark-streams-with-storetrue since it is rebalancing. 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2017-03-11 04:53:04,995] DEBUG Sending Heartbeat request for group 
> simple-benchmark-streams-with-storetrue to coordinator worker10:9092 (id: 
> 2147483646 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> Other tests that fail the same way include:
> test_id:
> kafkatest.benchmarks.streams.streams_simple_benchmark_test.StreamsSimpleBenchmarkTest.test_simple_benchmark.test=count.scale=2
> test_id:
> kafkatest.benchmarks.streams.streams_simple_benchmark_test.StreamsSimpleBenchmarkTest.test_simple_benchmark.test=processstreamwithsink.scale=1
> test_id:
> kafkatest.tests.streams.streams_bounce_test.StreamsBounceTest.test_bounce



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2693: KAFKA-4885: Add client.close as exception handler ...

2017-03-15 Thread guozhangwang
GitHub user guozhangwang opened a pull request:

https://github.com/apache/kafka/pull/2693

KAFKA-4885: Add client.close as exception handler in streams system tests



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guozhangwang/kafka 
K4885-system-test-unexpected-exception-handler

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2693.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2693


commit 756b26836d40a8836ed2e04f5860ac6e90e4c9e7
Author: Guozhang Wang 
Date:   2017-03-15T22:09:01Z

add close exception handler




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

2017-03-15 Thread Matthias J. Sax
Just a quick follow up:

Our overall proposal is, to implement KIP-129 as is as a “Stream EoS
1.0” version. The raised concerns are all valid, but hard to quantify at
the moment. Implementing KIP-129, that provides a clean design, allows
us to gain more insight in the performance implications. This enables
us, to make an educated decision, if the “producer per task” model
perform wells or not, and if a switch to a “producer per thread” model
is mandatory.

We also want to point out, that we can move incrementally from "producer
per task" to "producer per thread" design or apply some incremental
improvements to "producer per task" (as discussed in the doc). Thus,
there is not issue with regard to upgrading.


-Matthias



On 3/15/17 2:36 PM, Matthias J. Sax wrote:
> Hi,
> 
> I want to pick up this thread again. As there are some concerns about
> the "producer per task" design, we did write up an alternative "producer
> per thread" design and discuss pros/cons of both approaches:
> 
> https://docs.google.com/document/d/1CfOJaa6mdg5o7pLf_zXISV4oE0ZeMZwT_sG1QWgL4EE
> 
> 
> Looking forward to your feedback.
> 
> 
> -Matthias
> 
> 
> On 3/10/17 3:24 AM, Damian Guy wrote:
>> Hi Matthias,
>>
>> Thanks for the response. I agree with you regarding the use of
>> PartitionGrouper to reduce the number of tasks. It would be good to have an
>> idea of any additional load on the brokers as we increase the number of
>> tasks and therefore producers.
>>
>> Thanks,
>> Damian
>>
>> On Wed, 8 Mar 2017 at 01:45 Matthias J. Sax  wrote:
>>
>>> Damian, Jun,
>>>
>>> Thanks for your input.
>>>
>>>
>>> About Performance test:
>>>
>>> I can follow up with more performance tests using more partitions and
>>> also collecting broker metrics.
>>>
>>> However, I want to highlight again, that even if 1000+ partitions would
>>> be problematic, one can simply implement PartitionGrouper interface and
>>> reduce the number of tasks to 250 or 100... So I am not sure, if we
>>> should block this KIP, even if there might be some performance penalty
>>> for currently single partitioned tasks.
>>>
>>> About memory usage. JXM max-heap and max-off-heap did report 256MB and
>>> 133MB for all experiments (thus I did not put it in the spreadsheet).
>>> Thus, using 100 producers (each using a max of 32MB of memory) was not
>>> an issue with regard to memory consumption. I did not track "current
>>> head/off-heap" memory as this would require a more advance test setup to
>>> monitor it over time. If you think this would be required, we can do
>>> some tests though.
>>>
>>> However, as 256 MB was enough memory, and there are other components
>>> next to the producers using memory, I don't expect a severely increased
>>> memory usage. Producer allocate memory on-demand, and if load is shared
>>> over multiple producers, overall memory usage should stay the same as a
>>> single producer should allocate less memory.
>>>
>>>
>>> About Batching:
>>>
>>> As you can see from the benchmarks (in the detailed view -- I also added
>>> some graphs to the summary now) the average batch size gets slightly
>>> decrease with an increased number of partitions. However, there is no
>>> big difference between "producer per thread" and "producer per task"
>>> scenario.
>>>
>>>
>>> About acks:
>>>
>>> This is covered by KIP-98 already. If idempotent producer is use, it's
>>> required to set max.in.flight.requests.per.connection=1 and retries > 0
>>> -- otherwise a config exception will be thrown. For transactions, it's
>>> further required that acks=-1 to avoid a config exception.
>>>
>>> Other bits, like min.isr, replication.factor, etc. (ie, all broker/topic
>>> configs) are out of scope, and it's user responsibility to set those
>>> values correctly to ensure transactionality and idempotency.
>>>
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 3/7/17 9:32 AM, Jun Rao wrote:
 Hi, Guozhang,

 Thanks for the KIP. A couple of comments.

 1. About the impact on producer batching. My understanding is that
 typically different sub-topologies in the same task are publishing to
 different topics. Since the producer batching happens at the
 topic/partition level, using a producer per task may not impact batching
 much.

 2. When processing.guarantee is set to exactly_once, do we want to
>>> enforce
 acks to all in the producer? The default acks is 1 and may cause acked
>>> data
 to be lost later when the leader changes.

 Thanks,

 Jun

 On Tue, Mar 7, 2017 at 3:42 AM, Damian Guy  wrote:

> Hi Matthias,
>
> Thanks. The perf test is a good start but I don't think it goes far
>>> enough.
> 100 partitions is not a lot. What happens when there are thousands of
> partitions? What is the load on the brokers? How much more memory is
>>> used
> by the Streams App etc?
>
> Thanks,
> Damian
>
> On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax 

Re: [DISCUSS] KIP-126 - Allow KafkaProducer to batch based on uncompressed size

2017-03-15 Thread Becket Qin
I see, then we are thinking about the same thing :)

On Wed, Mar 15, 2017 at 2:26 PM, Ismael Juma  wrote:

> I meant finishing what's described in the following section and then
> starting a discussion followed by a vote:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 4+-+Command+line+and+centralized+administrative+operations#KIP-4-
> Commandlineandcentralizedadministrativeoperations-DescribeConfigsRequest
>
> We have only voted on KIP-4 Metadata, KIP-4 Create Topics, KIP-4 Delete
> Topics so far.
>
> Ismael
>
> On Wed, Mar 15, 2017 at 8:58 PM, Becket Qin  wrote:
>
> > Hi Ismael,
> >
> > KIP-4 is also the one that I was thinking about. We have introduced a
> > DescribeConfigRequest there so the producer can easily get the
> > configurations. By "another KIP" do you mean a new (or maybe extended)
> > protocol or using that protocol in clients?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Wed, Mar 15, 2017 at 1:21 PM, Ismael Juma  wrote:
> >
> > > Hi Becket,
> > >
> > > How were you thinking of retrieving the configuration items you
> > mentioned?
> > > I am asking because I was planning to post a KIP for Describe Configs
> > (one
> > > of the protocols in KIP-4), which would expose such information. But
> > maybe
> > > you are thinking of extending Metadata request?
> > >
> > > Ismael
> > >
> > > On Wed, Mar 15, 2017 at 7:33 PM, Becket Qin 
> > wrote:
> > >
> > > > Hi Jason,
> > > >
> > > > Good point. I was thinking about that, too. I was not sure if that is
> > the
> > > > right thing to do by default.
> > > >
> > > > If we assume people always set the batch size to max message size,
> > > > splitting the oversized batch makes a lot of sense. But it seems
> > possible
> > > > that users want to control the memory footprint so they would set the
> > > batch
> > > > size to smaller than the max message size so the producer can have
> hold
> > > > batches for more partitions. In this case, splitting the batch might
> > not
> > > be
> > > > the desired behavior.
> > > >
> > > > I think the most intuitive approach to this is allow the producer to
> > get
> > > > the max message size configuration (as well as some other
> > configurations
> > > > such as timestamp type)  from the broker side and use that to decide
> > > > whether a batch should be split or not. I probably should add this to
> > the
> > > > KIP wiki.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Wed, Mar 15, 2017 at 9:47 AM, Jason Gustafson  >
> > > > wrote:
> > > >
> > > > > Hey Becket,
> > > > >
> > > > > Thanks for the KIP! The approach seems reasonable. One
> clarification:
> > > is
> > > > > the intent to do the splitting after the broker rejects the request
> > > with
> > > > > MESSAGE_TOO_LARGE, or prior to sending if the configured batch size
> > is
> > > > > exceeded?
> > > > >
> > > > > -Jason
> > > > >
> > > > > On Mon, Mar 13, 2017 at 8:10 PM, Becket Qin 
> > > > wrote:
> > > > >
> > > > > > Bump up the thread for further comments. If there is no more
> > comments
> > > > on
> > > > > > the KIP I will start the voting thread on Wed.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jiangjie (Becket) Qin
> > > > > >
> > > > > > On Tue, Mar 7, 2017 at 9:48 AM, Becket Qin  >
> > > > wrote:
> > > > > >
> > > > > > > Hi Dong,
> > > > > > >
> > > > > > > Thanks for the comments.
> > > > > > >
> > > > > > > The patch is mostly for proof of concept in case there is any
> > > concern
> > > > > > > about the implementation which is indeed a little tricky.
> > > > > > >
> > > > > > > The new metric has already been mentioned in the Public
> Interface
> > > > > Change
> > > > > > > section.
> > > > > > >
> > > > > > > I added the reasoning about how the compression ratio
> > > > > > > improving/deteriorate steps are determined in the wiki.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jiangjie (Becket) Qin
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Mar 6, 2017 at 4:42 PM, Dong Lin 
> > > > wrote:
> > > > > > >
> > > > > > >> Hey Becket,
> > > > > > >>
> > > > > > >> I am wondering if we should first vote for the KIP before
> > > reviewing
> > > > > the
> > > > > > >> patch. I have two comments below:
> > > > > > >>
> > > > > > >> - Should we specify the new sensors as part of interface
> change
> > in
> > > > the
> > > > > > >> KIP?
> > > > > > >> - The KIP proposes to increase estimated compression ratio by
> > 0.05
> > > > for
> > > > > > >> each
> > > > > > >> underestimation and decrement the estimation by 0.005 for each
> > > > > > >> overestimation. Why are these two values chosen? I think there
> > is
> > > > some
> > > > > > >> tradeoff in selecting the value. Can the KIP be more explicit
> > > about
> > > > > the
> > > > > > >> tradeoff and explain how these two values would 

[jira] [Commented] (KAFKA-4907) compacted topic shouldn't reject messages with old timestamp

2017-03-15 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927054#comment-15927054
 ] 

Jun Rao commented on KAFKA-4907:


[~becket_qin], could you confirm if this is a real issue? If so, would you have 
time to fix this? Thanks.

> compacted topic shouldn't reject messages with old timestamp
> 
>
> Key: KAFKA-4907
> URL: https://issues.apache.org/jira/browse/KAFKA-4907
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Jun Rao
>
> In LogValidator.validateTimestamp(), we check the validity of the timestamp 
> in the message without checking whether the topic is compacted or not. This 
> can cause messages to a compacted topic to be rejected when it shouldn't.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4907) compacted topic shouldn't reject messages with old timestamp

2017-03-15 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-4907:
--

 Summary: compacted topic shouldn't reject messages with old 
timestamp
 Key: KAFKA-4907
 URL: https://issues.apache.org/jira/browse/KAFKA-4907
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.1.0
Reporter: Jun Rao


In LogValidator.validateTimestamp(), we check the validity of the timestamp in 
the message without checking whether the topic is compacted or not. This can 
cause messages to a compacted topic to be rejected when it shouldn't.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4893) async topic deletion conflicts with max topic length

2017-03-15 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927038#comment-15927038
 ] 

Ismael Juma commented on KAFKA-4893:


Adding directories introduces more complexity due to fsync semantics. See 
KAFKA-3968.

> async topic deletion conflicts with max topic length
> 
>
> Key: KAFKA-4893
> URL: https://issues.apache.org/jira/browse/KAFKA-4893
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Vahid Hashemian
>Priority: Minor
>
> As per the 
> [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], 
> topics can be only 249 characters long to line up with typical filesystem 
> limitations:
> {quote}
> Each sharded partition log is placed into its own folder under the Kafka log 
> directory. The name of such folders consists of the topic name, appended by a 
> dash (\-) and the partition id. Since a typical folder name can not be over 
> 255 characters long, there will be a limitation on the length of topic names. 
> We assume the number of partitions will not ever be above 100,000. Therefore, 
> topic names cannot be longer than 249 characters. This leaves just enough 
> room in the folder name for a dash and a potentially 5 digit long partition 
> id.
> {quote}
> {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during 
> validation.
> This limit ends up not being quite right since topic deletion ends up 
> renaming the directory to the form {{topic-partition.uniqueId-delete}} as can 
> be seen in {{LogManager.asyncDelete}}:
> {code}
> val dirName = new StringBuilder(removedLog.name)
>   .append(".")
>   
> .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
>   .append(Log.DeleteDirSuffix)
>   .toString()
> {code}
> So the unique id and "-delete" suffix end up hogging some of the characters. 
> Deleting a long-named topic results in a log message such as the following:
> {code}
> kafka.common.KafkaStorageException: Failed to rename log directory from 
> /tmp/kafka-logs0/0-0
>  to 
> /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete
>   at kafka.log.LogManager.asyncDelete(LogManager.scala:439)
>   at 
> kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
>   at kafka.cluster.Partition.delete(Partition.scala:137)
>   at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259)
>   at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The topic after this point still exists but has Leader set to -1 and the 
> controller recognizes the topic completion as incomplete (the topic znode is 
> still in /admin/delete_topics).
> I don't believe linkedin has any topic name this long but I'm making the 
> ticket in case anyone runs into this problem.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4893) async topic deletion conflicts with max topic length

2017-03-15 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927033#comment-15927033
 ] 

Vahid Hashemian commented on KAFKA-4893:


[~onurkaraman] What you suggested should work. Of course, an easier solution 
would be to decrease the max topic length (again) to 249 - 1 - 32 - 7 = 219.
{code}
.append(".") // len = 1
.append(java.util.UUID.randomUUID.toString.replaceAll("-","")) // len = 32
.append(Log.DeleteDirSuffix) // len = 7
{code}

> async topic deletion conflicts with max topic length
> 
>
> Key: KAFKA-4893
> URL: https://issues.apache.org/jira/browse/KAFKA-4893
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Vahid Hashemian
>Priority: Minor
>
> As per the 
> [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], 
> topics can be only 249 characters long to line up with typical filesystem 
> limitations:
> {quote}
> Each sharded partition log is placed into its own folder under the Kafka log 
> directory. The name of such folders consists of the topic name, appended by a 
> dash (\-) and the partition id. Since a typical folder name can not be over 
> 255 characters long, there will be a limitation on the length of topic names. 
> We assume the number of partitions will not ever be above 100,000. Therefore, 
> topic names cannot be longer than 249 characters. This leaves just enough 
> room in the folder name for a dash and a potentially 5 digit long partition 
> id.
> {quote}
> {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during 
> validation.
> This limit ends up not being quite right since topic deletion ends up 
> renaming the directory to the form {{topic-partition.uniqueId-delete}} as can 
> be seen in {{LogManager.asyncDelete}}:
> {code}
> val dirName = new StringBuilder(removedLog.name)
>   .append(".")
>   
> .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
>   .append(Log.DeleteDirSuffix)
>   .toString()
> {code}
> So the unique id and "-delete" suffix end up hogging some of the characters. 
> Deleting a long-named topic results in a log message such as the following:
> {code}
> kafka.common.KafkaStorageException: Failed to rename log directory from 
> /tmp/kafka-logs0/0-0
>  to 
> /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete
>   at kafka.log.LogManager.asyncDelete(LogManager.scala:439)
>   at 
> kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
>   at kafka.cluster.Partition.delete(Partition.scala:137)
>   at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259)
>   at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The topic after this point still exists but has Leader set to -1 and the 
> controller recognizes the topic completion as incomplete (the topic znode is 
> still in /admin/delete_topics).
> I don't believe linkedin has any topic name this long but I'm making the 
> ticket in case anyone runs into this problem.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

2017-03-15 Thread Matthias J. Sax
Hi,

I want to pick up this thread again. As there are some concerns about
the "producer per task" design, we did write up an alternative "producer
per thread" design and discuss pros/cons of both approaches:

https://docs.google.com/document/d/1CfOJaa6mdg5o7pLf_zXISV4oE0ZeMZwT_sG1QWgL4EE


Looking forward to your feedback.


-Matthias


On 3/10/17 3:24 AM, Damian Guy wrote:
> Hi Matthias,
> 
> Thanks for the response. I agree with you regarding the use of
> PartitionGrouper to reduce the number of tasks. It would be good to have an
> idea of any additional load on the brokers as we increase the number of
> tasks and therefore producers.
> 
> Thanks,
> Damian
> 
> On Wed, 8 Mar 2017 at 01:45 Matthias J. Sax  wrote:
> 
>> Damian, Jun,
>>
>> Thanks for your input.
>>
>>
>> About Performance test:
>>
>> I can follow up with more performance tests using more partitions and
>> also collecting broker metrics.
>>
>> However, I want to highlight again, that even if 1000+ partitions would
>> be problematic, one can simply implement PartitionGrouper interface and
>> reduce the number of tasks to 250 or 100... So I am not sure, if we
>> should block this KIP, even if there might be some performance penalty
>> for currently single partitioned tasks.
>>
>> About memory usage. JXM max-heap and max-off-heap did report 256MB and
>> 133MB for all experiments (thus I did not put it in the spreadsheet).
>> Thus, using 100 producers (each using a max of 32MB of memory) was not
>> an issue with regard to memory consumption. I did not track "current
>> head/off-heap" memory as this would require a more advance test setup to
>> monitor it over time. If you think this would be required, we can do
>> some tests though.
>>
>> However, as 256 MB was enough memory, and there are other components
>> next to the producers using memory, I don't expect a severely increased
>> memory usage. Producer allocate memory on-demand, and if load is shared
>> over multiple producers, overall memory usage should stay the same as a
>> single producer should allocate less memory.
>>
>>
>> About Batching:
>>
>> As you can see from the benchmarks (in the detailed view -- I also added
>> some graphs to the summary now) the average batch size gets slightly
>> decrease with an increased number of partitions. However, there is no
>> big difference between "producer per thread" and "producer per task"
>> scenario.
>>
>>
>> About acks:
>>
>> This is covered by KIP-98 already. If idempotent producer is use, it's
>> required to set max.in.flight.requests.per.connection=1 and retries > 0
>> -- otherwise a config exception will be thrown. For transactions, it's
>> further required that acks=-1 to avoid a config exception.
>>
>> Other bits, like min.isr, replication.factor, etc. (ie, all broker/topic
>> configs) are out of scope, and it's user responsibility to set those
>> values correctly to ensure transactionality and idempotency.
>>
>>
>>
>> -Matthias
>>
>>
>> On 3/7/17 9:32 AM, Jun Rao wrote:
>>> Hi, Guozhang,
>>>
>>> Thanks for the KIP. A couple of comments.
>>>
>>> 1. About the impact on producer batching. My understanding is that
>>> typically different sub-topologies in the same task are publishing to
>>> different topics. Since the producer batching happens at the
>>> topic/partition level, using a producer per task may not impact batching
>>> much.
>>>
>>> 2. When processing.guarantee is set to exactly_once, do we want to
>> enforce
>>> acks to all in the producer? The default acks is 1 and may cause acked
>> data
>>> to be lost later when the leader changes.
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>> On Tue, Mar 7, 2017 at 3:42 AM, Damian Guy  wrote:
>>>
 Hi Matthias,

 Thanks. The perf test is a good start but I don't think it goes far
>> enough.
 100 partitions is not a lot. What happens when there are thousands of
 partitions? What is the load on the brokers? How much more memory is
>> used
 by the Streams App etc?

 Thanks,
 Damian

 On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax 
>> wrote:

> Hi,
>
> I want to give a first respond:
>
>
>
> 1. Producer per task:
>
> First, we did some performance tests, indicating that the performance
> penalty is small. Please have a look here:
>
> https://docs.google.com/spreadsheets/d/18aGOB13-
 ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing
>
> For the test, we ran with a trunk version and a modified version that
> uses a producer per task (of course, no transactions, but at-least-once
> semantics). The scaling factor indicates the number of brokers and
> (single threaded) Streams instances. We used SimpleBenchmark that is
> part of AK code base.
>
>
> Second, as the design is "producer per task" (and not "producer per
> partition") it is possible to specify a custom PartitionGrouper that
> assigns 

[jira] [Commented] (KAFKA-4906) Support 0.9 brokers with a newer Producer or Consumer version

2017-03-15 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927023#comment-15927023
 ] 

Ismael Juma commented on KAFKA-4906:


You also have to handle the fact that 0.9 brokers only support an older message 
format version, which is something that the producer can't handle. If we do 
this, and I am not sure if it's worth it, it will have to be in 0.11.0.0. 
Changing the producer to support an older message format version is too big a 
change for a bug fix release (I am mentioning this because the current fix 
version is 0.10.2.1).

> Support 0.9 brokers with a newer Producer or Consumer version
> -
>
> Key: KAFKA-4906
> URL: https://issues.apache.org/jira/browse/KAFKA-4906
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.10.2.0
>Reporter: Grant Henke
>Assignee: Grant Henke
> Fix For: 0.10.2.1
>
>
> KAFKA-4507 added the ability for newer Kafka clients to talk to older Kafka 
> brokers if a new feature supported by a newer wire protocol was not 
> used/required. 
> We currently support brokers as old as 0.10.0.0 because thats when the 
> ApiVersionsRequest/Response was added to the broker (KAFKA-3307).
> However, there are relatively few changes between 0.9.0.0 and 0.10.0.0 on the 
> wire, making it possible to support another major broker version set by 
> assuming that any disconnect resulting from an ApiVersionsRequest is from a 
> 0.9 broker and defaulting to legacy protocol versions. 
> Supporting 0.9 with newer clients can drastically simplify upgrades, allow 
> for libraries and frameworks to easily support a wider set of environments, 
> and let developers take advantage of client side improvements without 
> requiring cluster upgrades first. 
> Below is a list of the wire protocol versions by release for reference: 
> {noformat}
> 0.10.x
>   Produce(0): 0 to 2
>   Fetch(1): 0 to 2 
>   Offsets(2): 0
>   Metadata(3): 0 to 1
>   OffsetCommit(8): 0 to 2
>   OffsetFetch(9): 0 to 1
>   GroupCoordinator(10): 0
>   JoinGroup(11): 0
>   Heartbeat(12): 0
>   LeaveGroup(13): 0
>   SyncGroup(14): 0
>   DescribeGroups(15): 0
>   ListGroups(16): 0
>   SaslHandshake(17): 0
>   ApiVersions(18): 0
> 0.9.x:
>   Produce(0): 0 to 1 (no response timestamp from v2)
>   Fetch(1): 0 to 1 (no response timestamp from v2)
>   Offsets(2): 0
>   Metadata(3): 0 (no cluster id or rack info from v1)
>   OffsetCommit(8): 0 to 2
>   OffsetFetch(9): 0 to 1
>   GroupCoordinator(10): 0
>   JoinGroup(11): 0
>   Heartbeat(12): 0
>   LeaveGroup(13): 0
>   SyncGroup(14): 0
>   DescribeGroups(15): 0
>   ListGroups(16): 0
>   SaslHandshake(17): UNSUPPORTED
>   ApiVersions(18): UNSUPPORTED
> 0.8.2.x:
>   Produce(0): 0 (no quotas from v1)
>   Fetch(1): 0 (no quotas from v1)
>   Offsets(2): 0
>   Metadata(3): 0
>   OffsetCommit(8): 0 to 1 (no global retention time from v2)
>   OffsetFetch(9): 0 to 1
>   GroupCoordinator(10): 0
>   JoinGroup(11): UNSUPPORTED
>   Heartbeat(12): UNSUPPORTED
>   LeaveGroup(13): UNSUPPORTED
>   SyncGroup(14): UNSUPPORTED
>   DescribeGroups(15): UNSUPPORTED
>   ListGroups(16): UNSUPPORTED
>   SaslHandshake(17): UNSUPPORTED
>   ApiVersions(18): UNSUPPORTED
> {noformat}
> Note: Due to KAFKA-3088 it may take up to request.timeout.time to fail an 
> ApiVersionsRequest and failover to legacy protocol versions unless we handle 
> that scenario specifically in this patch. The workaround would be to reduce 
> request.timeout.time if needed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4906) Support 0.9 brokers with a newer Producer or Consumer version

2017-03-15 Thread Grant Henke (JIRA)
Grant Henke created KAFKA-4906:
--

 Summary: Support 0.9 brokers with a newer Producer or Consumer 
version
 Key: KAFKA-4906
 URL: https://issues.apache.org/jira/browse/KAFKA-4906
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 0.10.2.0
Reporter: Grant Henke
Assignee: Grant Henke
 Fix For: 0.10.2.1


KAFKA-4507 added the ability for newer Kafka clients to talk to older Kafka 
brokers if a new feature supported by a newer wire protocol was not 
used/required. 

We currently support brokers as old as 0.10.0.0 because thats when the 
ApiVersionsRequest/Response was added to the broker (KAFKA-3307).

However, there are relatively few changes between 0.9.0.0 and 0.10.0.0 on the 
wire, making it possible to support another major broker version set by 
assuming that any disconnect resulting from an ApiVersionsRequest is from a 0.9 
broker and defaulting to legacy protocol versions. 

Supporting 0.9 with newer clients can drastically simplify upgrades, allow for 
libraries and frameworks to easily support a wider set of environments, and let 
developers take advantage of client side improvements without requiring cluster 
upgrades first. 

Below is a list of the wire protocol versions by release for reference: 
{noformat}
0.10.x
Produce(0): 0 to 2
Fetch(1): 0 to 2 
Offsets(2): 0
Metadata(3): 0 to 1
OffsetCommit(8): 0 to 2
OffsetFetch(9): 0 to 1
GroupCoordinator(10): 0
JoinGroup(11): 0
Heartbeat(12): 0
LeaveGroup(13): 0
SyncGroup(14): 0
DescribeGroups(15): 0
ListGroups(16): 0
SaslHandshake(17): 0
ApiVersions(18): 0

0.9.x:
Produce(0): 0 to 1 (no response timestamp from v2)
Fetch(1): 0 to 1 (no response timestamp from v2)
Offsets(2): 0
Metadata(3): 0 (no cluster id or rack info from v1)
OffsetCommit(8): 0 to 2
OffsetFetch(9): 0 to 1
GroupCoordinator(10): 0
JoinGroup(11): 0
Heartbeat(12): 0
LeaveGroup(13): 0
SyncGroup(14): 0
DescribeGroups(15): 0
ListGroups(16): 0
SaslHandshake(17): UNSUPPORTED
ApiVersions(18): UNSUPPORTED

0.8.2.x:
Produce(0): 0 (no quotas from v1)
Fetch(1): 0 (no quotas from v1)
Offsets(2): 0
Metadata(3): 0
OffsetCommit(8): 0 to 1 (no global retention time from v2)
OffsetFetch(9): 0 to 1
GroupCoordinator(10): 0
JoinGroup(11): UNSUPPORTED
Heartbeat(12): UNSUPPORTED
LeaveGroup(13): UNSUPPORTED
SyncGroup(14): UNSUPPORTED
DescribeGroups(15): UNSUPPORTED
ListGroups(16): UNSUPPORTED
SaslHandshake(17): UNSUPPORTED
ApiVersions(18): UNSUPPORTED
{noformat}

Note: Due to KAFKA-3088 it may take up to request.timeout.time to fail an 
ApiVersionsRequest and failover to legacy protocol versions unless we handle 
that scenario specifically in this patch. The workaround would be to reduce 
request.timeout.time if needed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-126 - Allow KafkaProducer to batch based on uncompressed size

2017-03-15 Thread Ismael Juma
I meant finishing what's described in the following section and then
starting a discussion followed by a vote:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-DescribeConfigsRequest

We have only voted on KIP-4 Metadata, KIP-4 Create Topics, KIP-4 Delete
Topics so far.

Ismael

On Wed, Mar 15, 2017 at 8:58 PM, Becket Qin  wrote:

> Hi Ismael,
>
> KIP-4 is also the one that I was thinking about. We have introduced a
> DescribeConfigRequest there so the producer can easily get the
> configurations. By "another KIP" do you mean a new (or maybe extended)
> protocol or using that protocol in clients?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Wed, Mar 15, 2017 at 1:21 PM, Ismael Juma  wrote:
>
> > Hi Becket,
> >
> > How were you thinking of retrieving the configuration items you
> mentioned?
> > I am asking because I was planning to post a KIP for Describe Configs
> (one
> > of the protocols in KIP-4), which would expose such information. But
> maybe
> > you are thinking of extending Metadata request?
> >
> > Ismael
> >
> > On Wed, Mar 15, 2017 at 7:33 PM, Becket Qin 
> wrote:
> >
> > > Hi Jason,
> > >
> > > Good point. I was thinking about that, too. I was not sure if that is
> the
> > > right thing to do by default.
> > >
> > > If we assume people always set the batch size to max message size,
> > > splitting the oversized batch makes a lot of sense. But it seems
> possible
> > > that users want to control the memory footprint so they would set the
> > batch
> > > size to smaller than the max message size so the producer can have hold
> > > batches for more partitions. In this case, splitting the batch might
> not
> > be
> > > the desired behavior.
> > >
> > > I think the most intuitive approach to this is allow the producer to
> get
> > > the max message size configuration (as well as some other
> configurations
> > > such as timestamp type)  from the broker side and use that to decide
> > > whether a batch should be split or not. I probably should add this to
> the
> > > KIP wiki.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Wed, Mar 15, 2017 at 9:47 AM, Jason Gustafson 
> > > wrote:
> > >
> > > > Hey Becket,
> > > >
> > > > Thanks for the KIP! The approach seems reasonable. One clarification:
> > is
> > > > the intent to do the splitting after the broker rejects the request
> > with
> > > > MESSAGE_TOO_LARGE, or prior to sending if the configured batch size
> is
> > > > exceeded?
> > > >
> > > > -Jason
> > > >
> > > > On Mon, Mar 13, 2017 at 8:10 PM, Becket Qin 
> > > wrote:
> > > >
> > > > > Bump up the thread for further comments. If there is no more
> comments
> > > on
> > > > > the KIP I will start the voting thread on Wed.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > > On Tue, Mar 7, 2017 at 9:48 AM, Becket Qin 
> > > wrote:
> > > > >
> > > > > > Hi Dong,
> > > > > >
> > > > > > Thanks for the comments.
> > > > > >
> > > > > > The patch is mostly for proof of concept in case there is any
> > concern
> > > > > > about the implementation which is indeed a little tricky.
> > > > > >
> > > > > > The new metric has already been mentioned in the Public Interface
> > > > Change
> > > > > > section.
> > > > > >
> > > > > > I added the reasoning about how the compression ratio
> > > > > > improving/deteriorate steps are determined in the wiki.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jiangjie (Becket) Qin
> > > > > >
> > > > > >
> > > > > > On Mon, Mar 6, 2017 at 4:42 PM, Dong Lin 
> > > wrote:
> > > > > >
> > > > > >> Hey Becket,
> > > > > >>
> > > > > >> I am wondering if we should first vote for the KIP before
> > reviewing
> > > > the
> > > > > >> patch. I have two comments below:
> > > > > >>
> > > > > >> - Should we specify the new sensors as part of interface change
> in
> > > the
> > > > > >> KIP?
> > > > > >> - The KIP proposes to increase estimated compression ratio by
> 0.05
> > > for
> > > > > >> each
> > > > > >> underestimation and decrement the estimation by 0.005 for each
> > > > > >> overestimation. Why are these two values chosen? I think there
> is
> > > some
> > > > > >> tradeoff in selecting the value. Can the KIP be more explicit
> > about
> > > > the
> > > > > >> tradeoff and explain how these two values would impact
> producer's
> > > > > >> performance?
> > > > > >>
> > > > > >> Thanks,
> > > > > >> Dong
> > > > > >>
> > > > > >>
> > > > > >> On Sat, Mar 4, 2017 at 11:42 AM, Becket Qin <
> becket@gmail.com
> > >
> > > > > wrote:
> > > > > >>
> > > > > >> > I have updated the KIP based on the latest discussion. Please
> > > check
> > > > > and
> > > > > >> let
> > > > > >> > me know if there is any further concern.
> > > > > >> >
> > 

[jira] [Commented] (KAFKA-4905) StreamPartitionAssignor doesn't respect subscriptions to assign partitions.

2017-03-15 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927000#comment-15927000
 ] 

Matthias J. Sax commented on KAFKA-4905:


I am not sure if I can follow: What I don't understand is, how you could end up 
with two (Streams) consumers that subscribe to different topics? You specify 
your Topology via {{TopologyBuilder}} or {{KStreamBuilder}} and this ensures, 
that all used consumers of the Streams app subscribe to the same topics. Or do 
you try to use {{StreamPartitionAssigner}} "outside" of Streams (ie, for 
consumers you manually instantiate)? For this case, it's a "user error" as 
{{StreamPartitionAssigner}} was never designed to be use "outside" of Streams 
(note, it belongs to package {{internals}} and thus is not part of public API).

> StreamPartitionAssignor doesn't respect subscriptions to assign partitions.
> ---
>
> Key: KAFKA-4905
> URL: https://issues.apache.org/jira/browse/KAFKA-4905
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>
> Both RangeAssignor and RoundRobinAssignor use the subscriptions to assign 
> partition to each consumer. This allow to have two consumers belonging to the 
> the same group and subscribing to two differents topics.
> This doesn't seem to be the case of the StreamPartitionAssignor resulting to 
> an IllegalArgumentException thrown during rebalance. 
> java.lang.IllegalArgumentException: Assigned partition foo-2 for 
> non-subscribed topic regex pattern; subscription pattern is bar
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:190)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:216)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> This is because the consumer group leader attempt to assign partitions to a 
> consumer that didn't subscribe to the associated topic.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4905) StreamPartitionAssignor doesn't respect subscriptions to assign partitions.

2017-03-15 Thread Florian Hussonnois (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Florian Hussonnois updated KAFKA-4905:
--
Summary: StreamPartitionAssignor doesn't respect subscriptions to assign 
partitions.  (was: StreamPartitionAssignor doesn't respect subscriptions to 
asisgn partitions.)

> StreamPartitionAssignor doesn't respect subscriptions to assign partitions.
> ---
>
> Key: KAFKA-4905
> URL: https://issues.apache.org/jira/browse/KAFKA-4905
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>
> Both RangeAssignor and RoundRobinAssignor use the subscriptions to assign 
> partition to each consumer. This allow to have two consumers belonging to the 
> the same group and subscribing to two differents topics.
> This doesn't seem to be the case of the StreamPartitionAssignor resulting to 
> an IllegalArgumentException thrown during rebalance. 
> java.lang.IllegalArgumentException: Assigned partition foo-2 for 
> non-subscribed topic regex pattern; subscription pattern is bar
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:190)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:216)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> This is because the consumer group leader attempt to assign partitions to a 
> consumer that didn't subscribe to the associated topic.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4905) StreamPartitionAssignor doesn't respect subscriptions to asisgn partitions.

2017-03-15 Thread Florian Hussonnois (JIRA)
Florian Hussonnois created KAFKA-4905:
-

 Summary: StreamPartitionAssignor doesn't respect subscriptions to 
asisgn partitions.
 Key: KAFKA-4905
 URL: https://issues.apache.org/jira/browse/KAFKA-4905
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Florian Hussonnois


Both RangeAssignor and RoundRobinAssignor use the subscriptions to assign 
partition to each consumer. This allow to have two consumers belonging to the 
the same group and subscribing to two differents topics.

This doesn't seem to be the case of the StreamPartitionAssignor resulting to an 
IllegalArgumentException thrown during rebalance. 

java.lang.IllegalArgumentException: Assigned partition foo-2 for non-subscribed 
topic regex pattern; subscription pattern is bar
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:190)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:216)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)

This is because the consumer group leader attempt to assign partitions to a 
consumer that didn't subscribe to the associated topic.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-126 - Allow KafkaProducer to batch based on uncompressed size

2017-03-15 Thread Becket Qin
Hi Ismael,

KIP-4 is also the one that I was thinking about. We have introduced a
DescribeConfigRequest there so the producer can easily get the
configurations. By "another KIP" do you mean a new (or maybe extended)
protocol or using that protocol in clients?

Thanks,

Jiangjie (Becket) Qin

On Wed, Mar 15, 2017 at 1:21 PM, Ismael Juma  wrote:

> Hi Becket,
>
> How were you thinking of retrieving the configuration items you mentioned?
> I am asking because I was planning to post a KIP for Describe Configs (one
> of the protocols in KIP-4), which would expose such information. But maybe
> you are thinking of extending Metadata request?
>
> Ismael
>
> On Wed, Mar 15, 2017 at 7:33 PM, Becket Qin  wrote:
>
> > Hi Jason,
> >
> > Good point. I was thinking about that, too. I was not sure if that is the
> > right thing to do by default.
> >
> > If we assume people always set the batch size to max message size,
> > splitting the oversized batch makes a lot of sense. But it seems possible
> > that users want to control the memory footprint so they would set the
> batch
> > size to smaller than the max message size so the producer can have hold
> > batches for more partitions. In this case, splitting the batch might not
> be
> > the desired behavior.
> >
> > I think the most intuitive approach to this is allow the producer to get
> > the max message size configuration (as well as some other configurations
> > such as timestamp type)  from the broker side and use that to decide
> > whether a batch should be split or not. I probably should add this to the
> > KIP wiki.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Wed, Mar 15, 2017 at 9:47 AM, Jason Gustafson 
> > wrote:
> >
> > > Hey Becket,
> > >
> > > Thanks for the KIP! The approach seems reasonable. One clarification:
> is
> > > the intent to do the splitting after the broker rejects the request
> with
> > > MESSAGE_TOO_LARGE, or prior to sending if the configured batch size is
> > > exceeded?
> > >
> > > -Jason
> > >
> > > On Mon, Mar 13, 2017 at 8:10 PM, Becket Qin 
> > wrote:
> > >
> > > > Bump up the thread for further comments. If there is no more comments
> > on
> > > > the KIP I will start the voting thread on Wed.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Tue, Mar 7, 2017 at 9:48 AM, Becket Qin 
> > wrote:
> > > >
> > > > > Hi Dong,
> > > > >
> > > > > Thanks for the comments.
> > > > >
> > > > > The patch is mostly for proof of concept in case there is any
> concern
> > > > > about the implementation which is indeed a little tricky.
> > > > >
> > > > > The new metric has already been mentioned in the Public Interface
> > > Change
> > > > > section.
> > > > >
> > > > > I added the reasoning about how the compression ratio
> > > > > improving/deteriorate steps are determined in the wiki.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > >
> > > > > On Mon, Mar 6, 2017 at 4:42 PM, Dong Lin 
> > wrote:
> > > > >
> > > > >> Hey Becket,
> > > > >>
> > > > >> I am wondering if we should first vote for the KIP before
> reviewing
> > > the
> > > > >> patch. I have two comments below:
> > > > >>
> > > > >> - Should we specify the new sensors as part of interface change in
> > the
> > > > >> KIP?
> > > > >> - The KIP proposes to increase estimated compression ratio by 0.05
> > for
> > > > >> each
> > > > >> underestimation and decrement the estimation by 0.005 for each
> > > > >> overestimation. Why are these two values chosen? I think there is
> > some
> > > > >> tradeoff in selecting the value. Can the KIP be more explicit
> about
> > > the
> > > > >> tradeoff and explain how these two values would impact producer's
> > > > >> performance?
> > > > >>
> > > > >> Thanks,
> > > > >> Dong
> > > > >>
> > > > >>
> > > > >> On Sat, Mar 4, 2017 at 11:42 AM, Becket Qin  >
> > > > wrote:
> > > > >>
> > > > >> > I have updated the KIP based on the latest discussion. Please
> > check
> > > > and
> > > > >> let
> > > > >> > me know if there is any further concern.
> > > > >> >
> > > > >> > Thanks,
> > > > >> >
> > > > >> > Jiangjie (Becket) Qin
> > > > >> >
> > > > >> > On Sat, Mar 4, 2017 at 10:56 AM, Becket Qin <
> becket@gmail.com
> > >
> > > > >> wrote:
> > > > >> >
> > > > >> > > Actually second thought on this, rate might be better for two
> > > > reasons:
> > > > >> > > 1. Most of the metrics in the producer we already have are
> using
> > > > rate
> > > > >> > > instead of count.
> > > > >> > > 2. If a service is bounced, the count will be reset to 0, but
> it
> > > > does
> > > > >> not
> > > > >> > > affect rate.
> > > > >> > >
> > > > >> > > I'll make the change.
> > > > >> > >
> > > > >> > > Thanks,
> > > > >> > >
> > > > >> > > Jiangjie (Becket) Qin
> > > > >> > >
> > > > >> > > On Sat, Mar 4, 2017 at 10:27 AM, Becket Qin <
> 

Re: [DISCUSS] KIP-126 - Allow KafkaProducer to batch based on uncompressed size

2017-03-15 Thread Ismael Juma
Hi Becket,

How were you thinking of retrieving the configuration items you mentioned?
I am asking because I was planning to post a KIP for Describe Configs (one
of the protocols in KIP-4), which would expose such information. But maybe
you are thinking of extending Metadata request?

Ismael

On Wed, Mar 15, 2017 at 7:33 PM, Becket Qin  wrote:

> Hi Jason,
>
> Good point. I was thinking about that, too. I was not sure if that is the
> right thing to do by default.
>
> If we assume people always set the batch size to max message size,
> splitting the oversized batch makes a lot of sense. But it seems possible
> that users want to control the memory footprint so they would set the batch
> size to smaller than the max message size so the producer can have hold
> batches for more partitions. In this case, splitting the batch might not be
> the desired behavior.
>
> I think the most intuitive approach to this is allow the producer to get
> the max message size configuration (as well as some other configurations
> such as timestamp type)  from the broker side and use that to decide
> whether a batch should be split or not. I probably should add this to the
> KIP wiki.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Wed, Mar 15, 2017 at 9:47 AM, Jason Gustafson 
> wrote:
>
> > Hey Becket,
> >
> > Thanks for the KIP! The approach seems reasonable. One clarification: is
> > the intent to do the splitting after the broker rejects the request with
> > MESSAGE_TOO_LARGE, or prior to sending if the configured batch size is
> > exceeded?
> >
> > -Jason
> >
> > On Mon, Mar 13, 2017 at 8:10 PM, Becket Qin 
> wrote:
> >
> > > Bump up the thread for further comments. If there is no more comments
> on
> > > the KIP I will start the voting thread on Wed.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Tue, Mar 7, 2017 at 9:48 AM, Becket Qin 
> wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Thanks for the comments.
> > > >
> > > > The patch is mostly for proof of concept in case there is any concern
> > > > about the implementation which is indeed a little tricky.
> > > >
> > > > The new metric has already been mentioned in the Public Interface
> > Change
> > > > section.
> > > >
> > > > I added the reasoning about how the compression ratio
> > > > improving/deteriorate steps are determined in the wiki.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > >
> > > > On Mon, Mar 6, 2017 at 4:42 PM, Dong Lin 
> wrote:
> > > >
> > > >> Hey Becket,
> > > >>
> > > >> I am wondering if we should first vote for the KIP before reviewing
> > the
> > > >> patch. I have two comments below:
> > > >>
> > > >> - Should we specify the new sensors as part of interface change in
> the
> > > >> KIP?
> > > >> - The KIP proposes to increase estimated compression ratio by 0.05
> for
> > > >> each
> > > >> underestimation and decrement the estimation by 0.005 for each
> > > >> overestimation. Why are these two values chosen? I think there is
> some
> > > >> tradeoff in selecting the value. Can the KIP be more explicit about
> > the
> > > >> tradeoff and explain how these two values would impact producer's
> > > >> performance?
> > > >>
> > > >> Thanks,
> > > >> Dong
> > > >>
> > > >>
> > > >> On Sat, Mar 4, 2017 at 11:42 AM, Becket Qin 
> > > wrote:
> > > >>
> > > >> > I have updated the KIP based on the latest discussion. Please
> check
> > > and
> > > >> let
> > > >> > me know if there is any further concern.
> > > >> >
> > > >> > Thanks,
> > > >> >
> > > >> > Jiangjie (Becket) Qin
> > > >> >
> > > >> > On Sat, Mar 4, 2017 at 10:56 AM, Becket Qin  >
> > > >> wrote:
> > > >> >
> > > >> > > Actually second thought on this, rate might be better for two
> > > reasons:
> > > >> > > 1. Most of the metrics in the producer we already have are using
> > > rate
> > > >> > > instead of count.
> > > >> > > 2. If a service is bounced, the count will be reset to 0, but it
> > > does
> > > >> not
> > > >> > > affect rate.
> > > >> > >
> > > >> > > I'll make the change.
> > > >> > >
> > > >> > > Thanks,
> > > >> > >
> > > >> > > Jiangjie (Becket) Qin
> > > >> > >
> > > >> > > On Sat, Mar 4, 2017 at 10:27 AM, Becket Qin <
> becket@gmail.com
> > >
> > > >> > wrote:
> > > >> > >
> > > >> > >> Hi Dong,
> > > >> > >>
> > > >> > >> Yes, there is a sensor in the patch about the split occurrence.
> > > >> > >>
> > > >> > >> Currently it is a count instead of rate. In practice, it seems
> > > count
> > > >> is
> > > >> > >> easier to use in this case. But I am open to change.
> > > >> > >>
> > > >> > >> Thanks,
> > > >> > >>
> > > >> > >> Jiangjie (Becket) Qin
> > > >> > >>
> > > >> > >> On Fri, Mar 3, 2017 at 7:43 PM, Dong Lin 
> > > >> wrote:
> > > >> > >>
> > > >> > >>> Hey Becket,
> > > >> > >>>
> > > >> > >>> I haven't looked at the patch yet. But since 

[jira] [Created] (KAFKA-4904) Performance of RocksDb with state record cache

2017-03-15 Thread Eno Thereska (JIRA)
Eno Thereska created KAFKA-4904:
---

 Summary: Performance of RocksDb with state record cache
 Key: KAFKA-4904
 URL: https://issues.apache.org/jira/browse/KAFKA-4904
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Eno Thereska
 Fix For: 0.11.0.0


The performance of RocksDb without any record cache is slightly better than 
with the default of 10MB of the default cache. This needs investigating. It is 
likely to be the case that this is not an entirely apples-to-apples comparison 
since the record cache holds records for a maximum of 'commit.time.ms', which 
by default is 30 seconds. So the record cache is adding quite a bit of latency, 
and we know that, however documenting the tradeoff and looking if there is any 
other bugs needs to be done.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-132: Augment KStream.print to allow extra parameters in the printed string

2017-03-15 Thread Matthias J. Sax
Thanks for updating the KIP.

It's in very good shape IMHO and I support this idea!



-Matthias


On 3/15/17 3:05 AM, Marc Juchli wrote:
> Dear Matthias,
> 
> The KIP is updated. I think it now contains all the information on that
> page.
> 
> Marc
> 
> On Mon, Mar 13, 2017 at 9:37 PM Matthias J. Sax 
> wrote:
> 
>> Marc,
>>
>> Thanks for the KIP.
>>
>> Can you please update the KIP in a way such that it is self contained.
>> Right now, you link to all kind of other places making it hard to read
>> the KIP.
>>
>> The KIP should be the "center of truth" -- if there is important
>> information elsewhere, please c it into the KIP.
>>
>>
>> Thanks a lot!
>>
>>
>> -Matthias
>>
>>
>>
>> On 3/13/17 1:30 PM, Matthias J. Sax wrote:
>>> Can you please add the KIP to this table:
>>>
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals#KafkaImprovementProposals-KIPsunderdiscussion
>>>
>>> Thanks,
>>>
>>>  Matthias
>>>
>>>
>>> On 3/13/17 8:08 AM, Marc Juchli wrote:
 Dear all,

 The following describes KIP-132, which I just created. See:

>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-132+-+Augment+KStream.print+to+allow+extra+parameters+in+the+printed+string

 Motivation

 As for now, KStream#print leads to a predefined output where key and
>> value are
 printed with comma separation.
 KAFKA-4830  suggests
>> to
 extend print in a way that it takes KeyValueMapper as a parameter.
 This will allow a user to change outputs according to the users demand.
 Public Interfaces

 The affected interface is KStream, which needs to be extended with
>> another
 overloaded version of print:

 void print(final Serde keySerde,
final Serde valSerde,
final String streamName,
final KeyValueMapper mapper);

 Proposed Changes

 See pull request GH-2669 .
 This PR contains a discussion regarding KAFKA-4830
  as well as
>> KAFKA-4772
 .

 Compatibility, Deprecation, and Migration Plan

 The extension of print will not introduce compatibility issues – we can
 maintain the current output by keeping the current output format as a
 default (if mapper was not set):

 if(mapper == null) {
 printStream.println("[" + streamName + "]: " + keyToPrint + " , "
 + valueToPrint);
 } else {
 printStream.println("[" + streamName + "]: " +
 mapper.apply(keyToPrint, valueToPrint));
 }



 Kind regards,
 Marc

>>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


[jira] [Commented] (KAFKA-4770) KStreamAggregationDedupIntegrationTest fails occasionally

2017-03-15 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926878#comment-15926878
 ] 

Eno Thereska commented on KAFKA-4770:
-

Haven't seen this anymore.

> KStreamAggregationDedupIntegrationTest fails occasionally
> -
>
> Key: KAFKA-4770
> URL: https://issues.apache.org/jira/browse/KAFKA-4770
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
> Fix For: 0.11.0.0
>
>
> org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
> shouldGroupByKey FAILED
> java.lang.AssertionError: 
> Expected: is <[KeyValue(1@1487244179000, 2), KeyValue(2@1487244179000, 
> 2), KeyValue(3@1487244179000, 2), KeyValue(4@1487244179000, 2), 
> KeyValue(5@1487244179000, 2)]>
>  but: was <[KeyValue(1@1487244179000, 2), KeyValue(2@1487244179000, 
> 2), KeyValue(3@1487244179000, 2), KeyValue(4@1487244179000, 1), 
> KeyValue(5@1487244179000, 1)]>
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
> at 
> org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest.shouldGroupByKey(KStreamAggregationDedupIntegrationTest.java:240)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-4770) KStreamAggregationDedupIntegrationTest fails occasionally

2017-03-15 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska resolved KAFKA-4770.
-
Resolution: Cannot Reproduce

> KStreamAggregationDedupIntegrationTest fails occasionally
> -
>
> Key: KAFKA-4770
> URL: https://issues.apache.org/jira/browse/KAFKA-4770
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
> Fix For: 0.11.0.0
>
>
> org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
> shouldGroupByKey FAILED
> java.lang.AssertionError: 
> Expected: is <[KeyValue(1@1487244179000, 2), KeyValue(2@1487244179000, 
> 2), KeyValue(3@1487244179000, 2), KeyValue(4@1487244179000, 2), 
> KeyValue(5@1487244179000, 2)]>
>  but: was <[KeyValue(1@1487244179000, 2), KeyValue(2@1487244179000, 
> 2), KeyValue(3@1487244179000, 2), KeyValue(4@1487244179000, 1), 
> KeyValue(5@1487244179000, 1)]>
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
> at 
> org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest.shouldGroupByKey(KStreamAggregationDedupIntegrationTest.java:240)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Build failed in Jenkins: kafka-trunk-jdk8 #1353

2017-03-15 Thread Apache Jenkins Server
See 


Changes:

[harsha] MINOR: Fix a documentation typo

--
[...truncated 157.19 KB...]

kafka.security.auth.SimpleAclAuthorizerTest > testLoadCache STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testLoadCache PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.PrimitiveApiTest > testMultiProduce STARTED

kafka.integration.PrimitiveApiTest > testMultiProduce PASSED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch STARTED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch PASSED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize 
STARTED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize PASSED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests STARTED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests PASSED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch STARTED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch PASSED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression STARTED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression PASSED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic STARTED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic PASSED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest STARTED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.MetricsDuringTopicCreationDeletionTest > 

[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-15 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926839#comment-15926839
 ] 

Matthias J. Sax commented on KAFKA-3514:


Thanks for you input. It's an interesting approach. We plan to do a KIP for 
this change, as it will be tricky to get right. It would be great if you would 
participate in the discussion on the dev list -- not sure, when the KIP will be 
pushed at the moment. (see 
https://github.com/apache/kafka/pull/1689#issuecomment-286523692)

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-126 - Allow KafkaProducer to batch based on uncompressed size

2017-03-15 Thread Becket Qin
Hi Jason,

Good point. I was thinking about that, too. I was not sure if that is the
right thing to do by default.

If we assume people always set the batch size to max message size,
splitting the oversized batch makes a lot of sense. But it seems possible
that users want to control the memory footprint so they would set the batch
size to smaller than the max message size so the producer can have hold
batches for more partitions. In this case, splitting the batch might not be
the desired behavior.

I think the most intuitive approach to this is allow the producer to get
the max message size configuration (as well as some other configurations
such as timestamp type)  from the broker side and use that to decide
whether a batch should be split or not. I probably should add this to the
KIP wiki.

Thanks,

Jiangjie (Becket) Qin

On Wed, Mar 15, 2017 at 9:47 AM, Jason Gustafson  wrote:

> Hey Becket,
>
> Thanks for the KIP! The approach seems reasonable. One clarification: is
> the intent to do the splitting after the broker rejects the request with
> MESSAGE_TOO_LARGE, or prior to sending if the configured batch size is
> exceeded?
>
> -Jason
>
> On Mon, Mar 13, 2017 at 8:10 PM, Becket Qin  wrote:
>
> > Bump up the thread for further comments. If there is no more comments on
> > the KIP I will start the voting thread on Wed.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Tue, Mar 7, 2017 at 9:48 AM, Becket Qin  wrote:
> >
> > > Hi Dong,
> > >
> > > Thanks for the comments.
> > >
> > > The patch is mostly for proof of concept in case there is any concern
> > > about the implementation which is indeed a little tricky.
> > >
> > > The new metric has already been mentioned in the Public Interface
> Change
> > > section.
> > >
> > > I added the reasoning about how the compression ratio
> > > improving/deteriorate steps are determined in the wiki.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > > On Mon, Mar 6, 2017 at 4:42 PM, Dong Lin  wrote:
> > >
> > >> Hey Becket,
> > >>
> > >> I am wondering if we should first vote for the KIP before reviewing
> the
> > >> patch. I have two comments below:
> > >>
> > >> - Should we specify the new sensors as part of interface change in the
> > >> KIP?
> > >> - The KIP proposes to increase estimated compression ratio by 0.05 for
> > >> each
> > >> underestimation and decrement the estimation by 0.005 for each
> > >> overestimation. Why are these two values chosen? I think there is some
> > >> tradeoff in selecting the value. Can the KIP be more explicit about
> the
> > >> tradeoff and explain how these two values would impact producer's
> > >> performance?
> > >>
> > >> Thanks,
> > >> Dong
> > >>
> > >>
> > >> On Sat, Mar 4, 2017 at 11:42 AM, Becket Qin 
> > wrote:
> > >>
> > >> > I have updated the KIP based on the latest discussion. Please check
> > and
> > >> let
> > >> > me know if there is any further concern.
> > >> >
> > >> > Thanks,
> > >> >
> > >> > Jiangjie (Becket) Qin
> > >> >
> > >> > On Sat, Mar 4, 2017 at 10:56 AM, Becket Qin 
> > >> wrote:
> > >> >
> > >> > > Actually second thought on this, rate might be better for two
> > reasons:
> > >> > > 1. Most of the metrics in the producer we already have are using
> > rate
> > >> > > instead of count.
> > >> > > 2. If a service is bounced, the count will be reset to 0, but it
> > does
> > >> not
> > >> > > affect rate.
> > >> > >
> > >> > > I'll make the change.
> > >> > >
> > >> > > Thanks,
> > >> > >
> > >> > > Jiangjie (Becket) Qin
> > >> > >
> > >> > > On Sat, Mar 4, 2017 at 10:27 AM, Becket Qin  >
> > >> > wrote:
> > >> > >
> > >> > >> Hi Dong,
> > >> > >>
> > >> > >> Yes, there is a sensor in the patch about the split occurrence.
> > >> > >>
> > >> > >> Currently it is a count instead of rate. In practice, it seems
> > count
> > >> is
> > >> > >> easier to use in this case. But I am open to change.
> > >> > >>
> > >> > >> Thanks,
> > >> > >>
> > >> > >> Jiangjie (Becket) Qin
> > >> > >>
> > >> > >> On Fri, Mar 3, 2017 at 7:43 PM, Dong Lin 
> > >> wrote:
> > >> > >>
> > >> > >>> Hey Becket,
> > >> > >>>
> > >> > >>> I haven't looked at the patch yet. But since we are going to try
> > the
> > >> > >>> split-on-oversize solution, should the KIP also add a sensor
> that
> > >> shows
> > >> > >>> the
> > >> > >>> rate of split per second and the probability of split?
> > >> > >>>
> > >> > >>> Thanks,
> > >> > >>> Dong
> > >> > >>>
> > >> > >>>
> > >> > >>> On Fri, Mar 3, 2017 at 6:39 PM, Becket Qin <
> becket@gmail.com>
> > >> > wrote:
> > >> > >>>
> > >> > >>> > Just to clarify, the implementation is basically what I
> > mentioned
> > >> > above
> > >> > >>> > (split/resend + adjusted estimation evolving algorithm) and
> > >> changing
> > >> > >>> the
> > >> > >>> > compression ratio estimation to be per topic.

RE: documentation feedback

2017-03-15 Thread Brian Cornally
Hi Colin, 

Sorry, just reading this now. Looks like I don't have permissions to commit to 
a new branch to make pull request. I haven't made pull request before so may be 
doing it incorrectly ;)

git push origin doc-security-console-example
remote: Permission to apache/kafka.git denied to briancornally.
fatal: unable to access 'https://github.com/apache/kafka.git/': The requested 
URL returned error: 403

$ git diff origin
diff --git a/docs/security.html b/docs/security.html
index 2e5d492..5118b57 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -213,8 +213,12 @@
 
 Examples using console-producer and console-consumer:
 
-kafka-console-producer.sh --broker-list localhost:9093 --topic 
test --producer.config client-ssl.properties
-kafka-console-consumer.sh --bootstrap-server localhost:9093 
--topic test --consumer.config client-ssl.properties
+echo ssl | kafka-console-producer.sh --broker-list localhost:9093 
--topic test --producer.config client-ssl.properties
+kafka-console-consumer.sh --bootstrap-server localhost:9093 
--topic test --consumer.config client-ssl.properties
+
+notes:
+- older version of kafka-console-producer.sh requires additional 
option --security-protocol SSL
+- older version of kafka-console-consumer.sh requires additional 
options --security-protocol SSL --new-consumer
 
 
 7.3 Authentication using 
SASL

Best, 
 
Brian

-Original Message-
From: Colin McCabe [mailto:cmcc...@apache.org] 
Sent: Friday, February 17, 2017 11:55 AM
To: dev@kafka.apache.org
Subject: Re: documentation feedback

Hi Brian,

Have you created a pull request for this?

best,
Colin


On Thu, Feb 9, 2017, at 15:21, Brian Cornally wrote:
> @apachekafka suggested addition to
> http://kafka.apache.org/documentation/#security - section 5 - Examples 
> using console-producer and console-consumer:
> 
> echo "ssl" |  
> /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh
> --broker-list $KAFKABROKERSSSL --topic test2  --producer.config 
> client-ssl.properties --security-protocol SSL
> note: addition of --security-protocol SSL
> 
> /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh 
> --zookeeper $KAFKAZKHOSTS --bootstrap-server $KAFKABROKERSSSL --topic 
> test2 --from-beginning --consumer.config client-ssl.properties 
> --security-protocol SSL --new-consumer
> note: addition of --security-protocol SSL --new-consumer Thanks
> 


Re: [VOTE] KIP-107: Add purgeDataBefore() API in AdminClient

2017-03-15 Thread Dong Lin
Hey Jason, Ismael, Jeff,

Regarding Purge vs PurgeRecords, would it be OK for me to make a followup
patch to rename PurgeRequest to PurgeRecordsRequest (similarly for
ProduceRequest and FetchRequest)? This is because I favor PurgeRequest over
PurgeRecordsRequest before we rename ProduceRequest and FetchRequest. Also,
since the patch is ready for merge other than the naming issue we are
discussing here, I would like to make less cosmetic code change and have it
merged soon. I can submit the patch to rename the requests right after the
pull request is committed.

Hey Jun,

You mentioned that the purpose of having purge is to distinguish between
removing while log vs removing portion of the log. The PurgeRequest
proposed in this KIP will remove portion of the Log since it works on the
granularity of records. This will be more explicit after we rename it to
PurgeRecordsRequest. If we want to have request in the future to remove the
entire log, we can name it PurgeLogRequest. Thus I think it is OK to use
"delete" instead of "purge" in the name and still be able to distinguish
between removing while log vs removing portion of the log.

I have updated the KIP to replace "purge" with "delete" in the names of the
Java API and requests. Are you OK with the change?

Thanks,
Dong


On Wed, Mar 15, 2017 at 9:59 AM, Jason Gustafson  wrote:

> Hey Dong,
>
> Sorry for the late reply. Yes, I prefer PurgeRecordsRequest instead of
> PurgeRequest. DeleteRecords seems even better. As mentioned, I also think
> it would be a good idea to rename FetchRequest and ProduceRequest
> accordingly, but we need not consider that here. We could potentially
> rename Purge to PurgeRecords if and when we rename Fetch and Produce, but
> if that's the plan, we may as well do it from the start. Anyway, it's just
> my preference, so don't block on my opinion if the consensus is unclear.
>
> -Jason
>
>
>
> On Wed, Mar 15, 2017 at 8:45 AM, Ismael Juma  wrote:
>
> > Hi Dong,
> >
> > I think your suggestion of including `Records` in the name of the new
> > request and renaming `Fetch` and `Produce` to be `FetchRecords` and
> > `ProduceRecords` is a good one. We can do the the renames separately.
> It's
> > a compatible change since the name of the API is never exchanged with
> > clients and the request/response classes are internal (we have done such
> > renames before as Jason pointed out offline). The number of APIs will
> > continue to grow and it will be much clearer if we avoid implicit
> > assumptions about the target of an API request/response.
> >
> > Given that, I also think that DeleteRecords makes sense since we also
> have
> > DeleteTopics. Both are batch APIs that delete multiple items (the space
> is
> > only freed later). If we use Oracle's definition of "purge", it seems to
> be
> > what happens to cause the space to be freed (and happens in the
> background
> > in Kafka):
> >
> > "Purging is the process of freeing up space in the database or of
> deleting
> > obsolete data that is not required by the system. The purge process can
> be
> > based on the age of the data or the type of data"
> > https://docs.oracle.com/cd/E12057_01/doc.1014/e12050/archpur
> g.htm#BABHDECI
> >
> > Regarding the AdminClient, I thought the KIP was proposing adding this
> > method to the Java AdminClient (KIP-117). If it's the Scala one, then it
> > doesn't even need to be in the KIP as the Scala AdminClient is internal
> and
> > no compatibility guarantees are offered (the methods that exist there
> never
> > went through a KIP for example). So, I'm OK with keeping the method
> > signature as it is.
> >
> > Ismael
> >
> > On Fri, Mar 10, 2017 at 6:06 PM, Dong Lin  wrote:
> >
> > > Hey Jason,
> > >
> > > Just to clarify, are you, together with Ismael and Jeff, suggesting
> that
> > > the new request should be named PurgeRecordsRequest instead of
> > > PurgeRequest? The advantage of PurgeRecordsRequest is the name itself
> is
> > > more explicit about what it does. The disadvantage of
> PurgeRecordsRequest
> > > is that it is a bit consistent with ProduceRequest and FetchRequest,
> > which
> > > already assumes that if the target is not explicitly specified then the
> > > target is "Records".
> > >
> > > I would be in favor of PurgeRecordsRequest if we plan to change
> > > FetchRequest
> > > to FetchRecordsRequest and ProduceRequest to ProduceRequestsRequest.
> > > Otherwise, I would prefer PurgeRequest since it is more consistent with
> > > existing style. Would PurgeRequest look more reasonable if we simply
> > assume
> > > that the operation target is "Records" if it is not explicitly
> specified
> > in
> > > the name?
> > >
> > > Becket is also in favor of PurgeRequest for the same reason. Maybe we
> can
> > > start a vote for this if people can not reach consensus on this name? I
> > > won't fight for PurgeRequest if most people like PurgeRecordsRequest.
> > >
> > > Thanks,
> > > Dong
> > >
> 

Build failed in Jenkins: kafka-trunk-jdk8 #1352

2017-03-15 Thread Apache Jenkins Server
See 


Changes:

[jason] MINOR: Improve log4j on stream thread and stream process

--
[...truncated 157.19 KB...]

kafka.security.auth.SimpleAclAuthorizerTest > testLoadCache STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testLoadCache PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.PrimitiveApiTest > testMultiProduce STARTED

kafka.integration.PrimitiveApiTest > testMultiProduce PASSED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch STARTED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch PASSED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize 
STARTED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize PASSED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests STARTED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests PASSED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch STARTED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch PASSED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression STARTED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression PASSED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic STARTED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic PASSED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest STARTED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED


[GitHub] kafka pull request #2674: MINOR: Fix a documentation typo

2017-03-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2674


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Jenkins build is back to normal : kafka-trunk-jdk8 #1351

2017-03-15 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-03-15 Thread Dong Lin
Hey Ismael,

Sure, I have updated "Changes in Operational Procedures" section in KIP-113
to specify the problem and solution with known disk failure. And I updated
the "Test Plan" section to note that we have test in KIP-113 to verify that
replicas already created on the good log directories will not be affected
by failure of other log directories.

Please let me know if there is any other improvement I can make. Thanks for
your comment.

Dong


On Wed, Mar 15, 2017 at 3:18 AM, Ismael Juma  wrote:

> Hi Dong,
>
> Yes, that sounds good to me. I'd list option 2 first since that is safe
> and, as you said, no worse than what happens today. The file approach is a
> bit hacky as you said, so it may be a bit fragile. Not sure if we really
> want to mention that. :)
>
> About the note in KIP-112 versus adding the test in KIP-113, I think it
> would make sense to add a short sentence stating that this scenario is
> covered in KIP-113. People won't necessarily read both KIPs at the same
> time and it's helpful to cross-reference when it makes sense.
>
> Thanks for your work on this.
>
> Ismael
>
> On Tue, Mar 14, 2017 at 11:00 PM, Dong Lin  wrote:
>
> > Hey Ismael,
> >
> > I get your concern that it is more likely for a disk to be slow, or
> exhibit
> > other forms of non-fatal symptom, after some known fatal error. Then it
> is
> > weird for user to start broker with the likely-problematic disk in the
> > broker config. In that case, I think there are two things user can do:
> >
> > 1) Intentionally change the log directory in the config to point to a
> file.
> > This is a bit hacky but it works well before we make more-appropriate
> > long-term change in Kafka to handle this case.
> > 2) Just don't start broker with bad log directories. Always fix disk
> before
> > restarting the broker. This is a safe approach that is no worse than
> > current practice.
> >
> > Would this address your concern if I specify the problem and the two
> > solutions in the KIP?
> >
> > Thanks,
> > Dong
> >
> > On Tue, Mar 14, 2017 at 3:29 PM, Dong Lin  wrote:
> >
> > > Hey Ismael,
> > >
> > > Thanks for the comment. Please see my reply below.
> > >
> > > On Tue, Mar 14, 2017 at 10:31 AM, Ismael Juma 
> wrote:
> > >
> > >> Thanks Dong. Comments inline.
> > >>
> > >> On Fri, Mar 10, 2017 at 6:25 PM, Dong Lin 
> wrote:
> > >> >
> > >> > I get your point. But I am not sure we should recommend user to
> simply
> > >> > remove disk from the broker config. If user simply does this without
> > >> > checking the utilization of good disks, replica on the bad disk will
> > be
> > >> > re-created on the good disk and may overload the good disks, causing
> > >> > cascading failure.
> > >> >
> > >>
> > >> Good point.
> > >>
> > >>
> > >> >
> > >> > I agree with you and Colin that slow disk may cause problem.
> However,
> > >> > performance degradation due to slow disk this is an existing problem
> > >> that
> > >> > is not detected or handled by Kafka or KIP-112.
> > >>
> > >>
> > >> I think an important difference is that a number of disk errors are
> > >> currently fatal and won't be after KIP-112. So it introduces new
> > scenarios
> > >> (for example, bouncing a broker that is working fine although some
> disks
> > >> have been marked bad).
> > >>
> > >
> > > Hmm.. I am still trying to understand why KIP-112 creates new
> scenarios.
> > > Slow disk is not considered fatal error and won't be caught by either
> > > existing Kafka design or this KIP. If any disk is marked bad, it means
> > > broker encounters IOException when accessing disk, most likely the
> broker
> > > will encounter IOException again when accessing this disk and mark this
> > > disk as bad after bounce. I guess you are talking about the case that a
> > > disk is marked bad, broker is bounced, then the disk provides degraded
> > > performance without being marked bad, right? But this seems to be an
> > > existing problem we already have today with slow disk.
> > >
> > > Here are the possible scenarios with bad disk after broker bounce:
> > >
> > > 1) bad disk -> broker bounce -> good disk. This would be great.
> > > 2) bad disk -> broker bounce -> slow disk. Slow disk is an existing
> > > problem that is not addressed by Kafka today.
> > > 3) bad disk -> broker bounce -> bad disk. This is handled by this KIP
> > such
> > > that only replicas on the bad disk become offline.
> > >
> > >
> > >>
> > >> > Detection and handling of
> > >> > slow disk is a separate problem that needs to be addressed in a
> future
> > >> KIP.
> > >> > It is currently listed in the future work. Does this sound OK?
> > >> >
> > >>
> > >> I'm OK with it being handled in the future. In the meantime, I was
> just
> > >> hoping that we can make it clear to users about the potential issue
> of a
> > >> disk marked as bad becoming good again after a bounce (which can be
> > >> dangerous).
> > >>
> > >> The main 

[GitHub] kafka pull request #2685: MINOR: Improve log4j on stream thread and stream p...

2017-03-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2685


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Jenkins build is back to normal : kafka-trunk-jdk7 #2015

2017-03-15 Thread Apache Jenkins Server
See 




[jira] [Commented] (KAFKA-4885) processstreamwithcachedstatestore and other streams benchmarks fail occasionally

2017-03-15 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926571#comment-15926571
 ] 

Guozhang Wang commented on KAFKA-4885:
--

[~damianguy] Well, as for the general solution in terms of error handling. I 
think there is a difference between "streams app has an issue, e.g. divide by 
zero, serde exception" and "broker cluster has an issue, and no data can be 
produced / fetched". In the former case I agree that we should usually fail 
fast; whereas for the latter case I'm not sure if we want Streams app to stop 
and die whenever there is a (temporary?) broker unavailability. Imagine 
cross-DC replication / MirrorMaker cases, we usually want such services to keep 
alive and idle when brokers are unavailable rather than logging a fatal error 
and shutdown itself.

As for the system test itself, I agree that setting a exception handler in 
streams would be better than setting num.retries to infinity.

> processstreamwithcachedstatestore and other streams benchmarks fail 
> occasionally
> -
>
> Key: KAFKA-4885
> URL: https://issues.apache.org/jira/browse/KAFKA-4885
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
> Fix For: 0.11.0.0
>
>
> test_id:
> kafkatest.benchmarks.streams.streams_simple_benchmark_test.StreamsSimpleBenchmarkTest.test_simple_benchmark.test=processstreamwithcachedstatestore.scale=2
> status: FAIL
> run time:   14 minutes 58.069 seconds
> Streams Test process on ubuntu@worker5 took too long to exit
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py",
>  line 86, in test_simple_benchmark
> self.driver[num].wait()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/services/streams.py",
>  line 102, in wait
> self.wait_node(node, timeout_sec)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/services/streams.py",
>  line 106, in wait_node
> wait_until(lambda: not node.account.alive(pid), timeout_sec=timeout_sec, 
> err_msg="Streams Test process on " + str(node.account) + " took too long to 
> exit")
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py",
>  line 36, in wait_until
> raise TimeoutError(err_msg)
> TimeoutError: Streams Test process on ubuntu@worker5 took too long to exit
> The log contains several lines like:
> [2017-03-11 04:52:59,080] DEBUG Attempt to heartbeat failed for group 
> simple-benchmark-streams-with-storetrue since it is rebalancing. 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2017-03-11 04:53:01,987] DEBUG Sending Heartbeat request for group 
> simple-benchmark-streams-with-storetrue to coordinator worker10:9092 (id: 
> 2147483646 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2017-03-11 04:53:02,088] DEBUG Attempt to heartbeat failed for group 
> simple-benchmark-streams-with-storetrue since it is rebalancing. 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2017-03-11 04:53:04,995] DEBUG Sending Heartbeat request for group 
> simple-benchmark-streams-with-storetrue to coordinator worker10:9092 (id: 
> 2147483646 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> Other tests that fail the same way include:
> test_id:
> kafkatest.benchmarks.streams.streams_simple_benchmark_test.StreamsSimpleBenchmarkTest.test_simple_benchmark.test=count.scale=2
> test_id:
> kafkatest.benchmarks.streams.streams_simple_benchmark_test.StreamsSimpleBenchmarkTest.test_simple_benchmark.test=processstreamwithsink.scale=1
> test_id:
> kafkatest.tests.streams.streams_bounce_test.StreamsBounceTest.test_bounce



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-107: Add purgeDataBefore() API in AdminClient

2017-03-15 Thread Jason Gustafson
Hey Dong,

Sorry for the late reply. Yes, I prefer PurgeRecordsRequest instead of
PurgeRequest. DeleteRecords seems even better. As mentioned, I also think
it would be a good idea to rename FetchRequest and ProduceRequest
accordingly, but we need not consider that here. We could potentially
rename Purge to PurgeRecords if and when we rename Fetch and Produce, but
if that's the plan, we may as well do it from the start. Anyway, it's just
my preference, so don't block on my opinion if the consensus is unclear.

-Jason



On Wed, Mar 15, 2017 at 8:45 AM, Ismael Juma  wrote:

> Hi Dong,
>
> I think your suggestion of including `Records` in the name of the new
> request and renaming `Fetch` and `Produce` to be `FetchRecords` and
> `ProduceRecords` is a good one. We can do the the renames separately. It's
> a compatible change since the name of the API is never exchanged with
> clients and the request/response classes are internal (we have done such
> renames before as Jason pointed out offline). The number of APIs will
> continue to grow and it will be much clearer if we avoid implicit
> assumptions about the target of an API request/response.
>
> Given that, I also think that DeleteRecords makes sense since we also have
> DeleteTopics. Both are batch APIs that delete multiple items (the space is
> only freed later). If we use Oracle's definition of "purge", it seems to be
> what happens to cause the space to be freed (and happens in the background
> in Kafka):
>
> "Purging is the process of freeing up space in the database or of deleting
> obsolete data that is not required by the system. The purge process can be
> based on the age of the data or the type of data"
> https://docs.oracle.com/cd/E12057_01/doc.1014/e12050/archpurg.htm#BABHDECI
>
> Regarding the AdminClient, I thought the KIP was proposing adding this
> method to the Java AdminClient (KIP-117). If it's the Scala one, then it
> doesn't even need to be in the KIP as the Scala AdminClient is internal and
> no compatibility guarantees are offered (the methods that exist there never
> went through a KIP for example). So, I'm OK with keeping the method
> signature as it is.
>
> Ismael
>
> On Fri, Mar 10, 2017 at 6:06 PM, Dong Lin  wrote:
>
> > Hey Jason,
> >
> > Just to clarify, are you, together with Ismael and Jeff, suggesting that
> > the new request should be named PurgeRecordsRequest instead of
> > PurgeRequest? The advantage of PurgeRecordsRequest is the name itself is
> > more explicit about what it does. The disadvantage of PurgeRecordsRequest
> > is that it is a bit consistent with ProduceRequest and FetchRequest,
> which
> > already assumes that if the target is not explicitly specified then the
> > target is "Records".
> >
> > I would be in favor of PurgeRecordsRequest if we plan to change
> > FetchRequest
> > to FetchRecordsRequest and ProduceRequest to ProduceRequestsRequest.
> > Otherwise, I would prefer PurgeRequest since it is more consistent with
> > existing style. Would PurgeRequest look more reasonable if we simply
> assume
> > that the operation target is "Records" if it is not explicitly specified
> in
> > the name?
> >
> > Becket is also in favor of PurgeRequest for the same reason. Maybe we can
> > start a vote for this if people can not reach consensus on this name? I
> > won't fight for PurgeRequest if most people like PurgeRecordsRequest.
> >
> > Thanks,
> > Dong
> >
> >
> >
> >
> > On Thu, Mar 9, 2017 at 5:39 PM, Jason Gustafson 
> > wrote:
> >
> > > Re; Purge vs PurgeRecords: I think I'm with Ismael and Jeff that the
> > > increasing surface area of the request APIs calls for more explicit
> > naming.
> > > PurgeRecords sounds reasonable to me. Using simple verbs like "fetch"
> and
> > > "produce" made sense when there were 6 or 7 APIs, but we'll soon be up
> to
> > > 30. I could also imagine having other Purge* APIs in the future (e.g.
> > > PurgeCommittedOffsets?), so it would be nice to avoid the need to
> rename
> > in
> > > the future, though it's probably not too big of a problem if we have
> to.
> > > (FWIW, I'd also be in favor of change FetchRequest to
> FetchRecordsRequest
> > > and ProduceRequest to ProduceRequestsRequest.)
> > >
> > > -Jason
> > >
> > > On Tue, Mar 7, 2017 at 10:11 AM, Dong Lin  wrote:
> > >
> > > > Hi Jun, Ismael,
> > > >
> > > > I think making the API similar to a future KIP is desirable but not
> > > > required. Implementation is easy but discussion of the API may take a
> > lot
> > > > of time given that we haven't yet reached agreement on KIP-117. Thus
> I
> > > > prefer to just mark the API in Scala as unstable.
> > > >
> > > > I am OK with either delete or purge in the name.
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > >
> > > > On Tue, Mar 7, 2017 at 9:59 AM, Jun Rao  wrote:
> > > >
> > > > > Hi, Dong, Ismael,
> > > > >
> > > > > 1. I just meant that it would be useful to distinguish 

Re: [DISCUSS] KIP-126 - Allow KafkaProducer to batch based on uncompressed size

2017-03-15 Thread Jason Gustafson
Hey Becket,

Thanks for the KIP! The approach seems reasonable. One clarification: is
the intent to do the splitting after the broker rejects the request with
MESSAGE_TOO_LARGE, or prior to sending if the configured batch size is
exceeded?

-Jason

On Mon, Mar 13, 2017 at 8:10 PM, Becket Qin  wrote:

> Bump up the thread for further comments. If there is no more comments on
> the KIP I will start the voting thread on Wed.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Mar 7, 2017 at 9:48 AM, Becket Qin  wrote:
>
> > Hi Dong,
> >
> > Thanks for the comments.
> >
> > The patch is mostly for proof of concept in case there is any concern
> > about the implementation which is indeed a little tricky.
> >
> > The new metric has already been mentioned in the Public Interface Change
> > section.
> >
> > I added the reasoning about how the compression ratio
> > improving/deteriorate steps are determined in the wiki.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Mon, Mar 6, 2017 at 4:42 PM, Dong Lin  wrote:
> >
> >> Hey Becket,
> >>
> >> I am wondering if we should first vote for the KIP before reviewing the
> >> patch. I have two comments below:
> >>
> >> - Should we specify the new sensors as part of interface change in the
> >> KIP?
> >> - The KIP proposes to increase estimated compression ratio by 0.05 for
> >> each
> >> underestimation and decrement the estimation by 0.005 for each
> >> overestimation. Why are these two values chosen? I think there is some
> >> tradeoff in selecting the value. Can the KIP be more explicit about the
> >> tradeoff and explain how these two values would impact producer's
> >> performance?
> >>
> >> Thanks,
> >> Dong
> >>
> >>
> >> On Sat, Mar 4, 2017 at 11:42 AM, Becket Qin 
> wrote:
> >>
> >> > I have updated the KIP based on the latest discussion. Please check
> and
> >> let
> >> > me know if there is any further concern.
> >> >
> >> > Thanks,
> >> >
> >> > Jiangjie (Becket) Qin
> >> >
> >> > On Sat, Mar 4, 2017 at 10:56 AM, Becket Qin 
> >> wrote:
> >> >
> >> > > Actually second thought on this, rate might be better for two
> reasons:
> >> > > 1. Most of the metrics in the producer we already have are using
> rate
> >> > > instead of count.
> >> > > 2. If a service is bounced, the count will be reset to 0, but it
> does
> >> not
> >> > > affect rate.
> >> > >
> >> > > I'll make the change.
> >> > >
> >> > > Thanks,
> >> > >
> >> > > Jiangjie (Becket) Qin
> >> > >
> >> > > On Sat, Mar 4, 2017 at 10:27 AM, Becket Qin 
> >> > wrote:
> >> > >
> >> > >> Hi Dong,
> >> > >>
> >> > >> Yes, there is a sensor in the patch about the split occurrence.
> >> > >>
> >> > >> Currently it is a count instead of rate. In practice, it seems
> count
> >> is
> >> > >> easier to use in this case. But I am open to change.
> >> > >>
> >> > >> Thanks,
> >> > >>
> >> > >> Jiangjie (Becket) Qin
> >> > >>
> >> > >> On Fri, Mar 3, 2017 at 7:43 PM, Dong Lin 
> >> wrote:
> >> > >>
> >> > >>> Hey Becket,
> >> > >>>
> >> > >>> I haven't looked at the patch yet. But since we are going to try
> the
> >> > >>> split-on-oversize solution, should the KIP also add a sensor that
> >> shows
> >> > >>> the
> >> > >>> rate of split per second and the probability of split?
> >> > >>>
> >> > >>> Thanks,
> >> > >>> Dong
> >> > >>>
> >> > >>>
> >> > >>> On Fri, Mar 3, 2017 at 6:39 PM, Becket Qin 
> >> > wrote:
> >> > >>>
> >> > >>> > Just to clarify, the implementation is basically what I
> mentioned
> >> > above
> >> > >>> > (split/resend + adjusted estimation evolving algorithm) and
> >> changing
> >> > >>> the
> >> > >>> > compression ratio estimation to be per topic.
> >> > >>> >
> >> > >>> > Thanks,
> >> > >>> >
> >> > >>> > Jiangjie (Becket) Qin
> >> > >>> >
> >> > >>> > On Fri, Mar 3, 2017 at 6:36 PM, Becket Qin <
> becket@gmail.com>
> >> > >>> wrote:
> >> > >>> >
> >> > >>> > > I went ahead and have a patch submitted here:
> >> > >>> > > https://github.com/apache/kafka/pull/2638
> >> > >>> > >
> >> > >>> > > Per Joel's suggestion, I changed the compression ratio to be
> per
> >> > >>> topic as
> >> > >>> > > well. It seems working well. Since there is an important
> >> behavior
> >> > >>> change
> >> > >>> > > and a new sensor is added, I'll keep the KIP and update it
> >> > according.
> >> > >>> > >
> >> > >>> > > Thanks,
> >> > >>> > >
> >> > >>> > > Jiangjie (Becket) Qin
> >> > >>> > >
> >> > >>> > > On Mon, Feb 27, 2017 at 3:50 PM, Joel Koshy <
> >> jjkosh...@gmail.com>
> >> > >>> wrote:
> >> > >>> > >
> >> > >>> > >> >
> >> > >>> > >> > Lets say we sent the batch over the wire and received a
> >> > >>> > >> > RecordTooLargeException, how do we split it as once we add
> >> the
> >> > >>> message
> >> > >>> > >> to
> >> > >>> > >> > the batch we loose the message level granularity. We will
> >> have
> >> > to
> >> > >>> > 

[jira] [Commented] (KAFKA-4903) Shell#runCommand does not clear the input buffer

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926406#comment-15926406
 ] 

ASF GitHub Bot commented on KAFKA-4903:
---

GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/2692

KAFKA-4903: Shell#runCommand does not clear the input buffer



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka KAFKA-4903

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2692.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2692






> Shell#runCommand does not clear the input buffer
> 
>
> Key: KAFKA-4903
> URL: https://issues.apache.org/jira/browse/KAFKA-4903
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>
> Shell#runCommand does not clear the input buffer where it claims to do so.
> {code}
> // clear the input stream buffer
> String line = null;
> while (line != null) {
> line = inReader.readLine();
> }
>  {code}
> The 'while' loop never runs.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2692: KAFKA-4903: Shell#runCommand does not clear the in...

2017-03-15 Thread cmccabe
GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/2692

KAFKA-4903: Shell#runCommand does not clear the input buffer



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka KAFKA-4903

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2692.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2692






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [VOTE] KIP-107: Add purgeDataBefore() API in AdminClient

2017-03-15 Thread Ismael Juma
Hi Dong,

I think your suggestion of including `Records` in the name of the new
request and renaming `Fetch` and `Produce` to be `FetchRecords` and
`ProduceRecords` is a good one. We can do the the renames separately. It's
a compatible change since the name of the API is never exchanged with
clients and the request/response classes are internal (we have done such
renames before as Jason pointed out offline). The number of APIs will
continue to grow and it will be much clearer if we avoid implicit
assumptions about the target of an API request/response.

Given that, I also think that DeleteRecords makes sense since we also have
DeleteTopics. Both are batch APIs that delete multiple items (the space is
only freed later). If we use Oracle's definition of "purge", it seems to be
what happens to cause the space to be freed (and happens in the background
in Kafka):

"Purging is the process of freeing up space in the database or of deleting
obsolete data that is not required by the system. The purge process can be
based on the age of the data or the type of data"
https://docs.oracle.com/cd/E12057_01/doc.1014/e12050/archpurg.htm#BABHDECI

Regarding the AdminClient, I thought the KIP was proposing adding this
method to the Java AdminClient (KIP-117). If it's the Scala one, then it
doesn't even need to be in the KIP as the Scala AdminClient is internal and
no compatibility guarantees are offered (the methods that exist there never
went through a KIP for example). So, I'm OK with keeping the method
signature as it is.

Ismael

On Fri, Mar 10, 2017 at 6:06 PM, Dong Lin  wrote:

> Hey Jason,
>
> Just to clarify, are you, together with Ismael and Jeff, suggesting that
> the new request should be named PurgeRecordsRequest instead of
> PurgeRequest? The advantage of PurgeRecordsRequest is the name itself is
> more explicit about what it does. The disadvantage of PurgeRecordsRequest
> is that it is a bit consistent with ProduceRequest and FetchRequest, which
> already assumes that if the target is not explicitly specified then the
> target is "Records".
>
> I would be in favor of PurgeRecordsRequest if we plan to change
> FetchRequest
> to FetchRecordsRequest and ProduceRequest to ProduceRequestsRequest.
> Otherwise, I would prefer PurgeRequest since it is more consistent with
> existing style. Would PurgeRequest look more reasonable if we simply assume
> that the operation target is "Records" if it is not explicitly specified in
> the name?
>
> Becket is also in favor of PurgeRequest for the same reason. Maybe we can
> start a vote for this if people can not reach consensus on this name? I
> won't fight for PurgeRequest if most people like PurgeRecordsRequest.
>
> Thanks,
> Dong
>
>
>
>
> On Thu, Mar 9, 2017 at 5:39 PM, Jason Gustafson 
> wrote:
>
> > Re; Purge vs PurgeRecords: I think I'm with Ismael and Jeff that the
> > increasing surface area of the request APIs calls for more explicit
> naming.
> > PurgeRecords sounds reasonable to me. Using simple verbs like "fetch" and
> > "produce" made sense when there were 6 or 7 APIs, but we'll soon be up to
> > 30. I could also imagine having other Purge* APIs in the future (e.g.
> > PurgeCommittedOffsets?), so it would be nice to avoid the need to rename
> in
> > the future, though it's probably not too big of a problem if we have to.
> > (FWIW, I'd also be in favor of change FetchRequest to FetchRecordsRequest
> > and ProduceRequest to ProduceRequestsRequest.)
> >
> > -Jason
> >
> > On Tue, Mar 7, 2017 at 10:11 AM, Dong Lin  wrote:
> >
> > > Hi Jun, Ismael,
> > >
> > > I think making the API similar to a future KIP is desirable but not
> > > required. Implementation is easy but discussion of the API may take a
> lot
> > > of time given that we haven't yet reached agreement on KIP-117. Thus I
> > > prefer to just mark the API in Scala as unstable.
> > >
> > > I am OK with either delete or purge in the name.
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > > On Tue, Mar 7, 2017 at 9:59 AM, Jun Rao  wrote:
> > >
> > > > Hi, Dong, Ismael,
> > > >
> > > > 1. I just meant that it would be useful to distinguish between
> removing
> > > the
> > > > whole log vs removing a portion of the log. The exact naming is less
> > > > important.
> > > >
> > > > 4. When we move the purgeBefore() api to the Java AdminClient, it
> would
> > > be
> > > > great if the api looks comparable to what's in KIP-117. For now,
> > perhaps
> > > we
> > > > can mark the api in Scala as unstable so that people are aware that
> > it's
> > > > subject to change?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Fri, Mar 3, 2017 at 11:25 AM, Dong Lin 
> wrote:
> > > >
> > > > > Hey Ismael,
> > > > >
> > > > > Thank for the detailed explanation. Here is my thought:
> > > > >
> > > > > 1. purge vs. delete
> > > > >
> > > > > We have originally considered purge, delete, truncate and remove. I
> > > 

[jira] [Created] (KAFKA-4903) Shell#runCommand does not clear the input buffer

2017-03-15 Thread Colin P. McCabe (JIRA)
Colin P. McCabe created KAFKA-4903:
--

 Summary: Shell#runCommand does not clear the input buffer
 Key: KAFKA-4903
 URL: https://issues.apache.org/jira/browse/KAFKA-4903
 Project: Kafka
  Issue Type: Bug
  Components: clients
Reporter: Colin P. McCabe
Assignee: Colin P. McCabe


Shell#runCommand does not clear the input buffer where it claims to do so.
{code}
// clear the input stream buffer
String line = null;
while (line != null) {
line = inReader.readLine();
}
 {code}

The 'while' loop never runs.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4902) Utils#delete should correctly handle I/O errors and symlinks

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926382#comment-15926382
 ] 

ASF GitHub Bot commented on KAFKA-4902:
---

GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/2691

KAFKA-4902: Utils#delete should correctly handle I/O errors and symlinks



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka KAFKA-4902

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2691.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2691


commit 8670331826aeb054940467966f6f10265b47c95b
Author: Colin P. Mccabe 
Date:   2017-03-15T15:36:31Z

KAFKA-4902: Utils#delete should correctly handle I/O errors and symlinks




> Utils#delete should correctly handle I/O errors and symlinks
> 
>
> Key: KAFKA-4902
> URL: https://issues.apache.org/jira/browse/KAFKA-4902
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>
> Currently, Utils#delete silently ignores I/O errors.  It also will not 
> properly handle symlinks.  It could get into an infinite loop when symlinks 
> are present.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2691: KAFKA-4902: Utils#delete should correctly handle I...

2017-03-15 Thread cmccabe
GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/2691

KAFKA-4902: Utils#delete should correctly handle I/O errors and symlinks



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka KAFKA-4902

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2691.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2691


commit 8670331826aeb054940467966f6f10265b47c95b
Author: Colin P. Mccabe 
Date:   2017-03-15T15:36:31Z

KAFKA-4902: Utils#delete should correctly handle I/O errors and symlinks




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-4902) Utils#delete should correctly handle I/O errors and symlinks

2017-03-15 Thread Colin P. McCabe (JIRA)
Colin P. McCabe created KAFKA-4902:
--

 Summary: Utils#delete should correctly handle I/O errors and 
symlinks
 Key: KAFKA-4902
 URL: https://issues.apache.org/jira/browse/KAFKA-4902
 Project: Kafka
  Issue Type: Bug
  Components: clients
Reporter: Colin P. McCabe
Assignee: Colin P. McCabe


Currently, Utils#delete silently ignores I/O errors.  It also will not properly 
handle symlinks.  It could get into an infinite loop when symlinks are present.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4888) offset 449883 is invalid, cause: Record is corrupt (stored crc = 2171407101, computed crc = 1371274824)

2017-03-15 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926322#comment-15926322
 ] 

Jun Rao commented on KAFKA-4888:


[~eduardo.s.neto] and [~chengc], is the issue persistent, i.e., if you consume 
from the same offset again, do you see the same CRC error? Also, it would be 
useful to run bin/kafka-run-class.sh kafka.tools.DumpLogSegments on the segment 
of the affected offset and see if the message is actually corrupted on disk (in 
all replicas).

> offset 449883 is invalid, cause: Record is corrupt (stored crc = 2171407101, 
> computed crc = 1371274824)
> ---
>
> Key: KAFKA-4888
> URL: https://issues.apache.org/jira/browse/KAFKA-4888
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
> Environment: Release version:
> LSB Version:  
> :base-4.0-amd64:base-4.0-noarch:core-4.0-amd64:core-4.0-noarch:graphics-4.0-amd64:graphics-4.0-noarch:printing-4.0-amd64:printing-4.0-noarch
> Distributor ID:   RedHatEnterpriseServer
> Description:  Red Hat Enterprise Linux Server release 6.5 (Santiago)
> Release:  6.5
> Codename: Santiago
> Memory SO:
> Mem:15.577G total,   15.416G used,  164.508M free,   49.895M buffers
> Filesystem
> FilesystemSize  Used Avail Use% Mounted on
> /dev/mapper/appvg-lv_opt
>30G 1018M   28G   20% /opt/ngin
>Reporter: Eduardo da Silva Neto
> Attachments: issue_kafka
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> We created five kafka consumers that consume with one message at a time 
> max.poll.records = 1. After two days of intensive processing with the kafka 
> server file system with about 50% used the corrupted registry error occurred.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2690: HOTFIX: Fix header in ByteArrayConverter

2017-03-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2690


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4643) Improve test coverage of StreamsKafkaClient

2017-03-15 Thread Damian Guy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926216#comment-15926216
 ] 

Damian Guy commented on KAFKA-4643:
---

[~adyachkov] true. I missed that! Though the unit tests for this class are 
extremely light. It probably can't be properly unit-tested as is. Feel free to 
take a look at it and let us know if you need any help.

> Improve test coverage of StreamsKafkaClient
> ---
>
> Key: KAFKA-4643
> URL: https://issues.apache.org/jira/browse/KAFKA-4643
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
> Fix For: 0.11.0.0
>
>
> Exception paths not tested.
> {{getTopicMetadata}} not tested



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Reopened] (KAFKA-4643) Improve test coverage of StreamsKafkaClient

2017-03-15 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy reopened KAFKA-4643:
---

> Improve test coverage of StreamsKafkaClient
> ---
>
> Key: KAFKA-4643
> URL: https://issues.apache.org/jira/browse/KAFKA-4643
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
> Fix For: 0.11.0.0
>
>
> Exception paths not tested.
> {{getTopicMetadata}} not tested



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4901) Make ProduceRequest thread-safe

2017-03-15 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4901:
---
Assignee: Ismael Juma
  Status: Patch Available  (was: Open)

> Make ProduceRequest thread-safe
> ---
>
> Key: KAFKA-4901
> URL: https://issues.apache.org/jira/browse/KAFKA-4901
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Assignee: Ismael Juma
> Fix For: 0.11.0.0, 0.10.2.1
>
>
> If request logging is enabled, ProduceRequest can be accessed
> and mutated concurrently from a network thread (which calls
> toString) and the request handler thread (which calls
> clearPartitionRecords()).
> That can lead to a ConcurrentModificationException when iterating
> the partitionRecords map.
> The underlying thread-safety issue has existed since the server
> started using the Java implementation of ProduceRequest in 0.10.0.
> However, we were incorrectly not clearing the underlying struct until
> 0.10.2, so toString itself was thread-safe until that change. In 0.10.2,
> toString is no longer thread-safe and we could potentially see a
> NullPointerException given the right set of interleavings between
> toString and clearPartitionRecords although we haven't seen that
> happen yet.
> In trunk, we changed the requests to have a toStruct method
> instead of creating a struct in the constructor and toString was
> no longer printing the contents of the Struct. This accidentally
> fixed the race condition, but it meant that request logging was less
> useful.
> A couple of days ago, AbstractRequest.toString was changed to
> print the contents of the request by calling toStruct().toString()
> and reintroduced the race condition. The impact is more visible
> because we iterate over a HashMap, which proactively
> checks for concurrent modification (unlike arrays).
> We will need a separate PR for 0.10.2.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4901) Make ProduceRequest thread-safe

2017-03-15 Thread Ismael Juma (JIRA)
Ismael Juma created KAFKA-4901:
--

 Summary: Make ProduceRequest thread-safe
 Key: KAFKA-4901
 URL: https://issues.apache.org/jira/browse/KAFKA-4901
 Project: Kafka
  Issue Type: Bug
Reporter: Ismael Juma
 Fix For: 0.11.0.0, 0.10.2.1


If request logging is enabled, ProduceRequest can be accessed
and mutated concurrently from a network thread (which calls
toString) and the request handler thread (which calls
clearPartitionRecords()).

That can lead to a ConcurrentModificationException when iterating
the partitionRecords map.

The underlying thread-safety issue has existed since the server
started using the Java implementation of ProduceRequest in 0.10.0.
However, we were incorrectly not clearing the underlying struct until
0.10.2, so toString itself was thread-safe until that change. In 0.10.2,
toString is no longer thread-safe and we could potentially see a
NullPointerException given the right set of interleavings between
toString and clearPartitionRecords although we haven't seen that
happen yet.

In trunk, we changed the requests to have a toStruct method
instead of creating a struct in the constructor and toString was
no longer printing the contents of the Struct. This accidentally
fixed the race condition, but it meant that request logging was less
useful.

A couple of days ago, AbstractRequest.toString was changed to
print the contents of the request by calling toStruct().toString()
and reintroduced the race condition. The impact is more visible
because we iterate over a HashMap, which proactively
checks for concurrent modification (unlike arrays).

We will need a separate PR for 0.10.2.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2690: HOTFIX: Fix header in ByteArrayConverter

2017-03-15 Thread ijuma
GitHub user ijuma opened a pull request:

https://github.com/apache/kafka/pull/2690

HOTFIX: Fix header in ByteArrayConverter



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ijuma/kafka fix-header-in-byte-array-converter

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2690.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2690






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4643) Improve test coverage of StreamsKafkaClient

2017-03-15 Thread Andrey Dyachkov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926189#comment-15926189
 ] 

Andrey Dyachkov commented on KAFKA-4643:


[~damianguy] exceptions paths are not tested anyway.

> Improve test coverage of StreamsKafkaClient
> ---
>
> Key: KAFKA-4643
> URL: https://issues.apache.org/jira/browse/KAFKA-4643
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
> Fix For: 0.11.0.0
>
>
> Exception paths not tested.
> {{getTopicMetadata}} not tested



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2689: MINOR: Make ProduceRequest thread-safe

2017-03-15 Thread ijuma
GitHub user ijuma opened a pull request:

https://github.com/apache/kafka/pull/2689

MINOR: Make ProduceRequest thread-safe



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ijuma/kafka produce-request-thread-safety

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2689.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2689


commit 8cb363b98b0a01952ec7bfe8c5a387796994eb6b
Author: Ismael Juma 
Date:   2017-03-15T13:23:28Z

MINOR: Make ProduceRequest thread-safe

If request logging is enabled, it can be accessed concurrently
from a network thread (which calls `toString`) and the
request handler thread (which calls `clearPartitionRecords()`).

commit 2c5243a37166b5a2d8996e5554862e97ce70a6ee
Author: Ismael Juma 
Date:   2017-03-15T13:23:58Z

Tweak formatting of `KafkaApis.handleProduceRequest`

More consistent line length usage.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4863) Querying window store may return unwanted keys

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926122#comment-15926122
 ] 

ASF GitHub Bot commented on KAFKA-4863:
---

GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2688

KAFKA-4863: Querying window store may return unwanted keys. Backport to 
0.10.2



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka kafka-4863-10.2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2688.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2688


commit 5c6900f7e35ef3dc33be8fd750d6f0a0d700f458
Author: Damian Guy 
Date:   2017-03-15T12:05:07Z

backport to 0.10.2




> Querying window store may return unwanted keys
> --
>
> Key: KAFKA-4863
> URL: https://issues.apache.org/jira/browse/KAFKA-4863
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Xavier Léauté
>Assignee: Damian Guy
>Priority: Critical
>
> Using variable length keys in a window store may cause unwanted results to be 
> returned when querying certain ranges.
> Below is a test case for {{RocksDBWindowStoreTest}} that shows the problem. 
> It fails, returning {{\[0001, 0003, 0002, 0004, 0005\]}} instead of {{\[0001, 
> 0003, 0005\]}}.
> {code:java}
> @Test
> public void testPutAndFetchSanity() throws IOException {
> final RocksDBWindowStoreSupplier supplier =
> new RocksDBWindowStoreSupplier<>(
> "window", 60 * 1000L * 2, 3,
> true, Serdes.String(), Serdes.String(),
> windowSize, true, Collections.emptyMap(), 
> false
> );
> final WindowStore store = supplier.get();
> store.init(context, store);
> try {
> store.put("a", "0001", 0);
> store.put("aa", "0002", 0);
> store.put("a", "0003", 1);
> store.put("aa", "0004", 1);
> store.put("a", "0005", 6);
> assertEquals(Utils.mkList("0001", "0003", "0005"), 
> toList(store.fetch("a", 0, Long.MAX_VALUE)));
> } finally {
> store.close();
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2688: KAFKA-4863: Querying window store may return unwan...

2017-03-15 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2688

KAFKA-4863: Querying window store may return unwanted keys. Backport to 
0.10.2



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka kafka-4863-10.2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2688.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2688


commit 5c6900f7e35ef3dc33be8fd750d6f0a0d700f458
Author: Damian Guy 
Date:   2017-03-15T12:05:07Z

backport to 0.10.2




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-15 Thread Arun Mathew (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926036#comment-15926036
 ] 

Arun Mathew edited comment on KAFKA-3514 at 3/15/17 12:15 PM:
--

Hi [~mjsax], [~mihbor] . We were building an audit trail for kafka based on 
kafka stream and encountered similar issue.

Our work around was to use a hybrid of event time and system time. During 
regular operation we use event time. But when we create the punctuation 
schedules (object representing the next punctuation time) we also record the 
system time at which the punctuation schedule was created. The punctuate code 
was modified to punctuate anyway, in case the interval specified in the 
punctuation schedule has elapsed in terms of system time (current time - time 
of punctuation schedule creation), a new punctuation schedule corresponding to 
the next expected punctuation time (current punctuation time + punctuation 
interval) is also created. 

In an earlier version of kafka the above logic sufficed as the mayBePunctuate 
was called as part of the polling for events (in the absence of events). But 
current version doesn't seem to call it so we had to patch that portion a bit 
too.

Please let me know your thoughts.


was (Author: arunmathew88):
Hi [~mjsax] [~mihbor] . We were building an audit trail for kafka based on 
kafka stream and encountered similar issue.

Our work around was to use a hybrid of event time and system time. During 
regular operation we use event time. But when we create the punctuation 
schedules (object representing the next punctuation time) we also record the 
system time at which the punctuation schedule was created. The punctuate code 
was modified to punctuate anyway, in case the interval specified in the 
punctuation schedule has elapsed in terms of system time (current time - time 
of punctuation schedule creation), a new punctuation schedule corresponding to 
the next expected punctuation time (current punctuation time + punctuation 
interval) is also created. 

In an earlier version of kafka the above logic sufficed as the mayBePunctuate 
was called as part of the polling for events (in the absence of events). But 
current version doesn't seem to call it so we had to patch that portion a bit 
too.

Please let me know your thoughts.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-15 Thread Arun Mathew (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926036#comment-15926036
 ] 

Arun Mathew commented on KAFKA-3514:


Hi [~mjsax] [~mihbor] . We were building an audit trail for kafka based on 
kafka stream and encountered similar issue.

Our work around was to use a hybrid of event time and system time. During 
regular operation we use event time. But when we create the punctuation 
schedules (object representing the next punctuation time) we also record the 
system time at which the punctuation schedule was created. The punctuate code 
was modified to punctuate anyway, in case the interval specified in the 
punctuation schedule has elapsed in terms of system time (current time - time 
of punctuation schedule creation), a new punctuation schedule corresponding to 
the next expected punctuation time (current punctuation time + punctuation 
interval) is also created. 

In an earlier version of kafka the above logic sufficed as the mayBePunctuate 
was called as part of the polling for events (in the absence of events). But 
current version doesn't seem to call it so we had to patch that portion a bit 
too.

Please let me know your thoughts.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-132: Augment KStream.print to allow extra parameters in the printed string

2017-03-15 Thread Marc Juchli
Dear Matthias,

The KIP is updated. I think it now contains all the information on that
page.

Marc

On Mon, Mar 13, 2017 at 9:37 PM Matthias J. Sax 
wrote:

> Marc,
>
> Thanks for the KIP.
>
> Can you please update the KIP in a way such that it is self contained.
> Right now, you link to all kind of other places making it hard to read
> the KIP.
>
> The KIP should be the "center of truth" -- if there is important
> information elsewhere, please c it into the KIP.
>
>
> Thanks a lot!
>
>
> -Matthias
>
>
>
> On 3/13/17 1:30 PM, Matthias J. Sax wrote:
> > Can you please add the KIP to this table:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals#KafkaImprovementProposals-KIPsunderdiscussion
> >
> > Thanks,
> >
> >  Matthias
> >
> >
> > On 3/13/17 8:08 AM, Marc Juchli wrote:
> >> Dear all,
> >>
> >> The following describes KIP-132, which I just created. See:
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-132+-+Augment+KStream.print+to+allow+extra+parameters+in+the+printed+string
> >>
> >> Motivation
> >>
> >> As for now, KStream#print leads to a predefined output where key and
> value are
> >> printed with comma separation.
> >> KAFKA-4830  suggests
> to
> >> extend print in a way that it takes KeyValueMapper as a parameter.
> >> This will allow a user to change outputs according to the users demand.
> >> Public Interfaces
> >>
> >> The affected interface is KStream, which needs to be extended with
> another
> >> overloaded version of print:
> >>
> >> void print(final Serde keySerde,
> >>final Serde valSerde,
> >>final String streamName,
> >>final KeyValueMapper mapper);
> >>
> >> Proposed Changes
> >>
> >> See pull request GH-2669 .
> >> This PR contains a discussion regarding KAFKA-4830
> >>  as well as
> KAFKA-4772
> >> .
> >>
> >> Compatibility, Deprecation, and Migration Plan
> >>
> >> The extension of print will not introduce compatibility issues – we can
> >> maintain the current output by keeping the current output format as a
> >> default (if mapper was not set):
> >>
> >> if(mapper == null) {
> >> printStream.println("[" + streamName + "]: " + keyToPrint + " , "
> >> + valueToPrint);
> >> } else {
> >> printStream.println("[" + streamName + "]: " +
> >> mapper.apply(keyToPrint, valueToPrint));
> >> }
> >>
> >>
> >>
> >> Kind regards,
> >> Marc
> >>
> >
>
>


Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-03-15 Thread Ismael Juma
Hi Dong,

Yes, that sounds good to me. I'd list option 2 first since that is safe
and, as you said, no worse than what happens today. The file approach is a
bit hacky as you said, so it may be a bit fragile. Not sure if we really
want to mention that. :)

About the note in KIP-112 versus adding the test in KIP-113, I think it
would make sense to add a short sentence stating that this scenario is
covered in KIP-113. People won't necessarily read both KIPs at the same
time and it's helpful to cross-reference when it makes sense.

Thanks for your work on this.

Ismael

On Tue, Mar 14, 2017 at 11:00 PM, Dong Lin  wrote:

> Hey Ismael,
>
> I get your concern that it is more likely for a disk to be slow, or exhibit
> other forms of non-fatal symptom, after some known fatal error. Then it is
> weird for user to start broker with the likely-problematic disk in the
> broker config. In that case, I think there are two things user can do:
>
> 1) Intentionally change the log directory in the config to point to a file.
> This is a bit hacky but it works well before we make more-appropriate
> long-term change in Kafka to handle this case.
> 2) Just don't start broker with bad log directories. Always fix disk before
> restarting the broker. This is a safe approach that is no worse than
> current practice.
>
> Would this address your concern if I specify the problem and the two
> solutions in the KIP?
>
> Thanks,
> Dong
>
> On Tue, Mar 14, 2017 at 3:29 PM, Dong Lin  wrote:
>
> > Hey Ismael,
> >
> > Thanks for the comment. Please see my reply below.
> >
> > On Tue, Mar 14, 2017 at 10:31 AM, Ismael Juma  wrote:
> >
> >> Thanks Dong. Comments inline.
> >>
> >> On Fri, Mar 10, 2017 at 6:25 PM, Dong Lin  wrote:
> >> >
> >> > I get your point. But I am not sure we should recommend user to simply
> >> > remove disk from the broker config. If user simply does this without
> >> > checking the utilization of good disks, replica on the bad disk will
> be
> >> > re-created on the good disk and may overload the good disks, causing
> >> > cascading failure.
> >> >
> >>
> >> Good point.
> >>
> >>
> >> >
> >> > I agree with you and Colin that slow disk may cause problem. However,
> >> > performance degradation due to slow disk this is an existing problem
> >> that
> >> > is not detected or handled by Kafka or KIP-112.
> >>
> >>
> >> I think an important difference is that a number of disk errors are
> >> currently fatal and won't be after KIP-112. So it introduces new
> scenarios
> >> (for example, bouncing a broker that is working fine although some disks
> >> have been marked bad).
> >>
> >
> > Hmm.. I am still trying to understand why KIP-112 creates new scenarios.
> > Slow disk is not considered fatal error and won't be caught by either
> > existing Kafka design or this KIP. If any disk is marked bad, it means
> > broker encounters IOException when accessing disk, most likely the broker
> > will encounter IOException again when accessing this disk and mark this
> > disk as bad after bounce. I guess you are talking about the case that a
> > disk is marked bad, broker is bounced, then the disk provides degraded
> > performance without being marked bad, right? But this seems to be an
> > existing problem we already have today with slow disk.
> >
> > Here are the possible scenarios with bad disk after broker bounce:
> >
> > 1) bad disk -> broker bounce -> good disk. This would be great.
> > 2) bad disk -> broker bounce -> slow disk. Slow disk is an existing
> > problem that is not addressed by Kafka today.
> > 3) bad disk -> broker bounce -> bad disk. This is handled by this KIP
> such
> > that only replicas on the bad disk become offline.
> >
> >
> >>
> >> > Detection and handling of
> >> > slow disk is a separate problem that needs to be addressed in a future
> >> KIP.
> >> > It is currently listed in the future work. Does this sound OK?
> >> >
> >>
> >> I'm OK with it being handled in the future. In the meantime, I was just
> >> hoping that we can make it clear to users about the potential issue of a
> >> disk marked as bad becoming good again after a bounce (which can be
> >> dangerous).
> >>
> >> The main benefit of creating the second topic after log directory goes
> >> > offline is that we can make sure the second topic is created on the
> good
> >> > log directory. I am not sure we can simply assume that the first topic
> >> will
> >> > always be created on the first log directory in the broker config and
> >> the
> >> > second topic will be created on the second log directory in the broker
> >> > config.
> >>
> >>
> >>
> >> > However, I can add this test in KIP-113 which allows user to
> >> > re-assign replica to specific log directory of a broker. Is this OK?
> >> >
> >>
> >> OK. Please add a note to KIP-112 about this as well (so that it's clear
> >> why
> >> we only do it in KIP-113).
> >>
> >
> > Sure. Instead of adding note to KIP-112, I have 

[jira] [Commented] (KAFKA-4888) offset 449883 is invalid, cause: Record is corrupt (stored crc = 2171407101, computed crc = 1371274824)

2017-03-15 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925851#comment-15925851
 ] 

Ismael Juma commented on KAFKA-4888:


[~chengc], is the Kafka version and other details the same as the one reported 
by Eduardo?

> offset 449883 is invalid, cause: Record is corrupt (stored crc = 2171407101, 
> computed crc = 1371274824)
> ---
>
> Key: KAFKA-4888
> URL: https://issues.apache.org/jira/browse/KAFKA-4888
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
> Environment: Release version:
> LSB Version:  
> :base-4.0-amd64:base-4.0-noarch:core-4.0-amd64:core-4.0-noarch:graphics-4.0-amd64:graphics-4.0-noarch:printing-4.0-amd64:printing-4.0-noarch
> Distributor ID:   RedHatEnterpriseServer
> Description:  Red Hat Enterprise Linux Server release 6.5 (Santiago)
> Release:  6.5
> Codename: Santiago
> Memory SO:
> Mem:15.577G total,   15.416G used,  164.508M free,   49.895M buffers
> Filesystem
> FilesystemSize  Used Avail Use% Mounted on
> /dev/mapper/appvg-lv_opt
>30G 1018M   28G   20% /opt/ngin
>Reporter: Eduardo da Silva Neto
> Attachments: issue_kafka
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> We created five kafka consumers that consume with one message at a time 
> max.poll.records = 1. After two days of intensive processing with the kafka 
> server file system with about 50% used the corrupted registry error occurred.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4888) offset 449883 is invalid, cause: Record is corrupt (stored crc = 2171407101, computed crc = 1371274824)

2017-03-15 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4888:
---
Fix Version/s: (was: 0.10.1.1)

> offset 449883 is invalid, cause: Record is corrupt (stored crc = 2171407101, 
> computed crc = 1371274824)
> ---
>
> Key: KAFKA-4888
> URL: https://issues.apache.org/jira/browse/KAFKA-4888
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
> Environment: Release version:
> LSB Version:  
> :base-4.0-amd64:base-4.0-noarch:core-4.0-amd64:core-4.0-noarch:graphics-4.0-amd64:graphics-4.0-noarch:printing-4.0-amd64:printing-4.0-noarch
> Distributor ID:   RedHatEnterpriseServer
> Description:  Red Hat Enterprise Linux Server release 6.5 (Santiago)
> Release:  6.5
> Codename: Santiago
> Memory SO:
> Mem:15.577G total,   15.416G used,  164.508M free,   49.895M buffers
> Filesystem
> FilesystemSize  Used Avail Use% Mounted on
> /dev/mapper/appvg-lv_opt
>30G 1018M   28G   20% /opt/ngin
>Reporter: Eduardo da Silva Neto
> Attachments: issue_kafka
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> We created five kafka consumers that consume with one message at a time 
> max.poll.records = 1. After two days of intensive processing with the kafka 
> server file system with about 50% used the corrupted registry error occurred.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4487) Tests should be run in Jenkins with INFO or DEBUG level

2017-03-15 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925832#comment-15925832
 ] 

Ismael Juma commented on KAFKA-4487:


[~enothereska], yeah, we'd have to check how much additional space is taken. 
One option would be to use an appender that sends anything below INFO level to 
/dev/null.

> Tests should be run in Jenkins with INFO or DEBUG level
> ---
>
> Key: KAFKA-4487
> URL: https://issues.apache.org/jira/browse/KAFKA-4487
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
> Fix For: 0.11.0.0
>
>
> KAFKA-4483 is an example of what can be missed by running them at ERROR 
> level. Worse than that would be subtle issues that would escape detection 
> altogether.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4643) Improve test coverage of StreamsKafkaClient

2017-03-15 Thread Damian Guy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925830#comment-15925830
 ] 

Damian Guy commented on KAFKA-4643:
---

[~adyachkov] it looks like the method has been removed since this was raised. 
i'll close the JIRA

> Improve test coverage of StreamsKafkaClient
> ---
>
> Key: KAFKA-4643
> URL: https://issues.apache.org/jira/browse/KAFKA-4643
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
> Fix For: 0.11.0.0
>
>
> Exception paths not tested.
> {{getTopicMetadata}} not tested



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-4643) Improve test coverage of StreamsKafkaClient

2017-03-15 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy resolved KAFKA-4643.
---
Resolution: Won't Fix

Method that wasn't covered has subsequently been removed

> Improve test coverage of StreamsKafkaClient
> ---
>
> Key: KAFKA-4643
> URL: https://issues.apache.org/jira/browse/KAFKA-4643
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
> Fix For: 0.11.0.0
>
>
> Exception paths not tested.
> {{getTopicMetadata}} not tested



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4885) processstreamwithcachedstatestore and other streams benchmarks fail occasionally

2017-03-15 Thread Damian Guy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925825#comment-15925825
 ] 

Damian Guy commented on KAFKA-4885:
---

[~guozhang]

1. We give users a chance to shutdown the whole instance via the 
UncaughtExceptionHandler, right? 
2. Perhaps this should be configurable? Personally in apps i write i'd prefer 
to fail fast as failing will usually raise alerts that will help get to the 
root cause of the problem earlier. Retrying forever may mean that such issues 
go unnoticed for long periods of time.

> processstreamwithcachedstatestore and other streams benchmarks fail 
> occasionally
> -
>
> Key: KAFKA-4885
> URL: https://issues.apache.org/jira/browse/KAFKA-4885
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
> Fix For: 0.11.0.0
>
>
> test_id:
> kafkatest.benchmarks.streams.streams_simple_benchmark_test.StreamsSimpleBenchmarkTest.test_simple_benchmark.test=processstreamwithcachedstatestore.scale=2
> status: FAIL
> run time:   14 minutes 58.069 seconds
> Streams Test process on ubuntu@worker5 took too long to exit
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py",
>  line 86, in test_simple_benchmark
> self.driver[num].wait()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/services/streams.py",
>  line 102, in wait
> self.wait_node(node, timeout_sec)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/services/streams.py",
>  line 106, in wait_node
> wait_until(lambda: not node.account.alive(pid), timeout_sec=timeout_sec, 
> err_msg="Streams Test process on " + str(node.account) + " took too long to 
> exit")
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py",
>  line 36, in wait_until
> raise TimeoutError(err_msg)
> TimeoutError: Streams Test process on ubuntu@worker5 took too long to exit
> The log contains several lines like:
> [2017-03-11 04:52:59,080] DEBUG Attempt to heartbeat failed for group 
> simple-benchmark-streams-with-storetrue since it is rebalancing. 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2017-03-11 04:53:01,987] DEBUG Sending Heartbeat request for group 
> simple-benchmark-streams-with-storetrue to coordinator worker10:9092 (id: 
> 2147483646 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2017-03-11 04:53:02,088] DEBUG Attempt to heartbeat failed for group 
> simple-benchmark-streams-with-storetrue since it is rebalancing. 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2017-03-11 04:53:04,995] DEBUG Sending Heartbeat request for group 
> simple-benchmark-streams-with-storetrue to coordinator worker10:9092 (id: 
> 2147483646 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> Other tests that fail the same way include:
> test_id:
> kafkatest.benchmarks.streams.streams_simple_benchmark_test.StreamsSimpleBenchmarkTest.test_simple_benchmark.test=count.scale=2
> test_id:
> kafkatest.benchmarks.streams.streams_simple_benchmark_test.StreamsSimpleBenchmarkTest.test_simple_benchmark.test=processstreamwithsink.scale=1
> test_id:
> kafkatest.tests.streams.streams_bounce_test.StreamsBounceTest.test_bounce



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4487) Tests should be run in Jenkins with INFO or DEBUG level

2017-03-15 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925757#comment-15925757
 ] 

Eno Thereska commented on KAFKA-4487:
-

There are cases when the disk gets full when running with DEBUG level. Just FYI.

> Tests should be run in Jenkins with INFO or DEBUG level
> ---
>
> Key: KAFKA-4487
> URL: https://issues.apache.org/jira/browse/KAFKA-4487
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
> Fix For: 0.11.0.0
>
>
> KAFKA-4483 is an example of what can be missed by running them at ERROR 
> level. Worse than that would be subtle issues that would escape detection 
> altogether.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4888) offset 449883 is invalid, cause: Record is corrupt (stored crc = 2171407101, computed crc = 1371274824)

2017-03-15 Thread Cheng Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925720#comment-15925720
 ] 

Cheng Chen commented on KAFKA-4888:
---

[~eduardo.s.neto] We saw the same problem, did you deploy our cluster on a 
public cloud or your own machines?

> offset 449883 is invalid, cause: Record is corrupt (stored crc = 2171407101, 
> computed crc = 1371274824)
> ---
>
> Key: KAFKA-4888
> URL: https://issues.apache.org/jira/browse/KAFKA-4888
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
> Environment: Release version:
> LSB Version:  
> :base-4.0-amd64:base-4.0-noarch:core-4.0-amd64:core-4.0-noarch:graphics-4.0-amd64:graphics-4.0-noarch:printing-4.0-amd64:printing-4.0-noarch
> Distributor ID:   RedHatEnterpriseServer
> Description:  Red Hat Enterprise Linux Server release 6.5 (Santiago)
> Release:  6.5
> Codename: Santiago
> Memory SO:
> Mem:15.577G total,   15.416G used,  164.508M free,   49.895M buffers
> Filesystem
> FilesystemSize  Used Avail Use% Mounted on
> /dev/mapper/appvg-lv_opt
>30G 1018M   28G   20% /opt/ngin
>Reporter: Eduardo da Silva Neto
> Fix For: 0.10.1.1
>
> Attachments: issue_kafka
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> We created five kafka consumers that consume with one message at a time 
> max.poll.records = 1. After two days of intensive processing with the kafka 
> server file system with about 50% used the corrupted registry error occurred.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-4900) Brokers stuck in controller re-election loop after failing to register metrics

2017-03-15 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925611#comment-15925611
 ] 

Onur Karaman edited comment on KAFKA-4900 at 3/15/17 6:44 AM:
--

For reference, I think the controller was changing every 3 seconds or so during 
the re-election loop.

We were able to stabilize the cluster after about two hours but haven't yet 
figured out the root cause.

We didn't know the cause, so we definitely didn't take the optimal sequence of 
steps to stabilize the cluster. Anyways, here's what we did:
# We figured deleting the /controller znode wouldn't make a difference since it 
was anyways changing every 3 seconds.
# We initially tried bouncing what we had incorrectly thought was the sole 
broker corresponding to the metric collision, but this didn't help. The 
re-election loop continued, and we later found out in the logs that there had 
been "connection-close-rate" metrics collisions tagged with other brokers.
# We then shifted traffic out of the bad cluster, killed all of the brokers, 
and restarted them all. Getting the cluster back up and recovering its 
unflushed segments took around 25 minutes.
# While the re-election loop had stopped, we still saw under-replicated 
partitions after the cluster was back up and later found that some of the 
fetchers haven't been started. I think at this point we just deleted the 
/controller znode to elect a new controller with the hopes of it broadcasting 
leadership/followership of partitions to the cluster so that the broker's 
fetchers can start up correctly. After this, the under-replicated partitions 
came back down to approximately zero. We think the few under-replicated 
partitions here and there are from uneven partition leadership distribution but 
this wasn't a big deal so we haven't attempted to fix the few remaining 
under-replicated partitions yet.


was (Author: onurkaraman):
For reference, I think the controller was changing every 3 seconds or so during 
the re-election loop.

We were able to stabilize the cluster after about two hours but haven't yet 
figured out the root cause.

We didn't know the cause, so we definitely didn't take the optimal sequence of 
steps to stabilize the cluster. Anyways, here's what we did:
# We figured deleting the /controller znode wouldn't make a difference since it 
was anyways changing every 3 seconds.
# We initially tried bouncing what we had incorrectly thought was the sole 
broker corresponding to the metric collision, but this didn't help. The 
re-election loop continued, and we later found out in the logs that there had 
been "connection-close-rate" metrics collisions tagged with other brokers.
# We then shifted traffic out of the bad cluster, killed all of the brokers, 
and restarted them all. Getting the cluster back up and recovering its 
unflushed segments took around 25 minutes.
# We still saw under-replicated partitions after the cluster was back up and 
later found that some of the fetchers haven't been started. I think at this 
point we just deleted the /controller znode to elect a new controller with the 
hopes of it broadcasting leadership/followership of partitions to the cluster 
so that the broker's fetchers can start up correctly. After this, the 
under-replicated partitions came back down to approximately zero. We think the 
few under-replicated partitions here and there are from uneven partition 
leadership distribution but this wasn't a big deal so we haven't attempted to 
fix the few remaining under-replicated partitions yet.

> Brokers stuck in controller re-election loop after failing to register metrics
> --
>
> Key: KAFKA-4900
> URL: https://issues.apache.org/jira/browse/KAFKA-4900
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, core
>Affects Versions: 0.10.1.1
>Reporter: Nick Travers
>
> We hit this today in one of out three node staging clusters. The exception 
> continues to occur on all three nodes.
> {code}
> 2017-03-15 02:17:30,677 ERROR 
> [ZkClient-EventThread-35-samsa-zkserver.stage.sjc1.square:26101/samsa] 
> server.ZookeeperLeaderElector - Error while electing or becoming leader on 
> broker 9
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=connection-close-rate, 
> group=controller-channel-metrics,description=Connections closed per second in 
> the window., tags={broker-id=10}]' already exists, can't register another one.
> at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:380)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:179)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:164)
> at 
> org.apache.kafka.common.network.Selector$SelectorMetrics.(Selector.java:617)
> at 

[jira] [Commented] (KAFKA-4900) Brokers stuck in controller re-election loop after failing to register metrics

2017-03-15 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925611#comment-15925611
 ] 

Onur Karaman commented on KAFKA-4900:
-

For reference, I think the controller was changing every 3 seconds or so during 
the re-election loop.

We were able to stabilize the cluster after about two hours but haven't yet 
figured out the root cause.

We didn't know the cause, so we definitely didn't take the optimal sequence of 
steps to stabilize the cluster. Anyways, here's what we did:
# We figured deleting the /controller znode wouldn't make a difference since it 
was anyways changing every 3 seconds.
# We initially tried bouncing what we had incorrectly thought was the sole 
broker corresponding to the metric collision, but this didn't help. The 
re-election loop continued, and we later found out in the logs that there had 
been "connection-close-rate" metrics collisions tagged with other brokers.
# We then shifted traffic out of the bad cluster, killed all of the brokers, 
and restarted them all. Getting the cluster back up and recovering its 
unflushed segments took around 25 minutes.
# We still saw under-replicated partitions after the cluster was back up and 
later found that some of the fetchers haven't been started. I think at this 
point we just deleted the /controller znode to elect a new controller with the 
hopes of it broadcasting leadership/followership of partitions to the cluster 
so that the broker's fetchers can start up correctly. After this, the 
under-replicated partitions came back down to approximately zero. We think the 
few under-replicated partitions here and there are from uneven partition 
leadership distribution but this wasn't a big deal so we haven't attempted to 
fix the few remaining under-replicated partitions yet.

> Brokers stuck in controller re-election loop after failing to register metrics
> --
>
> Key: KAFKA-4900
> URL: https://issues.apache.org/jira/browse/KAFKA-4900
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, core
>Affects Versions: 0.10.1.1
>Reporter: Nick Travers
>
> We hit this today in one of out three node staging clusters. The exception 
> continues to occur on all three nodes.
> {code}
> 2017-03-15 02:17:30,677 ERROR 
> [ZkClient-EventThread-35-samsa-zkserver.stage.sjc1.square:26101/samsa] 
> server.ZookeeperLeaderElector - Error while electing or becoming leader on 
> broker 9
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=connection-close-rate, 
> group=controller-channel-metrics,description=Connections closed per second in 
> the window., tags={broker-id=10}]' already exists, can't register another one.
> at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:380)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:179)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:164)
> at 
> org.apache.kafka.common.network.Selector$SelectorMetrics.(Selector.java:617)
> at org.apache.kafka.common.network.Selector.(Selector.java:138)
> at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:101)
> at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45)
> at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45)
> at scala.collection.immutable.Set$Set3.foreach(Set.scala:163)
> at 
> kafka.controller.ControllerChannelManager.(ControllerChannelManager.scala:45)
> at 
> kafka.controller.KafkaController.startChannelManager(KafkaController.scala:814)
> at 
> kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:742)
> at 
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:334)
> at 
> kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:167)
> at 
> kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141)
> at