[jira] [Resolved] (KAFKA-13614) Leader replication quota is applied to consumer fetches

2022-01-26 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-13614.
-
Fix Version/s: 3.2.0
 Reviewer: David Jacot
   Resolution: Fixed

> Leader replication quota is applied to consumer fetches
> ---
>
> Key: KAFKA-13614
> URL: https://issues.apache.org/jira/browse/KAFKA-13614
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: David Mao
>Assignee: David Mao
>Priority: Major
> Fix For: 3.2.0
>
>
> in ReplicaManager.readFromLocalLog we check shouldLeaderThrottle regardless 
> of whether the read is coming from a consumer or follower broker. This 
> results in replication quota being applied to consumer fetches.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13622) Revisit the complexity of position tracking in state stores

2022-01-26 Thread John Roesler (Jira)
John Roesler created KAFKA-13622:


 Summary: Revisit the complexity of position tracking in state 
stores
 Key: KAFKA-13622
 URL: https://issues.apache.org/jira/browse/KAFKA-13622
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler


Currently, state store implementers have a significant burden to track position 
correctly. They have to:
 * update the position during all puts
 * implement the RecordBatchingStateRestoreCallback and use the 
{color:#00}ChangelogRecordDeserializationHelper to update the position 
based on record headers{color}
 * {color:#00}implement some mechanism to restore the position after a 
restart if the store is persistent (such as supply a CommitCallback to write 
the position to a local file and then read the file during init){color}

{color:#00}[~guozhang] pointed out during review that this is probably too 
much responsibility (and certainly too much opportunity for error). We should 
see what we can do to simplify these responsibilities, if not eliminate them 
entirely from the store implementer's scope of concern.
{color}

 

{color:#00}See 
https://github.com/apache/kafka/pull/11676#discussion_r790358058{color}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13608) Implement Position restoration for all in-memory state stores

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13608.
--
Resolution: Duplicate

> Implement Position restoration for all in-memory state stores
> -
>
> Key: KAFKA-13608
> URL: https://issues.apache.org/jira/browse/KAFKA-13608
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Vicky Papavasileiou
>Priority: Major
>
> In-memory state stores restore their state from the changelog (as opposed to 
> RocksDB stores that restore from disk). In-memory stores currently don't 
> handle restoring of the Position



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13524) IQv2: Implement KeyQuery from the RecordCache

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13524.
--
Resolution: Fixed

> IQv2: Implement KeyQuery from the RecordCache
> -
>
> Key: KAFKA-13524
> URL: https://issues.apache.org/jira/browse/KAFKA-13524
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: Vicky Papavasileiou
>Priority: Major
>
> The Record Cache in Kafka Streams is more properly termed a write buffer, 
> since it only caches writes, not reads, and its intent is to buffer the 
> writes before flushing them in bulk into lower store layers.
> Unlike scan-type queries, which require scanning both the record cache and 
> the underlying store and collating the results, the KeyQuery (and any other 
> point lookup) can straightforwardly be served from the record cache if it is 
> buffered or fall through to the underlying store if not.
> In contrast to scan-type operations, benchmarks reveal that key-based cache 
> reads are faster than always skipping the cache as well.
> Therefore, it makes sense to implement a handler in the CachingKeyValueStore 
> for the KeyQuery specifically in order to serve fresher key-based lookups. 
> Scan queries may also be useful, but their less flattering performance 
> profile makes it reasonable to leave them for follow-on work.
> We could add an option to disable cache reads on the KeyQuery, but since they 
> seem to be always better, I'm leaning toward just unilaterally serving cached 
> records if they exist.
>  
> I did a quick POC of this: 
> [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries]
>  
> The internal code of the caching stores should be refactored to share logic 
> with the regular store methods. Scan queries will be more complicated, since 
> they require merging the cache with the wrapped result.
> There is a bug related to that non-timestamped-store-serde hack (see the 
> failing test when you run IQv2StoreIntegrationTest). Even though the inner 
> store is not timestamped, the cache returns a timestamped value. We'll have 
> to discuss options to fix it.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: State store guarantees - KStreams 2.8.1

2022-01-26 Thread Matthias J. Sax

Glad you figured it out.

It's not easy to perform checks for this case. The problem is that the 
logic checking topics/partitions does not know anything about the 
operators that are used -- the DSL semantics are not available at runtime.


And the `merge()` operator itself does not have access to input topic 
partition information.



-Matthias

On 1/26/22 12:25 PM, Jiří Syrový wrote:

It's actually my mistake to blame. The whole problem was caused by topic
partition count mismatch.
I would have expected that at least *merge* operator let's you know when
merging two inputs with a mismatching number of partitions.

On Sat, 22 Jan 2022 at 03:46, Matthias J. Sax 
wrote:


Well, it's unclear what the remote lookup does... As Kafka Streams does
not implement this part, my best guess at the moment is to blame it on a
bug in the remote request implementation.

Are you using some of-the-shelf implementation for the remove lookup
part or are you using something build in-house?


-Matthias



On 1/21/22 13:13, Jiří Syrový wrote:

I agree it sounds a bit off, but it seems that even a host that is not
marked as active allows me to query it's store and gives me a result that
is not null.

This application has an API that either queries local or remote
store (basically via HTTP API of active host), but the weird part is I

get

the local response from both instances instead of expected one remote (on
non-active and non-standby host) and one local.
In principle the code to query stores looks like this

  streams
.store(
  StoreQueryParameters
.fromNameAndType(
  storeName,
  QueryableStoreTypes.keyValueStore[K, V]()
)
)
.get(key)


And responses look like this:
$ curl //123 *(response from instance A)*
*{"meta":"KeyQueryMetadata
{activeHost=HostInfo{host='ip-1-2-3-4.eu-west-1.compute.internal',
port=8080}, standbyHosts=[],


partition=0}","value":{"timestamp":"2022-01-21T00:00:02.433Z","enabled":true},"hostname":"ip-1-2-3-4.eu-west-1.compute.internal"}*

$ curl //123 *(response from instance B)*
*{"meta":"KeyQueryMetadata
{activeHost=HostInfo{host='ip-1-2-3-4.eu-west-1.compute.internal',
port=8080}, standbyHosts=[],


partition=0}","value":{"timestamp":"2022-01-21T15:55:27.807Z","enabled":false},"hostname":"ip-9-8-7-6.eu-west-1.compute.internal"}*


This behaviour is not random and is 100% reproducible. I can try to

create

a minimal code example that will demonstrate it.

On Fri, 21 Jan 2022 at 18:20, Matthias J. Sax  wrote:


but instance A returns

result X for a partition I and instance B returns result Y for the

same

partition I.


This sound a little off. As you stated, if both instances agree on the
active host, the active host must either be instance A or instance B,
and thus you can query partition I only on instance A or instance B. The
non-active instance should not return any data for a partition it does
not host.

Can you elaborate?

-Matthias

On 1/21/22 4:47 AM, Jiří Syrový wrote:

Hi everyone,

I'm trying for a while to answer myself a question about what are

actually

guarantees for state stores in regards to consistency when connected to
transformers.

I have an application where a single (persistent, rocksdb backed) state
store is connected to multiple transformers. Each transformer might

both

read (get) and write (put) data into the state store. All transformers
receive data from multiple input topics in the same way (the same key,

same

number of partitions) that before sending it to transformers merged
together.

All transformers are located in the same sub-topology.

What I observed is that even with 0 standby replicas I might get
inconsistent results when querying this state store connected to

multiple

transformers. I have 2 instances and metadata on both instances agree

on

the active host for this state store and partition, but instance A

returns

result X for a partition I and instance B returns result Y for the same
partition I.

Any suggestions if this is a bug or is my assumption incorrect that the
same state store should give the same result for the same key (same
partition) in 2 distinct transformers fed from the same input?

Thanks,
Jiri











Re: permission to create KIP

2022-01-26 Thread Matthias J. Sax

Done.

On 1/26/22 12:52 PM, Vikas Singh wrote:

Hi,

Sending this mail as per
https://cwiki.apache.org/confluence/display/kafka/kafka+improvement+proposals
to request permission to be able to create KIP and contribute to AK.

confluence wiki id: vikasconfluent
jira id: vikasconfluent

Thanks,
Vikas



Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #638

2022-01-26 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-704: Send a hint to broker if it is an unclean leader

2022-01-26 Thread Jason Gustafson
Hey Jose,

Thanks for the updates. I noticed that `LeaderRecoveryState` is marked as
ignorable in the `AlterPartition` request. It would be helpful to
understand the motivation for that.

Thanks,
Jason

On Wed, Jan 26, 2022 at 10:25 AM Colin McCabe  wrote:

> On Wed, Jan 26, 2022, at 09:14, José Armando García Sancio wrote:
> > Thanks for the feedback Colin.
> >
> > Colin wrote:
> >> We already have many classes that are called "partition state." For
> example, PartitionStates.java on the client side,
> PartitionStateMachine.scala and TopicPartitionStateZNode in the old
> controller, RemotePartitionDeleteState.java under storage, and so forth. I
> don't object to adding another one, but let's make it very clear that it's
> LeaderRecoveryState not just a generic "partition state", to avoid
> confusion. Actually maybe we should call it LeaderRecoveryStateChange,
> since we'll need to have a "no change" entry in the enum.
> >
> > Sounds good. I am going to call it LeaderRecoveryState.
> >
> > Colin wrote:
> >> I would argue that we should not add unused fields to RPCs and metadata
> records because we might want them in the future, or because it seems more
> "symmetrical," etc. We have a great mechanism for adding new stuff in the
> future: add a new field and specify the default as whatever the old
> behavior was.
> >>
> >> So I would argue that we should not add this state to AlterIsr, either
> the request or the response. We already know that sending AlterIsr clears
> the recovery state, and if it succeeded then the state was cleared. If this
> changes in the future, we can add a new field that default to whatever we
> want.
> >>
> >> Adding an RPC field that will only ever have one value is bad form. And
> 99% of the time, when you do finally decide to have more than one possible
> value, you'll find that what you originally wrote isn't adequate and you
> need to change the RPC, or the code, anyway. At least, that's my experience.
> >
> > We discussed this offline. We agree that it is better to be explicit
> > with regards to changes to the leader recovery state and not make the
> > changes implicit. The controller is going to have checks that make
> > sure that the transitions are valid. For example, the controller will
> > not allow the topic partition leader to:
> > 1. Change the ISR to a size greater than 1 if the leader is still
> recovering.
> > 2. Change the leader recovery state from "recovered" to "recovering".
> >
> > We also agree that we will add the field in the response just to be
> > consistent with the existing pattern. We should create another KIP to
> > remove these fields as they are not strictly necessary.
> >
>
> +1. Thanks, José!
>
> cheers,
> Colin
>


permission to create KIP

2022-01-26 Thread Vikas Singh
Hi,

Sending this mail as per
https://cwiki.apache.org/confluence/display/kafka/kafka+improvement+proposals
to request permission to be able to create KIP and contribute to AK.

confluence wiki id: vikasconfluent
jira id: vikasconfluent

Thanks,
Vikas


Re: State store guarantees - KStreams 2.8.1

2022-01-26 Thread Jiří Syrový
It's actually my mistake to blame. The whole problem was caused by topic
partition count mismatch.
I would have expected that at least *merge* operator let's you know when
merging two inputs with a mismatching number of partitions.

On Sat, 22 Jan 2022 at 03:46, Matthias J. Sax 
wrote:

> Well, it's unclear what the remote lookup does... As Kafka Streams does
> not implement this part, my best guess at the moment is to blame it on a
> bug in the remote request implementation.
>
> Are you using some of-the-shelf implementation for the remove lookup
> part or are you using something build in-house?
>
>
> -Matthias
>
>
>
> On 1/21/22 13:13, Jiří Syrový wrote:
> > I agree it sounds a bit off, but it seems that even a host that is not
> > marked as active allows me to query it's store and gives me a result that
> > is not null.
> >
> > This application has an API that either queries local or remote
> > store (basically via HTTP API of active host), but the weird part is I
> get
> > the local response from both instances instead of expected one remote (on
> > non-active and non-standby host) and one local.
> > In principle the code to query stores looks like this
> >
> >  streams
> >.store(
> >  StoreQueryParameters
> >.fromNameAndType(
> >  storeName,
> >  QueryableStoreTypes.keyValueStore[K, V]()
> >)
> >)
> >.get(key)
> >
> >
> > And responses look like this:
> > $ curl //123 *(response from instance A)*
> > *{"meta":"KeyQueryMetadata
> > {activeHost=HostInfo{host='ip-1-2-3-4.eu-west-1.compute.internal',
> > port=8080}, standbyHosts=[],
> >
> partition=0}","value":{"timestamp":"2022-01-21T00:00:02.433Z","enabled":true},"hostname":"ip-1-2-3-4.eu-west-1.compute.internal"}*
> > $ curl //123 *(response from instance B)*
> > *{"meta":"KeyQueryMetadata
> > {activeHost=HostInfo{host='ip-1-2-3-4.eu-west-1.compute.internal',
> > port=8080}, standbyHosts=[],
> >
> partition=0}","value":{"timestamp":"2022-01-21T15:55:27.807Z","enabled":false},"hostname":"ip-9-8-7-6.eu-west-1.compute.internal"}*
> >
> > This behaviour is not random and is 100% reproducible. I can try to
> create
> > a minimal code example that will demonstrate it.
> >
> > On Fri, 21 Jan 2022 at 18:20, Matthias J. Sax  wrote:
> >
> >>> but instance A returns
>  result X for a partition I and instance B returns result Y for the
> same
>  partition I.
> >>
> >> This sound a little off. As you stated, if both instances agree on the
> >> active host, the active host must either be instance A or instance B,
> >> and thus you can query partition I only on instance A or instance B. The
> >> non-active instance should not return any data for a partition it does
> >> not host.
> >>
> >> Can you elaborate?
> >>
> >> -Matthias
> >>
> >> On 1/21/22 4:47 AM, Jiří Syrový wrote:
> >>> Hi everyone,
> >>>
> >>> I'm trying for a while to answer myself a question about what are
> >> actually
> >>> guarantees for state stores in regards to consistency when connected to
> >>> transformers.
> >>>
> >>> I have an application where a single (persistent, rocksdb backed) state
> >>> store is connected to multiple transformers. Each transformer might
> both
> >>> read (get) and write (put) data into the state store. All transformers
> >>> receive data from multiple input topics in the same way (the same key,
> >> same
> >>> number of partitions) that before sending it to transformers merged
> >>> together.
> >>>
> >>> All transformers are located in the same sub-topology.
> >>>
> >>> What I observed is that even with 0 standby replicas I might get
> >>> inconsistent results when querying this state store connected to
> multiple
> >>> transformers. I have 2 instances and metadata on both instances agree
> on
> >>> the active host for this state store and partition, but instance A
> >> returns
> >>> result X for a partition I and instance B returns result Y for the same
> >>> partition I.
> >>>
> >>> Any suggestions if this is a bug or is my assumption incorrect that the
> >>> same state store should give the same result for the same key (same
> >>> partition) in 2 distinct transformers fed from the same input?
> >>>
> >>> Thanks,
> >>> Jiri
> >>>
> >>
> >
>


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #637

2022-01-26 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-13621) Resign leader on partition

2022-01-26 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13621:
--

 Summary: Resign leader on partition
 Key: KAFKA-13621
 URL: https://issues.apache.org/jira/browse/KAFKA-13621
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio


h1. Motivation

If the current leader A at epoch X gets partition from the rest of the quorum, 
quorum voter A will stay leader at epoch X. This happens because voter A will 
never receive an request from the rest of the voters increasing the epoch. 
These requests that typically increase the epoch of past leaders are 
BeginQuorumEpoch and Vote.

In addition if voter A (leader at epoch X) doesn't get partition from the rest 
of the brokers (observer in the KRaft protocol) the brokers will never learn 
about the new quorum leader. This happens because 1. observers learn about the 
leader from the Fetch response and 2. observer send a Fetch request to a random 
leader if the Fetch request times out.

Neither of these two scenarios will cause the broker to send a request to a 
different voter because the leader at epoch X will never send a different 
leader in the response and the broker will never send a Fetch request to a 
different voter because the Fetch request will never timeout.
h1. Proposed Changes

In this scenario the A, the leader at epoch X, will stop receiving Fetch 
request from the majority of the voters. Voter A should resign as leader if the 
Fetch request from the majority of the voters is old enough. A reasonable value 
for "old enough" is the Fetch timeout value.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13620) The request handler metric name for ControllerApis should be different than KafkaApis

2022-01-26 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-13620:


 Summary: The request handler metric name for ControllerApis should 
be different than KafkaApis
 Key: KAFKA-13620
 URL: https://issues.apache.org/jira/browse/KAFKA-13620
 Project: Kafka
  Issue Type: Bug
Reporter: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[VOTE] KIP-704: Send a hint to partition leader to recover the partition

2022-01-26 Thread José Armando García Sancio
Hi all,

I'd like to start a vote on KIP-704: Send a hint to the partition
leader to recover the partition.

KIP wiki: https://cwiki.apache.org/confluence/x/kAZRCg
Discussion thread:
https://lists.apache.org/thread/ld2t2xkby7rpgrggqo1h344goddfdnxb

Thanks,
-José


Re: [DISCUSS] KIP-704: Send a hint to broker if it is an unclean leader

2022-01-26 Thread Colin McCabe
On Wed, Jan 26, 2022, at 09:14, José Armando García Sancio wrote:
> Thanks for the feedback Colin.
>
> Colin wrote:
>> We already have many classes that are called "partition state." For example, 
>> PartitionStates.java on the client side, PartitionStateMachine.scala and 
>> TopicPartitionStateZNode in the old controller, 
>> RemotePartitionDeleteState.java under storage, and so forth. I don't object 
>> to adding another one, but let's make it very clear that it's 
>> LeaderRecoveryState not just a generic "partition state", to avoid 
>> confusion. Actually maybe we should call it LeaderRecoveryStateChange, since 
>> we'll need to have a "no change" entry in the enum.
>
> Sounds good. I am going to call it LeaderRecoveryState.
>
> Colin wrote:
>> I would argue that we should not add unused fields to RPCs and metadata 
>> records because we might want them in the future, or because it seems more 
>> "symmetrical," etc. We have a great mechanism for adding new stuff in the 
>> future: add a new field and specify the default as whatever the old behavior 
>> was.
>>
>> So I would argue that we should not add this state to AlterIsr, either the 
>> request or the response. We already know that sending AlterIsr clears the 
>> recovery state, and if it succeeded then the state was cleared. If this 
>> changes in the future, we can add a new field that default to whatever we 
>> want.
>>
>> Adding an RPC field that will only ever have one value is bad form. And 99% 
>> of the time, when you do finally decide to have more than one possible 
>> value, you'll find that what you originally wrote isn't adequate and you 
>> need to change the RPC, or the code, anyway. At least, that's my experience.
>
> We discussed this offline. We agree that it is better to be explicit
> with regards to changes to the leader recovery state and not make the
> changes implicit. The controller is going to have checks that make
> sure that the transitions are valid. For example, the controller will
> not allow the topic partition leader to:
> 1. Change the ISR to a size greater than 1 if the leader is still recovering.
> 2. Change the leader recovery state from "recovered" to "recovering".
>
> We also agree that we will add the field in the response just to be
> consistent with the existing pattern. We should create another KIP to
> remove these fields as they are not strictly necessary.
>

+1. Thanks, José!

cheers,
Colin


Re: [VOTE] KIP-801: Implement an Authorizer that stores metadata in __cluster_metadata

2022-01-26 Thread Colin McCabe
On Wed, Jan 26, 2022, at 01:54, Rajini Sivaram wrote:
> Hi Colin,
>
> Thanks for the KIP. A couple of questions:
>
> 1) The KIP says:
> *However, when the controller is active, its Authorizer state may be
> slightly ahead of the the broker's Authorizer state. This will happen in
> the time between when a new ACL is created (or deleted) and the time that
> this change is persisted durably by a majority of controller quorum peers.*
>
> Once the ACL takes effect on the controller, do we guarantee that it will
> be applied everywhere or can there be scenarios where the controller has
> applied the change, but others don't due to a subsequent failure in
> persisting the ACL changes?

Hi Rajini,

Thanks for taking a look.

Good question. In general, for any new metadata record (not just ACL records), 
there are two fates:
1. the record will be persisted to the raft quorum.
2. the raft leader will fail and a new leader will be elected that doesn't have 
the new record.

In the case of #2, the standby controllers and the brokers will never apply the 
record.

In general the active controller always rolls back its state to the last 
committed offset if there is a failure and it loses the leadership. So this 
isn't really unique to the KRaft Authorizer, it just seemed worth pointing out 
since there will be a brief period when the active controller is "ahead."

We do try pretty hard to apply KIP-801 ACL records in order so that the states 
you will see on brokers and standby controllers will always be valid states 
from some point in the past timeline. The consistency here should be at least 
as good as the current system.

>
> 2) Have we considered using a different config with limited privileges for
> bootstrapping instead of the all-powerful *super.users*?
>

That's an interesting idea, but I'm not sure how much we could limit it. The 
brokers and controllers at least need CLUSTER_ACTION on CLUSTER in order to do 
most of what they do. This might be something we could explore in a future KIP 
since it's pretty cross-cutting (if we had such a limited bootstrapping user, 
all the authorizers could implement it, not just the KIP-801 one...)

best,
Colin

>
> Regards,
>
> Rajini
>
>
> On Wed, Jan 26, 2022 at 1:50 AM Colin McCabe  wrote:
>
>> How about this:
>>
>> We create a configuration key called early.start.listeners which contains
>> a list of listener names. If this is not specified, its value defaults to
>> just the controller listeners. Optionally, other listeners can be added too.
>>
>> If super.users contains any user names, early start listeners will start
>> immediately. In the beginning they only authorize users that are in
>> super.users. All other listeners receive a new error code,
>> AUTHORIZER_NOT_READY_ERROR. If super.users does not contain any user names,
>> then early start listeners will not be treated differently than other
>> listeners.
>>
>> This will allow the controller listeners to get started immediately if the
>> broker user is in super.users, which will speed up startup. It also will be
>> useful for breaking chicken/egg cycles like needing to pull the SCRAM
>> metadata to authorize pulling the SCRAM metadata.
>>
>> There are still a few use cases where super.users won't be required, but
>> it may be useful in many cases to have this early start functionality.
>>
>> Leaving aside the preceding discussion, do you agree with starting up all
>> endpoints (including non-early start ones) once we load a metadata
>> snapshot? How feasible would it be for us to get a callback from the Raft
>> layer the first time we caught up to the last stable offset? (we only want
>> the callback the first time, not any other time). (I think the metadata
>> shell also would want something like this, at least as an option).
>>
>> best,
>> Colin
>>
>>
>> On Tue, Jan 25, 2022, at 13:34, Jason Gustafson wrote:
>> > Hi Colin,
>> >
>> > Thanks for the writeup. I had one question about bootstrapping. For the
>> > brokers, I understand that listener startup is delayed until the
>> Authorizer
>> > is ready. However, I was not very clear how this would work for the
>> > controller listeners. We may need them to startup before the metadata log
>> > is ready so that a quorum can be established (as noted in the KIP). This
>> > works fine if we assume that the controller principals are among
>> > `super.users`. For requests forwarded from brokers, on the other hand, we
>> > need to ensure the ACLs have been loaded properly before we begin
>> > authorizing. The problem is that we currently use the same listener for
>> > quorum requests and for forwarded requests. So my question is how does
>> the
>> > Authorizer communicate to the controller when it is safe to begin
>> > authorizing different request types?
>> >
>> > There are a couple ways I can see this working. First, we could allow the
>> > user to configure the listener used for forwarded requests separately.
>> That
>> > would work with the existing `Authorizer.start

Re: [DISCUSS] Broker behaviour when returning records

2022-01-26 Thread Megan Niu (BLOOMBERG/ TORONTO)
Hi Luke,

Thanks for the answers and pointers to docs!

From: show...@gmail.com At: 01/22/22 02:37:59 UTC-5:00To:  Megan Niu 
(BLOOMBERG/ TORONTO ) ,  dev@kafka.apache.org
Subject: Re: [DISCUSS] Broker behaviour when returning records

Hi Megan,

Answering your questions below:

1. Why can't the broker remove messages from the first batch? So that
fetch.max.bytes is not violated.

The documentation has answered your question:

If the first record batch in the first non-empty partition of the fetch is
larger than this limit, the batch will still be returned to ensure that the
consumer can make progress.

ref;
https://kafka.apache.org/documentation/#consumerconfigs_max.partition.fetch.byte
s


2. How does the broker choose which partitions (from those that the
consumer is assigned to) contribute messages to a response batch?

As far as I know, suppose consumer A subscribed to partition 1,2,3,  the
broker just read as many data as the 1st partition picked (let's say,
partition 1), and not exceed the max fetch limit. If it did not exceed,
continue to the partition 2, and so on.

3. Why does the broker send records in batches to the consumer?

For performance

Thank you.
Luke


On Sat, Jan 22, 2022 at 6:22 AM Megan Niu (BLOOMBERG/ TORONTO) <
mni...@bloomberg.net> wrote:

> Hi all,
> I have some questions about how brokers batch records to send to consumers.
>
> One of the configuration properties for a consumer is fetch.max.bytes.
> Here's what "Kafka: The Definitive Guide" (by Shapira et al.) says about
> fetch.max.bytes :
>
> "This property lets you specify the maximum bytes that Kafka will return
> whenever the consumer polls a broker (50 MB by default). [...] Note that
> records are sent to the client in batches, and if the first record-batch
> that the broker has to send exceeds this size, the batch will be sent and
> the limit will be ignored."
>
> My questions:
> 1. Why can't the broker remove messages from the first batch? So that
> fetch.max.bytes is not violated.
> 2. How does the broker choose which partitions (from those that the
> consumer is assigned to) contribute messages to a response batch?
> 3. Why does the broker send records in batches to the consumer?




Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #636

2022-01-26 Thread Apache Jenkins Server
See 




[jira] [Resolved] (KAFKA-13618) BatchAccumulator `Exptected` rename to `Expected`

2022-01-26 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-13618.

Fix Version/s: 3.2.0
   Resolution: Fixed

> BatchAccumulator `Exptected` rename to `Expected`
> -
>
> Key: KAFKA-13618
> URL: https://issues.apache.org/jira/browse/KAFKA-13618
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Kvicii.Yu
>Assignee: Kvicii.Yu
>Priority: Major
> Fix For: 3.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] KIP-704: Send a hint to broker if it is an unclean leader

2022-01-26 Thread José Armando García Sancio
Thanks for the feedback Colin.

Colin wrote:
> We already have many classes that are called "partition state." For example, 
> PartitionStates.java on the client side, PartitionStateMachine.scala and 
> TopicPartitionStateZNode in the old controller, 
> RemotePartitionDeleteState.java under storage, and so forth. I don't object 
> to adding another one, but let's make it very clear that it's 
> LeaderRecoveryState not just a generic "partition state", to avoid confusion. 
> Actually maybe we should call it LeaderRecoveryStateChange, since we'll need 
> to have a "no change" entry in the enum.

Sounds good. I am going to call it LeaderRecoveryState.

Colin wrote:
> I would argue that we should not add unused fields to RPCs and metadata 
> records because we might want them in the future, or because it seems more 
> "symmetrical," etc. We have a great mechanism for adding new stuff in the 
> future: add a new field and specify the default as whatever the old behavior 
> was.
>
> So I would argue that we should not add this state to AlterIsr, either the 
> request or the response. We already know that sending AlterIsr clears the 
> recovery state, and if it succeeded then the state was cleared. If this 
> changes in the future, we can add a new field that default to whatever we 
> want.
>
> Adding an RPC field that will only ever have one value is bad form. And 99% 
> of the time, when you do finally decide to have more than one possible value, 
> you'll find that what you originally wrote isn't adequate and you need to 
> change the RPC, or the code, anyway. At least, that's my experience.

We discussed this offline. We agree that it is better to be explicit
with regards to changes to the leader recovery state and not make the
changes implicit. The controller is going to have checks that make
sure that the transitions are valid. For example, the controller will
not allow the topic partition leader to:
1. Change the ISR to a size greater than 1 if the leader is still recovering.
2. Change the leader recovery state from "recovered" to "recovering".

We also agree that we will add the field in the response just to be
consistent with the existing pattern. We should create another KIP to
remove these fields as they are not strictly necessary.

Thanks
-José


Re: Proposal to add IBM Power (ppc64le) to apache kafka jenkins CI

2022-01-26 Thread Mickael Maison
Hi Abhijit,

Thanks for offering a ppc64le VM. Looking at the INFRA ticket, it
looks like it was added to Jenkins successfully.
The spec looks appropriate for Kafka builds.

Let's wait a few days to see if anybody has questions or concerns.

Thanks,
Mickael


On Mon, Jan 24, 2022 at 11:03 AM Abhijit Mane  wrote:
>
> Hello,
>
> We would like to work with the community to enable IBM Power (ppc64le)
> architecture support for Apache Jenkins CI so we can enable Apache
> Kafka on IBM Power systems. An IBM Power (ppc64le) VM available to the
> community for integrating into Jenkins cluster. The VM has also been
> set up as advised by Infra using the script below and verified.
> https://github.com/apache/cassandra-builds/blob/trunk/ASF-jenkins-agents.md
>
> ---
> Ubuntu 20.04 VM
> Root vol: 100GB,  Data vol: 500GB, mounted at /home/jenkins
> 8 cores / 16GB RAM
> Infra discussion with community:
> https://issues.apache.org/jira/browse/INFRA-22612
> ---
>
> Kindly review Jira to let me know if you have any questions/concerns
> and if it's okay to proceed further.
>
> Regards,
> Abhijit
>


Re: [VOTE] KIP-714: Client Metrics and Observability

2022-01-26 Thread rifer...@riferrei.com
+1

I think this KIP solves a problem that has been around for some time with Kafka 
deployments, which is the ability to assess the current state of a Kafka 
architecture but looking at the whole picture. I also share other folks' 
concerns regarding adding runtime dependencies to the clients; this may be 
problematic for large deployments. Still, I think it is worth refactoring.

IMHO, it is a fair trade-off.

— Ricardo

> On Jan 26, 2022, at 9:34 AM, Magnus Edenhill  wrote:
> 
> Hi all,
> 
> it's been a while and there's been some more discussions of the KIP which
> have been
> addressed on the KIP page.
> 
> I think it's a good time to revive this vote thread and get things moving.
> 
> We're currently at +3 (non-binding) and -1 (non-binding) votes.
> 
> Regards,
> Magnus
> 
> 
> Den mån 1 nov. 2021 kl 21:19 skrev J Rivers :
> 
>> +1
>> 
>> Thank you for the KIP!
>> 
>> Our organization runs kafka at large scale in a multi-tenant configuration.
>> We actually have many other enterprises connecting up to our system to
>> retrieve stream data. These feeds vary greatly in volume and velocity. The
>> peak rates are a multiplicative factor of the nominal.  There is extreme
>> skew in our datasets in a number of ways.
>> 
>> We don't have time to work with every new internal/external client to tune
>> their feeds. They need to be able to take one of the many kafka clients and
>> go off to the races.
>> 
>> Being able to retrieve client metrics would be invaluable here as it's hard
>> and time consuming to communicate out of the enterprise walls.
>> 
>> This KIP is important to us to expand the use of our datasets internally
>> and outside the borders of the enterprise. Our clients like the performance
>> and data safeties related to the kafka connection. The observability has
>> been a problem...
>> 
>> Jonathan Rivers
>> jrivers...@gmail.com
>> 
>> 
>> 
>> 
>> On Mon, Oct 18, 2021 at 11:56 PM Ryanne Dolan 
>> wrote:
>> 
>>> -1
>>> 
>>> Ryanne
>>> 
>>> On Mon, Oct 18, 2021, 4:30 AM Magnus Edenhill 
>> wrote:
>>> 
 Hi all,
 
 I'd like to start a vote on KIP-714.
 https://cwiki.apache.org/confluence/x/2xRRCg
 
 Discussion thread:
 https://www.mail-archive.com/dev@kafka.apache.org/msg119000.html
 
 Thanks,
 Magnus
 
>>> 
>> 



Re: [VOTE] KIP-714: Client Metrics and Observability

2022-01-26 Thread Magnus Edenhill
Hi all,

it's been a while and there's been some more discussions of the KIP which
have been
addressed on the KIP page.

I think it's a good time to revive this vote thread and get things moving.

We're currently at +3 (non-binding) and -1 (non-binding) votes.

Regards,
Magnus


Den mån 1 nov. 2021 kl 21:19 skrev J Rivers :

> +1
>
> Thank you for the KIP!
>
> Our organization runs kafka at large scale in a multi-tenant configuration.
> We actually have many other enterprises connecting up to our system to
> retrieve stream data. These feeds vary greatly in volume and velocity. The
> peak rates are a multiplicative factor of the nominal.  There is extreme
> skew in our datasets in a number of ways.
>
> We don't have time to work with every new internal/external client to tune
> their feeds. They need to be able to take one of the many kafka clients and
> go off to the races.
>
> Being able to retrieve client metrics would be invaluable here as it's hard
> and time consuming to communicate out of the enterprise walls.
>
> This KIP is important to us to expand the use of our datasets internally
> and outside the borders of the enterprise. Our clients like the performance
> and data safeties related to the kafka connection. The observability has
> been a problem...
>
> Jonathan Rivers
> jrivers...@gmail.com
>
>
>
>
> On Mon, Oct 18, 2021 at 11:56 PM Ryanne Dolan 
> wrote:
>
> > -1
> >
> > Ryanne
> >
> > On Mon, Oct 18, 2021, 4:30 AM Magnus Edenhill 
> wrote:
> >
> > > Hi all,
> > >
> > > I'd like to start a vote on KIP-714.
> > > https://cwiki.apache.org/confluence/x/2xRRCg
> > >
> > > Discussion thread:
> > > https://www.mail-archive.com/dev@kafka.apache.org/msg119000.html
> > >
> > > Thanks,
> > > Magnus
> > >
> >
>


Re: [DISCUSS] KIP-714: Client metrics and observability

2022-01-26 Thread Magnus Edenhill
Hi all,

I've updated the KIP with responses to the latest comments: Java client
dependencies (Thanks Kirk!), alternate designs (separate cluster, separate
producer, etc), etc.

I will revive the vote thread.

Thanks,
Magnus


Den mån 13 dec. 2021 kl 22:32 skrev Ryanne Dolan :

> I think we should be very careful about introducing new runtime
> dependencies into the clients. Historically this has been rare and
> essentially necessary (e.g. compression libs).
>
> Ryanne
>
> On Mon, Dec 13, 2021, 1:06 PM Kirk True  wrote:
>
> > Hi Jun,
> >
> > On Thu, Dec 9, 2021, at 2:28 PM, Jun Rao wrote:
> > > 13. Using OpenTelemetry. Does that require runtime dependency
> > > on OpenTelemetry library? How good is the compatibility story
> > > of OpenTelemetry? This is important since an application could have
> other
> > > OpenTelemetry dependencies than the Kafka client.
> >
> > The current design is that the OpenTelemetry JARs would ship with the
> > client. Perhaps we can design the client such that the JARs aren't even
> > loaded if the user has opted out. The user could even exclude the JARs
> from
> > their dependencies if they so wished.
> >
> > I can't speak to the compatibility of the libraries. Is it possible that
> > we include a shaded version?
> >
> > Thanks,
> > Kirk
> >
> > >
> > > 14. The proposal listed idempotence=true. This is more of a
> configuration
> > > than a metric. Are we including that as a metric? What other
> > configurations
> > > are we including? Should we separate the configurations from the
> metrics?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Mon, Nov 29, 2021 at 7:34 AM Magnus Edenhill 
> > wrote:
> > >
> > > > Hey Bob,
> > > >
> > > > That's a good point.
> > > >
> > > > Request type labels were considered but since they're already tracked
> > by
> > > > broker-side metrics
> > > > they were left out as to avoid metric duplication, however those
> > metrics
> > > > are not per connection,
> > > > so they won't be that useful in practice for troubleshooting specific
> > > > client instances.
> > > >
> > > > I'll add the request_type label to the relevant metrics.
> > > >
> > > > Thanks,
> > > > Magnus
> > > >
> > > >
> > > > Den tis 23 nov. 2021 kl 19:20 skrev Bob Barrett
> > > > :
> > > >
> > > > > Hi Magnus,
> > > > >
> > > > > Thanks for the thorough KIP, this seems very useful.
> > > > >
> > > > > Would it make sense to include the request type as a label for the
> > > > > `client.request.success`, `client.request.errors` and
> > > > `client.request.rtt`
> > > > > metrics? I think it would be very useful to see which specific
> > requests
> > > > are
> > > > > succeeding and failing for a client. One specific case I can think
> of
> > > > where
> > > > > this could be useful is producer batch timeouts. If a Java
> > application
> > > > does
> > > > > not enable producer client logs (unfortunately, in my experience
> this
> > > > > happens more often than it should), the application logs will only
> > > > contain
> > > > > the expiration error message, but no information about what is
> > causing
> > > > the
> > > > > timeout. The requests might all be succeeding but taking too long
> to
> > > > > process batches, or metadata requests might be failing, or some or
> > all
> > > > > produce requests might be failing (if the bootstrap servers are
> > reachable
> > > > > from the client but one or more other brokers are not, for
> example).
> > If
> > > > the
> > > > > cluster operator is able to identify the specific requests that are
> > slow
> > > > or
> > > > > failing for a client, they will be better able to diagnose the
> issue
> > > > > causing batch timeouts.
> > > > >
> > > > > One drawback I can think of is that this will increase the
> > cardinality of
> > > > > the request metrics. But any given client is only going to use a
> > small
> > > > > subset of the request types, and since we already have partition
> > labels
> > > > for
> > > > > the topic-level metrics, I think request labels will still make up
> a
> > > > > relatively small percentage of the set of metrics.
> > > > >
> > > > > Thanks,
> > > > > Bob
> > > > >
> > > > > On Mon, Nov 22, 2021 at 2:08 AM Viktor Somogyi-Vass <
> > > > > viktorsomo...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Magnus,
> > > > > >
> > > > > > I think this is a very useful addition. We also have a similar
> (but
> > > > much
> > > > > > more simplistic) implementation of this. Maybe I missed it in the
> > KIP
> > > > but
> > > > > > what about adding metrics about the subscription cache itself?
> > That I
> > > > > think
> > > > > > would improve its usability and debuggability as we'd be able to
> > see
> > > > its
> > > > > > performance, hit/miss rates, eviction counts and others.
> > > > > >
> > > > > > Best,
> > > > > > Viktor
> > > > > >
> > > > > > On Thu, Nov 18, 2021 at 5:12 PM Magnus Edenhill <
> > mag...@edenhill.se>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Mickael,
> > > > > > >
> > > > > > > see in

Re: [DISCUSS] KIP-814: Static membership protocol should let the leader skip assignment

2022-01-26 Thread David Jacot
Hi Hector,

Thanks for your comments.

The idea is to add a flag to the onJoinLeader leader method. We
must do this because the ConsumerCoordinator must be able
to reconstruct its state while skipping the assignment part. That
would definitely impact KIP-795. I need to look at your KIP.

That's right. We are thinking about a new consumer protocol but
we don't have anything concrete to share with the community yet.
We hope to have something in the near future.

Best,
David

On Wed, Jan 26, 2022 at 3:10 PM David Jacot  wrote:
>
> Hey Jason,
>
> I've updated the KIP based on your comments. Thanks for your
> feedback!
>
> Best,
> David
>
> On Wed, Jan 26, 2022 at 2:52 AM Jason Gustafson
>  wrote:
> >
> > Hey David,
> >
> > Yeah, there might not be a simple option to address the scenario I
> > described. Other than a broker-side solution, another way we could fix it
> > is by adding additional metadata to the assignment. I do agree that it
> > might not be worth the additional complexity.  At least we should probably
> > update the KIP to describe the limitation.
> >
> > @Hector In regard to the new consumer protocol, it's only just beyond
> > wishful thinking at this point. We are hoping to share some ideas with the
> > community in the near future though.
> >
> > Best,
> > Jason
> >
> > On Mon, Jan 24, 2022 at 3:06 PM Hector Geraldino (BLOOMBERG/ 919 3RD A) <
> > hgerald...@bloomberg.net> wrote:
> >
> > > Hi David,
> > >
> > > Is the idea here to skip calling performAssignment(...) on the
> > > AbstractCoordinator.onJoinLeader(...) method, or adding a new boolean
> > > parameter to the performAssignment(...) method? The reason I ask is 
> > > because
> > > I raised KIP-795 a few weeks back, which aims to add a public API for
> > > AbstractCoordinator, which might change (or not) with this KIP.
> > >
> > > I see you also mentioned there's some discussions regarding a new consumer
> > > protocol. Is this being discussed somewhere else? I'm curious to know how
> > > would it work with other systems (like Kafka Connect or Schema Registry)
> > > that rely on the rebalance protocol to handle resource assignments.
> > >
> > > Apologies in advance if these questions are off-topic for the discussion
> > > at hand.
> > >
> > > Regards,
> > > Hector
> > >
> > > From: dev@kafka.apache.org At: 01/24/22 09:08:58 UTC-5:00To:
> > > dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-814: Static membership protocol should let the
> > > leader skip assignment
> > >
> > > Hey Jason,
> > >
> > > Thanks for your comments.
> > >
> > > Regarding your first point. Yes, you have it right. Let me complement
> > > the KIP to be clearer.
> > >
> > > Regarding your second point. That is right. New partitions would not
> > > be detected while the leader is down. It is definitely not ideal but that
> > > seems acceptable to me, at least as a first step. Adding partitions to
> > > a topic is an infrequent event so the likelihood of having it while the
> > > leader is down is rather low but that could happen.
> > >
> > > The only way to not suffer from this would be to monitor the metadata
> > > changes on the broker side. This implies that we would parse both the
> > > subscriptions and the assignments in order to have the full list of 
> > > topics.
> > > I am not sure that it is worth doing it at the moment given that we are
> > > thinking about a new consumer protocol. What do you think?
> > >
> > > I suppose that we would need both in the long term as the current protocol
> > > is a bit weird at the moment so we need to fix it anyway. We could
> > > use this KIP to fix the protocol and do a subsequent KIP in the future for
> > > the server side monitoring if we need it.
> > >
> > > Best,
> > > David
> > >
> > > On Fri, Jan 21, 2022 at 7:51 PM Jason Gustafson
> > >  wrote:
> > > >
> > > > Hey David,
> > > >
> > > > Thanks for the proposal. This was a tricky bug and I think your approach
> > > is
> > > > probably the best way forward.
> > > >
> > > > It would be helpful to add a little more detail to the proposal. When 
> > > > the
> > > > coordinator detects that the static leader is returning, it will set
> > > > `skipAssignment` to true in the `JoinGroup` response. I believe the
> > > intent
> > > > is to return all member subscriptions in this response so that the 
> > > > leader
> > > > can monitor all topics subscribed in the group (which might be different
> > > > from the consumer's own subscription). The leader will then send an 
> > > > empty
> > > > `SyncGroup` request to collect its own assignment. Do I have that right?
> > > >
> > > > I think there might still be an edge case in this proposal (assuming 
> > > > I've
> > > > understood it correctly). In between the time that the leader shuts down
> > > > and is restarted, it is possible that new partitions are added to one of
> > > > the subscribed topics. The returning leader would not know about it
> > > > because it has no way to collect the full assignment. Do you think 

Re: [DISCUSS] KIP-814: Static membership protocol should let the leader skip assignment

2022-01-26 Thread David Jacot
Hey Jason,

I've updated the KIP based on your comments. Thanks for your
feedback!

Best,
David

On Wed, Jan 26, 2022 at 2:52 AM Jason Gustafson
 wrote:
>
> Hey David,
>
> Yeah, there might not be a simple option to address the scenario I
> described. Other than a broker-side solution, another way we could fix it
> is by adding additional metadata to the assignment. I do agree that it
> might not be worth the additional complexity.  At least we should probably
> update the KIP to describe the limitation.
>
> @Hector In regard to the new consumer protocol, it's only just beyond
> wishful thinking at this point. We are hoping to share some ideas with the
> community in the near future though.
>
> Best,
> Jason
>
> On Mon, Jan 24, 2022 at 3:06 PM Hector Geraldino (BLOOMBERG/ 919 3RD A) <
> hgerald...@bloomberg.net> wrote:
>
> > Hi David,
> >
> > Is the idea here to skip calling performAssignment(...) on the
> > AbstractCoordinator.onJoinLeader(...) method, or adding a new boolean
> > parameter to the performAssignment(...) method? The reason I ask is because
> > I raised KIP-795 a few weeks back, which aims to add a public API for
> > AbstractCoordinator, which might change (or not) with this KIP.
> >
> > I see you also mentioned there's some discussions regarding a new consumer
> > protocol. Is this being discussed somewhere else? I'm curious to know how
> > would it work with other systems (like Kafka Connect or Schema Registry)
> > that rely on the rebalance protocol to handle resource assignments.
> >
> > Apologies in advance if these questions are off-topic for the discussion
> > at hand.
> >
> > Regards,
> > Hector
> >
> > From: dev@kafka.apache.org At: 01/24/22 09:08:58 UTC-5:00To:
> > dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-814: Static membership protocol should let the
> > leader skip assignment
> >
> > Hey Jason,
> >
> > Thanks for your comments.
> >
> > Regarding your first point. Yes, you have it right. Let me complement
> > the KIP to be clearer.
> >
> > Regarding your second point. That is right. New partitions would not
> > be detected while the leader is down. It is definitely not ideal but that
> > seems acceptable to me, at least as a first step. Adding partitions to
> > a topic is an infrequent event so the likelihood of having it while the
> > leader is down is rather low but that could happen.
> >
> > The only way to not suffer from this would be to monitor the metadata
> > changes on the broker side. This implies that we would parse both the
> > subscriptions and the assignments in order to have the full list of topics.
> > I am not sure that it is worth doing it at the moment given that we are
> > thinking about a new consumer protocol. What do you think?
> >
> > I suppose that we would need both in the long term as the current protocol
> > is a bit weird at the moment so we need to fix it anyway. We could
> > use this KIP to fix the protocol and do a subsequent KIP in the future for
> > the server side monitoring if we need it.
> >
> > Best,
> > David
> >
> > On Fri, Jan 21, 2022 at 7:51 PM Jason Gustafson
> >  wrote:
> > >
> > > Hey David,
> > >
> > > Thanks for the proposal. This was a tricky bug and I think your approach
> > is
> > > probably the best way forward.
> > >
> > > It would be helpful to add a little more detail to the proposal. When the
> > > coordinator detects that the static leader is returning, it will set
> > > `skipAssignment` to true in the `JoinGroup` response. I believe the
> > intent
> > > is to return all member subscriptions in this response so that the leader
> > > can monitor all topics subscribed in the group (which might be different
> > > from the consumer's own subscription). The leader will then send an empty
> > > `SyncGroup` request to collect its own assignment. Do I have that right?
> > >
> > > I think there might still be an edge case in this proposal (assuming I've
> > > understood it correctly). In between the time that the leader shuts down
> > > and is restarted, it is possible that new partitions are added to one of
> > > the subscribed topics. The returning leader would not know about it
> > > because it has no way to collect the full assignment. Do you think this
> > is
> > > a problem?
> > >
> > > Thanks,
> > > Jason
> > >
> > > On Wed, Jan 19, 2022 at 7:27 AM David Jacot  wrote:
> > >
> > > > Hi folks,
> > > >
> > > > I'd like to start a discussion for KIP-814: Static membership protocol
> > > > should let the
> > > > leader skip assignment. This is a small extension to the static
> > > > membership protocol
> > > > to address KAFKA-13435.
> > > >
> > > > The KIP is here: https://cwiki.apache.org/confluence/x/C5-kCw.
> > > >
> > > > Please let me know what you think.
> > > >
> > > > Best,
> > > > David
> > > >
> >
> >
> >


[jira] [Created] (KAFKA-13619) zookeeper.sync.time.ms is no longer used

2022-01-26 Thread Tomonari Yamashita (Jira)
Tomonari Yamashita created KAFKA-13619:
--

 Summary: zookeeper.sync.time.ms is no longer used
 Key: KAFKA-13619
 URL: https://issues.apache.org/jira/browse/KAFKA-13619
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.1.0, 2.0.0
Reporter: Tomonari Yamashita


- zookeeper.sync.time.ms is no longer used. But it is present in the 
documentation(1)
 -- The implementation and documentation of zookeeper.sync.time.ms should be 
removed.
 -- FYI, as far as I can see, it was already out of use by v2.0.0.
 

(1) 
[https://kafka.apache.org/documentation/#brokerconfigs_zookeeper.sync.time.ms]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] KIP-787 - MM2 Interface to manage Kafka resources

2022-01-26 Thread Mickael Maison
Hi Omnia,

Thanks for the KIP, sorry for taking so long to comment. I've only had
time to take a quick look so far.

This seems to address a relatively advanced and specific use case. My
initial concern is this may make it hard to evolve MirrorMaker as
we'll likely need to update this new interface if new features are
added. For example if we wanted to sync group ACLs.
I'm wondering if it's something you've thought about. I'm not saying
it's a blocker but we have to weigh the pros and cons when introducing
new features.

Regarding the proposed API, I have a few suggestions:
- What about using configure() instead of the constructor to pass the
configuration, especially as it's implementing Configurable
- It's not clear what all the arguments of createTopicPartitions()
are. What's the difference between partitionCounts and newPartitions?
Should we have separate methods for creating topics and partitions?
- Do we really need createCompactedTopic()?
- Instead of updateTopicConfigs() and updateAcls() should we use the
"alter" prefix to stay consistent with Admin?

Thanks,
Mickael

On Wed, Jan 26, 2022 at 11:26 AM Omnia Ibrahim  wrote:
>
> Hi,
> If there are no more concerns regarding the proposal can I get some votes on 
> the KIP here https://lists.apache.org/thread/950lpxjb5kbjm8qdszlpxm9h4j4sfyjx
>
> Thanks
>
> On Wed, Oct 27, 2021 at 3:54 PM Ryanne Dolan  wrote:
>>
>> Well I'm convinced! Thanks for looking into it.
>>
>> Ryanne
>>
>> On Wed, Oct 27, 2021, 8:49 AM Omnia Ibrahim  wrote:
>>
>> > I checked the difference between the number of methods in the Admin
>> > interface and the number of methods MM2 invokes from Admin, and this diff
>> > is enormous (it's more than 70 methods).
>> > As far as I can see, the following methods MM2 depends on (in
>> > MirrorSourceConnector, MirrorMaker, MirrorCheckpointTask and
>> > MirrorCheckpointConnector), this will leave 73 methods on the Admin
>> > interface that customer will need to return dummy data for,
>> >
>> >1. create(conf)
>> >2. close
>> >3. listTopics()
>> >4. createTopics(newTopics, createTopicsOptions)
>> >5. createPartitions(newPartitions)
>> >6. alterConfigs(configs)  // this method is marked for deprecation in
>> >Admin and the ConfigResource MM2 use is only TOPIC
>> >7. createAcls(aclBindings) // the list of bindings always filtered by
>> >TOPIC
>> >8. describeAcls(aclBindingFilter) // filter is always ANY_TOPIC_ACL
>> >9. describeConfigs(configResources) // Always for TOPIC resources
>> >10. listConsumerGroupOffsets(groupId)
>> >11. listConsumerGroups()
>> >12. alterConsumerGroupOffsets(groupId, offsets)
>> >13. describeCluster() // this is invoked from
>> > ConnectUtils.lookupKafkaClusterId(conf),
>> >but MM2 isn't the one that initialize the AdminClient
>> >
>> > Going with the Admin interface in practice will make any custom Admin
>> > implementation humongous even for a fringe use case because of the number
>> > of methods that need to return dummy data,
>> >
>> > I am still leaning toward a new interface as it abstract all MM2's
>> > interaction with Kafka Resources in one place; this makes it easier to
>> > maintain and make it easier for the use cases where customers wish to
>> > provide a different method to handle resources.
>> >
>> > Omnia
>> >
>> > On Tue, Oct 26, 2021 at 4:10 PM Ryanne Dolan 
>> > wrote:
>> >
>> > > I like the idea of failing-fast whenever a custom impl is provided, but I
>> > > suppose that that could be done for Admin as well. I agree your proposal
>> > is
>> > > more ergonomic, but maybe it's okay to have a little friction in such
>> > > fringe use-cases.
>> > >
>> > > Ryanne
>> > >
>> > >
>> > > On Tue, Oct 26, 2021, 6:23 AM Omnia Ibrahim 
>> > > wrote:
>> > >
>> > > > Hey Ryanne, Thanks fo the quick feedback.
>> > > > Using the Admin interface would make everything easier, as MM2 will
>> > need
>> > > > only to configure the classpath for the new implementation and use it
>> > > > instead of AdminClient.
>> > > > However, I have two concerns
>> > > > 1. The Admin interface is enormous, and the MM2 users will need to know
>> > > the
>> > > > list of methods MM2 depends on and override these only instead of
>> > > > implementing the whole Admin interface.
>> > > > 2. MM2 users will need keep an eye on any changes to Admin interface
>> > that
>> > > > impact MM2 for example deprecating methods.
>> > > > Am not sure if adding these concerns on the users is acceptable or not.
>> > > > One solution to address these concerns could be adding some checks to
>> > > make
>> > > > sure the methods MM2 uses from the Admin interface exists to fail
>> > faster.
>> > > > What do you think
>> > > >
>> > > > Omnia
>> > > >
>> > > >
>> > > > On Mon, Oct 25, 2021 at 11:24 PM Ryanne Dolan 
>> > > > wrote:
>> > > >
>> > > > > Thanks Omnia, neat idea. I wonder if we could use the existing Admin
>> > > > > interface instead of defining a new one?
>> > > > >
>> > > > > Ryanne
>

[jira] [Created] (KAFKA-13618) BatchAccumulator `Exptected` rename to `Expected`

2022-01-26 Thread Kvicii.Yu (Jira)
Kvicii.Yu created KAFKA-13618:
-

 Summary: BatchAccumulator `Exptected` rename to `Expected`
 Key: KAFKA-13618
 URL: https://issues.apache.org/jira/browse/KAFKA-13618
 Project: Kafka
  Issue Type: Improvement
Reporter: Kvicii.Yu
Assignee: Kvicii.Yu






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #635

2022-01-26 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 509983 lines...]
[2022-01-26T13:27:00.120Z] 
org.apache.kafka.streams.integration.EosV2UpgradeIntegrationTest > 
shouldUpgradeFromEosAlphaToEosV2[true] PASSED
[2022-01-26T13:27:00.120Z] 
[2022-01-26T13:27:00.120Z] 
org.apache.kafka.streams.integration.GlobalKTableIntegrationTest > 
shouldGetToRunningWithOnlyGlobalTopology STARTED
[2022-01-26T13:27:00.120Z] 
[2022-01-26T13:27:00.120Z] 
org.apache.kafka.streams.integration.GlobalKTableIntegrationTest > 
shouldGetToRunningWithOnlyGlobalTopology PASSED
[2022-01-26T13:27:00.120Z] 
[2022-01-26T13:27:00.120Z] 
org.apache.kafka.streams.integration.GlobalKTableIntegrationTest > 
shouldKStreamGlobalKTableLeftJoin STARTED
[2022-01-26T13:27:01.148Z] 
[2022-01-26T13:27:01.148Z] 
org.apache.kafka.streams.integration.GlobalKTableIntegrationTest > 
shouldKStreamGlobalKTableLeftJoin PASSED
[2022-01-26T13:27:01.148Z] 
[2022-01-26T13:27:01.148Z] 
org.apache.kafka.streams.integration.GlobalKTableIntegrationTest > 
shouldRestoreGlobalInMemoryKTableOnRestart STARTED
[2022-01-26T13:27:01.661Z] 
[2022-01-26T13:27:01.661Z] 
org.apache.kafka.streams.integration.TaskMetadataIntegrationTest > 
shouldReportCorrectEndOffsetInformation PASSED
[2022-01-26T13:27:01.661Z] 
[2022-01-26T13:27:01.661Z] 
org.apache.kafka.streams.integration.TaskMetadataIntegrationTest > 
shouldReportCorrectCommittedOffsetInformation STARTED
[2022-01-26T13:27:02.688Z] 
[2022-01-26T13:27:02.688Z] 
org.apache.kafka.streams.integration.TaskMetadataIntegrationTest > 
shouldReportCorrectCommittedOffsetInformation PASSED
[2022-01-26T13:27:03.203Z] 
[2022-01-26T13:27:03.203Z] 
org.apache.kafka.streams.integration.GlobalKTableIntegrationTest > 
shouldRestoreGlobalInMemoryKTableOnRestart PASSED
[2022-01-26T13:27:03.203Z] 
[2022-01-26T13:27:03.203Z] 
org.apache.kafka.streams.integration.GlobalKTableIntegrationTest > 
shouldKStreamGlobalKTableJoin STARTED
[2022-01-26T13:27:03.718Z] 
[2022-01-26T13:27:03.718Z] 
org.apache.kafka.streams.integration.KStreamTransformIntegrationTest > 
shouldFlatTransformValuesWithKeyWithConnectedStoreProvider STARTED
[2022-01-26T13:27:04.744Z] 
[2022-01-26T13:27:04.744Z] 
org.apache.kafka.streams.integration.KStreamTransformIntegrationTest > 
shouldFlatTransformValuesWithKeyWithConnectedStoreProvider PASSED
[2022-01-26T13:27:04.744Z] 
[2022-01-26T13:27:04.744Z] 
org.apache.kafka.streams.integration.KStreamTransformIntegrationTest > 
shouldFlatTransform STARTED
[2022-01-26T13:27:05.257Z] 
[2022-01-26T13:27:05.257Z] 
org.apache.kafka.streams.integration.GlobalKTableIntegrationTest > 
shouldKStreamGlobalKTableJoin PASSED
[2022-01-26T13:27:06.798Z] 
[2022-01-26T13:27:06.798Z] 
org.apache.kafka.streams.integration.KStreamTransformIntegrationTest > 
shouldFlatTransform PASSED
[2022-01-26T13:27:06.798Z] 
[2022-01-26T13:27:06.798Z] 
org.apache.kafka.streams.integration.KStreamTransformIntegrationTest > 
shouldFlatTransformValuesWithValueTransformerWithoutKey STARTED
[2022-01-26T13:27:07.826Z] 
[2022-01-26T13:27:07.826Z] 
org.apache.kafka.streams.integration.KStreamTransformIntegrationTest > 
shouldFlatTransformValuesWithValueTransformerWithoutKey PASSED
[2022-01-26T13:27:07.826Z] 
[2022-01-26T13:27:07.826Z] 
org.apache.kafka.streams.integration.KStreamTransformIntegrationTest > 
shouldFlatTransformValuesWithKey STARTED
[2022-01-26T13:27:08.341Z] 
[2022-01-26T13:27:08.341Z] 
org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest
 > shouldInnerJoinMultiPartitionQueryable STARTED
[2022-01-26T13:27:09.880Z] 
[2022-01-26T13:27:09.880Z] 
org.apache.kafka.streams.integration.KStreamTransformIntegrationTest > 
shouldFlatTransformValuesWithKey PASSED
[2022-01-26T13:27:09.880Z] 
[2022-01-26T13:27:09.880Z] 
org.apache.kafka.streams.integration.KStreamTransformIntegrationTest > 
shouldTransform STARTED
[2022-01-26T13:27:11.953Z] 
[2022-01-26T13:27:11.953Z] 
org.apache.kafka.streams.integration.KStreamTransformIntegrationTest > 
shouldTransform PASSED
[2022-01-26T13:27:11.953Z] 
[2022-01-26T13:27:11.953Z] 
org.apache.kafka.streams.integration.KStreamTransformIntegrationTest > 
shouldTransformValuesWithValueTransformerWithKey STARTED
[2022-01-26T13:27:14.188Z] 
[2022-01-26T13:27:14.188Z] 
org.apache.kafka.streams.integration.KStreamTransformIntegrationTest > 
shouldTransformValuesWithValueTransformerWithKey PASSED
[2022-01-26T13:27:14.188Z] 
[2022-01-26T13:27:14.188Z] 
org.apache.kafka.streams.integration.KStreamTransformIntegrationTest > 
shouldTransformValuesWithValueTransformerWithKeyWithConnectedStoreProvider 
STARTED
[2022-01-26T13:27:14.188Z] 
[2022-01-26T13:27:14.188Z] 
org.apache.kafka.streams.integration.KStreamTransformIntegrationTest > 
shouldTransformValuesWithValueTransformerWithKeyWithConnectedStoreProvider 
PASSED
[2022-01-26T13:27:14.188Z] 
[2022-01-26T13:27:14.188Z] 
org.apache.kafka.streams.integra

Re: [VOTE] KIP-808: Add support for unix epoch precision in TimestampConverter SMT

2022-01-26 Thread Mickael Maison
+1 (binding)

Thanks Julien for the KIP!

On Wed, Jan 26, 2022 at 12:05 PM Tom Bentley  wrote:
>
> Hi Julien,
>
> Thanks again for this KIP. +1 (binding).
>
> Kind regards,
>
> Tom
>
> On Tue, 18 Jan 2022 at 08:15, Julien Chanaud 
> wrote:
>
> > Hi everyone,
> >
> > I'd like to start a vote for KIP-808: Add support for unix epoch precision
> > in TimestampConverter SMT
> >
> > https://cwiki.apache.org/confluence/x/GJuqCw
> >
> > Thanks for your help,
> >
> > Julien
> >


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #634

2022-01-26 Thread Apache Jenkins Server
See 




Re: [VOTE] KIP-808: Add support for unix epoch precision in TimestampConverter SMT

2022-01-26 Thread Tom Bentley
Hi Julien,

Thanks again for this KIP. +1 (binding).

Kind regards,

Tom

On Tue, 18 Jan 2022 at 08:15, Julien Chanaud 
wrote:

> Hi everyone,
>
> I'd like to start a vote for KIP-808: Add support for unix epoch precision
> in TimestampConverter SMT
>
> https://cwiki.apache.org/confluence/x/GJuqCw
>
> Thanks for your help,
>
> Julien
>


[jira] [Resolved] (KAFKA-9279) Silent data loss in Kafka producer

2022-01-26 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-9279.
---
Fix Version/s: 3.2.0
   Resolution: Fixed

> Silent data loss in Kafka producer
> --
>
> Key: KAFKA-9279
> URL: https://issues.apache.org/jira/browse/KAFKA-9279
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.3.0
>Reporter: Andrew Klopper
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.2.0
>
>
> It appears that it is possible for a producer.commitTransaction() call to 
> succeed even if an individual producer.send() call has failed. The following 
> code demonstrates the issue:
> {code:java}
> package org.example.dataloss;
> import java.nio.charset.StandardCharsets;
> import java.util.Properties;
> import java.util.Random;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.ByteArraySerializer;
> public class Main {
> public static void main(final String[] args) {
> final Properties producerProps = new Properties();
> if (args.length != 2) {
> System.err.println("Invalid command-line arguments");
> System.exit(1);
> }
> final String bootstrapServer = args[0];
> final String topic = args[1];
> producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> bootstrapServer);
> producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "50");
> producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000");
> producerProps.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 
> "100");
> producerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
> "true");
> producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, 
> "dataloss_01");
> producerProps.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
> "dataloss_01");
> try (final KafkaProducer producer = new 
> KafkaProducer<>(producerProps, new ByteArraySerializer(), new 
> ByteArraySerializer())) {
> producer.initTransactions();
> producer.beginTransaction();
> final Random random = new Random();
> final byte[] largePayload = new byte[200];
> random.nextBytes(largePayload);
> producer.send(
> new ProducerRecord<>(
> topic,
> "large".getBytes(StandardCharsets.UTF_8),
> largePayload
> ),
> (metadata, e) -> {
> if (e == null) {
> System.out.println("INFO: Large payload succeeded");
> } else {
> System.err.printf("ERROR: Large payload failed: 
> %s\n", e.getMessage());
> }
> }
> );
> producer.commitTransaction();
> System.out.println("Commit succeeded");
> } catch (final Exception e) {
> System.err.printf("FATAL ERROR: %s", e.getMessage());
> }
> }
> }
> {code}
> The code prints the following output:
> {code:java}
> ERROR: Large payload failed: The message is 293 bytes when serialized 
> which is larger than the maximum request size you have configured with the 
> max.request.size configuration.
> Commit succeeded{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] KIP-787 - MM2 Interface to manage Kafka resources

2022-01-26 Thread Omnia Ibrahim
Hi,
If there are no more concerns regarding the proposal can I get some votes
on the KIP here
https://lists.apache.org/thread/950lpxjb5kbjm8qdszlpxm9h4j4sfyjx


Thanks

On Wed, Oct 27, 2021 at 3:54 PM Ryanne Dolan  wrote:

> Well I'm convinced! Thanks for looking into it.
>
> Ryanne
>
> On Wed, Oct 27, 2021, 8:49 AM Omnia Ibrahim 
> wrote:
>
> > I checked the difference between the number of methods in the Admin
> > interface and the number of methods MM2 invokes from Admin, and this diff
> > is enormous (it's more than 70 methods).
> > As far as I can see, the following methods MM2 depends on (in
> > MirrorSourceConnector, MirrorMaker, MirrorCheckpointTask and
> > MirrorCheckpointConnector), this will leave 73 methods on the Admin
> > interface that customer will need to return dummy data for,
> >
> >1. create(conf)
> >2. close
> >3. listTopics()
> >4. createTopics(newTopics, createTopicsOptions)
> >5. createPartitions(newPartitions)
> >6. alterConfigs(configs)  // this method is marked for deprecation in
> >Admin and the ConfigResource MM2 use is only TOPIC
> >7. createAcls(aclBindings) // the list of bindings always filtered by
> >TOPIC
> >8. describeAcls(aclBindingFilter) // filter is always ANY_TOPIC_ACL
> >9. describeConfigs(configResources) // Always for TOPIC resources
> >10. listConsumerGroupOffsets(groupId)
> >11. listConsumerGroups()
> >12. alterConsumerGroupOffsets(groupId, offsets)
> >13. describeCluster() // this is invoked from
> > ConnectUtils.lookupKafkaClusterId(conf),
> >but MM2 isn't the one that initialize the AdminClient
> >
> > Going with the Admin interface in practice will make any custom Admin
> > implementation humongous even for a fringe use case because of the number
> > of methods that need to return dummy data,
> >
> > I am still leaning toward a new interface as it abstract all MM2's
> > interaction with Kafka Resources in one place; this makes it easier to
> > maintain and make it easier for the use cases where customers wish to
> > provide a different method to handle resources.
> >
> > Omnia
> >
> > On Tue, Oct 26, 2021 at 4:10 PM Ryanne Dolan 
> > wrote:
> >
> > > I like the idea of failing-fast whenever a custom impl is provided,
> but I
> > > suppose that that could be done for Admin as well. I agree your
> proposal
> > is
> > > more ergonomic, but maybe it's okay to have a little friction in such
> > > fringe use-cases.
> > >
> > > Ryanne
> > >
> > >
> > > On Tue, Oct 26, 2021, 6:23 AM Omnia Ibrahim 
> > > wrote:
> > >
> > > > Hey Ryanne, Thanks fo the quick feedback.
> > > > Using the Admin interface would make everything easier, as MM2 will
> > need
> > > > only to configure the classpath for the new implementation and use it
> > > > instead of AdminClient.
> > > > However, I have two concerns
> > > > 1. The Admin interface is enormous, and the MM2 users will need to
> know
> > > the
> > > > list of methods MM2 depends on and override these only instead of
> > > > implementing the whole Admin interface.
> > > > 2. MM2 users will need keep an eye on any changes to Admin interface
> > that
> > > > impact MM2 for example deprecating methods.
> > > > Am not sure if adding these concerns on the users is acceptable or
> not.
> > > > One solution to address these concerns could be adding some checks to
> > > make
> > > > sure the methods MM2 uses from the Admin interface exists to fail
> > faster.
> > > > What do you think
> > > >
> > > > Omnia
> > > >
> > > >
> > > > On Mon, Oct 25, 2021 at 11:24 PM Ryanne Dolan  >
> > > > wrote:
> > > >
> > > > > Thanks Omnia, neat idea. I wonder if we could use the existing
> Admin
> > > > > interface instead of defining a new one?
> > > > >
> > > > > Ryanne
> > > > >
> > > > > On Mon, Oct 25, 2021, 12:54 PM Omnia Ibrahim <
> > o.g.h.ibra...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hey everyone,
> > > > > > Please take a look at KIP-787
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-787%3A+MM2+Interface+to+manage+Kafka+resources
> > > > > > <
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-787%3A+MM2+Interface+to+manage+Kafka+resources
> > > > > > >
> > > > > >
> > > > > > Thanks for the feedback and support
> > > > > > Omnia
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [VOTE] KIP-801: Implement an Authorizer that stores metadata in __cluster_metadata

2022-01-26 Thread Rajini Sivaram
Hi Colin,

Thanks for the KIP. A couple of questions:

1) The KIP says:
*However, when the controller is active, its Authorizer state may be
slightly ahead of the the broker's Authorizer state. This will happen in
the time between when a new ACL is created (or deleted) and the time that
this change is persisted durably by a majority of controller quorum peers.*

Once the ACL takes effect on the controller, do we guarantee that it will
be applied everywhere or can there be scenarios where the controller has
applied the change, but others don't due to a subsequent failure in
persisting the ACL changes?

2) Have we considered using a different config with limited privileges for
bootstrapping instead of the all-powerful *super.users*?

Regards,

Rajini


On Wed, Jan 26, 2022 at 1:50 AM Colin McCabe  wrote:

> How about this:
>
> We create a configuration key called early.start.listeners which contains
> a list of listener names. If this is not specified, its value defaults to
> just the controller listeners. Optionally, other listeners can be added too.
>
> If super.users contains any user names, early start listeners will start
> immediately. In the beginning they only authorize users that are in
> super.users. All other listeners receive a new error code,
> AUTHORIZER_NOT_READY_ERROR. If super.users does not contain any user names,
> then early start listeners will not be treated differently than other
> listeners.
>
> This will allow the controller listeners to get started immediately if the
> broker user is in super.users, which will speed up startup. It also will be
> useful for breaking chicken/egg cycles like needing to pull the SCRAM
> metadata to authorize pulling the SCRAM metadata.
>
> There are still a few use cases where super.users won't be required, but
> it may be useful in many cases to have this early start functionality.
>
> Leaving aside the preceding discussion, do you agree with starting up all
> endpoints (including non-early start ones) once we load a metadata
> snapshot? How feasible would it be for us to get a callback from the Raft
> layer the first time we caught up to the last stable offset? (we only want
> the callback the first time, not any other time). (I think the metadata
> shell also would want something like this, at least as an option).
>
> best,
> Colin
>
>
> On Tue, Jan 25, 2022, at 13:34, Jason Gustafson wrote:
> > Hi Colin,
> >
> > Thanks for the writeup. I had one question about bootstrapping. For the
> > brokers, I understand that listener startup is delayed until the
> Authorizer
> > is ready. However, I was not very clear how this would work for the
> > controller listeners. We may need them to startup before the metadata log
> > is ready so that a quorum can be established (as noted in the KIP). This
> > works fine if we assume that the controller principals are among
> > `super.users`. For requests forwarded from brokers, on the other hand, we
> > need to ensure the ACLs have been loaded properly before we begin
> > authorizing. The problem is that we currently use the same listener for
> > quorum requests and for forwarded requests. So my question is how does
> the
> > Authorizer communicate to the controller when it is safe to begin
> > authorizing different request types?
> >
> > There are a couple ways I can see this working. First, we could allow the
> > user to configure the listener used for forwarded requests separately.
> That
> > would work with the existing `Authorizer.start` API. Alternatively,
> perhaps
> > we could modify `Authorizer.start` to work with something more granular
> > than `EndPoint`. This would allow the controller to begin accepting
> > requests from the other quorum members before it is ready to authorize
> > forwarded requests from clients.  Then we would need some way to let
> > brokers know when the controller is ready to accept these forwarded
> > requests (e.g. through an error code in the `Envelope` response).
> >
> > What do you think?
> >
> > Thanks,
> > Jason
> >
> >
> >
> >
> >
> >
> > On Wed, Jan 12, 2022 at 12:57 PM David Arthur
> >  wrote:
> >
> >> +1 binding, thanks Colin!
> >>
> >> On Mon, Dec 13, 2021 at 7:47 PM Colin McCabe 
> wrote:
> >>
> >> > Hi all,
> >> >
> >> > I'd like to start the vote on KIP-801: Implement an Authorizer that
> >> stores
> >> > metadata in __cluster_metadata
> >> >
> >> > The KIP is here: https://cwiki.apache.org/confluence/x/h5KqCw
> >> >
> >> > The original DISCUSS thread is here:
> >> >
> >> > https://lists.apache.org/thread/3d5o7h17ztjztjhblx4fln0wbbs1rmdq
> >> >
> >> > Please take a look and vote if you can.
> >> >
> >> > best,
> >> > Colin
> >> >
> >>
> >>
> >> --
> >> -David
> >>
>


Build failed in Jenkins: Kafka » Kafka Branch Builder » 3.1 #70

2022-01-26 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 502829 lines...]
[2022-01-26T09:45:20.058Z] > Task :raft:testClasses UP-TO-DATE
[2022-01-26T09:45:20.058Z] > Task :connect:json:testJar
[2022-01-26T09:45:20.058Z] > Task :connect:json:testSrcJar
[2022-01-26T09:45:20.058Z] > Task :metadata:compileTestJava UP-TO-DATE
[2022-01-26T09:45:20.058Z] > Task :metadata:testClasses UP-TO-DATE
[2022-01-26T09:45:20.058Z] > Task 
:clients:generateMetadataFileForMavenJavaPublication
[2022-01-26T09:45:20.058Z] > Task 
:clients:generatePomFileForMavenJavaPublication
[2022-01-26T09:45:20.058Z] 
[2022-01-26T09:45:20.058Z] > Task :streams:processMessages
[2022-01-26T09:45:20.058Z] Execution optimizations have been disabled for task 
':streams:processMessages' to ensure correctness due to the following reasons:
[2022-01-26T09:45:20.058Z]   - Gradle detected a problem with the following 
location: 
'/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.1/streams/src/generated/java/org/apache/kafka/streams/internals/generated'.
 Reason: Task ':streams:srcJar' uses this output of task 
':streams:processMessages' without declaring an explicit or implicit 
dependency. This can lead to incorrect results being produced, depending on 
what order the tasks are executed. Please refer to 
https://docs.gradle.org/7.2/userguide/validation_problems.html#implicit_dependency
 for more details about this problem.
[2022-01-26T09:45:20.058Z] MessageGenerator: processed 1 Kafka message JSON 
files(s).
[2022-01-26T09:45:20.058Z] 
[2022-01-26T09:45:20.058Z] > Task :streams:compileJava UP-TO-DATE
[2022-01-26T09:45:20.058Z] > Task :streams:classes UP-TO-DATE
[2022-01-26T09:45:20.058Z] > Task :streams:test-utils:compileJava UP-TO-DATE
[2022-01-26T09:45:20.987Z] > Task :streams:copyDependantLibs
[2022-01-26T09:45:20.987Z] > Task :streams:jar UP-TO-DATE
[2022-01-26T09:45:20.987Z] > Task 
:streams:generateMetadataFileForMavenJavaPublication
[2022-01-26T09:45:23.774Z] > Task :connect:api:javadoc
[2022-01-26T09:45:23.774Z] > Task :connect:api:copyDependantLibs UP-TO-DATE
[2022-01-26T09:45:23.774Z] > Task :connect:api:jar UP-TO-DATE
[2022-01-26T09:45:23.774Z] > Task 
:connect:api:generateMetadataFileForMavenJavaPublication
[2022-01-26T09:45:23.774Z] > Task :connect:json:copyDependantLibs UP-TO-DATE
[2022-01-26T09:45:23.774Z] > Task :connect:json:jar UP-TO-DATE
[2022-01-26T09:45:23.774Z] > Task 
:connect:json:generateMetadataFileForMavenJavaPublication
[2022-01-26T09:45:23.774Z] > Task :connect:api:javadocJar
[2022-01-26T09:45:23.774Z] > Task :connect:api:compileTestJava UP-TO-DATE
[2022-01-26T09:45:23.774Z] > Task :connect:api:testClasses UP-TO-DATE
[2022-01-26T09:45:23.774Z] > Task :connect:api:testJar
[2022-01-26T09:45:23.774Z] > Task :connect:api:testSrcJar
[2022-01-26T09:45:23.774Z] > Task 
:connect:api:publishMavenJavaPublicationToMavenLocal
[2022-01-26T09:45:23.774Z] > Task 
:connect:json:publishMavenJavaPublicationToMavenLocal
[2022-01-26T09:45:23.774Z] > Task :connect:api:publishToMavenLocal
[2022-01-26T09:45:23.774Z] > Task :connect:json:publishToMavenLocal
[2022-01-26T09:45:27.318Z] > Task :streams:javadoc
[2022-01-26T09:45:27.318Z] > Task :streams:javadocJar
[2022-01-26T09:45:28.238Z] > Task :clients:javadoc
[2022-01-26T09:45:29.162Z] > Task :clients:javadocJar
[2022-01-26T09:45:30.083Z] 
[2022-01-26T09:45:30.083Z] > Task :clients:srcJar
[2022-01-26T09:45:30.083Z] Execution optimizations have been disabled for task 
':clients:srcJar' to ensure correctness due to the following reasons:
[2022-01-26T09:45:30.083Z]   - Gradle detected a problem with the following 
location: 
'/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.1/clients/src/generated/java'.
 Reason: Task ':clients:srcJar' uses this output of task 
':clients:processMessages' without declaring an explicit or implicit 
dependency. This can lead to incorrect results being produced, depending on 
what order the tasks are executed. Please refer to 
https://docs.gradle.org/7.2/userguide/validation_problems.html#implicit_dependency
 for more details about this problem.
[2022-01-26T09:45:30.083Z] 
[2022-01-26T09:45:30.083Z] > Task :clients:testJar
[2022-01-26T09:45:31.003Z] > Task :clients:testSrcJar
[2022-01-26T09:45:31.003Z] > Task 
:clients:publishMavenJavaPublicationToMavenLocal
[2022-01-26T09:45:31.003Z] > Task :clients:publishToMavenLocal
[2022-01-26T09:45:49.993Z] > Task :core:compileScala
[2022-01-26T09:46:46.768Z] > Task :core:classes
[2022-01-26T09:46:46.768Z] > Task :core:compileTestJava NO-SOURCE
[2022-01-26T09:47:16.981Z] > Task :core:compileTestScala
[2022-01-26T09:47:58.174Z] > Task :core:testClasses
[2022-01-26T09:48:11.925Z] > Task :streams:compileTestJava
[2022-01-26T09:48:11.925Z] > Task :streams:testClasses
[2022-01-26T09:48:11.925Z] > Task :streams:testJar
[2022-01-26T09:48:12.851Z] > Task :streams:testSrcJar
[2022-01-26T09:48:12.851Z] > Task 
:streams:publishMavenJavaPublic

Re: [VOTE] KIP-769: Connect APIs to list all connector plugins and retrieve their configuration definitions

2022-01-26 Thread Viktor Somogyi-Vass
Hi Michael,

+1 (non-binding) from me.

Viktor

On Thu, Jan 13, 2022 at 11:32 AM Mickael Maison 
wrote:

> Bumping this vote.
>
> We have 2 non-binding votes so far. Please take a look and let me know
> if you have any feedback.
>
> Thanks,
> Mickael
>
> On Mon, Dec 13, 2021 at 10:50 PM Ryanne Dolan 
> wrote:
> >
> > +1 (non-binding)
> >
> > Ryanne
> >
> > On Mon, Dec 13, 2021, 4:18 AM Mickael Maison 
> > wrote:
> >
> > > Hi all,
> > >
> > > I'd like to start a vote on KIP-769 which proposes adding new
> > > endpoints to the Connect REST API to list all connectors plugins and
> > > retrieve their configurations.
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-769%3A+Connect+APIs+to+list+all+connector+plugins+and+retrieve+their+configuration+definitions
> > >
> > > Please take a look and let me know if you have any feedback.
> > >
> > > Thanks,
> > > Mickael
> > >
>


Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #633

2022-01-26 Thread Apache Jenkins Server
See