[jira] [Created] (KAFKA-8634) Update ZooKeeper to 3.5.5

2019-07-08 Thread Ismael Juma (JIRA)
Ismael Juma created KAFKA-8634:
--

 Summary: Update ZooKeeper to 3.5.5
 Key: KAFKA-8634
 URL: https://issues.apache.org/jira/browse/KAFKA-8634
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma
Assignee: Ismael Juma


ZooKeeper 3.5.5 is the first stable release in the 3.5.x series. The key new 
feature in ZK 3.5.x is TLS support.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7178) Is kafka compatible with zookeeper 3.5.x ?

2019-07-08 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-7178.

Resolution: Duplicate

KAFKA-8634 tracks updating to ZK 3.5.x.

> Is kafka compatible with zookeeper 3.5.x ?
> --
>
> Key: KAFKA-7178
> URL: https://issues.apache.org/jira/browse/KAFKA-7178
> Project: Kafka
>  Issue Type: Improvement
>  Components: zkclient
>Reporter: hackerwin7
>Priority: Major
>
> Hi, all
> I want to know is kafka versions (0.8.x, 0.9.x, 0.10.x 0.11.x 1.x) compatible 
> with zookeeper 3.5.x with dynamic reconfiguration feature?
> some refs on here: 
> https://serverfault.com/questions/854650/kafka-compatible-with-zookeeper-3-5-feature-rebalancing-client-connections



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8635) Unnecessary wait when looking up coordinator before transactional request

2019-07-08 Thread Denis Washington (JIRA)
Denis Washington created KAFKA-8635:
---

 Summary: Unnecessary wait when looking up coordinator before 
transactional request
 Key: KAFKA-8635
 URL: https://issues.apache.org/jira/browse/KAFKA-8635
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.2.1, 2.3.0
Reporter: Denis Washington


In our Kafka Streams applications (with EOS enabled), we were seeing mysterious 
long delays between records being produced by a stream task and the same 
records being consumed by the next task. These delays turned out to always be 
around {{retry.backoff.ms}} long; reducing that value reduced the delays by 
about the same amount.

After digging further, I pinned down the problem to the following lines in 
{{org.apache.kafka.clients.producer.internals.Sender#runOnce}}:

{{} else if (transactionManager.hasInFlightTransactionalRequest() || 
maybeSendTransactionalRequest()) {}}
{{  // as long as there are outstanding transactional requests, we simply 
wait for them to return}}
{{ client.poll(retryBackoffMs, time.milliseconds());}}
{{ return;}}
{{ }}}

This code seems to assume that, if {{maybeSendTransactionalRequest}} returns 
true, a transactional request has been sent out that should be waited for. 
However, this is not true if the request requires a coordinator lookup:

{{if (nextRequestHandler.needsCoordinator()) {}}
{{    targetNode = 
transactionManager.coordinator(nextRequestHandler.coordinatorType());}}
{{    if (targetNode == null) {}}
{{         transactionManager.lookupCoordinator(nextRequestHandler); 
{{     break;}}
{{    }}}
{{    ...}}

{{lookupCoordinator()}} does not actually send anything, but just enqueues a 
coordinator lookup request for the {{Sender}}'s next run loop iteration. 
{{maybeSendTransactionalRequest}} still returns true, though (the {{break}} 
jumps to a {{return true}} at the end of the method), leading the {{Sender}} to 
needlessly wait via {{client.poll()}} although there is actually no request 
in-flight.

I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if 
it merely enqueues the coordinator lookup instead of actually sending anything. 
But I'm not sure, hence the bug report instead of a pull request.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-455 Create an Admin API for Replica Reassignments

2019-07-08 Thread Stanislav Kozlovski
Hey Colin,

I've got A couple more minor questions:

AlterPartitionAssignmentsRequest: Should we make the `Topics` and
`Partitions` field nullable as well, so we can appropriately cancel
pending reassignments at all levels of granularity? I think this will
be consistent with the rest of the request and should not be too hard
to implement.

>From the API spec, I understand we want to never fail the whole
`AlterPartitionAssignmentsRequest` if one partition is given an
invalid assignment. This is consistent with the current behavior from
the zNode.
Do we want to add more detail to the `AlterPartitionReassignmentsResponse`?
Currently, we have:
```
 { "name": "Responses", "type": "[]ReassignablePartitionResponse",
"versions": "0+",
  "about": "The responses to partitions to reassign.", "fields": [
  { "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code." },
  { "name": "ErrorString", "type": "string", "versions": "0+",
"nullableVersions": "0+",
"about": "The error string, or null if there was no error." }
  ]
}
```
I was thinking we should add fields to denote topic/partitions:
```
{ "name": "Responses", "type": "[]ReassignableTopicResponse", "versions": "0+",
  "about": "The responses to topics to reassign.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+",
"entityType": "topicName",
  "about": "The topic name" },
{ "name": "Partitions", "type":
"[]ReassignablePartitionResponse", "versions": "0+",
  "about": "The responses to partitions to reassign.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
  "about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
  "about": "The error code." },
{ "name": "ErrorString", "type": "string", "versions":
"0+", "nullableVersions": "0+",
  "about": "The error string, or null if there was no error." }
  ]
}
  ]
}
```
to better structure the reassigned partitions and their particular failures.


For the `ListPartitionReassignmentsRequest`, we only have a top-level
error, meaning we would fail the whole request if even one partition
ID or topic name is invalid. Is it worth adding a more fine-grained
error level? I assume it’s not worth it since we would have a
bigger-in-size response and it should be pretty rare to provide an
invalid topic-name or partition index.


To continue the discussion:

> I thought about this a bit more, and I think the easiest way forward is just 
> to make the shell script incremental.

+1. This was going to be my suggestion as well. I believe this is more
intuitive to the user and easier on our side

> I guess we could have such a flag, but it can be implemented on the 
> command-line side, not the broker side.

Yes, I propose a "--cancel-all" top-level flag to the tool

> It seems like a fairly unlikely case.  I don't think I feel that strongly 
> about what we should do in this scenario.

It truly is a very unlikely case as it means somebody is using both
the old and new APIs, which should be rare in practice. We could keep
the `reassign_partitions` up to date with any new API replica
reassignments which were present in the node to begin with, but that
might incur a performance hit which may not be worth the tradeoff,
given the scenario it guards against is very unlikely.
I would suggest we consciously make the design decision that we accept
the possible inconsistency in this case in favor of better performance
and simpler code. What do people think about this?

> The more likely case is that a znode-initiated reassignment for a partition 
> is partly done and the controller failover happens.  That case will be 
> handled, though, by the same logic which makes a new reassignment to the 
> current active assignment a no-op.

Could you expand on this? By partly-done, are you referring to the
process of updating the `/partitions/${part}/state` zNodes in
accordance to the `reassign_partitions` zNode?
If so, yes that will be handle, but a new reassignment to the current
active assignment won't be a no-op, I think. My understanding was that
when the controller starts up, it will read the `reassign_partitions`
znode and apply it to the `/states` znode. Therefore, a new
reassignment to the current active assignment will work (it won't be a
no-op) if the controller gets restarted. Is this correct?

> I agree that there is kind of an awkward edge case right when a targetReplica 
> enters the ISR.  Since the replica set and the ISR reside in two separate 
> znodes, they can't be updated atomically (well, maybe they could, but I don't 
> think we will in practice).  When the controller becomes aware that the ISR 
> has changed for the reassigning partition, it will remove the relevant 
> partition from targetReplicas and add it to replicas.
> This is another ZK write, of course.  I don't 

Re: [DISCUSS] KIP-435: Internal Partition Reassignment Batching

2019-07-08 Thread Viktor Somogyi-Vass
Hey Stanislav,

Thanks for the thorough look at the KIP! :)

> Let me first explicitly confirm my understanding of the configs and the
> algorithm:
> * reassignment.parallel.replica.count - the maximum total number of
> replicas that we can move at once, *per partition*
> * reassignment.parallel.partition.count - the maximum number of partitions
> we can move at once
> * reassignment.parallel.leader.movements - the maximum number of leader
> movements we can have at once

Yes.

> As far as I currently understand it, your proposed algorithm will
naturally
> prioritize leader movement first. e.g if
> reassignment.parallel.replica.count=1 and
>
reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
> we will always move one, the first possible, replica in the replica set
> (which will be the leader if part of the excess replica set (ER)).
> Am I correct in saying that?

Yes.

> 1. Does it make sense to add `max` somewhere in the configs' names?

If you imply that it's not always an exact number (because the last batch
could be smaller) than I think it's a good idea. I don't mind this naming
or having max in it either.

> 2. How does this KIP play along with KIP-455's notion of multiple
> rebalances - do the configs apply on a
> single AlterPartitionAssignmentsRequest or are they global?

I think it is intuitive if there are on a global level and the user would
have the option to set KIP-435's settings to Int.MAX and thus eliminate any
batching and submit their own batches through
AlterPartitionAssignmentsRequest. If we applied them on every batch then we
couldn't really guarantee any limits as the user would be able to get
around it with submitting lots of reassignments. By default though we set
the batching limits to Int.MAX so effectively they're disabled.
Also if I understand correctly, AlterPartitionAssignmentsRequest would be a
partition level batching, isn't it? So if you submit 10 partitions at once
then they'll all be started by the controller immediately as per my
understanding.

> 3. Unless I've missed it, the algorithm does not take into account
> `reassignment.parallel.leader.movements`

Yea the reassignment step calculation doesn't contain that because it
describes only one partition's reassignment step calculation. We simply
fill our reassignment batch with as many leader movements as we can and
then fill the rest with reassignments which don't require leader movement
if we have space. I'll create a pseudo code block on the KIP for this.

> 4. The KIP says that the order of the input has some control over how the
> batches are created - i.e it's deterministic. What would the batches of
the
> following reassignment look like:
> reassignment.parallel.replica.count=1
> reassignment.parallel.partition.count=MAX_INT
> reassignment.parallel.leader.movements=1
> partitionA - (0,1,2) -> (3, 4, 5)
> partitionB - (0,1,2) -> (3,4,5)
> partitionC - (0,1,2) -> (3, 4, 5)
> From my understanding, we would start with A(0->3), B(1->4) and C(1->4).
Is
> that correct? Would the second step then continue with B(0->3)?
>
> If the configurations are global, I can imagine we will have a bit more
> trouble in preserving the expected ordering, especially across controller
> failovers -- but I'll avoid speculating until you confirm the scope of the
> config.

I've raised the ordering problem on the discussion of KIP-455 in a bit
different form and as I remember the verdict there was that we shouldn't
expose ordering as an API. It might not be easy as you say and there might
be much better strategies to follow (like disk or network utilization
goals). Therefore I'll remove this section from the KIP. (Otherwise yes, I
would have thought of the same behavior what you described.)
Since #3 also was about the pseudo code, it would be something like this:
Config:
reassignment.parallel.replica.count=R
reassignment.parallel.partition.count=P
reassignment.parallel.leader.movements=L
Code:
val batchSize = max(L,P)
// split the individual partition reassignments whether they involve leader
movement or not
val partitionMovements =
calculateReassignmentStepsFor(partitionsToReassign).partition(partitionReassignment.involvesLeaderReassignment)
// fill the batch with as much leader movements as possible and take the
rest from other reassignments
val currentBatch = if (partitionMovements.leaderMovements.size < batchSize)
  partitionMovements.leaderMovements ++
partitionsToReassign.otherPartitionMovements.take(batchSize -
partitionMovements.leaderMovements.size)
else
 partitionMovements.leaderMovements.take(batchSize)
executeReassignmentOnBatch(currentBatch)

> 5. Regarding the new behavior of electing the new preferred leader in the
> "first step" of the reassignment - does this obey the
> `auto.leader.rebalance.enable` config?
> If not, I have concerns regarding how backwards compatible this might be -
> e.g imagine a user does a huge reassignment (as they have always done) and
> suddenly a huge leader shift happe

[VOTE] KIP-484: Expose metrics for group and transaction metadata loading duration

2019-07-08 Thread Anastasia Vela
Hi,

I'd like to start a vote thread for KIP-484. This addition will provide
visibility for how long partitions take to load in order to understand some
inactivity seen in the consumer group.

More info at
https://cwiki.apache.org/confluence/display/KAFKA/KIP-484%3A+Expose+metrics+for+group+and+transaction+metadata+loading+duration

Best,
Anastasia


[jira] [Created] (KAFKA-8636) Document behavior change for static members with `max.poll.interval.ms`

2019-07-08 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8636:
--

 Summary: Document behavior change for static members with 
`max.poll.interval.ms`
 Key: KAFKA-8636
 URL: https://issues.apache.org/jira/browse/KAFKA-8636
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen
Assignee: Boyang Chen


The static member's behavior with max poll interval is potentially different 
from current config documents. For example, if session timeout >> max poll 
interval, static members will not leave the group until session timeout.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Broker Startup Time (Prolonged URPs)

2019-07-08 Thread Jamie
Hi All,
I'm currently doing some work into investigating why our Kafka deployment has a 
prolonged period of URPs after a restart 30-45 minutes. At the moment we use a 
replication factor of 2 due to the cost associated with having another copy of 
each partition, however, we do plan to move to replication factor of 3 in 
future. We are using version 1.1.0. On the producer side we use 'acks=all' and 
'min.insync.replicas=1' which I believe means that while URPs exist then we 
will only have one copy of the data (single point of failure) until the 
replicas can start fetching from the leader again and expand the ISR list. 
I've done some digging into the logs and the Kafka source code and have some 
questions around the behavior of the replica threads after start up. I should 
state that this prolonged period of URPs only occurs when we have >3000-4000 
partitions per broker. At the moment we perform a rolling restart each weekend 
because we need the broker to get new Kerberos tickets. During the shutdown of 
the broker the leader partitions are moved to the current replica partition 
which means when the broker starts again all of it's partitions are replicas. 
What I noticed in the logs was that after the server had completed startup was 
around 30-45 minutes of the messages similar to the below in the logs:

    INFO [Log partition=topic-100-6, dir=/d/d4/kafka-data/kafka-data] 
Truncating to offset 170 (kafka.log.Log)    INFO [Log partition=topic-100-6, 
dir=/d/d4/kafka-data/kafka-data] Loading producer state from offset 170 with 
message format version 2 (kafka.log.Log)
My understanding is that the truncation occurs because in order to have 
consistency the new replica has to truncate to the highest offset which exists 
in both replicas (the High watermark?) so that there aren't messages in one 
replica and not in another, is this correct? I'm not exactly sure why the 
producer state needs to be loaded, what information does this contain? The 
current offset which the producer has received an acknowledgement for? I 
assumed this is used is 'enable.idempotence' is set to true? 
It seems as if these two steps need to be performed for each replica partition 
before the replica threads can start fetching records from the leaders (and 
bring the replica back in sync). Is this correct, and if so, is there anyway to 
reduce the time taken to perform these steps? 
I did increase the number of replica fetcher threads in my testing environment 
and it did seem to shorten the time taken, I assume the replicas on the broker 
are assigned in a round robin way to the replica fetcher threads so if you have 
more replica fetcher threads then each thread is responsible for setting up 
less partitions? 
Any help with the above would be greatly appreciated. 
Many Thanks, 
Jamie 

[jira] [Created] (KAFKA-8637) WriteBatch objects leak off-heap memory

2019-07-08 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8637:
--

 Summary: WriteBatch objects leak off-heap memory
 Key: KAFKA-8637
 URL: https://issues.apache.org/jira/browse/KAFKA-8637
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.1.0
Reporter: Sophie Blee-Goldman


In 2.1 we did some refactoring that led to the WriteBatch objects in 
#restoreAllInternal being created in a separate method, rather than in a 
try-with-resources statement. This causes a memory leak as the WriteBatchs are 
no longer closed automatically



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Possible implementation for KAFKA-560

2019-07-08 Thread Harsha
Hi Carlos,
   This is a really useful feature and we would like to have it as 
well. I think high_watermark == log_start_offset is a good starting point to 
consider but we may also have a case where the topic is empty and the clients 
producing it may be offline so we might end up garbage collecting which is 
still active.  Having a configurable time period when an empty topic can be 
deleted will help in this case. Also, we should check if there are any 
consumers still reading from topics etc.. 
  It will be good to have a KIP around this and add some edge cases 
handling.

Thanks,
Harsha


On Sun, Jun 23, 2019, at 9:40 PM, Carlos Manuel Duclos-Vergara wrote:
> Hi,
> Thanks for the answer. Looking at high water mark, then the logic would be
> to flag the partitions that have
> 
> high_watermark == log_start_offset
> 
> In addition, I'm thinking that having the leader fulfill that criteria is
> enough to flag a partition, maybe check the replicas only if requested by
> the user.
> 
> 
> fre. 21. jun. 2019, 23:35 skrev Colin McCabe :
> 
> > I don't think this requires a change in the protocol.  It seems like you
> > should be able to use the high water mark to figure something out here?
> >
> > best,
> > Colin
> >
> >
> > On Fri, Jun 21, 2019, at 04:56, Carlos Manuel Duclos-Vergara wrote:
> > > Hi,
> > >
> > > This is an ancient task, but I feel it is still current today (specially
> > > since as somebody that deals with a Kafka cluster I know that this
> > happens
> > > more often than not).
> > >
> > > The task is about garbage collection of topics in a sort of automated
> > way.
> > > After some consideration I started a prototype implementation based on a
> > > manual process:
> > >
> > > 1. Using the cli, I can use the --describe-topic to get a list of topics
> > > that have size 0
> > > 2. Massage that list into something that can be then fed into the cli and
> > > remove the topics that have size 0.
> > >
> > > The guiding principle here is the assumption that abandoned topics will
> > > eventually have size 0, because all records will expire. This is not true
> > > for all topics, but it covers a large portion of them and having
> > something
> > > like this would help admins to find "suspicious" topics at least.
> > >
> > > I started implementing this change and I realized that it would require a
> > > change in the protocol, because the sizes are never sent over the wire.
> > > Funny enough we collect the sizes of the log files, but we do not send
> > them.
> > >
> > > I think this kind of changes will require a KIP, but I wanted to ask what
> > > others think about this.
> > >
> > > The in-progress implementation of this can be found here:
> > >
> > https://github.com/carlosduclos/kafka/commit/0dffe5e131c3bd32b77f56b9be8eded89a96df54
> > >
> > > Comments?
> > >
> > > --
> > > Carlos Manuel Duclos Vergara
> > > Backend Software Developer
> > >
> >
>


Re: Possible implementation for KAFKA-560

2019-07-08 Thread Gwen Shapira
I love it and also have few use-cases.

Agree with Harsha that we need a KIP. To cover edge-cases and also
clearly define the expected behavior, whether it will be implemented
in the admin client or in a tool, etc.

Gwen

On Mon, Jul 8, 2019 at 1:52 PM Harsha  wrote:
>
> Hi Carlos,
>This is a really useful feature and we would like to have it as 
> well. I think high_watermark == log_start_offset is a good starting point to 
> consider but we may also have a case where the topic is empty and the clients 
> producing it may be offline so we might end up garbage collecting which is 
> still active.  Having a configurable time period when an empty topic can be 
> deleted will help in this case. Also, we should check if there are any 
> consumers still reading from topics etc..
>   It will be good to have a KIP around this and add some edge cases 
> handling.
>
> Thanks,
> Harsha
>
>
> On Sun, Jun 23, 2019, at 9:40 PM, Carlos Manuel Duclos-Vergara wrote:
> > Hi,
> > Thanks for the answer. Looking at high water mark, then the logic would be
> > to flag the partitions that have
> >
> > high_watermark == log_start_offset
> >
> > In addition, I'm thinking that having the leader fulfill that criteria is
> > enough to flag a partition, maybe check the replicas only if requested by
> > the user.
> >
> >
> > fre. 21. jun. 2019, 23:35 skrev Colin McCabe :
> >
> > > I don't think this requires a change in the protocol.  It seems like you
> > > should be able to use the high water mark to figure something out here?
> > >
> > > best,
> > > Colin
> > >
> > >
> > > On Fri, Jun 21, 2019, at 04:56, Carlos Manuel Duclos-Vergara wrote:
> > > > Hi,
> > > >
> > > > This is an ancient task, but I feel it is still current today (specially
> > > > since as somebody that deals with a Kafka cluster I know that this
> > > happens
> > > > more often than not).
> > > >
> > > > The task is about garbage collection of topics in a sort of automated
> > > way.
> > > > After some consideration I started a prototype implementation based on a
> > > > manual process:
> > > >
> > > > 1. Using the cli, I can use the --describe-topic to get a list of topics
> > > > that have size 0
> > > > 2. Massage that list into something that can be then fed into the cli 
> > > > and
> > > > remove the topics that have size 0.
> > > >
> > > > The guiding principle here is the assumption that abandoned topics will
> > > > eventually have size 0, because all records will expire. This is not 
> > > > true
> > > > for all topics, but it covers a large portion of them and having
> > > something
> > > > like this would help admins to find "suspicious" topics at least.
> > > >
> > > > I started implementing this change and I realized that it would require 
> > > > a
> > > > change in the protocol, because the sizes are never sent over the wire.
> > > > Funny enough we collect the sizes of the log files, but we do not send
> > > them.
> > > >
> > > > I think this kind of changes will require a KIP, but I wanted to ask 
> > > > what
> > > > others think about this.
> > > >
> > > > The in-progress implementation of this can be found here:
> > > >
> > > https://github.com/carlosduclos/kafka/commit/0dffe5e131c3bd32b77f56b9be8eded89a96df54
> > > >
> > > > Comments?
> > > >
> > > > --
> > > > Carlos Manuel Duclos Vergara
> > > > Backend Software Developer
> > > >
> > >
> >



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


Kafka enable.auto.commit to false for executor even when its set to true

2019-07-08 Thread Sarkar, Masud
Hi Team,

Even when I am setting the enable.auto.commit to true explicitly, its getting 
set to false during runtime. Its shows the below logs:

19/07/08 10:57:16 WARN KafkaUtils: overriding enable.auto.commit to false for 
executor
19/07/08 10:57:16 WARN KafkaUtils: overriding auto.offset.reset to none for 
executor
19/07/08 10:57:16 WARN KafkaUtils: overriding executor group.id to 
spark-executor-spark-executor-SERP_hpit-ifsl_group
19/07/08 10:57:16 WARN KafkaUtils: overriding receive.buffer.bytes to 65536 see 
KAFKA-3135

Could you please help me debugging the issue.

Thanks,
Masud Sarkar

This message (including any attachments) contains confidential information 
intended for a specific individual and purpose, and is protected by law. If you 
are not the intended recipient, you should delete this message and any 
disclosure, copying, or distribution of this message, or the taking of any 
action based on it, by you is strictly prohibited.

Deloitte refers to a Deloitte member firm, one of its related entities, or 
Deloitte Touche Tohmatsu Limited ("DTTL"). Each Deloitte member firm is a 
separate legal entity and a member of DTTL. DTTL does not provide services to 
clients. Please see www.deloitte.com/about to learn more.

v.E.1


Re: PR review

2019-07-08 Thread M. Manna
Hello,

A few requests have been sent already. Could this please be reviewed ? Our
business implementation is holding due to this change.



On Thu, 4 Jul 2019 at 13:33, M. Manna  wrote:

> https://github.com/apache/kafka/pull/6771
>
> Could this be reviewed please ?
>
> On Wed, 3 Jul 2019 at 11:35, M. Manna  wrote:
>
>> https://github.com/apache/kafka/pull/6771
>>
>> Bouncing both users and dev to get some activity going. We are waiting
>> for a while to get this KIP pr merged.
>>
>> Could someone please review?
>>
>> Thanks,
>>
>> On Sun, 30 Jun 2019 at 08:59, M. Manna  wrote:
>>
>>> https://github.com/apache/kafka/pull/6771
>>>
>>> Hello,
>>>
>>> Could the above PR can be reviewed? This has been waiting for a long
>>> time.
>>>
>>> Just to mention, the package name should have "internal". Round-robin
>>> partitioning should have been supported without/without a key from the
>>> beginning. It provides user a guaranteed round-robin partitioning without
>>> having to regard for key values (e.g. null/not null). From our business
>>> side, this is a Kafka internal logic. Hence, the placement inside
>>> "internal" package.
>>>
>>> Thanks,
>>>
>>


[jira] [Resolved] (KAFKA-6771) Make specifying partitions in RoundTripWorkload, ProduceBench more flexible

2019-07-08 Thread Colin P. McCabe (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin P. McCabe resolved KAFKA-6771.

Resolution: Fixed

> Make specifying partitions in RoundTripWorkload, ProduceBench more flexible
> ---
>
> Key: KAFKA-6771
> URL: https://issues.apache.org/jira/browse/KAFKA-6771
> Project: Kafka
>  Issue Type: Improvement
>  Components: system tests
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Major
>
> Make specifying partitions in RoundTripWorkload, ProduceBench more flexible



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8638) Preferred Leader Blacklist (deprioritized list)

2019-07-08 Thread GEORGE LI (JIRA)
GEORGE LI created KAFKA-8638:


 Summary: Preferred Leader Blacklist (deprioritized list)
 Key: KAFKA-8638
 URL: https://issues.apache.org/jira/browse/KAFKA-8638
 Project: Kafka
  Issue Type: Improvement
  Components: config, controller, core
Affects Versions: 2.2.1, 2.3.0, 1.1.1
Reporter: GEORGE LI
Assignee: GEORGE LI


Currently, the kafka preferred leader election will pick the broker_id in the 
topic/partition replica assignments in a priority order when the broker is in 
ISR. The preferred leader is the broker id in the first position of replica. 
There are use-cases that, even the first broker in the replica assignment is in 
ISR, there is a need for it to be moved to the end of ordering (lowest 
priority) when deciding leadership during  preferred leader election. 

Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred 
leader.  When preferred leadership is run, it will pick 1 as the leader if it's 
ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, then pick 
3 as the leader. There are use cases that, even 1 is in ISR, we would like it 
to be moved to the end of ordering (lowest priority) when deciding leadership 
during preferred leader election.   Below is a list of use cases:

# If broker_id 1 is a swapped failed host and brought up with last segments or 
latest offset without historical data (There is another effort on this), it's 
better for it to not serve leadership till it's caught-up.

# The cross-data center cluster has AWS instances which have less computing 
power than the on-prem bare metal machines.  We could put the AWS broker_ids in 
Preferred Leader Blacklist, so on-prem brokers can be elected leaders, without 
changing the reassignments ordering of the replicas. 

# If the broker_id 1 is constantly losing leadership after some time: 
"Flapping". we would want to exclude 1 to be a leader unless all other brokers 
of this topic/partition are offline.  The “Flapping” effect was seen in the 
past when 2 or more brokers were bad, when they lost leadership 
constantly/quickly, the sets of partition replicas they belong to will see 
leadership constantly changing.  The ultimate solution is to swap these bad 
hosts.  But for quick mitigation, we can also put the bad hosts in the 
Preferred Leader Blacklist to move the priority of its being elected as leaders 
to the lowest. 

#  If the controller is busy serving an extra load of metadata requests and 
other tasks. we would like to put the controller's leaders to other brokers to 
lower its CPU load. currently bouncing to lose leadership would not work for 
Controller, because after the bounce, the controller fails over to another 
broker.

# Avoid bouncing broker in order to lose its leadership: it would be good if we 
have a way to specify which broker should be excluded from serving 
traffic/leadership (without changing the replica assignment ordering by 
reassignments, even though that's quick), and run preferred leader election.  A 
bouncing broker will cause temporary URP, and sometimes other issues.  Also a 
bouncing of broker (e.g. broker_id 1) can temporarily lose all its leadership, 
but if another broker (e.g. broker_id 2) fails or gets bounced, some of its 
leaderships will likely failover to broker_id 1 on a replica with 3 brokers.  
If broker_id 1 is in the blacklist, then in such a scenario even broker_id 2 
offline,  the 3rd broker can take leadership. 


The current work-around of the above is to change the topic/partition's replica 
reassignments to move the broker_id 1 from the first position to the last 
position and run preferred leader election. e.g. (1, 2, 3) => (2, 3, 1). This 
changes the replica reassignments, and we need to keep track of the original 
one and restore if things change (e.g. controller fails over to another broker, 
the swapped empty broker caught up). That’s a rather tedious task.
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-07-08 Thread Boyang Chen
Hey Guozhang,

I will correct my statement from last email. I don't think the
read_committed (3.a) is necessary to be added to the OffsetFetch request,
as if we are using EOS application, the underlying consumers within the
group should always back off when there is pending offsets.

Let me know if you think this is correct.

On Tue, Jul 2, 2019 at 3:21 PM Boyang Chen 
wrote:

> Thank you Guozhang for the questions, inline answers are below.
>
> On Tue, Jul 2, 2019 at 3:14 PM Boyang Chen 
> wrote:
>
>> Hey all,
>>
>> I have done a fundamental polish of KIP-447
>> 
>>  and
>> written a design doc
>> 
>>  depicting
>> internal changes. We stripped off many implementation details from the KIP,
>> and simplified the public changes by a lot. For reviewers, it is highly
>> recommended to fully understand EOS design in KIP-98 and read its
>> corresponding design doc
>> 
>>  if
>> you haven't done so already.
>>
>> Let me know if you found anything confusing around the KIP or the design.
>> Would be happy to discuss in depth.
>>
>> Best,
>> Boyang
>>
>>
>>
>>
>>
>> On Wed, Jun 26, 2019 at 11:00 AM Guozhang Wang 
>> wrote:
>>
>>> 2. The reason we did not expose generation.id from KafkaConsumer public
>>> APIs directly is to abstract this notion from users (since it is an
>>> implementation detail of the rebalance protocol itself, e.g. if user
>>> calls
>>> consumer.assign() they do not need to invoke ConsumerCoordinator and no
>>> need to be aware of generation.id at all).
>>>
>>> On the other hand, with the current proposal the txn.coordiantor did not
>>> know about the latest generation from the source-of-truth
>>> group.coordinator; instead, it will only bump up the generation from the
>>> producer's InitProducerIdRequest only.
>>>
>>> The key here is that GroupCoordinator, when handling
>>> `InitProducerIdRequest
>>>
>> In the new design, we just pass the entire consumer instance into the
> producer through
> #initTransaction, so no public API will be created.
>
>> 3. I agree that if we rely on the group coordinator to block on returning
>>> offset-fetch-response if read-committed is enabled, then we do not need
>>> to
>>> store partition assignment on txn coordinator and therefore it's better
>>> to
>>> still decouple them. For that case we still need to update the KIP wiki
>>> page that includes:
>>>
>>> 3.a. Augment OffsetFetchRequest with the ISOLATION_LEVEL as well.
>>> 3.b. Add new error code in OffsetFetchResponse to let client backoff and
>>> retry if there are pending txns including the interested partitions.
>>> 3.c. Also in the worst case we would let the client be blocked for the
>>> txn.timeout period, and for that rationale we may need to consider
>>> reducing
>>> our default txn.timeout value as well.
>>>
>>> Addressed 3.b and 3.c, will do 3.a.
>
>> 4. According to Colin it seems we do not need to create another KIP and we
>>> can just complete it as part of KIP-117 / KAFKA-5214; and we need to do
>>> some cleanup to have BrokerApiVersion exposed from AdminClient (@Colin
>>> please let use know if you have any concerns exposing it).
>>>
>> I think we no longer need to rely on api version for initialization,
> since we will be using the upgrade.from config anyway.
>
>>
>>> Guozhang
>>>
>>>
>>> On Tue, Jun 25, 2019 at 6:43 PM Jason Gustafson 
>>> wrote:
>>>
>>> > For reference, we have BrokerApiVersionCommand already as a public
>>> > interface. We have a bit of tech debt at the moment because it uses a
>>> > custom AdminClient. It would be nice to clean that up. In general, I
>>> think
>>> > it is reasonable to expose from AdminClient. It can be used by
>>> management
>>> > tools to inspect running Kafka versions for example.
>>> >
>>> > -Jason
>>> >
>>> > On Tue, Jun 25, 2019 at 4:37 PM Boyang Chen <
>>> reluctanthero...@gmail.com>
>>> > wrote:
>>> >
>>> > > Thank you for the context Colin. The groupId was indeed a copy-paste
>>> > error.
>>> > > Our use case here for 447 is (Quoted from Guozhang):
>>> > > '''
>>> > > I think if we can do something else to
>>> > > avoid this config though, for example we can use the embedded
>>> AdminClient
>>> > > to send the APIVersion request upon starting up, and based on the
>>> > returned
>>> > > value decides whether to go to the old code path or the new behavior.
>>> > > '''
>>> > > The benefit we get is to avoid adding a new configuration to make a
>>> > > decision simply base on broker version. If you have concerns with
>>> > exposing
>>> > > ApiVersion for client, we could
>>> > > try to think of alternative solutions too.
>>> > >
>>> > > Boyang
>>> > >
>>> > >
>>> > >
>>> > > On Tue, Jun 25, 2019 at 4:20 PM Colin McCabe 
>>> wrote:
>>> > >
>>> > > > kafka.api.ApiVersion is 

Re: PR review

2019-07-08 Thread Colin McCabe
Hi M. Manna,

I left a review.  Take a look.

Sorry for the delays.

best,
Colin


On Mon, Jul 8, 2019, at 14:38, M. Manna wrote:
> Hello,
> 
> A few requests have been sent already. Could this please be reviewed ? Our
> business implementation is holding due to this change.
> 
> 
> 
> On Thu, 4 Jul 2019 at 13:33, M. Manna  wrote:
> 
> > https://github.com/apache/kafka/pull/6771
> >
> > Could this be reviewed please ?
> >
> > On Wed, 3 Jul 2019 at 11:35, M. Manna  wrote:
> >
> >> https://github.com/apache/kafka/pull/6771
> >>
> >> Bouncing both users and dev to get some activity going. We are waiting
> >> for a while to get this KIP pr merged.
> >>
> >> Could someone please review?
> >>
> >> Thanks,
> >>
> >> On Sun, 30 Jun 2019 at 08:59, M. Manna  wrote:
> >>
> >>> https://github.com/apache/kafka/pull/6771
> >>>
> >>> Hello,
> >>>
> >>> Could the above PR can be reviewed? This has been waiting for a long
> >>> time.
> >>>
> >>> Just to mention, the package name should have "internal". Round-robin
> >>> partitioning should have been supported without/without a key from the
> >>> beginning. It provides user a guaranteed round-robin partitioning without
> >>> having to regard for key values (e.g. null/not null). From our business
> >>> side, this is a Kafka internal logic. Hence, the placement inside
> >>> "internal" package.
> >>>
> >>> Thanks,
> >>>
> >>
>


Re: [jira] [Created] (KAFKA-8638) Preferred Leader Blacklist (deprioritized list)

2019-07-08 Thread George Li
 Hi, 

I have created this ticket and would like to bounce this idea "Preferred Leader 
Blacklist" off this mailing list. I have listed a couple use-cases in my 
current Kafka environment that could benefit from this feature. If the 
community has high interests, I will start a KIP for this. 

Note: the word Blacklist might be a bit misleading. It essentially just put the 
"blacklist" broker at the lowest priority when considering leadership, 
regardless the position it is in the partition assignment. If none of the other 
brokers is in ISR, except the "blacklist" broker, it will still be elected as 
leader to avoid offline partition. 

Thanks,
George

On Monday, July 8, 2019, 03:07:06 PM PDT, GEORGE LI (JIRA) 
 wrote:  
 
 GEORGE LI created KAFKA-8638:


            Summary: Preferred Leader Blacklist (deprioritized list)
                Key: KAFKA-8638
                URL: https://issues.apache.org/jira/browse/KAFKA-8638
            Project: Kafka
          Issue Type: Improvement
          Components: config, controller, core
    Affects Versions: 2.2.1, 2.3.0, 1.1.1
            Reporter: GEORGE LI
            Assignee: GEORGE LI


Currently, the kafka preferred leader election will pick the broker_id in the 
topic/partition replica assignments in a priority order when the broker is in 
ISR. The preferred leader is the broker id in the first position of replica. 
There are use-cases that, even the first broker in the replica assignment is in 
ISR, there is a need for it to be moved to the end of ordering (lowest 
priority) when deciding leadership during  preferred leader election. 

Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred 
leader.  When preferred leadership is run, it will pick 1 as the leader if it's 
ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, then pick 
3 as the leader. There are use cases that, even 1 is in ISR, we would like it 
to be moved to the end of ordering (lowest priority) when deciding leadership 
during preferred leader election.  Below is a list of use cases:

# If broker_id 1 is a swapped failed host and brought up with last segments or 
latest offset without historical data (There is another effort on this), it's 
better for it to not serve leadership till it's caught-up.

# The cross-data center cluster has AWS instances which have less computing 
power than the on-prem bare metal machines.  We could put the AWS broker_ids in 
Preferred Leader Blacklist, so on-prem brokers can be elected leaders, without 
changing the reassignments ordering of the replicas. 

# If the broker_id 1 is constantly losing leadership after some time: 
"Flapping". we would want to exclude 1 to be a leader unless all other brokers 
of this topic/partition are offline.  The “Flapping” effect was seen in the 
past when 2 or more brokers were bad, when they lost leadership 
constantly/quickly, the sets of partition replicas they belong to will see 
leadership constantly changing.  The ultimate solution is to swap these bad 
hosts.  But for quick mitigation, we can also put the bad hosts in the 
Preferred Leader Blacklist to move the priority of its being elected as leaders 
to the lowest. 

#  If the controller is busy serving an extra load of metadata requests and 
other tasks. we would like to put the controller's leaders to other brokers to 
lower its CPU load. currently bouncing to lose leadership would not work for 
Controller, because after the bounce, the controller fails over to another 
broker.

# Avoid bouncing broker in order to lose its leadership: it would be good if we 
have a way to specify which broker should be excluded from serving 
traffic/leadership (without changing the replica assignment ordering by 
reassignments, even though that's quick), and run preferred leader election.  A 
bouncing broker will cause temporary URP, and sometimes other issues.  Also a 
bouncing of broker (e.g. broker_id 1) can temporarily lose all its leadership, 
but if another broker (e.g. broker_id 2) fails or gets bounced, some of its 
leaderships will likely failover to broker_id 1 on a replica with 3 brokers.  
If broker_id 1 is in the blacklist, then in such a scenario even broker_id 2 
offline,  the 3rd broker can take leadership. 


The current work-around of the above is to change the topic/partition's replica 
reassignments to move the broker_id 1 from the first position to the last 
position and run preferred leader election. e.g. (1, 2, 3) => (2, 3, 1). This 
changes the replica reassignments, and we need to keep track of the original 
one and restore if things change (e.g. controller fails over to another broker, 
the swapped empty broker caught up). That’s a rather tedious task.
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)  

[jira] [Created] (KAFKA-8639) Replace AddPartitionsToTxn request/response with automated protocol

2019-07-08 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8639:
--

 Summary: Replace AddPartitionsToTxn request/response with 
automated protocol
 Key: KAFKA-8639
 URL: https://issues.apache.org/jira/browse/KAFKA-8639
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen
Assignee: Boyang Chen






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-455 Create an Admin API for Replica Reassignments

2019-07-08 Thread George Li
 > Now that we support multiple reassignment requests, users may add execute> 
 > them incrementally. Suppose something goes horribly wrong and they want to> 
 > revert as quickly as possible - they would need to run the tool with> 
 > multiple rollback JSONs.  I think that it would be useful to have an easy> 
 > way to stop all ongoing reassignments for emergency situations.

KIP-236: Interruptible Partition Reassignment is exactly trying to cancel the 
pending reassignments cleanly/safely in a timely fashion.  It's possible to 
cancel/rollback the reassignments not yet completed if the original replicas 
before reassignment is saved somewhere. e.g. the /admin/reassign_partitions 
znode, the Controller's ReassignmentContext memory struct.  

I think a command line option like "kafka-reassign-partitions.sh --cancel" 
would be easier for the user to cancel whatever pending reassignments going on 
right now.  no need to find the rollback json files and re-submit them as 
reassignments. 

Regardless of KIP-236 or KIP-455,  I would like stress the importance of 
keeping the original replicas info before reassignments are kicked off.  This 
original replicas info will allow us to distinguish what replicas are currently 
being reassigned, so we can rollback to its original state.  Also this will 
opens up possibility to separate the ReplicaFetcher traffic of normal follower 
traffic from Reassignment traffic,  also the metrics reporting URP, MaxLag, 
TotalLag, etc. right now, Reassignment traffic and normal follower traffic 
shared the same ReplicaFetcher threads pool. 

Thanks,
George

On Tuesday, July 2, 2019, 10:47:55 AM PDT, Stanislav Kozlovski 
 wrote:  
 
 Hey there, I need to start a new thread on KIP-455. I think there might be
an issue with the mailing server. For some reason, my replies to the
previous discussion thread could not be seen by others. After numerous
attempts, Colin suggested I start a new thread.

Original Discussion Thread:
https://sematext.com/opensee/m/Kafka/uyzND1Yl7Er128CQu1?subj=+DISCUSS+KIP+455+Create+an+Administrative+API+for+Replica+Reassignment
KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-455%3A+Create+an+Administrative+API+for+Replica+Reassignment
Last Reply of Previous Thread:
http://mail-archives.apache.org/mod_mbox/kafka-dev/201906.mbox/%3C679a4c5b-3da6-4556-bb89-e680d8cbb705%40www.fastmail.com%3E

The following is my reply:

Hi again,

This has been a great discussion on a tricky KIP. I appreciate everybody's
involvement in improving this crucial API.
That being said, I wanted to apologize for my first comment, it was a bit
rushed and not thought out.

I've got a few questions now that I dove into this better:

1. Does it make sense to have an easy way to cancel all ongoing
reassignments? To cancel all ongoing reassignments, users had the crude
option of deleting the znode, bouncing the controller and running the
rollback JSON assignment that kafka-reassign-partitions.sh gave them
(KAFKA-6304).
Now that we support multiple reassignment requests, users may add execute
them incrementally. Suppose something goes horribly wrong and they want to
revert as quickly as possible - they would need to run the tool with
multiple rollback JSONs.  I think that it would be useful to have an easy
way to stop all ongoing reassignments for emergency situations.

-

2. Our kafka-reassign-partitions.sh tool doesn't seem to currently let you
figure out the ongoing assignments - I guess we expect people to use
kafka-topics.sh for that. I am not sure how well that would continue to
work now that we update the replica set only after the new replica joins
the ISR.
Do you think it makes sense to add an option for listing the current
reassignments to the reassign tool as part of this KIP?

We might want to think whether we want to show the TargetReplicas
information in the kafka-topics command for completeness as well. That
might involve the need to update the DescribeTopicsResponse. Personally I
can't see a downside but I haven't given it too much thought. I fully agree
that we don't want to add the target replicas to the full replica set and
nothing useful comes out of telling users they have a replica that might
not have copied a single byte. Yet, telling them that we have the intention
of copying bytes sounds useful so maybe having a separate column in
kafka-topics.sh would provide better clarity?

-

3. What happens if we do another reassignment to a partition while one is
in progress? Do we overwrite the TargetReplicas?
In the example sequence you gave:
R: [1, 2, 3, 4, 5, 6], I: [1, 2, 3, 4, 5, 6], T: [4, 5, 6]
What would the behavior be if a new reassign request came with
TargetReplicas of [7, 8, 9] for that partition?

To avoid complexity and potential race conditions, would it make sense to
reject a reassign request once one is in progress for the specific
partition, essentially forcing the user to cancel it first?
Forcing the user to cancel has the bene

Build failed in Jenkins: kafka-2.0-jdk8 #282

2019-07-08 Thread Apache Jenkins Server
See 


Changes:

[cmccabe] MINOR: Split at first occurrence of '=' in kafka.py props parsing

--
[...truncated 441.62 KB...]
kafka.zk.ReassignPartitionsZNodeTest > testDecodeValidJson STARTED

kafka.zk.ReassignPartitionsZNodeTest > testDecodeValidJson PASSED

kafka.zk.KafkaZkClientTest > testZNodeChangeHandlerForDataChange STARTED

kafka.zk.KafkaZkClientTest > testZNodeChangeHandlerForDataChange PASSED

kafka.zk.KafkaZkClientTest > testCreateAndGetTopicPartitionStatesRaw STARTED

kafka.zk.KafkaZkClientTest > testCreateAndGetTopicPartitionStatesRaw PASSED

kafka.zk.KafkaZkClientTest > testLogDirGetters STARTED

kafka.zk.KafkaZkClientTest > testLogDirGetters PASSED

kafka.zk.KafkaZkClientTest > testSetGetAndDeletePartitionReassignment STARTED

kafka.zk.KafkaZkClientTest > testSetGetAndDeletePartitionReassignment PASSED

kafka.zk.KafkaZkClientTest > testIsrChangeNotificationsDeletion STARTED

kafka.zk.KafkaZkClientTest > testIsrChangeNotificationsDeletion PASSED

kafka.zk.KafkaZkClientTest > testGetDataAndVersion STARTED

kafka.zk.KafkaZkClientTest > testGetDataAndVersion PASSED

kafka.zk.KafkaZkClientTest > testGetChildren STARTED

kafka.zk.KafkaZkClientTest > testGetChildren PASSED

kafka.zk.KafkaZkClientTest > testSetAndGetConsumerOffset STARTED

kafka.zk.KafkaZkClientTest > testSetAndGetConsumerOffset PASSED

kafka.zk.KafkaZkClientTest > testClusterIdMethods STARTED

kafka.zk.KafkaZkClientTest > testClusterIdMethods PASSED

kafka.zk.KafkaZkClientTest > testEntityConfigManagementMethods STARTED

kafka.zk.KafkaZkClientTest > testEntityConfigManagementMethods PASSED

kafka.zk.KafkaZkClientTest > testUpdateLeaderAndIsr STARTED

kafka.zk.KafkaZkClientTest > testUpdateLeaderAndIsr PASSED

kafka.zk.KafkaZkClientTest > testUpdateBrokerInfo STARTED

kafka.zk.KafkaZkClientTest > testUpdateBrokerInfo PASSED

kafka.zk.KafkaZkClientTest > testCreateRecursive STARTED

kafka.zk.KafkaZkClientTest > testCreateRecursive PASSED

kafka.zk.KafkaZkClientTest > testGetConsumerOffsetNoData STARTED

kafka.zk.KafkaZkClientTest > testGetConsumerOffsetNoData PASSED

kafka.zk.KafkaZkClientTest > testDeleteTopicPathMethods STARTED

kafka.zk.KafkaZkClientTest > testDeleteTopicPathMethods PASSED

kafka.zk.KafkaZkClientTest > testSetTopicPartitionStatesRaw STARTED

kafka.zk.KafkaZkClientTest > testSetTopicPartitionStatesRaw PASSED

kafka.zk.KafkaZkClientTest > testAclManagementMethods STARTED

kafka.zk.KafkaZkClientTest > testAclManagementMethods PASSED

kafka.zk.KafkaZkClientTest > testPreferredReplicaElectionMethods STARTED

kafka.zk.KafkaZkClientTest > testPreferredReplicaElectionMethods PASSED

kafka.zk.KafkaZkClientTest > testPropagateLogDir STARTED

kafka.zk.KafkaZkClientTest > testPropagateLogDir PASSED

kafka.zk.KafkaZkClientTest > testGetDataAndStat STARTED

kafka.zk.KafkaZkClientTest > testGetDataAndStat PASSED

kafka.zk.KafkaZkClientTest > testReassignPartitionsInProgress STARTED

kafka.zk.KafkaZkClientTest > testReassignPartitionsInProgress PASSED

kafka.zk.KafkaZkClientTest > testCreateTopLevelPaths STARTED

kafka.zk.KafkaZkClientTest > testCreateTopLevelPaths PASSED

kafka.zk.KafkaZkClientTest > testIsrChangeNotificationGetters STARTED

kafka.zk.KafkaZkClientTest > testIsrChangeNotificationGetters PASSED

kafka.zk.KafkaZkClientTest > testLogDirEventNotificationsDeletion STARTED

kafka.zk.KafkaZkClientTest > testLogDirEventNotificationsDeletion PASSED

kafka.zk.KafkaZkClientTest > testGetLogConfigs STARTED

kafka.zk.KafkaZkClientTest > testGetLogConfigs PASSED

kafka.zk.KafkaZkClientTest > testBrokerSequenceIdMethods STARTED

kafka.zk.KafkaZkClientTest > testBrokerSequenceIdMethods PASSED

kafka.zk.KafkaZkClientTest > testCreateSequentialPersistentPath STARTED

kafka.zk.KafkaZkClientTest > testCreateSequentialPersistentPath PASSED

kafka.zk.KafkaZkClientTest > testConditionalUpdatePath STARTED

kafka.zk.KafkaZkClientTest > testConditionalUpdatePath PASSED

kafka.zk.KafkaZkClientTest > testDeleteTopicZNode STARTED

kafka.zk.KafkaZkClientTest > testDeleteTopicZNode PASSED

kafka.zk.KafkaZkClientTest > testDeletePath STARTED

kafka.zk.KafkaZkClientTest > testDeletePath PASSED

kafka.zk.KafkaZkClientTest > testGetBrokerMethods STARTED

kafka.zk.KafkaZkClientTest > testGetBrokerMethods PASSED

kafka.zk.KafkaZkClientTest > testCreateTokenChangeNotification STARTED

kafka.zk.KafkaZkClientTest > testCreateTokenChangeNotification PASSED

kafka.zk.KafkaZkClientTest > testGetTopicsAndPartitions STARTED

kafka.zk.KafkaZkClientTest > testGetTopicsAndPartitions PASSED

kafka.zk.KafkaZkClientTest > testRegisterBrokerInfo STARTED

kafka.zk.KafkaZkClientTest > testRegisterBrokerInfo PASSED

kafka.zk.KafkaZkClientTest > testConsumerOffsetPath STARTED

kafka.zk.KafkaZkClientTest > testConsumerOffsetPath PASSED

kafka.zk.KafkaZkClientTest > testControllerManagementMethods STARTED

kafka.zk.KafkaZkClientTest > testControllerMan

Re: [DISCUSS] KIP-480 : Sticky Partitioner

2019-07-08 Thread Justine Olshan
Hello all,

If there are no more comments or concerns, I would like to start the vote
on this tomorrow afternoon.

However, if there are still topics to discuss, feel free to bring them up
now.

Thank you,
Justine

On Tue, Jul 2, 2019 at 4:25 PM Justine Olshan  wrote:

> Hello again,
>
> Another update to the interface has been made to the KIP.
> Please let me know if you have any feedback!
>
> Thank you,
> Justine
>
> On Fri, Jun 28, 2019 at 2:52 PM Justine Olshan 
> wrote:
>
>> Hi all,
>> I made some changes to the KIP.
>> The idea is to clean up the code, make behavior more explicit, provide
>> more flexibility, and to keep default behavior the same.
>>
>> Now we will change the partition in onNewBatch, and specify the
>> conditions for this function call (non-keyed values, no explicit
>> partitions) in willCallOnNewBatch.
>> This clears up some of the issues with the implementation. I'm happy to
>> hear further opinions and discuss this change!
>>
>> Thank you,
>> Justine
>>
>> On Thu, Jun 27, 2019 at 2:53 PM Colin McCabe  wrote:
>>
>>> On Thu, Jun 27, 2019, at 01:31, Ismael Juma wrote:
>>> > Thanks for the KIP Justine. It looks pretty good. A few comments:
>>> >
>>> > 1. Should we favor partitions that are not under replicated? This is
>>> > something that Netflix did too.
>>>
>>> This seems like it could lead to cascading failures, right?  If a
>>> partition becomes under-replicated because there is too much traffic, the
>>> producer stops sending to it, which puts even more load on the remaining
>>> partitions, which are even more likely to fail then, etc.  It also will
>>> create unbalanced load patterns on the consumers.
>>>
>>> >
>>> > 2. If there's no measurable performance difference, I agree with
>>> Stanislav
>>> > that Optional would be better than Integer.
>>> >
>>> > 3. We should include the javadoc for the newly introduced method that
>>> > specifies it and its parameters. In particular, it would good to
>>> specify if
>>> > it gets called when an explicit partition id has been provided.
>>>
>>> Agreed.
>>>
>>> best,
>>> Colin
>>>
>>> >
>>> > Ismael
>>> >
>>> > On Mon, Jun 24, 2019, 2:04 PM Justine Olshan 
>>> wrote:
>>> >
>>> > > Hello,
>>> > > This is the discussion thread for KIP-480: Sticky Partitioner.
>>> > >
>>> > >
>>> > >
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner
>>> > >
>>> > > Thank you,
>>> > > Justine Olshan
>>> > >
>>> >
>>>
>>


Monitoring streams

2019-07-08 Thread Brian Putt
Hello,

We have multiple stream services that we're looking to monitor when they've
been disconnected from the broker so that we can restart the services.

I've looked at https://issues.apache.org/jira/browse/KAFKA-6520 and am
wondering if anyone has suggestions on what we can do today to help ensure
our services don't go idle.

As an example, we'll have our streaming services running and we'll
stop/start the kafka brokers. The services will remain running, but they're
not actually pulling any data.

We could look at time since last record received, but that's not a
guarantee as there's always a possibility that data was legitimately turned
off upstream.

Thanks,

Brian


Kafka - CDC SQL Server

2019-07-08 Thread Raja Bagus S
Hi All,

Would you all please to share to me step by step to configure Kafka connector 
to CDC SQL Server?
I have Kafka with Debezium Connector to SQL Server that I don't know whether 
it's compatible or not if I want to connect it to CDC. I've tried to follow 
some use cases in the internet, but still not working.

My plan is for replication 1st database to 2nd database.
CDC#1stDB -> Kafka (Topics, maybe) -> CDC#2ndCDC (I don't know whether it needs 
CDC too, cmiiw).

I hope you guys can help me.

Thanks,
Rjbgss

Sent from my iPhone

[jira] [Created] (KAFKA-8640) Replace OffsetFetch request/response with automated protocol

2019-07-08 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8640:
--

 Summary: Replace OffsetFetch request/response with automated 
protocol
 Key: KAFKA-8640
 URL: https://issues.apache.org/jira/browse/KAFKA-8640
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen
Assignee: Boyang Chen






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk8 #3772

2019-07-08 Thread Apache Jenkins Server
See 


Changes:

[bbejeck] MINOR: improve RocksDBConfigSetter docs (#7009)

[jason] MINOR: A few cleanups and compiler warning fixes (#6986)

--
[...truncated 2.81 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueWithProducerRecord STARTED

Build failed in Jenkins: kafka-trunk-jdk11 #679

2019-07-08 Thread Apache Jenkins Server
See 


Changes:

[bbejeck] MINOR: improve RocksDBConfigSetter docs (#7009)

[jason] MINOR: A few cleanups and compiler warning fixes (#6986)

--
[...truncated 2.56 MB...]
org.apache.kafka.trogdor.coordinator.CoordinatorTest > testCreateTask STARTED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testCreateTask PASSED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testTasksRequest STARTED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testTasksRequest PASSED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testCoordinatorStatus 
STARTED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testCoordinatorStatus 
PASSED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testCoordinatorUptime 
STARTED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testCoordinatorUptime 
PASSED

org.apache.kafka.trogdor.task.TaskSpecTest > testTaskSpecSerialization STARTED

org.apache.kafka.trogdor.task.TaskSpecTest > testTaskSpecSerialization PASSED

org.apache.kafka.trogdor.workload.TimeIntervalTransactionsGeneratorTest > 
testCommitsTransactionAfterIntervalPasses STARTED

org.apache.kafka.trogdor.workload.TimeIntervalTransactionsGeneratorTest > 
testCommitsTransactionAfterIntervalPasses PASSED

org.apache.kafka.trogdor.workload.ExternalCommandWorkerTest > 
testProcessWithFailedExit STARTED

org.apache.kafka.trogdor.workload.ExternalCommandWorkerTest > 
testProcessWithFailedExit PASSED

org.apache.kafka.trogdor.workload.ExternalCommandWorkerTest > 
testProcessNotFound STARTED

org.apache.kafka.trogdor.workload.ExternalCommandWorkerTest > 
testProcessNotFound PASSED

org.apache.kafka.trogdor.workload.ExternalCommandWorkerTest > 
testProcessForceKillTimeout STARTED

org.apache.kafka.trogdor.workload.ExternalCommandWorkerTest > 
testProcessForceKillTimeout PASSED

org.apache.kafka.trogdor.workload.ExternalCommandWorkerTest > testProcessStop 
STARTED

org.apache.kafka.trogdor.workload.ExternalCommandWorkerTest > testProcessStop 
PASSED

org.apache.kafka.trogdor.workload.ExternalCommandWorkerTest > 
testProcessWithNormalExit STARTED

org.apache.kafka.trogdor.workload.ExternalCommandWorkerTest > 
testProcessWithNormalExit PASSED

org.apache.kafka.trogdor.workload.PayloadGeneratorTest > 
testConstantPayloadGenerator STARTED

org.apache.kafka.trogdor.workload.PayloadGeneratorTest > 
testConstantPayloadGenerator PASSED

org.apache.kafka.trogdor.workload.PayloadGeneratorTest > 
testSequentialPayloadGenerator STARTED

org.apache.kafka.trogdor.workload.PayloadGeneratorTest > 
testSequentialPayloadGenerator PASSED

org.apache.kafka.trogdor.workload.PayloadGeneratorTest > 
testNullPayloadGenerator STARTED

org.apache.kafka.trogdor.workload.PayloadGeneratorTest > 
testNullPayloadGenerator PASSED

org.apache.kafka.trogdor.workload.PayloadGeneratorTest > 
testUniformRandomPayloadGenerator STARTED

org.apache.kafka.trogdor.workload.PayloadGeneratorTest > 
testUniformRandomPayloadGenerator PASSED

org.apache.kafka.trogdor.workload.PayloadGeneratorTest > testPayloadIterator 
STARTED

org.apache.kafka.trogdor.workload.PayloadGeneratorTest > testPayloadIterator 
PASSED

org.apache.kafka.trogdor.workload.PayloadGeneratorTest > 
testUniformRandomPayloadGeneratorPaddingBytes STARTED

org.apache.kafka.trogdor.workload.PayloadGeneratorTest > 
testUniformRandomPayloadGeneratorPaddingBytes PASSED

org.apache.kafka.trogdor.workload.ThrottleTest > testThrottle STARTED

org.apache.kafka.trogdor.workload.ThrottleTest > testThrottle PASSED

org.apache.kafka.trogdor.workload.ConsumeBenchSpecTest > 
testMaterializeTopicsWithSomePartitions STARTED

org.apache.kafka.trogdor.workload.ConsumeBenchSpecTest > 
testMaterializeTopicsWithSomePartitions PASSED

org.apache.kafka.trogdor.workload.ConsumeBenchSpecTest > 
testMaterializeTopicsWithNoPartitions STARTED

org.apache.kafka.trogdor.workload.ConsumeBenchSpecTest > 
testMaterializeTopicsWithNoPartitions PASSED

org.apache.kafka.trogdor.workload.ConsumeBenchSpecTest > 
testInvalidTopicNameRaisesExceptionInMaterialize STARTED

org.apache.kafka.trogdor.workload.ConsumeBenchSpecTest > 
testInvalidTopicNameRaisesExceptionInMaterialize PASSED

org.apache.kafka.trogdor.workload.HistogramTest > testHistogramPercentiles 
STARTED

org.apache.kafka.trogdor.workload.HistogramTest > testHistogramPercentiles 
PASSED

org.apache.kafka.trogdor.workload.HistogramTest > testHistogramSamples STARTED

org.apache.kafka.trogdor.workload.HistogramTest > testHistogramSamples PASSED

org.apache.kafka.trogdor.workload.HistogramTest > testHistogramAverage STARTED

org.apache.kafka.trogdor.workload.HistogramTest > testHistogramAverage PASSED

org.apache.kafka.trogdor.workload.TopicsSpecTest > testPartitionNumbers STARTED

org.apache.kafka.trogdor.workload.TopicsSpecTest > testPartitionNumbers PASSED

org.apache.kafka.trogdor.workload.TopicsSpecTest > testMaterialize STARTED

[jira] [Resolved] (KAFKA-8591) NPE when reloading connector configuration using WorkerConfigTransformer

2019-07-08 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-8591.

   Resolution: Fixed
Fix Version/s: 2.3.1
   2.2.2

> NPE when reloading connector configuration using WorkerConfigTransformer
> 
>
> Key: KAFKA-8591
> URL: https://issues.apache.org/jira/browse/KAFKA-8591
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.2.1
>Reporter: Nacho Munoz
>Assignee: Robert Yokota
>Priority: Major
> Fix For: 2.2.2, 2.3.1
>
>
> When a connector uses a ConfigProvider and sets a given TTL in the returned 
> ConfigData, it is expected that WorkerConfigTransformer will periodically 
> reload the connector configuration. The problem is that when the TTL expires 
> a NPE is raised. 
> [2019-06-17 14:34:12,320] INFO Scheduling a restart of connector 
> workshop-incremental in 6 ms 
> (org.apache.kafka.connect.runtime.WorkerConfigTransformer:88)
>  [2019-06-17 14:34:12,321] ERROR Uncaught exception in herder work thread, 
> exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:227)
>  java.lang.NullPointerException
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$19.onCompletion(DistributedHerder.java:1187)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$19.onCompletion(DistributedHerder.java:1183)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:273)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:219)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:834)
> The reason is that WorkerConfigTransformer is passing a null callback to the 
> herder's restartConnector method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)