Re: [DISCUSS] KIP-213: Second follow-up on Foreign Key Joins

2019-06-26 Thread Adam Bellemare
Hi John

Good thinking with regards to upgrade path between versions regarding
over-the-wire instructions in SubscriptionWrapper. At this point in time I
can't think of any new wire message instructions, but I would appreciate as
many eyes on it as possible. I have just included the LEFT join in the last
commit (about 10 min ago) along with INNER join. I do not think that RIGHT
join and OUTER are possible given that there is no LHS key available, so
LHSTable.outerJoinOnForeignKey(RHSTable) wouldn't even make sense. This is
in contrast to the current LHSTable.outerJoin(RHSTable), as they are both
keyed on the same key. I have buffed up the Integration tests and have
tried to make them more readable to ensure that we're covering all the
scenarios. I think that if we can get more eyes on the workflow showing the
various LHS and RHS events and outputs then that may help us validate that
we have all the scenarios covered.

With regards to the 2.3->2.4 scenario you described, I'm not entirely sure
I follow. If they want to add a FK-join, they will need to rework their
code in the KStreams app and make a new release, since the underlying
topology would be different and new internal topics would need to be
created. In other words, I don't think a rolling upgrade where the user
introduces a FK join would be possible since their topology would
necessitate a full KStreams reset. Is this what you meant?



On Wed, Jun 26, 2019 at 4:10 PM John Roesler  wrote:

> Thanks, Adam!
>
> One unrelated thought that has just now occurred to me is that (unlike
> the equi-joins we currently have), this join logic is potentially
> spread over multiple Streams instances, which in general means that
> the instances may be running different versions of Kafka Streams.
>
> This means that if we discover a bug that requires us to again change
> the wire message (as you did in this proposal update), we need to
> consider what should happen if the PK instance is newer than the FK
> instance, or vice-versa, during a rolling upgrade. We should think
> ahead to this condition and make sure the logic is forward compatible.
>
> Related: what about the initial case, when we release this feature
> (let's say in 2.4)? What will happen if I decide to adopt 2.4 and add
> a FK join together in one upgrade. Thus, the 2.4 member of the cluster
> is producing the SubscriptionWrapper messages, and some 2.3 members
> get the subscription topic assigned to them, but they have no idea
> what to do with it? I'm not sure this is a problem; hopefully they
> just do nothing. If it is a problem, it would be fine to say you have
> to upgrade completely to 2.4 before deploying a FK join.
>
> Just want to make sure we anticipate these issues in case it affects
> the design at all.
>
> Thanks,
> -John
>
> On Wed, Jun 26, 2019 at 2:38 PM Adam Bellemare 
> wrote:
> >
> > Sigh... Forgot the link:
> >
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=74684836=78=74
> >
> > I'll update it when I validate that there are no issues with removing the
> > SubscriptionResponseWrapper boolean.
> >
> > On Wed, Jun 26, 2019 at 3:37 PM Adam Bellemare  >
> > wrote:
> >
> > > >Maybe just call it as (k, leftval, null) or (k, null, rightval)?
> > > Done.
> > >
> > > > if you update the KIP, you might want to send a new "diff link" to
> this
> > > thread
> > > Here it is:
> > >
> > > > Looking closely at the proposal, can you explain more about the
> > > propagateIfNull field in SubscriptionResponseWrapper? It sort of looks
> like
> > > it's always going to be equal to (RHS-result != null).
> > > I believe you are correct, and I missed the forest for the trees. They
> are
> > > effectively the same thing, and I can simply remove the flag. I will
> code
> > > it up and try it out locally just to be sure.
> > >
> > > Thanks again for your help, it is greatly appreciated!
> > >
> > > On Wed, Jun 26, 2019 at 2:54 PM John Roesler 
> wrote:
> > >
> > >> I think the "scenario trace" is very nice, but has one point that I
> > >> found confusing:
> > >>
> > >> You indicate a retraction in the join output as (k,null) and a join
> > >> result as (k, leftval, rightval), but confusingly, you also write a
> > >> join result as (k, JoinResult) when one side is null. Maybe just call
> > >> it as (k, leftval, null) or (k, null, rightval)? That way the readers
> > >> can more easily determine if the results meet their expectations for
> > >> each join type.
> > >>
> > >> (procedural note: if you update the KIP, you might want to send a new
> > >> "diff link" to this thread, since the one I posted at the beginning
> > >> would not automatically show your latest changes)
> > >>
> > >> I was initially concerned that the proposed algorithm would wind up
> > >> propagating something that looks like a left join (k, leftval, null)
> > >> under the case that Joe pointed out, but after reviewing your
> > >> scenario, I see that it will emit a tombstone (k, null) instead. This
> > >> 

[jira] [Created] (KAFKA-8605) Warn users when they have same connector in their plugin-path more than once

2019-06-26 Thread Cyrus Vafadari (JIRA)
Cyrus Vafadari created KAFKA-8605:
-

 Summary: Warn users when they have same connector in their 
plugin-path more than once
 Key: KAFKA-8605
 URL: https://issues.apache.org/jira/browse/KAFKA-8605
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Cyrus Vafadari


Right now it is very easy to have multiple copies of the same connector in the 
plugin-path and not realize it.

This can be problematic if a user is adding dependencies into the plugin, or 
accidentally using the wrong version of the connector.

An unintrusive improvement would be to log a warning if the same connector 
appears in the plugin-path more than once



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Jenkins build is back to normal : kafka-2.2-jdk8 #143

2019-06-26 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-480 : Sticky Partitioner

2019-06-26 Thread Justine Olshan
Stanislav,
Thank you for looking at my KIP!

I did discuss with Colin about whether the null vs. Optional types and we
did not come to a strong conclusion either way.
I'd be happy to change it if it makes the logic more clear.

Thanks,
Justine

On Wed, Jun 26, 2019 at 2:46 PM Stanislav Kozlovski 
wrote:

> Hey Justine,
>
> Thanks for the KIP! I am impressed by the performance results linked in the
> KIP and I like the data-driven approach. This looks like a great
> improvement.
>
> I had one minor question regarding the public interface
> `repartitionOnNewBatch` where we return null in the case of no change
> needed. Have we considered using Java's Optional type to avoid null values?
>
> Best,
> Stanislav
>
> On Tue, Jun 25, 2019 at 11:29 PM Colin McCabe  wrote:
>
> > No worries.  Thanks for fixing it!
> > C.
> >
> > On Tue, Jun 25, 2019, at 13:47, Justine Olshan wrote:
> > > Also apologies on the late link to the jira, but apparently https links
> > do
> > > not work and it kept defaulting to an image on my desktop even when it
> > > looked like I put the correct link in. Weird...
> > >
> > > On Tue, Jun 25, 2019 at 1:41 PM Justine Olshan 
> > wrote:
> > >
> > > > I came up with a good solution for this and will push the commit
> soon.
> > The
> > > > repartition will be called only when a partition is not manually
> sent.
> > > >
> > > > On Tue, Jun 25, 2019 at 1:39 PM Colin McCabe 
> > wrote:
> > > >
> > > >> Well, this is a generic partitioner method, so it shouldn't dictate
> > any
> > > >> particular behavior.
> > > >>
> > > >> Colin
> > > >>
> > > >>
> > > >> On Tue, Jun 25, 2019, at 12:04, Justine Olshan wrote:
> > > >> > I also just noticed that if we want to use this method on the
> keyed
> > > >> record
> > > >> > case, I will need to move the method outside of the sticky (no
> key,
> > no
> > > >> set
> > > >> > partition) check. Not a big problem, but something to keep in
> mind.
> > > >> > Perhaps, we should encapsulate the sticky vs. not behavior inside
> > the
> > > >> > method? More things to think about.
> > > >> >
> > > >> > On Tue, Jun 25, 2019 at 11:55 AM Colin McCabe  >
> > > >> wrote:
> > > >> >
> > > >> > > Hi Justine,
> > > >> > >
> > > >> > > The KIP discusses adding a new method to the partitioner
> > interface.
> > > >> > >
> > > >> > > > default public Integer onNewBatch(String topic, Cluster
> > cluster) {
> > > >> ... }
> > > >> > >
> > > >> > > However, this new method doesn't give the partitioner access to
> > the
> > > >> key
> > > >> > > and value of the message.  While this works for the case
> described
> > > >> here (no
> > > >> > > key), in general we might need this information when
> re-assigning
> > a
> > > >> > > partitition based on the batch completing.  So I think we should
> > add
> > > >> these
> > > >> > > methods to onNewBatch.
> > > >> > >
> > > >> > > Also, it would be nice to call this something like
> > > >> "repartitionOnNewBatch"
> > > >> > > or something, to make it clearer what is going on.
> > > >> > >
> > > >> > > best,
> > > >> > > Colin
> > > >> > >
> > > >> > > On Mon, Jun 24, 2019, at 18:32, Boyang Chen wrote:
> > > >> > > > Thank you Justine for the KIP! Do you mind creating a
> > corresponding
> > > >> JIRA
> > > >> > > > ticket too?
> > > >> > > >
> > > >> > > > On Mon, Jun 24, 2019 at 4:51 PM Colin McCabe <
> > cmcc...@apache.org>
> > > >> wrote:
> > > >> > > >
> > > >> > > > > Hi Justine,
> > > >> > > > >
> > > >> > > > > Thanks for the KIP.  This looks great!
> > > >> > > > >
> > > >> > > > > In one place in the KIP, you write: "Remove
> > > >> > > > > testRoundRobinWithUnavailablePartitions() and
> testRoundRobin()
> > > >> since
> > > >> > > the
> > > >> > > > > round robin functionality of the partitioner has been
> > removed."
> > > >> You
> > > >> > > can
> > > >> > > > > skip this and similar lines.  We don't need to describe
> > changes to
> > > >> > > internal
> > > >> > > > > test classes in the KIP since they're not visible to users
> or
> > > >> external
> > > >> > > > > developers.
> > > >> > > > >
> > > >> > > > > It seems like maybe the performance tests should get their
> own
> > > >> section.
> > > >> > > > > Right now, the way the layout is makes it look like they are
> > part
> > > >> of
> > > >> > > the
> > > >> > > > > "Compatibility, Deprecation, and Migration Plan"
> > > >> > > > >
> > > >> > > > > best,
> > > >> > > > > Colin
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > On Mon, Jun 24, 2019, at 14:04, Justine Olshan wrote:
> > > >> > > > > > Hello,
> > > >> > > > > > This is the discussion thread for KIP-480: Sticky
> > Partitioner.
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > >
> > > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner
> > > >> > > > > >
> > > >> > > > > > Thank you,
> > > >> > > > > > Justine Olshan
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > >
> >
>
>
> --
> Best,
> Stanislav
>


Re: [DISCUSS] KIP-480 : Sticky Partitioner

2019-06-26 Thread Stanislav Kozlovski
Hey Justine,

Thanks for the KIP! I am impressed by the performance results linked in the
KIP and I like the data-driven approach. This looks like a great
improvement.

I had one minor question regarding the public interface
`repartitionOnNewBatch` where we return null in the case of no change
needed. Have we considered using Java's Optional type to avoid null values?

Best,
Stanislav

On Tue, Jun 25, 2019 at 11:29 PM Colin McCabe  wrote:

> No worries.  Thanks for fixing it!
> C.
>
> On Tue, Jun 25, 2019, at 13:47, Justine Olshan wrote:
> > Also apologies on the late link to the jira, but apparently https links
> do
> > not work and it kept defaulting to an image on my desktop even when it
> > looked like I put the correct link in. Weird...
> >
> > On Tue, Jun 25, 2019 at 1:41 PM Justine Olshan 
> wrote:
> >
> > > I came up with a good solution for this and will push the commit soon.
> The
> > > repartition will be called only when a partition is not manually sent.
> > >
> > > On Tue, Jun 25, 2019 at 1:39 PM Colin McCabe 
> wrote:
> > >
> > >> Well, this is a generic partitioner method, so it shouldn't dictate
> any
> > >> particular behavior.
> > >>
> > >> Colin
> > >>
> > >>
> > >> On Tue, Jun 25, 2019, at 12:04, Justine Olshan wrote:
> > >> > I also just noticed that if we want to use this method on the keyed
> > >> record
> > >> > case, I will need to move the method outside of the sticky (no key,
> no
> > >> set
> > >> > partition) check. Not a big problem, but something to keep in mind.
> > >> > Perhaps, we should encapsulate the sticky vs. not behavior inside
> the
> > >> > method? More things to think about.
> > >> >
> > >> > On Tue, Jun 25, 2019 at 11:55 AM Colin McCabe 
> > >> wrote:
> > >> >
> > >> > > Hi Justine,
> > >> > >
> > >> > > The KIP discusses adding a new method to the partitioner
> interface.
> > >> > >
> > >> > > > default public Integer onNewBatch(String topic, Cluster
> cluster) {
> > >> ... }
> > >> > >
> > >> > > However, this new method doesn't give the partitioner access to
> the
> > >> key
> > >> > > and value of the message.  While this works for the case described
> > >> here (no
> > >> > > key), in general we might need this information when re-assigning
> a
> > >> > > partitition based on the batch completing.  So I think we should
> add
> > >> these
> > >> > > methods to onNewBatch.
> > >> > >
> > >> > > Also, it would be nice to call this something like
> > >> "repartitionOnNewBatch"
> > >> > > or something, to make it clearer what is going on.
> > >> > >
> > >> > > best,
> > >> > > Colin
> > >> > >
> > >> > > On Mon, Jun 24, 2019, at 18:32, Boyang Chen wrote:
> > >> > > > Thank you Justine for the KIP! Do you mind creating a
> corresponding
> > >> JIRA
> > >> > > > ticket too?
> > >> > > >
> > >> > > > On Mon, Jun 24, 2019 at 4:51 PM Colin McCabe <
> cmcc...@apache.org>
> > >> wrote:
> > >> > > >
> > >> > > > > Hi Justine,
> > >> > > > >
> > >> > > > > Thanks for the KIP.  This looks great!
> > >> > > > >
> > >> > > > > In one place in the KIP, you write: "Remove
> > >> > > > > testRoundRobinWithUnavailablePartitions() and testRoundRobin()
> > >> since
> > >> > > the
> > >> > > > > round robin functionality of the partitioner has been
> removed."
> > >> You
> > >> > > can
> > >> > > > > skip this and similar lines.  We don't need to describe
> changes to
> > >> > > internal
> > >> > > > > test classes in the KIP since they're not visible to users or
> > >> external
> > >> > > > > developers.
> > >> > > > >
> > >> > > > > It seems like maybe the performance tests should get their own
> > >> section.
> > >> > > > > Right now, the way the layout is makes it look like they are
> part
> > >> of
> > >> > > the
> > >> > > > > "Compatibility, Deprecation, and Migration Plan"
> > >> > > > >
> > >> > > > > best,
> > >> > > > > Colin
> > >> > > > >
> > >> > > > >
> > >> > > > > On Mon, Jun 24, 2019, at 14:04, Justine Olshan wrote:
> > >> > > > > > Hello,
> > >> > > > > > This is the discussion thread for KIP-480: Sticky
> Partitioner.
> > >> > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > >
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner
> > >> > > > > >
> > >> > > > > > Thank you,
> > >> > > > > > Justine Olshan
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> >
>


-- 
Best,
Stanislav


Re: [DISCUSS] KIP-435: Incremental Partition Reassignment

2019-06-26 Thread Colin McCabe
Hi Viktor,

Good point.  Sorry, I should have read the KIP more closely.

It would be good to change the title of the mail thread to reflect the new 
title of the KIP, "Internal Partition Reassignment Batching."

I do think there will be some interaction with KIP-455 here.  One example is 
that we'll want a way of knowing what target replicas are currently being 
worked on.  So maybe we'll have to add a field to the structures returned by 
listPartitionReassignments.

best,
Colin


On Wed, Jun 26, 2019, at 06:20, Viktor Somogyi-Vass wrote:
> Hey Colin,
> 
> I think there's some confusion here so I might change the name of this. So
> KIP-435 is about the internal batching of reassignments (so purely a
> controller change) and not about client side APIs. As per this moment these
> kind of improvements are listed on KIP-455's future work section so in my
> understanding KIP-455 won't touch that :).
> Let me know if I'm missing any points here.
> 
> Viktor
> 
> On Tue, Jun 25, 2019 at 9:02 PM Colin McCabe  wrote:
> 
> > Hi Viktor,
> >
> > Now that the 2.3 release is over, we're going to be turning our attention
> > back to working on KIP-455, which provides an API for partition
> > reassignment, and also solves the incremental reassignment problem.  Sorry
> > about the pause, but I had to focus on the stuff that was going into 2.3.
> >
> > I think last time we talked about this, the consensus was that KIP-455
> > supersedes KIP-435, since KIP-455 supports incremental reassignment.  We
> > also don't want to add more technical debt in the form of a new
> > ZooKeeper-based API that we'll have to support for a while.  So let's focus
> > on KIP-455 here.  We have more resources now so I think we'll be able to
> > get it done soonish.
> >
> > best,
> > Colin
> >
> >
> > On Tue, Jun 25, 2019, at 08:09, Viktor Somogyi-Vass wrote:
> > > Hi All,
> > >
> > > I have added another improvement to this, which is to limit the parallel
> > > leader movements. I think I'll soon (maybe late this week or early next)
> > > start a vote on this too if there are no additional feedback.
> > >
> > > Thanks,
> > > Viktor
> > >
> > > On Mon, Apr 29, 2019 at 1:26 PM Viktor Somogyi-Vass <
> > viktorsomo...@gmail.com>
> > > wrote:
> > >
> > > > Hi Folks,
> > > >
> > > > I've updated the KIP with the batching which would work on both replica
> > > > and partition level. To explain it briefly: for instance if the replica
> > > > level is set to 2 and partition level is set to 3, then 2x3=6 replica
> > > > reassignment would be in progress at the same time. In case of
> > reassignment
> > > > for a single partition from (0, 1, 2, 3, 4) to (5, 6, 7, 8, 9) we would
> > > > form the batches (0, 1) → (5, 6); (2, 3) → (7, 8) and 4 → 9 and would
> > > > execute the reassignment in this order.
> > > >
> > > > Let me know what you think.
> > > >
> > > > Best,
> > > > Viktor
> > > >
> > > > On Mon, Apr 15, 2019 at 7:01 PM Viktor Somogyi-Vass <
> > > > viktorsomo...@gmail.com> wrote:
> > > >
> > > >> A follow up on the batching topic to clarify my points above.
> > > >>
> > > >> Generally I think that batching should be a core feature as Colin said
> > > >> the controller should possess all information that are related.
> > > >> Also Cruise Control (or really any 3rd party admin system) might build
> > > >> upon this to give more holistic approach to balance brokers. We may
> > cater
> > > >> them with APIs that act like building blocks to make their life
> > easier like
> > > >> incrementalization, batching, cancellation and rollback but I think
> > the
> > > >> more advanced we go we'll need more advanced control surface and
> > Kafka's
> > > >> basic tooling might not be suitable for that.
> > > >>
> > > >> Best,
> > > >> Viktor
> > > >>
> > > >>
> > > >> On Mon, 15 Apr 2019, 18:22 Viktor Somogyi-Vass, <
> > viktorsomo...@gmail.com>
> > > >> wrote:
> > > >>
> > > >>> Hey Guys,
> > > >>>
> > > >>> I'll reply to you all in this email:
> > > >>>
> > > >>> @Jun:
> > > >>> 1. yes, it'd be a good idea to add this feature, I'll write this into
> > > >>> the KIP. I was actually thinking about introducing a dynamic config
> > called
> > > >>> reassignment.parallel.partition.count and
> > > >>> reassignment.parallel.replica.count. The first property would
> > control how
> > > >>> many partition reassignment can we do concurrently. The second would
> > go one
> > > >>> level in granularity and would control how many replicas do we want
> > to move
> > > >>> for a given partition. Also one more thing that'd be useful to fix
> > is that
> > > >>> a given list of partition -> replica list would be executed in the
> > same
> > > >>> order (from first to last) so it's overall predictable and the user
> > would
> > > >>> have some control over the order of reassignments should be
> > specified as
> > > >>> the JSON is still assembled by the user.
> > > >>> 2. the /kafka/brokers/topics/{topic} znode to be specific. I'll
> > update
> > > >>> the KIP to contain 

Re: [DISCUSS] KIP-455: Create an Administrative API for Replica Reassignment

2019-06-26 Thread Colin McCabe
On Wed, Jun 26, 2019, at 12:02, Jason Gustafson wrote:
> Hi Colin,
> 
> Responses below and another question:
> 
> > I guess the thought process here is that most reassignment tools want to
> > know about all the reassignments that are going on.  If you don't know all
> > the pending reassignments, then it's hard to say whether adding a new one
> > is a good idea, or cancelling an existing one.  So I guess I can't think of
> > a case where a reassignment tool would want a partial set rather than the
> > full one.
>
> UIs often have "drill into" options. If there is a large ongoing
> reassignment, I can see wanting to limit the scope to a specific topic. Any
> downside that I'm missing?
>

We could add a mode that only lists a given set of partitions.  To be 
consistent with how we handle topics, that could be a separate "describe" 
method.  I don't think there's any downside, except some extra code to write.

> > Good question.  It will be the current behavior.  Basically, we immediately
> > try to replicate to all the targetReplicas.  As they enter the ISR, they
> > leave "targetReplicas" and enter "replicas."  Making this more incremental
> > would be a good follow-on improvement.
> 
> The current behavior is to wait until all target replicas are in the ISR
> before making a change. Are you saying that we will keep this behavior?

We'd remove nodes from targetReplicas just as soon as they entered the ISR.  
They would become regular replicas at that point.

> 
> > When the /admin/reassign_partitions znode is empty, we'll listen for
> > updates.  When an update is made, we'll treat it like a call to
> > AlterPartitionReassignments.  Whenever we have zero pending reassignments,
> > we'll delete the /admin/reassign_partitions znode.  If the znode already
> > exists, we don't listen on it (same as now).
> 
> 
> So just to be a bit more explicit, what you are saying is that we will keep
> the reassignment state under /admin/reassign_partitions as we do currently,
> but we will update the target_replicas field in /partition/state following
> this new logic. Then as soon as the current replica set matches the target
> assignment, we will remove the /admin/reassign_partitions znode. Right?

One clarification: I wasn't proposing that the controller should write to 
/admin/reassign_partitions.  We will just remove the znode when we transition 
to having no ongoing reassignments.  There's no guarantee that what is in the 
znode reflects the current reassignments that are going on.  The only thing you 
can know is that if the znode exists, there is at least one reassignment going 
on.  But if someone makes a new reassignment with the 
AlterPartitionReassignments API, it won't appear in the znode.

Another thing to note is that if the znode exists and you overwrite it, your 
updates will be ignored.  This matches the current behavior of this znode.  
Apparently some applications don't know about this behavior and try to update 
the znode while a reassignment is going on, but it has no effect-- other than 
making what is in ZK misleading if someone checks.  This is, again, existing 
behavior :(

It's not a good API by any means.  For example, what if someone else overwrites 
your znode update before the controller has a chance to read it?  Unfortunate, 
but there's no really good way to fix this without transitioning away from 
direct ZooKeeper access.  We'll transition the command line tools immediately, 
but there will be some external management tools that will lag a bit.

> 
> Actually I'm still a bit confused about one aspect of this proposal. You
> are suggesting that we should leave the reassignment out of the Metadata.
> That is fine, but what does that mean as far as the consistency of the
> metadata we expose to clients while a reassignment is active? Currently the
> Metadata includes the following:
> 
> 1. Current leader
> 2. Current ISR
> 3. Current assigned replicas
> 
> Can you explain how the reassignment will affect this state?  As the target
> replicas are coming into sync, they will be added to the ISR. We wouldn't
> want it to be the case that the ISR includes replicas which are not
> currently assigned. So is the replica set the union of the current and
> target replicas?

I think we should leave the targetReplicas out of the normal replica set in the 
client metadata.  The client code that is checking the replica set shouldn't be 
treating target replicas as if they were regular replicas.   For example, you 
wouldn't look at a partition with 3 replicas and 2 target replicas, and an ISR 
of size 3, and think that it was under-replicated.

I agree that there is kind of an awkward edge case right when a targetReplica 
enters the ISR.  Since the replica set and the ISR reside in two separate 
znodes, they can't be updated atomically (well, maybe they could, but I don't 
think we will in practice).  When the controller becomes aware that the ISR has 
changed for the reassigning partition, it will 

[DISCUSS] KIP-482: The Kafka Protocol should Support Optional Fields

2019-06-26 Thread Colin McCabe
Hi all,

I would like to start a discussion for KIP-482:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Fields

cheers,
Colin


Re: [DISCUSS] KIP-213: Second follow-up on Foreign Key Joins

2019-06-26 Thread John Roesler
Thanks, Adam!

One unrelated thought that has just now occurred to me is that (unlike
the equi-joins we currently have), this join logic is potentially
spread over multiple Streams instances, which in general means that
the instances may be running different versions of Kafka Streams.

This means that if we discover a bug that requires us to again change
the wire message (as you did in this proposal update), we need to
consider what should happen if the PK instance is newer than the FK
instance, or vice-versa, during a rolling upgrade. We should think
ahead to this condition and make sure the logic is forward compatible.

Related: what about the initial case, when we release this feature
(let's say in 2.4)? What will happen if I decide to adopt 2.4 and add
a FK join together in one upgrade. Thus, the 2.4 member of the cluster
is producing the SubscriptionWrapper messages, and some 2.3 members
get the subscription topic assigned to them, but they have no idea
what to do with it? I'm not sure this is a problem; hopefully they
just do nothing. If it is a problem, it would be fine to say you have
to upgrade completely to 2.4 before deploying a FK join.

Just want to make sure we anticipate these issues in case it affects
the design at all.

Thanks,
-John

On Wed, Jun 26, 2019 at 2:38 PM Adam Bellemare  wrote:
>
> Sigh... Forgot the link:
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=74684836=78=74
>
> I'll update it when I validate that there are no issues with removing the
> SubscriptionResponseWrapper boolean.
>
> On Wed, Jun 26, 2019 at 3:37 PM Adam Bellemare 
> wrote:
>
> > >Maybe just call it as (k, leftval, null) or (k, null, rightval)?
> > Done.
> >
> > > if you update the KIP, you might want to send a new "diff link" to this
> > thread
> > Here it is:
> >
> > > Looking closely at the proposal, can you explain more about the
> > propagateIfNull field in SubscriptionResponseWrapper? It sort of looks like
> > it's always going to be equal to (RHS-result != null).
> > I believe you are correct, and I missed the forest for the trees. They are
> > effectively the same thing, and I can simply remove the flag. I will code
> > it up and try it out locally just to be sure.
> >
> > Thanks again for your help, it is greatly appreciated!
> >
> > On Wed, Jun 26, 2019 at 2:54 PM John Roesler  wrote:
> >
> >> I think the "scenario trace" is very nice, but has one point that I
> >> found confusing:
> >>
> >> You indicate a retraction in the join output as (k,null) and a join
> >> result as (k, leftval, rightval), but confusingly, you also write a
> >> join result as (k, JoinResult) when one side is null. Maybe just call
> >> it as (k, leftval, null) or (k, null, rightval)? That way the readers
> >> can more easily determine if the results meet their expectations for
> >> each join type.
> >>
> >> (procedural note: if you update the KIP, you might want to send a new
> >> "diff link" to this thread, since the one I posted at the beginning
> >> would not automatically show your latest changes)
> >>
> >> I was initially concerned that the proposed algorithm would wind up
> >> propagating something that looks like a left join (k, leftval, null)
> >> under the case that Joe pointed out, but after reviewing your
> >> scenario, I see that it will emit a tombstone (k, null) instead. This
> >> is appropriate, and unavoidable, since we have to retract the join
> >> result from the logical view (the join result is a logical Table).
> >>
> >> Looking closely at the proposal, can you explain more about the
> >> propagateIfNull field in SubscriptionResponseWrapper?
> >> It sort of looks like it's always going to be equal to (RHS-result !=
> >> null).
> >>
> >> In other words, can we drop that field and just send back RHS-result
> >> or null, and then handle it on the left-hand side like:
> >> if (rhsOriginalValueHash doesn't match) {
> >> emit nothing, just drop the update
> >> } else if (joinType==inner && rhsValue == null) {
> >> emit tombstone
> >> } else {
> >> emit joiner(lhsValue, rhsValue)
> >> }
> >>
> >> To your concern about emitting extra tombstones, personally, I think
> >> it's fine. Clearly, we should try to avoid unnecessary tombstones, but
> >> all things considered, it's not harmful to emit some unnecessary
> >> tombstones: their payload is small, and they are trivial to handle
> >> downstream. If users want to, they can materialize the join result to
> >> suppress any extra tombstones, so there's a way out.
> >>
> >> Thanks for the awesome idea. It's better than what I was thinking.
> >> -john
> >>
> >> On Wed, Jun 26, 2019 at 11:37 AM Adam Bellemare
> >>  wrote:
> >> >
> >> > Thanks John.
> >> >
> >> > I'm looking forward to any feedback on this. In the meantime I will
> >> work on
> >> > the unit tests to ensure that we have well-defined and readable
> >> coverage.
> >> >
> >> > At the moment I cannot see a way around emitting (k,null) whenever we
> >> emit
> >> > an 

Re: [DISCUSS] KIP-213: Second follow-up on Foreign Key Joins

2019-06-26 Thread Adam Bellemare
Sigh... Forgot the link:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=74684836=78=74

I'll update it when I validate that there are no issues with removing the
SubscriptionResponseWrapper boolean.

On Wed, Jun 26, 2019 at 3:37 PM Adam Bellemare 
wrote:

> >Maybe just call it as (k, leftval, null) or (k, null, rightval)?
> Done.
>
> > if you update the KIP, you might want to send a new "diff link" to this
> thread
> Here it is:
>
> > Looking closely at the proposal, can you explain more about the
> propagateIfNull field in SubscriptionResponseWrapper? It sort of looks like
> it's always going to be equal to (RHS-result != null).
> I believe you are correct, and I missed the forest for the trees. They are
> effectively the same thing, and I can simply remove the flag. I will code
> it up and try it out locally just to be sure.
>
> Thanks again for your help, it is greatly appreciated!
>
> On Wed, Jun 26, 2019 at 2:54 PM John Roesler  wrote:
>
>> I think the "scenario trace" is very nice, but has one point that I
>> found confusing:
>>
>> You indicate a retraction in the join output as (k,null) and a join
>> result as (k, leftval, rightval), but confusingly, you also write a
>> join result as (k, JoinResult) when one side is null. Maybe just call
>> it as (k, leftval, null) or (k, null, rightval)? That way the readers
>> can more easily determine if the results meet their expectations for
>> each join type.
>>
>> (procedural note: if you update the KIP, you might want to send a new
>> "diff link" to this thread, since the one I posted at the beginning
>> would not automatically show your latest changes)
>>
>> I was initially concerned that the proposed algorithm would wind up
>> propagating something that looks like a left join (k, leftval, null)
>> under the case that Joe pointed out, but after reviewing your
>> scenario, I see that it will emit a tombstone (k, null) instead. This
>> is appropriate, and unavoidable, since we have to retract the join
>> result from the logical view (the join result is a logical Table).
>>
>> Looking closely at the proposal, can you explain more about the
>> propagateIfNull field in SubscriptionResponseWrapper?
>> It sort of looks like it's always going to be equal to (RHS-result !=
>> null).
>>
>> In other words, can we drop that field and just send back RHS-result
>> or null, and then handle it on the left-hand side like:
>> if (rhsOriginalValueHash doesn't match) {
>> emit nothing, just drop the update
>> } else if (joinType==inner && rhsValue == null) {
>> emit tombstone
>> } else {
>> emit joiner(lhsValue, rhsValue)
>> }
>>
>> To your concern about emitting extra tombstones, personally, I think
>> it's fine. Clearly, we should try to avoid unnecessary tombstones, but
>> all things considered, it's not harmful to emit some unnecessary
>> tombstones: their payload is small, and they are trivial to handle
>> downstream. If users want to, they can materialize the join result to
>> suppress any extra tombstones, so there's a way out.
>>
>> Thanks for the awesome idea. It's better than what I was thinking.
>> -john
>>
>> On Wed, Jun 26, 2019 at 11:37 AM Adam Bellemare
>>  wrote:
>> >
>> > Thanks John.
>> >
>> > I'm looking forward to any feedback on this. In the meantime I will
>> work on
>> > the unit tests to ensure that we have well-defined and readable
>> coverage.
>> >
>> > At the moment I cannot see a way around emitting (k,null) whenever we
>> emit
>> > an event that lacks a matching foreign key on the RHS, except in the
>> > (k,null) -> (k,fk) case.
>> > If this LHS oldValue=null, we know we would have emitted a deletion and
>> so
>> > (k,null) would be emitted out of the join. In this case we don't need to
>> > send another null.
>> >
>> > Adam
>> >
>> > On Wed, Jun 26, 2019 at 11:53 AM John Roesler 
>> wrote:
>> >
>> > > Hi Adam,
>> > >
>> > > Thanks for the proposed revision to your KIP
>> > > (
>> > >
>> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=74684836=77=74
>> > > )
>> > >
>> > > in response to the concern pointed out during code review
>> > > (https://github.com/apache/kafka/pull/5527#issuecomment-505137962)
>> > >
>> > > We should have a brief discussion thread (here) in the mailing list to
>> > > make sure everyone who wants to gets a chance to consider the
>> > > modification to the design.
>> > >
>> > > Thanks,
>> > > -John
>> > >
>>
>


Re: [DISCUSS] KIP-213: Second follow-up on Foreign Key Joins

2019-06-26 Thread Adam Bellemare
>Maybe just call it as (k, leftval, null) or (k, null, rightval)?
Done.

> if you update the KIP, you might want to send a new "diff link" to this
thread
Here it is:

> Looking closely at the proposal, can you explain more about the
propagateIfNull field in SubscriptionResponseWrapper? It sort of looks like
it's always going to be equal to (RHS-result != null).
I believe you are correct, and I missed the forest for the trees. They are
effectively the same thing, and I can simply remove the flag. I will code
it up and try it out locally just to be sure.

Thanks again for your help, it is greatly appreciated!

On Wed, Jun 26, 2019 at 2:54 PM John Roesler  wrote:

> I think the "scenario trace" is very nice, but has one point that I
> found confusing:
>
> You indicate a retraction in the join output as (k,null) and a join
> result as (k, leftval, rightval), but confusingly, you also write a
> join result as (k, JoinResult) when one side is null. Maybe just call
> it as (k, leftval, null) or (k, null, rightval)? That way the readers
> can more easily determine if the results meet their expectations for
> each join type.
>
> (procedural note: if you update the KIP, you might want to send a new
> "diff link" to this thread, since the one I posted at the beginning
> would not automatically show your latest changes)
>
> I was initially concerned that the proposed algorithm would wind up
> propagating something that looks like a left join (k, leftval, null)
> under the case that Joe pointed out, but after reviewing your
> scenario, I see that it will emit a tombstone (k, null) instead. This
> is appropriate, and unavoidable, since we have to retract the join
> result from the logical view (the join result is a logical Table).
>
> Looking closely at the proposal, can you explain more about the
> propagateIfNull field in SubscriptionResponseWrapper?
> It sort of looks like it's always going to be equal to (RHS-result !=
> null).
>
> In other words, can we drop that field and just send back RHS-result
> or null, and then handle it on the left-hand side like:
> if (rhsOriginalValueHash doesn't match) {
> emit nothing, just drop the update
> } else if (joinType==inner && rhsValue == null) {
> emit tombstone
> } else {
> emit joiner(lhsValue, rhsValue)
> }
>
> To your concern about emitting extra tombstones, personally, I think
> it's fine. Clearly, we should try to avoid unnecessary tombstones, but
> all things considered, it's not harmful to emit some unnecessary
> tombstones: their payload is small, and they are trivial to handle
> downstream. If users want to, they can materialize the join result to
> suppress any extra tombstones, so there's a way out.
>
> Thanks for the awesome idea. It's better than what I was thinking.
> -john
>
> On Wed, Jun 26, 2019 at 11:37 AM Adam Bellemare
>  wrote:
> >
> > Thanks John.
> >
> > I'm looking forward to any feedback on this. In the meantime I will work
> on
> > the unit tests to ensure that we have well-defined and readable coverage.
> >
> > At the moment I cannot see a way around emitting (k,null) whenever we
> emit
> > an event that lacks a matching foreign key on the RHS, except in the
> > (k,null) -> (k,fk) case.
> > If this LHS oldValue=null, we know we would have emitted a deletion and
> so
> > (k,null) would be emitted out of the join. In this case we don't need to
> > send another null.
> >
> > Adam
> >
> > On Wed, Jun 26, 2019 at 11:53 AM John Roesler  wrote:
> >
> > > Hi Adam,
> > >
> > > Thanks for the proposed revision to your KIP
> > > (
> > >
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=74684836=77=74
> > > )
> > >
> > > in response to the concern pointed out during code review
> > > (https://github.com/apache/kafka/pull/5527#issuecomment-505137962)
> > >
> > > We should have a brief discussion thread (here) in the mailing list to
> > > make sure everyone who wants to gets a chance to consider the
> > > modification to the design.
> > >
> > > Thanks,
> > > -John
> > >
>


Re: [DISCUSS] KIP-455: Create an Administrative API for Replica Reassignment

2019-06-26 Thread Jason Gustafson
Hi Colin,

Responses below and another question:

I guess the thought process here is that most reassignment tools want to
> know about all the reassignments that are going on.  If you don't know all
> the pending reassignments, then it's hard to say whether adding a new one
> is a good idea, or cancelling an existing one.  So I guess I can't think of
> a case where a reassignment tool would want a partial set rather than the
> full one.


UIs often have "drill into" options. If there is a large ongoing
reassignment, I can see wanting to limit the scope to a specific topic. Any
downside that I'm missing?

Good question.  It will be the current behavior.  Basically, we immediately
> try to replicate to all the targetReplicas.  As they enter the ISR, they
> leave "targetReplicas" and enter "replicas."  Making this more incremental
> would be a good follow-on improvement.


The current behavior is to wait until all target replicas are in the ISR
before making a change. Are you saying that we will keep this behavior?

When the /admin/reassign_partitions znode is empty, we'll listen for
> updates.  When an update is made, we'll treat it like a call to
> AlterPartitionReassignments.  Whenever we have zero pending reassignments,
> we'll delete the /admin/reassign_partitions znode.  If the znode already
> exists, we don't listen on it (same as now).


So just to be a bit more explicit, what you are saying is that we will keep
the reassignment state under /admin/reassign_partitions as we do currently,
but we will update the target_replicas field in /partition/state following
this new logic. Then as soon as the current replica set matches the target
assignment, we will remove the /admin/reassign_partitions znode. Right?


Actually I'm still a bit confused about one aspect of this proposal. You
are suggesting that we should leave the reassignment out of the Metadata.
That is fine, but what does that mean as far as the consistency of the
metadata we expose to clients while a reassignment is active? Currently the
Metadata includes the following:

1. Current leader
2. Current ISR
3. Current assigned replicas

Can you explain how the reassignment will affect this state?  As the target
replicas are coming into sync, they will be added to the ISR. We wouldn't
want it to be the case that the ISR includes replicas which are not
currently assigned. So is the replica set the union of the current and
target replicas?

Thanks,
Jason




On Wed, Jun 26, 2019 at 10:42 AM Colin McCabe  wrote:

> On Tue, Jun 25, 2019, at 18:37, Jason Gustafson wrote:
> > Hi Colin,
> >
> > Took another pass on the KIP. Looks good overall. A few questions below:
> >
> > 1. I wasn't clear why `currentReplicas` is an optional field. Wouldn't we
> > always have a current set of replicas?
>
> Good point.  When I wrote that I was trying to use the same structure for
> both requesting a new reassignment, and describing an existing one.  I just
> posted a new version which uses separate structures for these.
> (CurrentReplicas is only relevant when you are listing an existing
> assignment.)
>
> > 2. Seems the only option is to list all active partition reassignments? I
> > think we have tended to regret these APIs. At least should we be able to
> > specify a subset of topics or partitions perhaps?
>
> I guess the thought process here is that most reassignment tools want to
> know about all the reassignments that are going on.  If you don't know all
> the pending reassignments, then it's hard to say whether adding a new one
> is a good idea, or cancelling an existing one.  So I guess I can't think of
> a case where a reassignment tool would want a partial set rather than the
> full one.
>
> It is kind of unfortunate to be passing all this information to some
> external process, but that's kind of inherent in the idea of reassignment
> as separate from the controller.  There's normally only one or zero
> processes doing and monitoring reassignments, so it's not as bad as
> thousands of clients sending full metadata requests.  It's probably OK?
>
> >
> > 3. Can you elaborate a bit on the handling of /admin/reassign_partitions?
> > Does this alter the target_replicas of the leader and ISR znode?
>
> When the /admin/reassign_partitions znode is empty, we'll listen for
> updates.  When an update is made, we'll treat it like a call to
> AlterPartitionReassignments.  Whenever we have zero pending reassignments,
> we'll delete the /admin/reassign_partitions znode.  If the znode already
> exists, we don't listen on it (same as now).
>
> One thing to note here is that if you upgrade from a pre-KIP-455 version
> to a KIP-455 version, and you invoke AlterPartitionReassignments before the
> cluster roll is finished, the new controller won't write the reassignment
> to /admin/reassign_partitions.  So if the controller node bounces to an
> un-upgraded broker during the roll, the reassignment will halt for that
> time period.  This is OK because a cluster roll should be a 

Re: [DISCUSS] KIP-213: Second follow-up on Foreign Key Joins

2019-06-26 Thread John Roesler
I think the "scenario trace" is very nice, but has one point that I
found confusing:

You indicate a retraction in the join output as (k,null) and a join
result as (k, leftval, rightval), but confusingly, you also write a
join result as (k, JoinResult) when one side is null. Maybe just call
it as (k, leftval, null) or (k, null, rightval)? That way the readers
can more easily determine if the results meet their expectations for
each join type.

(procedural note: if you update the KIP, you might want to send a new
"diff link" to this thread, since the one I posted at the beginning
would not automatically show your latest changes)

I was initially concerned that the proposed algorithm would wind up
propagating something that looks like a left join (k, leftval, null)
under the case that Joe pointed out, but after reviewing your
scenario, I see that it will emit a tombstone (k, null) instead. This
is appropriate, and unavoidable, since we have to retract the join
result from the logical view (the join result is a logical Table).

Looking closely at the proposal, can you explain more about the
propagateIfNull field in SubscriptionResponseWrapper?
It sort of looks like it's always going to be equal to (RHS-result != null).

In other words, can we drop that field and just send back RHS-result
or null, and then handle it on the left-hand side like:
if (rhsOriginalValueHash doesn't match) {
emit nothing, just drop the update
} else if (joinType==inner && rhsValue == null) {
emit tombstone
} else {
emit joiner(lhsValue, rhsValue)
}

To your concern about emitting extra tombstones, personally, I think
it's fine. Clearly, we should try to avoid unnecessary tombstones, but
all things considered, it's not harmful to emit some unnecessary
tombstones: their payload is small, and they are trivial to handle
downstream. If users want to, they can materialize the join result to
suppress any extra tombstones, so there's a way out.

Thanks for the awesome idea. It's better than what I was thinking.
-john

On Wed, Jun 26, 2019 at 11:37 AM Adam Bellemare
 wrote:
>
> Thanks John.
>
> I'm looking forward to any feedback on this. In the meantime I will work on
> the unit tests to ensure that we have well-defined and readable coverage.
>
> At the moment I cannot see a way around emitting (k,null) whenever we emit
> an event that lacks a matching foreign key on the RHS, except in the
> (k,null) -> (k,fk) case.
> If this LHS oldValue=null, we know we would have emitted a deletion and so
> (k,null) would be emitted out of the join. In this case we don't need to
> send another null.
>
> Adam
>
> On Wed, Jun 26, 2019 at 11:53 AM John Roesler  wrote:
>
> > Hi Adam,
> >
> > Thanks for the proposed revision to your KIP
> > (
> > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=74684836=77=74
> > )
> >
> > in response to the concern pointed out during code review
> > (https://github.com/apache/kafka/pull/5527#issuecomment-505137962)
> >
> > We should have a brief discussion thread (here) in the mailing list to
> > make sure everyone who wants to gets a chance to consider the
> > modification to the design.
> >
> > Thanks,
> > -John
> >


Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-26 Thread Guozhang Wang
2. The reason we did not expose generation.id from KafkaConsumer public
APIs directly is to abstract this notion from users (since it is an
implementation detail of the rebalance protocol itself, e.g. if user calls
consumer.assign() they do not need to invoke ConsumerCoordinator and no
need to be aware of generation.id at all).

On the other hand, with the current proposal the txn.coordiantor did not
know about the latest generation from the source-of-truth
group.coordinator; instead, it will only bump up the generation from the
producer's InitProducerIdRequest only.

The key here is that GroupCoordinator, when handling `InitProducerIdRequest

3. I agree that if we rely on the group coordinator to block on returning
offset-fetch-response if read-committed is enabled, then we do not need to
store partition assignment on txn coordinator and therefore it's better to
still decouple them. For that case we still need to update the KIP wiki
page that includes:

3.a. Augment OffsetFetchRequest with the ISOLATION_LEVEL as well.
3.b. Add new error code in OffsetFetchResponse to let client backoff and
retry if there are pending txns including the interested partitions.
3.c. Also in the worst case we would let the client be blocked for the
txn.timeout period, and for that rationale we may need to consider reducing
our default txn.timeout value as well.

4. According to Colin it seems we do not need to create another KIP and we
can just complete it as part of KIP-117 / KAFKA-5214; and we need to do
some cleanup to have BrokerApiVersion exposed from AdminClient (@Colin
please let use know if you have any concerns exposing it).


Guozhang


On Tue, Jun 25, 2019 at 6:43 PM Jason Gustafson  wrote:

> For reference, we have BrokerApiVersionCommand already as a public
> interface. We have a bit of tech debt at the moment because it uses a
> custom AdminClient. It would be nice to clean that up. In general, I think
> it is reasonable to expose from AdminClient. It can be used by management
> tools to inspect running Kafka versions for example.
>
> -Jason
>
> On Tue, Jun 25, 2019 at 4:37 PM Boyang Chen 
> wrote:
>
> > Thank you for the context Colin. The groupId was indeed a copy-paste
> error.
> > Our use case here for 447 is (Quoted from Guozhang):
> > '''
> > I think if we can do something else to
> > avoid this config though, for example we can use the embedded AdminClient
> > to send the APIVersion request upon starting up, and based on the
> returned
> > value decides whether to go to the old code path or the new behavior.
> > '''
> > The benefit we get is to avoid adding a new configuration to make a
> > decision simply base on broker version. If you have concerns with
> exposing
> > ApiVersion for client, we could
> > try to think of alternative solutions too.
> >
> > Boyang
> >
> >
> >
> > On Tue, Jun 25, 2019 at 4:20 PM Colin McCabe  wrote:
> >
> > > kafka.api.ApiVersion is an internal class, not suitable to exposing
> > > through AdminClient.  That class is not even accessible without having
> > the
> > > broker jars on your CLASSPATH.
> > >
> > > Another question is, what is the groupId parameter doing in the call?
> > The
> > > API versions are the same no matter what consumer group we use, right?
> > > Perhaps this was a copy and paste error?
> > >
> > > This is not the first time we have discussed having a method in
> > > AdminClient to retrieve API version information.  In fact, the original
> > KIP
> > > which created KafkaAdminClient specified an API for fetching version
> > > information.  It was called apiVersions and it is still there on the
> > wiki.
> > > See
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations
> > >
> > > However, this API wasn't ready in time for 0.11.0 so we shipped without
> > > it.  There was a JIRA to implement it for later versions,
> > > https://issues.apache.org/jira/browse/KAFKA-5214 , as well as a PR,
> > > https://github.com/apache/kafka/pull/3012 .  However, we started to
> > > rethink whether this AdminClient function was even necessary.  Most of
> > the
> > > use-cases we could think of seemed like horrible hacks.  So it has
> never
> > > really been implemented (yet?).
> > >
> > > best,
> > > Colin
> > >
> > >
> > > On Tue, Jun 25, 2019, at 15:46, Boyang Chen wrote:
> > > > Actually, after a second thought, I think it actually makes sense to
> > > > support auto upgrade through admin client to help use get api version
> > > > from
> > > > broker.
> > > > A draft KIP is here:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-483%3A++Add+Broker+Version+API+in+Admin+Client
> > > >
> > > > Boyang
> > > >
> > > > On Tue, Jun 25, 2019 at 2:57 PM Boyang Chen <
> > reluctanthero...@gmail.com>
> > > > wrote:
> > > >
> > > > > Thank you Guozhang, some of my understandings are inline below.
> > > > >
> > > > > On Tue, Jun 25, 2019 at 11:05 AM Jason Gustafson <
> ja...@confluent.io
> 

Re: [DISCUSS] KIP-455: Create an Administrative API for Replica Reassignment

2019-06-26 Thread Colin McCabe
On Tue, Jun 25, 2019, at 18:37, Jason Gustafson wrote:
> Hi Colin,
> 
> Took another pass on the KIP. Looks good overall. A few questions below:
> 
> 1. I wasn't clear why `currentReplicas` is an optional field. Wouldn't we
> always have a current set of replicas?

Good point.  When I wrote that I was trying to use the same structure for both 
requesting a new reassignment, and describing an existing one.  I just posted a 
new version which uses separate structures for these.  (CurrentReplicas is only 
relevant when you are listing an existing assignment.)

> 2. Seems the only option is to list all active partition reassignments? I
> think we have tended to regret these APIs. At least should we be able to
> specify a subset of topics or partitions perhaps?

I guess the thought process here is that most reassignment tools want to know 
about all the reassignments that are going on.  If you don't know all the 
pending reassignments, then it's hard to say whether adding a new one is a good 
idea, or cancelling an existing one.  So I guess I can't think of a case where 
a reassignment tool would want a partial set rather than the full one.

It is kind of unfortunate to be passing all this information to some external 
process, but that's kind of inherent in the idea of reassignment as separate 
from the controller.  There's normally only one or zero processes doing and 
monitoring reassignments, so it's not as bad as thousands of clients sending 
full metadata requests.  It's probably OK?

> 
> 3. Can you elaborate a bit on the handling of /admin/reassign_partitions?
> Does this alter the target_replicas of the leader and ISR znode?

When the /admin/reassign_partitions znode is empty, we'll listen for updates.  
When an update is made, we'll treat it like a call to 
AlterPartitionReassignments.  Whenever we have zero pending reassignments, 
we'll delete the /admin/reassign_partitions znode.  If the znode already 
exists, we don't listen on it (same as now).

One thing to note here is that if you upgrade from a pre-KIP-455 version to a 
KIP-455 version, and you invoke AlterPartitionReassignments before the cluster 
roll is finished, the new controller won't write the reassignment to 
/admin/reassign_partitions.  So if the controller node bounces to an 
un-upgraded broker during the roll, the reassignment will halt for that time 
period.  This is OK because a cluster roll should be a relatively short amount 
of time-- and also, probably not a time you want to be doing reassignments 
anyway because of the extra load on the cluster.

> 
> 4. I think it would be helpful to provide an example of the rebalance
> process for a given partition. Specifically I am wondering whether the
> replica set is updated incrementally or if we follow the current behavior.
> Possibly some decisions can be deferred to implementation, but it would be
> helpful to work through a case of changing the replication factor just to
> make sure there are reasonable options.

Good question.  It will be the current behavior.  Basically, we immediately try 
to replicate to all the targetReplicas.  As they enter the ISR, they leave 
"targetReplicas" and enter "replicas."  Making this more incremental would be a 
good follow-on improvement.

> 
> 5. Are we changing the semantics of the URP and UnderMinIsr metrics in this
> KIP or in a follow-up?

In a follow-up.

> 
> 6. We have both "TargetBrokers" and "PendingReplicas" as names in the
> proposal. Perhaps we should try to be consistent?

Fixed.  I set it to TargetReplicas for now

> 
> 7. I am not sure specifying `targetReplicas` as empty is the clearest way
> to cancel a reassignment. Whether we implement it this way or not in the
> protocol is a separate issue, but perhaps we should have an explicit
> `cancelReassignment` method in AdminClient?

I changed the API slightly to take an Optional.  Does 
that seem cleaner?

best,
Colin

> 
> Thanks,
> Jason
> 
> 
> 
> 
> On Wed, Jun 19, 2019 at 3:36 AM Stanislav Kozlovski 
> wrote:
> 
> > Hey there Colin,
> >
> > Thanks for the work on this KIP. It is a much-needed improvement and I'm
> > excited to see it. Sorry for coming in so late to the discussion, I have
> > one question to better understand the change and a small suggestion.
> >
> > I see we allow reassignment cancellation at the partition level - what is
> > the motivation behind that? I think that having the back-end structures
> > support it is a good choice since it allows us more flexibility in the
> > future but what are the reasons for allowing a user to cancel at a
> > partition level? I think allowing it might let users shoot themselves in
> > the foot easier and make tools harder to implement (needing to guard
> > against it).
> >
> > In all regards, what do you think about an ease of use improvement where we
> > allow a user to cancel all reassignments for a topic without specifying its
> > partitions? Essentially, we could cancel all reassignments for a topic if
> > the 

Re: [DISCUSS] KIP-213: Second follow-up on Foreign Key Joins

2019-06-26 Thread Adam Bellemare
Thanks John.

I'm looking forward to any feedback on this. In the meantime I will work on
the unit tests to ensure that we have well-defined and readable coverage.

At the moment I cannot see a way around emitting (k,null) whenever we emit
an event that lacks a matching foreign key on the RHS, except in the
(k,null) -> (k,fk) case.
If this LHS oldValue=null, we know we would have emitted a deletion and so
(k,null) would be emitted out of the join. In this case we don't need to
send another null.

Adam

On Wed, Jun 26, 2019 at 11:53 AM John Roesler  wrote:

> Hi Adam,
>
> Thanks for the proposed revision to your KIP
> (
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=74684836=77=74
> )
>
> in response to the concern pointed out during code review
> (https://github.com/apache/kafka/pull/5527#issuecomment-505137962)
>
> We should have a brief discussion thread (here) in the mailing list to
> make sure everyone who wants to gets a chance to consider the
> modification to the design.
>
> Thanks,
> -John
>


[DISCUSS] KIP-213: Second follow-up on Foreign Key Joins

2019-06-26 Thread John Roesler
Hi Adam,

Thanks for the proposed revision to your KIP
(https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=74684836=77=74)

in response to the concern pointed out during code review
(https://github.com/apache/kafka/pull/5527#issuecomment-505137962)

We should have a brief discussion thread (here) in the mailing list to
make sure everyone who wants to gets a chance to consider the
modification to the design.

Thanks,
-John


Re: [DISCUSS] KIP-466: Add support for List serialization and deserialization

2019-06-26 Thread John Roesler
Thanks for the update, Daniyar!

In addition to specifying the config interface, can you also specify
the Java interface? Namely, if I need to pass an instance of this
serde in to the DSL directly, as in Produced, Materialized, etc., what
constructor(s) would I have available? Likewise with the Serializer
and Deserailizer. I don't think you need to specify the implementation
logic, since we've already discussed it here.

If you also want to specify the serialized format of the data records
in the KIP, it could be useful documentation, as well as letting us
verify the schema for forward/backward compatibility concerns, etc.

Thanks,
John

On Wed, Jun 26, 2019 at 10:33 AM Development  wrote:
>
> Hey,
>
> Finally made updates to the KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization
>  
> 
> Sorry for the delay :)
>
> Thank You!
>
> Best,
> Daniyar Yeralin
>
> > On Jun 22, 2019, at 12:49 AM, Matthias J. Sax  wrote:
> >
> > Yes, something like this. I did not think about good configuration
> > parameter names yet. I am also not sure if I understand all proposed
> > configs atm. But all configs should be listed and explained in the KIP
> > anyway, and we can discuss further after you have updated the KIP (I can
> > ask more detailed question if I have any).
> >
> >
> > -Matthias
> >
> > On 6/21/19 2:05 PM, Development wrote:
> >> Yes, you are right. ByteSerializer is not what I need to have in a list
> >> of primitives.
> >>
> >> As for the default constructor and configurability, just want to make
> >> sure. Is this what you have on your mind?
> >>
> >> Best,
> >> Daniyar Yeralin
> >>
> >>
> >>
> >>> On Jun 21, 2019, at 2:51 PM, Matthias J. Sax  >>> > wrote:
> >>>
> >>> Thanks for the update!
> >>>
> >>> I think that `ListDeserializer`, `ListSerializer`, and `ListSerde`
> >>> should have an default constructor and it should be possible to pass in
> >>> the `Class listClass` information via a configuration. Otherwise,
> >>> KafkaStreams cannot use it as default serde.
> >>>
> >>>
> >>> For the primitive serializers: `BytesSerializer` is not primitive IMHO,
> >>> as is it for `byte[]` with variable length -- it's for arrays, not for
> >>> single `byte` (note, that `Bytes` is a Kafka class wrapping `byte[]`).
> >>>
> >>>
> >>> For tests, we can comment on the PR. No need to do this in the KIP
> >>> discussion.
> >>>
> >>>
> >>> Can you also update the KIP?
> >>>
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> On 6/21/19 11:29 AM, Development wrote:
>  I made and pushed necessary commits, so we could review the final
>  version under PR https://github.com/apache/kafka/pull/6592
> 
>  I also need some advice on writing tests for this new serde. So far I
>  only have two test cases (roundtrip and empty payload), I’m not sure
>  if it is enough.
> 
>  Thank y’all for your help in this KIP :)
> 
>  Best,
>  Daniyar Yeralin
> 
> 
> > On Jun 21, 2019, at 1:44 PM, John Roesler  > > wrote:
> >
> > Hey Daniyar,
> >
> > Looks good to me! Thanks for considering it.
> >
> > Thanks,
> > -John
> >
> > On Fri, Jun 21, 2019 at 9:04 AM Development  >  > wrote:
> > Hey John and Matthias,
> >
> > Yes, now I see it all. I’m storing lots of redundant information.
> > Here is my final idea. Yes, now a user should pass a list type. I
> > realized that’s the type is not really needed in ListSerializer, but
> > only in ListDeserializer:
> >
> >
> > In ListSerializer we will start storing sizes only if serializer is
> > not a primitive serializer:
> >
> >
> > Then, in deserializer, we persist passed list type, so that during
> > deserialization we could create an instance of it with predefined
> > listSize for better performance.
> > We also try to locate a primitiveSize based on passed deserializer.
> > If it is not there, then primitiveSize will be null. Which means
> > that each entry’s size was encoded individually.
> >
> >
> > This looks much cleaner and more concise.
> >
> > What do you think?
> >
> > Best,
> > Daniyar Yeralin
> >
> >> On Jun 20, 2019, at 5:45 PM, Matthias J. Sax  >>  > wrote:
> >>
> >> For encoding the list-type: I see John's point about re-encoding the
> >> list-type redundantly. However, I also don't like the idea that the
> >> Deserializer returns a fixed type...
> >>
> >> Maybe it's best allow users to specify the target list type on
> >> deserialization via config?
> >>
> >> Similar for 

Re: [DISCUSS] KIP-466: Add support for List serialization and deserialization

2019-06-26 Thread Development
Hey, 

Finally made updates to the KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization
 

Sorry for the delay :)

Thank You!

Best,
Daniyar Yeralin

> On Jun 22, 2019, at 12:49 AM, Matthias J. Sax  wrote:
> 
> Yes, something like this. I did not think about good configuration
> parameter names yet. I am also not sure if I understand all proposed
> configs atm. But all configs should be listed and explained in the KIP
> anyway, and we can discuss further after you have updated the KIP (I can
> ask more detailed question if I have any).
> 
> 
> -Matthias
> 
> On 6/21/19 2:05 PM, Development wrote:
>> Yes, you are right. ByteSerializer is not what I need to have in a list
>> of primitives.
>> 
>> As for the default constructor and configurability, just want to make
>> sure. Is this what you have on your mind?
>> 
>> Best,
>> Daniyar Yeralin
>> 
>> 
>> 
>>> On Jun 21, 2019, at 2:51 PM, Matthias J. Sax >> > wrote:
>>> 
>>> Thanks for the update!
>>> 
>>> I think that `ListDeserializer`, `ListSerializer`, and `ListSerde`
>>> should have an default constructor and it should be possible to pass in
>>> the `Class listClass` information via a configuration. Otherwise,
>>> KafkaStreams cannot use it as default serde.
>>> 
>>> 
>>> For the primitive serializers: `BytesSerializer` is not primitive IMHO,
>>> as is it for `byte[]` with variable length -- it's for arrays, not for
>>> single `byte` (note, that `Bytes` is a Kafka class wrapping `byte[]`).
>>> 
>>> 
>>> For tests, we can comment on the PR. No need to do this in the KIP
>>> discussion.
>>> 
>>> 
>>> Can you also update the KIP?
>>> 
>>> 
>>> 
>>> -Matthias
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On 6/21/19 11:29 AM, Development wrote:
 I made and pushed necessary commits, so we could review the final
 version under PR https://github.com/apache/kafka/pull/6592
 
 I also need some advice on writing tests for this new serde. So far I
 only have two test cases (roundtrip and empty payload), I’m not sure
 if it is enough.
 
 Thank y’all for your help in this KIP :)
 
 Best,
 Daniyar Yeralin
 
 
> On Jun 21, 2019, at 1:44 PM, John Roesler  > wrote:
> 
> Hey Daniyar,
> 
> Looks good to me! Thanks for considering it.
> 
> Thanks,
> -John
> 
> On Fri, Jun 21, 2019 at 9:04 AM Development   > wrote:
> Hey John and Matthias,
> 
> Yes, now I see it all. I’m storing lots of redundant information.
> Here is my final idea. Yes, now a user should pass a list type. I
> realized that’s the type is not really needed in ListSerializer, but
> only in ListDeserializer:
> 
> 
> In ListSerializer we will start storing sizes only if serializer is
> not a primitive serializer:
> 
> 
> Then, in deserializer, we persist passed list type, so that during
> deserialization we could create an instance of it with predefined
> listSize for better performance.
> We also try to locate a primitiveSize based on passed deserializer.
> If it is not there, then primitiveSize will be null. Which means
> that each entry’s size was encoded individually.
> 
> 
> This looks much cleaner and more concise.
> 
> What do you think?
> 
> Best,
> Daniyar Yeralin
> 
>> On Jun 20, 2019, at 5:45 PM, Matthias J. Sax >  > wrote:
>> 
>> For encoding the list-type: I see John's point about re-encoding the
>> list-type redundantly. However, I also don't like the idea that the
>> Deserializer returns a fixed type...
>> 
>> Maybe it's best allow users to specify the target list type on
>> deserialization via config?
>> 
>> Similar for the primitive types: I don't think we need to encode the
>> type size, but users could specify the type on the deserializer (via a
>> config again)?
>> 
>> 
>> About generics: nesting could be arbitrarily deep. Hence, I doubt
>> we can
>> support this and a cast will be necessary at some point in the user
>> code.
>> 
>> 
>> 
>> -Matthias
>> 
>> 
>> 
>> On 6/20/19 1:21 PM, John Roesler wrote:
>>> Hey Daniyar,
>>> 
>>> Thanks for looking at it!
>>> 
>>> Something like your screenshot is more along the lines of what I was
>>> thinking. Sorry, but I didn't follow what you mean, how would that not
>>> be "vanilla java"?
>>> 
>>> Unfortunately the deserializer needs more information, though. For
>>> example, what if the inner type is a Map? The serde
>>> could
>>> only be used to produce a 

Re: [DISCUSS] KIP-477: Add PATCH method for connector config in Connect REST API

2019-06-26 Thread Ryanne Dolan
Ivan, I looked at adding PATCH a while ago as well. I decided not to pursue
the idea for a few reasons:

1) PATCH is still racy. For example, if you want to add a topic to the
"topics" property, you still need to read, modify, and write the existing
value. To handle this, you'd need to support atomic sub-document
operations, which I don't see happening.

2) A common pattern is to store your configurations in git or something,
and then apply them via PUT. Throw in some triggers or jenkins etc, and you
have a more robust solution than PATCH provides.

3) For properties that change a lot, it's possible to use an out-of-band
data source, e.g. Kafka or Zookeeper, and then have your Connector
subscribe to changes. I've done something like this to enable dynamic
reconfiguration of Connectors from command-line tools and dashboards
without involving the Connect REST API at all. Moreover, I've done so in an
atomic, non-racy way.

So I don't think PATCH is strictly necessary nor sufficient for atomic
partial updates. That said, it doesn't hurt and I'm happy to support the
KIP.

Ryanne

On Tue, Jun 25, 2019 at 12:15 PM Ivan Yurchenko 
wrote:

> Hi,
>
> Since Kafka 2.3 has just been release and more people may have time to look
> at this now, I'd like to bump this discussion.
> Thanks.
>
> Ivan
>
>
> On Thu, 13 Jun 2019 at 17:20, Ivan Yurchenko 
> wrote:
>
> > Hello,
> >
> > I'd like to start the discussion of KIP-477: Add PATCH method for
> > connector config in Connect REST API.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-477%3A+Add+PATCH+method+for+connector+config+in+Connect+REST+API
> >
> > There is also a draft PR: https://github.com/apache/kafka/pull/6934.
> >
> > Thank you.
> >
> > Ivan
> >
>


Re: [jira] [Created] (KAFKA-8604) kafka log dir was marked as offline because of deleting segments of __consumer_offsets failed

2019-06-26 Thread M. Manna
This is a known issue for Windows. See KAFKA-6188.

Thanks,

On Wed, 26 Jun 2019 at 14:46, songyingshuan (JIRA)  wrote:

> songyingshuan created KAFKA-8604:
> 
>
>  Summary: kafka log dir was marked as offline because of
> deleting segments of __consumer_offsets failed
>  Key: KAFKA-8604
>  URL: https://issues.apache.org/jira/browse/KAFKA-8604
>  Project: Kafka
>   Issue Type: Bug
>   Components: log cleaner
> Affects Versions: 1.0.1
> Reporter: songyingshuan
>
>
> We encountered a problem in our product env without any foresight. When
> kafka broker trying to clean __consumer_offsets-38 (and only happents to
> this partition), the log shows
> it failed, and marking the whole disk/log dir offline, and this leads to a
> negative impact on some normal partitions (because of the ISR list of those
> partitions decrease).
> we had to restart the broker server to reuse the disk/dir which was marked
> as offline. BUT!! this problem occurs periodically with the same reason so
> we have to restart broker periodically.
>
> we read some source code of kafka-1.0.1, but cannot make sure why this
> happends. And The cluster status had been good until this problem suddenly
> attacked us.
>
> the error log is something like this :
>
>
> {code:java}
> 2019-06-25 00:11:26,241 INFO kafka.log.TimeIndex: Deleting index
> /data6/kafka/data/__consumer_offsets-38/012855596978.timeindex.deleted
> 2019-06-25 00:11:26,258 ERROR kafka.server.LogDirFailureChannel: Error
> while deleting segments for __consumer_offsets-38 in dir /data6/kafka/data
> java.io.IOException: Delete of log .log.deleted failed.
> at kafka.log.LogSegment.delete(LogSegment.scala:496)
> at
> kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply$mcV$sp(Log.scala:1596)
> at
> kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply(Log.scala:1596)
> at
> kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply(Log.scala:1596)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.kafka$log$Log$$deleteSeg$1(Log.scala:1595)
> at
> kafka.log.Log$$anonfun$kafka$log$Log$$asyncDeleteSegment$1.apply$mcV$sp(Log.scala:1599)
> at
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
> at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 2019-06-25 00:11:26,265 ERROR kafka.utils.KafkaScheduler: Uncaught
> exception in scheduled task 'delete-file'
> org.apache.kafka.common.errors.KafkaStorageException: Error while deleting
> segments for __consumer_offsets-38 in dir /data6/kafka/data
> Caused by: java.io.IOException: Delete of log
> .log.deleted failed.
> at kafka.log.LogSegment.delete(LogSegment.scala:496)
> at
> kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply$mcV$sp(Log.scala:1596)
> at
> kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply(Log.scala:1596)
> at
> kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply(Log.scala:1596)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.kafka$log$Log$$deleteSeg$1(Log.scala:1595)
> at
> kafka.log.Log$$anonfun$kafka$log$Log$$asyncDeleteSegment$1.apply$mcV$sp(Log.scala:1599)
> at
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
> at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 2019-06-25 00:11:26,268 INFO kafka.server.ReplicaManager: [ReplicaManager
> broker=1001] Stopping serving replicas in dir /data6/kafka/data
> {code}
>  and we also find that the cleaning of __consumer_offsets-38 is so
> frequently that almost 85% of output log is related. something like this :
>
>
> {code:java}
> 2019-06-25 20:29:03,222 INFO kafka.log.OffsetIndex: Deleting 

Re: [VOTE] KIP-476: Add Java AdminClient interface

2019-06-26 Thread Ryanne Dolan
+1 (non-binding)

Thanks.
Ryanne

On Tue, Jun 25, 2019 at 10:21 PM Satish Duggana 
wrote:

> +1 (non-binding)
>
> On Wed, Jun 26, 2019 at 8:37 AM Satish Duggana 
> wrote:
> >
> > +1 Matthias/Andy.
> > IMHO, interface is about the contract, it should not have/expose any
> > implementation. I am fine with either way as it is more of taste or
> > preference.
> >
> > Agree with Ismael/Colin/Ryanne on not deprecating for good reasons.
> >
> >
> > On Mon, Jun 24, 2019 at 8:33 PM Andy Coates  wrote:
> > >
> > > I agree Matthias.
> > >
> > > (In Scala, such factory methods are on a companion object. As Java
> doesn't
> > > have the concept of a companion object, an equivalent would be a
> utility
> > > class with a similar name...)
> > >
> > > However, I'll update the KIP to include the factory method on the
> interface.
> > >
> > > On Fri, 21 Jun 2019 at 23:40, Matthias J. Sax 
> wrote:
> > >
> > > > I still think, that an interface does not need to know anything about
> > > > its implementation. But I am also fine if we add a factory method to
> the
> > > > new interface if that is preferred by most people.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 6/21/19 7:10 AM, Ismael Juma wrote:
> > > > > This is even more reason not to deprecate immediately, there is
> very
> > > > little
> > > > > maintenance cost for us. We should be mindful that many of our
> users (eg
> > > > > Spark, Flink, etc.) typically allow users to specify the kafka
> clients
> > > > > version and hence avoid using new classes/interfaces for some
> time. They
> > > > > would get a bunch of warnings they cannot do anything about apart
> from
> > > > > suppressing.
> > > > >
> > > > > Ismael
> > > > >
> > > > > On Fri, Jun 21, 2019 at 4:00 AM Andy Coates 
> wrote:
> > > > >
> > > > >> Hi Ismael,
> > > > >>
> > > > >> I’m happy enough to not deprecate the existing `AdminClient`
> class as
> > > > part
> > > > >> of this change.
> > > > >>
> > > > >> However, note that, the class will likely be empty, i.e. all
> methods and
> > > > >> implementations will be inherited from the interface:
> > > > >>
> > > > >> public abstract class AdminClient implements Admin {
> > > > >> }
> > > > >>
> > > > >> Not marking it as deprecated has the benefit that users won’t see
> any
> > > > >> deprecation warnings on the next release. Conversely, deprecating
> it
> > > > will
> > > > >> mean we can choose to remove this, now pointless class, in the
> future
> > > > if we
> > > > >> choose.
> > > > >>
> > > > >> That’s my thinking for deprecation, but as I’ve said I’m happy
> either
> > > > way.
> > > > >>
> > > > >> Andy
> > > > >>
> > > > >>> On 18 Jun 2019, at 16:09, Ismael Juma  wrote:
> > > > >>>
> > > > >>> I agree with Ryanne, I think we should avoid deprecating
> AdminClient
> > > > and
> > > > >>> causing so much churn for users who don't actually care about
> this
> > > > niche
> > > > >>> use case.
> > > > >>>
> > > > >>> Ismael
> > > > >>>
> > > > >>> On Tue, Jun 18, 2019 at 6:43 AM Andy Coates 
> wrote:
> > > > >>>
> > > >  Hi Ryanne,
> > > > 
> > > >  If we don't change the client code, then everywhere will still
> expect
> > > >  subclasses of `AdminClient`, so the interface will be of no
> use, i.e.
> > > > I
> > > >  can't write a class that implements the new interface and pass
> it to
> > > > the
> > > >  client code.
> > > > 
> > > >  Thanks,
> > > > 
> > > >  Andy
> > > > 
> > > >  On Mon, 17 Jun 2019 at 19:01, Ryanne Dolan <
> ryannedo...@gmail.com>
> > > > >> wrote:
> > > > 
> > > > > Andy, while I agree that the new interface is useful, I'm not
> > > > convinced
> > > > > adding an interface requires deprecating AdminClient and
> changing so
> > > > >> much
> > > > > client code. Why not just add the Admin interface, have
> AdminClient
> > > > > implement it, and have done?
> > > > >
> > > > > Ryanne
> > > > >
> > > > > On Mon, Jun 17, 2019 at 12:09 PM Andy Coates <
> a...@confluent.io>
> > > > >> wrote:
> > > > >
> > > > >> Hi all,
> > > > >>
> > > > >> I think I've addressed all concerns. Let me know if I've
> not.  Can I
> > > >  call
> > > > >> another round of votes please?
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> Andy
> > > > >>
> > > > >> On Fri, 14 Jun 2019 at 04:55, Satish Duggana <
> > > > >> satish.dugg...@gmail.com
> > > > >
> > > > >> wrote:
> > > > >>
> > > > >>> Hi Andy,
> > > > >>> Thanks for the KIP. This is a good change and it gives the
> user a
> > > > > better
> > > > >>> handle on Admin client usage. I agree with the proposal
> except the
> > > >  new
> > > > >>> `Admin` interface having all the methods from `AdminClient`
> > > > abstract
> > > > >> class.
> > > > >>> It should be kept clean having only the admin operations as
> methods
> > > > > from
> > > > >>> KafkaClient abstract class but not the factory methods 

[jira] [Created] (KAFKA-8604) kafka log dir was marked as offline because of deleting segments of __consumer_offsets failed

2019-06-26 Thread songyingshuan (JIRA)
songyingshuan created KAFKA-8604:


 Summary: kafka log dir was marked as offline because of deleting 
segments of __consumer_offsets failed
 Key: KAFKA-8604
 URL: https://issues.apache.org/jira/browse/KAFKA-8604
 Project: Kafka
  Issue Type: Bug
  Components: log cleaner
Affects Versions: 1.0.1
Reporter: songyingshuan


We encountered a problem in our product env without any foresight. When kafka 
broker trying to clean __consumer_offsets-38 (and only happents to this 
partition), the log shows
it failed, and marking the whole disk/log dir offline, and this leads to a 
negative impact on some normal partitions (because of the ISR list of those 
partitions decrease).
we had to restart the broker server to reuse the disk/dir which was marked as 
offline. BUT!! this problem occurs periodically with the same reason so we have 
to restart broker periodically.

we read some source code of kafka-1.0.1, but cannot make sure why this 
happends. And The cluster status had been good until this problem suddenly 
attacked us.

the error log is something like this :

 
{code:java}
2019-06-25 00:11:26,241 INFO kafka.log.TimeIndex: Deleting index 
/data6/kafka/data/__consumer_offsets-38/012855596978.timeindex.deleted
2019-06-25 00:11:26,258 ERROR kafka.server.LogDirFailureChannel: Error while 
deleting segments for __consumer_offsets-38 in dir /data6/kafka/data
java.io.IOException: Delete of log .log.deleted failed.
at kafka.log.LogSegment.delete(LogSegment.scala:496)
at 
kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply$mcV$sp(Log.scala:1596)
at kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply(Log.scala:1596)
at kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply(Log.scala:1596)
at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
at kafka.log.Log.kafka$log$Log$$deleteSeg$1(Log.scala:1595)
at 
kafka.log.Log$$anonfun$kafka$log$Log$$asyncDeleteSegment$1.apply$mcV$sp(Log.scala:1599)
at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-06-25 00:11:26,265 ERROR kafka.utils.KafkaScheduler: Uncaught exception in 
scheduled task 'delete-file'
org.apache.kafka.common.errors.KafkaStorageException: Error while deleting 
segments for __consumer_offsets-38 in dir /data6/kafka/data
Caused by: java.io.IOException: Delete of log .log.deleted 
failed.
at kafka.log.LogSegment.delete(LogSegment.scala:496)
at 
kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply$mcV$sp(Log.scala:1596)
at kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply(Log.scala:1596)
at kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply(Log.scala:1596)
at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
at kafka.log.Log.kafka$log$Log$$deleteSeg$1(Log.scala:1595)
at 
kafka.log.Log$$anonfun$kafka$log$Log$$asyncDeleteSegment$1.apply$mcV$sp(Log.scala:1599)
at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-06-25 00:11:26,268 INFO kafka.server.ReplicaManager: [ReplicaManager 
broker=1001] Stopping serving replicas in dir /data6/kafka/data
{code}
 and we also find that the cleaning of __consumer_offsets-38 is so frequently 
that almost 85% of output log is related. something like this :

 
{code:java}
2019-06-25 20:29:03,222 INFO kafka.log.OffsetIndex: Deleting index 
/data6/kafka/data/__consumer_offsets-38/008457474982.index.deleted 
2019-06-25 20:29:03,222 INFO kafka.log.TimeIndex: Deleting index 
/data6/kafka/data/__consumer_offsets-38/008457474982.timeindex.deleted 
2019-06-25 20:29:03,295 INFO kafka.log.Log: Deleting 

Re: [DISCUSS] KIP-435: Incremental Partition Reassignment

2019-06-26 Thread Viktor Somogyi-Vass
Hey Colin,

I think there's some confusion here so I might change the name of this. So
KIP-435 is about the internal batching of reassignments (so purely a
controller change) and not about client side APIs. As per this moment these
kind of improvements are listed on KIP-455's future work section so in my
understanding KIP-455 won't touch that :).
Let me know if I'm missing any points here.

Viktor

On Tue, Jun 25, 2019 at 9:02 PM Colin McCabe  wrote:

> Hi Viktor,
>
> Now that the 2.3 release is over, we're going to be turning our attention
> back to working on KIP-455, which provides an API for partition
> reassignment, and also solves the incremental reassignment problem.  Sorry
> about the pause, but I had to focus on the stuff that was going into 2.3.
>
> I think last time we talked about this, the consensus was that KIP-455
> supersedes KIP-435, since KIP-455 supports incremental reassignment.  We
> also don't want to add more technical debt in the form of a new
> ZooKeeper-based API that we'll have to support for a while.  So let's focus
> on KIP-455 here.  We have more resources now so I think we'll be able to
> get it done soonish.
>
> best,
> Colin
>
>
> On Tue, Jun 25, 2019, at 08:09, Viktor Somogyi-Vass wrote:
> > Hi All,
> >
> > I have added another improvement to this, which is to limit the parallel
> > leader movements. I think I'll soon (maybe late this week or early next)
> > start a vote on this too if there are no additional feedback.
> >
> > Thanks,
> > Viktor
> >
> > On Mon, Apr 29, 2019 at 1:26 PM Viktor Somogyi-Vass <
> viktorsomo...@gmail.com>
> > wrote:
> >
> > > Hi Folks,
> > >
> > > I've updated the KIP with the batching which would work on both replica
> > > and partition level. To explain it briefly: for instance if the replica
> > > level is set to 2 and partition level is set to 3, then 2x3=6 replica
> > > reassignment would be in progress at the same time. In case of
> reassignment
> > > for a single partition from (0, 1, 2, 3, 4) to (5, 6, 7, 8, 9) we would
> > > form the batches (0, 1) → (5, 6); (2, 3) → (7, 8) and 4 → 9 and would
> > > execute the reassignment in this order.
> > >
> > > Let me know what you think.
> > >
> > > Best,
> > > Viktor
> > >
> > > On Mon, Apr 15, 2019 at 7:01 PM Viktor Somogyi-Vass <
> > > viktorsomo...@gmail.com> wrote:
> > >
> > >> A follow up on the batching topic to clarify my points above.
> > >>
> > >> Generally I think that batching should be a core feature as Colin said
> > >> the controller should possess all information that are related.
> > >> Also Cruise Control (or really any 3rd party admin system) might build
> > >> upon this to give more holistic approach to balance brokers. We may
> cater
> > >> them with APIs that act like building blocks to make their life
> easier like
> > >> incrementalization, batching, cancellation and rollback but I think
> the
> > >> more advanced we go we'll need more advanced control surface and
> Kafka's
> > >> basic tooling might not be suitable for that.
> > >>
> > >> Best,
> > >> Viktor
> > >>
> > >>
> > >> On Mon, 15 Apr 2019, 18:22 Viktor Somogyi-Vass, <
> viktorsomo...@gmail.com>
> > >> wrote:
> > >>
> > >>> Hey Guys,
> > >>>
> > >>> I'll reply to you all in this email:
> > >>>
> > >>> @Jun:
> > >>> 1. yes, it'd be a good idea to add this feature, I'll write this into
> > >>> the KIP. I was actually thinking about introducing a dynamic config
> called
> > >>> reassignment.parallel.partition.count and
> > >>> reassignment.parallel.replica.count. The first property would
> control how
> > >>> many partition reassignment can we do concurrently. The second would
> go one
> > >>> level in granularity and would control how many replicas do we want
> to move
> > >>> for a given partition. Also one more thing that'd be useful to fix
> is that
> > >>> a given list of partition -> replica list would be executed in the
> same
> > >>> order (from first to last) so it's overall predictable and the user
> would
> > >>> have some control over the order of reassignments should be
> specified as
> > >>> the JSON is still assembled by the user.
> > >>> 2. the /kafka/brokers/topics/{topic} znode to be specific. I'll
> update
> > >>> the KIP to contain this.
> > >>>
> > >>> @Jason:
> > >>> I think building this functionality into Kafka would definitely
> benefit
> > >>> all the users and that CC as well as it'd simplify their software as
> you
> > >>> said. As I understand the main advantage of CC and other similar
> softwares
> > >>> are to give high level features for automatic load balancing.
> Reliability,
> > >>> stability and predictability of the reassignment should be a core
> feature
> > >>> of Kafka. I think the incrementalization feature would make it more
> stable.
> > >>> I would consider cancellation too as a core feature and we can leave
> the
> > >>> gate open for external tools to feed in their reassignment json as
> they
> > >>> want. I was also thinking about what are the set of features we can
> 

Build failed in Jenkins: kafka-trunk-jdk8 #3753

2019-06-26 Thread Apache Jenkins Server
See 


Changes:

[bill] MINOR: Fix for typos in processor-api.html (#6961)

[bbejeck] Minor: code enhancment (#6999)

--
[...truncated 5.09 MB...]

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
PASSED


[jira] [Resolved] (KAFKA-8596) Kafka topic pre-creation error message needs to be passed to application as an exception

2019-06-26 Thread Gurudatt Kulkarni (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gurudatt Kulkarni resolved KAFKA-8596.
--
Resolution: Duplicate

> Kafka topic pre-creation error message needs to be passed to application as 
> an exception
> 
>
> Key: KAFKA-8596
> URL: https://issues.apache.org/jira/browse/KAFKA-8596
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Ashish Vyas
>Priority: Minor
>
> If i don't have a topic pre-created, I get an error log that reads "is 
> unknown yet during rebalance, please make sure they have been pre-created 
> before starting the Streams application." Ideally I expect an exception here 
> being thrown that I can catch in my application and decide what I want to do. 
>  
> Without this, my app keeps running and actual functionality doesn't work 
> making it time consuming to debug. I want to stop the application right at 
> this point.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)