Re: [VOTE] 3.0.1 RC0

2022-03-07 Thread Luke Chen
Hi Mickael,

Thanks for running the release!

I did the following:
   1. Validated the scala 2.13 checksums
   2. Spot checked the java docs
   3. Ran the quick start with scala 2.13 (found a minor bug KAFKA-13718
, won't block the
release)

+1 (non-binding).

Thank you.

On Tue, Mar 8, 2022 at 1:33 AM Mickael Maison 
wrote:

> Here is a successful Jenkins build for the 3.0 branch:
> https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.0/183/
>
> On Mon, Mar 7, 2022 at 12:27 AM Jakub Scholz  wrote:
> >
> > +1 (non-binding). I used the staged Scala 2.13 binaries and the staging
> > Maven repository to run my tests. All seems to work fine, no issues
> found.
> >
> > Thanks
> > Jakub
> >
> > On Thu, Mar 3, 2022 at 7:05 PM Mickael Maison 
> wrote:
> >
> > > Hello Kafka users, developers and client-developers,
> > >
> > > This is the first candidate for release of Apache Kafka 3.0.1.
> > >
> > > Apache Kafka 3.0.1 is a bugfix release and 29 issues have been fixed
> > > since 3.0.0.
> > >
> > > Release notes for the 3.0.1 release:
> > > https://home.apache.org/~mimaison/kafka-3.0.1-rc0/RELEASE_NOTES.html
> > >
> > > *** Please download, test and vote by Thursday, March 10, 6pm GMT ***
> > >
> > > Kafka's KEYS file containing PGP keys we use to sign the release:
> > > https://kafka.apache.org/KEYS
> > >
> > > * Release artifacts to be voted upon (source and binary):
> > > https://home.apache.org/~mimaison/kafka-3.0.1-rc0/
> > >
> > > * Maven artifacts to be voted upon:
> > > https://repository.apache.org/content/groups/staging/org/apache/kafka/
> > >
> > > * Javadoc:
> > > https://home.apache.org/~mimaison/kafka-3.0.1-rc0/javadoc/
> > >
> > > * Tag to be voted upon (off 3.0 branch) is the 3.0.1 tag:
> > > https://github.com/apache/kafka/releases/tag/3.0.1-rc0
> > >
> > > * Documentation:
> > > https://kafka.apache.org/30/documentation.html
> > >
> > > * Protocol:
> > > https://kafka.apache.org/30/protocol.html
> > >
> > > * Successful Jenkins builds for the 3.0 branch:
> > > I'll share a link once the build complete
> > >
> > > /**
> > >
> > > Thanks,
> > > Mickael
> > >
>


[jira] [Created] (KAFKA-13718) kafka-topics create with default config will show `segment.bytes` overridden config

2022-03-07 Thread Luke Chen (Jira)
Luke Chen created KAFKA-13718:
-

 Summary: kafka-topics create with default config will show 
`segment.bytes` overridden config 
 Key: KAFKA-13718
 URL: https://issues.apache.org/jira/browse/KAFKA-13718
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 3.0.0, 2.8.1, 3.1.0
Reporter: Luke Chen


Following the quickstart guide[1], when describing the topic just created with 
default config, I found there's a overridden config shown:

_> bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server 
localhost:9092_

_Topic: quickstart-events   TopicId: 06zRrzDCRceR9zWAf_BUWQ    PartitionCount: 
1    ReplicationFactor: 1    *Configs: segment.bytes=1073741824*_
    _Topic: quickstart-events    Partition: 0    Leader: 0    Replicas: 0    
Isr: 0_

 

Compared with the expected result in Kafka quick start page, this config result 
should be empty. Although the config value is as what we expected (default 1GB 
value), this info display still confuse users.

 

 

 

 

 

 

[1]: https://kafka.apache.org/quickstart



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] KIP-714: Client metrics and observability

2022-03-07 Thread Ashenafi Marcos
Hi,
Can you please take out my email I’d so that will not be able to receive
any mail from you.
Thank you

On Tue, Oct 19, 2021 at 1:30 PM Mickael Maison 
wrote:

> Hi Magnus,
>
> Thanks for the proposal.
>
> 1. Looking at the protocol section, isn't "ClientInstanceId" expected
> to be a field in GetTelemetrySubscriptionsResponseV0? Otherwise, how
> does a client retrieve this value?
>
> 2. In the client API section, you mention a new method
> "clientInstanceId()". Can you clarify which interfaces are affected?
> Is it only Consumer and Producer?
>
> 3. I'm a bit concerned this is enabled by default. Even if the data
> collected is supposed to be not sensitive, I think this can be
> problematic in some environments. Also users don't seem to have the
> choice to only expose some metrics. Knowing how much data transit
> through some applications can be considered critical.
>
> 4. As a user, how do you know if your application is actively sending
> metrics? Are there new metrics exposing what's going on, like how much
> data is being sent?
>
> 5. If all metrics are enabled on a regular Consumer or Producer, do
> you have an idea how much throughput this would use?
>
> Thanks
>
> On Tue, Oct 19, 2021 at 5:06 PM Magnus Edenhill 
> wrote:
> >
> > Den tis 19 okt. 2021 kl 13:22 skrev Tom Bentley :
> >
> > > Hi Magnus,
> > >
> > > I reviewed the KIP since you called the vote (sorry for not reviewing
> when
> > > you announced your intention to call the vote). I have a few questions
> on
> > > some of the details.
> > >
> > > 1. There's no Javadoc on ClientTelemetryPayload.data(), so I don't know
> > > whether the payload is exposed through this method as compressed or
> not.
> > > Later on you say "Decompression of the payloads will be handled by the
> > > broker metrics plugin, the broker should expose a suitable
> decompression
> > > API to the metrics plugin for this purpose.", which suggests it's the
> > > compressed data in the buffer, but then we don't know which codec was
> used,
> > > nor the API via which the plugin should decompress it if required for
> > > forwarding to the ultimate metrics store. Should the
> ClientTelemetryPayload
> > > expose a method to get the compression and a decompressor?
> > >
> >
> > Good point, updated.
> >
> >
> >
> > > 2. The client-side API is expressed as StringOrError
> > > ClientInstance::ClientInstanceId(int timeout_ms). I understand that
> you're
> > > thinking about the librdkafka implementation, but it would be good to
> show
> > > the API as it would appear on the Apache Kafka clients.
> > >
> >
> > This was meant as pseudo-code, but I changed it to Java.
> >
> >
> > > 3. "PushTelemetryRequest|Response - protocol request used by the
> client to
> > > send metrics to any broker it is connected to." To be clear, this means
> > > that the client can choose any of the connected brokers and push to
> just
> > > one of them? What should a supporting client do if it gets an error
> when
> > > pushing metrics to a broker, retry sending to the same broker or try
> > > pushing to another broker, or drop the metrics? Should supporting
> clients
> > > send successive requests to a single broker, or round robin, or is
> that up
> > > to the client author? I'm guessing the behaviour should be sticky to
> > > support the rate limiting features, but I think it would be good for
> client
> > > authors if this section were explicit on the recommended behaviour.
> > >
> >
> > You are right, I've updated the KIP to make this clearer.
> >
> >
> > > 4. "Mapping the client instance id to an actual application instance
> > > running on a (virtual) machine can be done by inspecting the metrics
> > > resource labels, such as the client source address and source port, or
> > > security principal, all of which are added by the receiving broker.
> This
> > > will allow the operator together with the user to identify the actual
> > > application instance." Is this really always true? The source IP and
> port
> > > might be a loadbalancer/proxy in some setups. The principal, as already
> > > mentioned in the KIP, might be shared between multiple applications.
> So at
> > > worst the organization running the clients might have to consult the
> logs
> > > of a set of client applications, right?
> > >
> >
> > Yes, that's correct. There's no guaranteed mapping from
> client_instance_id
> > to
> > an actual instance, that's why the KIP recommends client implementations
> to
> > log the client instance id
> > upon retrieval, and also provide an API for the application to retrieve
> the
> > instance id programmatically
> > if it has a better way of exposing it.
> >
> >
> > 5. "Tests indicate that a compression ratio up to 10x is possible for the
> > > standard metrics." Client authors might appreciate your mentioning
> which
> > > compression codec got these results.
> > >
> >
> > Good point. Updated.
> >
> >
> > > 6. "Should the client send a push request prior to expiry of the
> previously
> > > 

Re: [DISCUSS] KIP-824 Allowing dumping segmentlogs limiting the batches in the output

2022-03-07 Thread Sergio Daniel Troiano
Hi Luke,

Make sense, done!

Thank you.
Sergio Troiano

On Tue, 8 Mar 2022 at 03:02, Luke Chen  wrote:

> Hi Sergio,
>
> > I don't want this to minimize the main feature I want to deploy as I
> think the
> message size limit is not as important as the limiting the amount of
> batches.
>
> Agree! Let's focus on the feature of limiting the batch amounts.
>
> One more comment to the KIP:
> 1. Could you put the new parameter description into the KIP proposed change
> section? That would make it much clear.
>
>
> Thank you.
> Luke
>
> On Mon, Mar 7, 2022 at 8:44 PM Sergio Daniel Troiano
>  wrote:
>
> > hey Luke,
> >
> > I am interested in expanding the KIP scope but I am a bit concerned this
> > could create a lot of noise and confusion as they look like very similar
> > parameters, I agree this is a small change, so I think if I do it
> properly
> > it should not be a problem at all, I just will need a couple more of days
> > as I want to create the proper tests as well.
> >
> > I have a doubt about editing the KIP, I mean should I add this as a new
> > feature as well?, should I describe this as a side effect finding? I
> don't
> > want this to minimize the main feature I want to deploy as I think the
> > message size limit is not as important as the limiting the amount of
> > batches.
> >
> > It is up to you, if you guys consider we must add this in this KIP then I
> > will be happy to do it. 
> >
> > Best regards.
> > Sergio Troiano
> >
> > On Mon, 7 Mar 2022 at 02:01, Luke Chen  wrote:
> >
> > > Hi Sergio,
> > >
> > > Thanks for your explanation.
> > > Make sense to me.
> > >
> > > > Only interesting thing that I have just found is *max-message-size
> *is
> > > not
> > > used while dump logs are requested, instead it is used by dumpIndex
> > >
> > > Are you interested in expanding the scope of this KIP to include the
> > > *max-message-size* in dumping logs?
> > > I think it's good because it will also be a small change, and no need
> to
> > go
> > > through another KIP discussing/voting process. But I'm fine if you want
> > to
> > > keep this KIP as is, and create another JIRA ticket for future work.
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Mon, Mar 7, 2022 at 6:02 AM Sergio Daniel Troiano
> > >  wrote:
> > >
> > > > hey Luke,
> > > >
> > > > Let me answer them:
> > > > 1. If the *max-batches-size* is too small that results in no records
> > > > output, will we output any information to the user?
> > > >
> > > > If the  *max-batches-size*is even smaller than the first batch then
> > there
> > > > won't be any output, this is handled by FileRecords class, I think
> this
> > > is
> > > > correct as this is the expected behaviour.
> > > >
> > > > 2. After your explanation, I guess the use of *max-batches-size*
> won't
> > > > conflict with *max-message-size*, right?
> > > >
> > > > Only interesting thing that I have just found is *max-message-size
> *is
> > > not
> > > > used while dump logs are requested, instead it is used by dumpIndex
> > > > <
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/DumpLogSegments.scala#L150
> > > > >
> > > > so,
> > > > this feature is not working for dumping logs, even though I checked
> if
> > > > there is a unit test for this and there is not any. Maybe we could
> > > create a
> > > > ticket for this?
> > > >
> > > > Regards.
> > > >
> > > >
> > > > On Sat, 5 Mar 2022 at 10:36, Luke Chen  wrote:
> > > >
> > > > > Hi Sergio,
> > > > >
> > > > > Thanks for the explanation! Very clear!
> > > > > I think we should put this example and explanation into KIP.
> > > > >
> > > > > Other comments:
> > > > > 1. If the *max-batches-size* is too small that results in no
> records
> > > > > output, will we output any information to the user?
> > > > > 2. After your explanation, I guess the use of *max-batches-size*
> > won't
> > > > > conflict with *max-message-size*, right?
> > > > > That is, user can set the 2 arguments at the same time. Is that
> > > correct?
> > > > >
> > > > > Thank you.
> > > > > Luke
> > > > >
> > > > > Thank you.
> > > > > Luke
> > > > >
> > > > > On Sat, Mar 5, 2022 at 4:47 PM Sergio Daniel Troiano
> > > > >  wrote:
> > > > >
> > > > > > hey Luke,
> > > > > >
> > > > > > thanks for the interest, it is a good question, please let me
> > explain
> > > > > you:
> > > > > >
> > > > > > *max-message-size *a filter for the size of each batch, so for
> > > example
> > > > if
> > > > > > Iset --max-message-size 1000 bytes and my segment log has 300
> > > batches,
> > > > > 150
> > > > > > of them has a size of 500 bytes  and the other 150 has a size of
> > 2000
> > > > > bytes
> > > > > > then the script will skip the las 150 ones as each batch is
> heavier
> > > > than
> > > > > > the limit.
> > > > > >
> > > > > > In the other hand following the same example above with
> > > > *max-batches-size
> > > > > > *set
> > > > > > to 1000 bytes it will only print out the first 2 batches (500
> bytes
> > > > each)
> > > > 

[jira] [Created] (KAFKA-13717) KafkaConsumer.close throws authorization exception even when commit offsets is empty

2022-03-07 Thread Vincent Jiang (Jira)
Vincent Jiang created KAFKA-13717:
-

 Summary: KafkaConsumer.close throws authorization exception even 
when commit offsets is empty
 Key: KAFKA-13717
 URL: https://issues.apache.org/jira/browse/KAFKA-13717
 Project: Kafka
  Issue Type: Bug
  Components: unit tests
Reporter: Vincent Jiang


When offsets is empty and coordinator is unknown, KafkaConsumer.close doesn't 
throw exception before commit 
[https://github.com/apache/kafka/commit/4b468a9d81f7380f7197a2a6b859c1b4dca84bd9|https://github.com/apache/kafka/commit/4b468a9d81f7380f7197a2a6b859c1b4dca84bd9,].
  After this commit, Kafka.close may throw authorization exception.

 

Root cause is because in the commit, the logic is changed to call 
lookupCoordinator even if offsets is empty. 

 

Even if a consumer doesn't have access to a group or a topic, it might be 
better to not throw authorization exception in this case because close() call 
doesn't touch actually access any resource.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13716) add tests for `DeleteRecordsCommand`

2022-03-07 Thread Luke Chen (Jira)
Luke Chen created KAFKA-13716:
-

 Summary: add tests for `DeleteRecordsCommand`
 Key: KAFKA-13716
 URL: https://issues.apache.org/jira/browse/KAFKA-13716
 Project: Kafka
  Issue Type: Test
  Components: tools
Reporter: Luke Chen


Found there's no tests for `DeleteRecordsCommand` class, which is used in 
`kafka-delete-records.sh`. We should add it.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13715) Add "generation" field into consumer protocol

2022-03-07 Thread Luke Chen (Jira)
Luke Chen created KAFKA-13715:
-

 Summary: Add "generation" field into consumer protocol
 Key: KAFKA-13715
 URL: https://issues.apache.org/jira/browse/KAFKA-13715
 Project: Kafka
  Issue Type: New Feature
  Components: clients
Reporter: Luke Chen
Assignee: Luke Chen


Implementation of KIP-792.

https://cwiki.apache.org/confluence/pages/resumedraft.action?draftId=191336615=e541012b-4f5f-463a-a6f8-a6f13e9ddbeb;



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] KIP-824 Allowing dumping segmentlogs limiting the batches in the output

2022-03-07 Thread Luke Chen
Hi Sergio,

> I don't want this to minimize the main feature I want to deploy as I
think the
message size limit is not as important as the limiting the amount of
batches.

Agree! Let's focus on the feature of limiting the batch amounts.

One more comment to the KIP:
1. Could you put the new parameter description into the KIP proposed change
section? That would make it much clear.


Thank you.
Luke

On Mon, Mar 7, 2022 at 8:44 PM Sergio Daniel Troiano
 wrote:

> hey Luke,
>
> I am interested in expanding the KIP scope but I am a bit concerned this
> could create a lot of noise and confusion as they look like very similar
> parameters, I agree this is a small change, so I think if I do it properly
> it should not be a problem at all, I just will need a couple more of days
> as I want to create the proper tests as well.
>
> I have a doubt about editing the KIP, I mean should I add this as a new
> feature as well?, should I describe this as a side effect finding? I don't
> want this to minimize the main feature I want to deploy as I think the
> message size limit is not as important as the limiting the amount of
> batches.
>
> It is up to you, if you guys consider we must add this in this KIP then I
> will be happy to do it. 
>
> Best regards.
> Sergio Troiano
>
> On Mon, 7 Mar 2022 at 02:01, Luke Chen  wrote:
>
> > Hi Sergio,
> >
> > Thanks for your explanation.
> > Make sense to me.
> >
> > > Only interesting thing that I have just found is *max-message-size *is
> > not
> > used while dump logs are requested, instead it is used by dumpIndex
> >
> > Are you interested in expanding the scope of this KIP to include the
> > *max-message-size* in dumping logs?
> > I think it's good because it will also be a small change, and no need to
> go
> > through another KIP discussing/voting process. But I'm fine if you want
> to
> > keep this KIP as is, and create another JIRA ticket for future work.
> >
> > Thank you.
> > Luke
> >
> > On Mon, Mar 7, 2022 at 6:02 AM Sergio Daniel Troiano
> >  wrote:
> >
> > > hey Luke,
> > >
> > > Let me answer them:
> > > 1. If the *max-batches-size* is too small that results in no records
> > > output, will we output any information to the user?
> > >
> > > If the  *max-batches-size*is even smaller than the first batch then
> there
> > > won't be any output, this is handled by FileRecords class, I think this
> > is
> > > correct as this is the expected behaviour.
> > >
> > > 2. After your explanation, I guess the use of *max-batches-size* won't
> > > conflict with *max-message-size*, right?
> > >
> > > Only interesting thing that I have just found is *max-message-size *is
> > not
> > > used while dump logs are requested, instead it is used by dumpIndex
> > > <
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/DumpLogSegments.scala#L150
> > > >
> > > so,
> > > this feature is not working for dumping logs, even though I checked if
> > > there is a unit test for this and there is not any. Maybe we could
> > create a
> > > ticket for this?
> > >
> > > Regards.
> > >
> > >
> > > On Sat, 5 Mar 2022 at 10:36, Luke Chen  wrote:
> > >
> > > > Hi Sergio,
> > > >
> > > > Thanks for the explanation! Very clear!
> > > > I think we should put this example and explanation into KIP.
> > > >
> > > > Other comments:
> > > > 1. If the *max-batches-size* is too small that results in no records
> > > > output, will we output any information to the user?
> > > > 2. After your explanation, I guess the use of *max-batches-size*
> won't
> > > > conflict with *max-message-size*, right?
> > > > That is, user can set the 2 arguments at the same time. Is that
> > correct?
> > > >
> > > > Thank you.
> > > > Luke
> > > >
> > > > Thank you.
> > > > Luke
> > > >
> > > > On Sat, Mar 5, 2022 at 4:47 PM Sergio Daniel Troiano
> > > >  wrote:
> > > >
> > > > > hey Luke,
> > > > >
> > > > > thanks for the interest, it is a good question, please let me
> explain
> > > > you:
> > > > >
> > > > > *max-message-size *a filter for the size of each batch, so for
> > example
> > > if
> > > > > Iset --max-message-size 1000 bytes and my segment log has 300
> > batches,
> > > > 150
> > > > > of them has a size of 500 bytes  and the other 150 has a size of
> 2000
> > > > bytes
> > > > > then the script will skip the las 150 ones as each batch is heavier
> > > than
> > > > > the limit.
> > > > >
> > > > > In the other hand following the same example above with
> > > *max-batches-size
> > > > > *set
> > > > > to 1000 bytes it will only print out the first 2 batches (500 bytes
> > > each)
> > > > > and stop, This will avoid reading the whole file
> > > > >
> > > > >
> > > > > Also if all of them are smaller than 1000 bytes it will end up
> > printing
> > > > out
> > > > > all the batches.
> > > > > The idea of my change is to limit the *amount* of batches no matter
> > > their
> > > > > size.
> > > > >
> > > > > I hope this reply helps.
> > > > > Best regards.
> > > > >
> > > > > On Sat, 5 Mar 2022 at 

Re: [DISCUSS] KIP-820: Extend KStream process with new Processor API

2022-03-07 Thread Matthias J. Sax
I think it's ok that we cannot prevent users from mutating a given 
read-only object. We have similar issues "all over the place" in the 
API, because it's just how Java works unfortunately (eg, 
`ValueMapperWithKey` and similar interfaces).


The point being is, that the API clearly expresses that the key should 
not be changes, as `FixedKeyRecord` as not `withKey()` method, what is 
much better then having `Record.withKey()` and thus incorrectly 
indicating to user that it would be ok to set a new key.


I think it's worth to add the new interfaces.


-Matthias


On 3/7/22 11:46 AM, Guozhang Wang wrote:

Thanks John! I feel a bit ashamed of just thinking loud here without trying
out prototypes myself :P

I think the FixedKeyProcessor/Record looks very good -- like you said,
since we are making a new set of APIs then why don't we reconsider more
bolderly -- but at the same time I'd also like to make sure we agree on how
much "safety" we can achieve in runtime: even with the proposed APIs, we
cannot prevent users doing something like:

---
process(FixedKeyRecord inputRecord) {
 inputRecord.key().modifyField(...); // this is not preventable with
runtime key validations either since we just check the key object itself is
not replaced
 context.forward(inputRecord);
}

---

I.e. in either type-safety or runtime validation, we cannot be 100% safe
that users would not do anything wrong. This drives me to think, how much
we'd like to pay to "remind" (instead of say "enforce", since we cannot
really do it) users the semantics of "processValue". Personally I felt that
adding the new set of APIs for that purpose only is a bit overkill, and
hence was leaning towards just the runtime validation. But I admit this is
purely subjective so I'm willing to yield to the group if others feel it's
worthy to do so.


Guozhang



On Mon, Mar 7, 2022 at 10:32 AM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:


Thanks, John!
This looks very promising.

I will familiarize this approach and update the KIP accordingly. From what
I can see so far, this should cover most of the open issues in this
proposal.

PS.


Just as a reminder, the current approach with transformers
is NOT enforced at compile time. Transformers have access to
a "forwarding disabled" processor context, which still has
the forward methods that throw a runtime exception when
invoked.


Agree. I was referring to the value transformers where `readOnlyKey` is
passed but not forwarded internally. Though about the "forwarding disabled"
approach, you're totally right that is a runtime validation.
Regardless, the approach proposed here will be a much better one.


On Sun, 6 Mar 2022 at 18:59, John Roesler  wrote:


Hello all,

It seems like we're making good progress on this discussion.
If I'm keeping track correctly, if we can resolve this
question about how to handle processValues(), then we should
be able to finalize the vote, right?

I share Matthias's preference for having a type-safe API.

Just as a reminder, the current approach with transformers
is NOT enforced at compile time. Transformers have access to
a "forwarding disabled" processor context, which still has
the forward methods that throw a runtime exception when
invoked.

However, the spirit of the "new processor api" line of work
is to clean up a lot of the cruft around the original
processor API, so this is a good opportunity to introduce a
type-safe version if we can.

Based on my experience adding the new processor API, I felt
like it should be possible to do what he suggests, but it
would be more involved than what he said. The biggest thing
I learned from that effort, though, is that you really have
to just try it to see what all the complications are.

With that in mind, I went ahead and implemented the
suggestion: https://github.com/apache/kafka/pull/11854

This is a functional prototype. It only adds processValues,
which takes a supplier of a new type, FixedKeyProcessor.
That processor only processes FixedKeyRecords, which have a
key that cannot be changed. FixedKeyProcessors have a
special context, a FixedKeyProcessorContext, which can only
forward FixedKeyRecords.

FixedKeyRecords have "fixed keys" because its key can only
be set in the constructor, and its constructor is package-
private.

As you can see, this new record/processor/context ecosystem
is an independent peer of the general one. This is necessary
to ensure the desired compiler check. For example, if
FixedKeyRecord were merely an interface implemented by
Record, then users could create a new Record with a new key
and forward it as a FixedKeyRecord, violating the
constraint.

As I said, with this proposal, the devil is in the details,
so if anyone thinks the API can be simplified, I suggest you
check out the branch and try out your proposal. I'd be very
happy to have a simplier solution, but I'm also pretty sure
this complexity is necessary.

Taking a step back, I do think this approach 

Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #743

2022-03-07 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-13714) Flaky test IQv2StoreIntegrationTest

2022-03-07 Thread John Roesler (Jira)
John Roesler created KAFKA-13714:


 Summary: Flaky test IQv2StoreIntegrationTest
 Key: KAFKA-13714
 URL: https://issues.apache.org/jira/browse/KAFKA-13714
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.2.0
Reporter: John Roesler


I have observed multiple consistency violations in the 
IQv2StoreIntegrationTest. Since this is the first release of IQv2, and it's 
apparently a major flaw in the feature, we should not release with this bug 
outstanding. Depending on the time-table, we may want to block the release or 
pull the feature until the next release.

 

The first observation I have is from 23 Feb 2022. So far all observations point 
to the range query in particular, and all observations have been for RocksDB 
stores, including RocksDBStore, TimestampedRocksDBStore, and the windowed store 
built on RocksDB segments.

For reference, range queries were implemented on 16 Feb 2022: 
[https://github.com/apache/kafka/commit/b38f6ba5cc989702180f5d5f8e55ba20444ea884]

The window-specific range query test has also failed once that I have seen. 
That feature was implemented on 2 Jan 2022: 
[https://github.com/apache/kafka/commit/b8f1cf14c396ab04b8968a8fa04d8cf67dd3254c]

 

Here are some stack traces I have seen:
{code:java}
verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI]

java.lang.AssertionError: 
Expected: is <[1, 2, 3]>
 but: was <[1, 2]>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1125)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:803)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776)
 {code}
{code:java}
verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI]

java.lang.AssertionError: 
Expected: is <[1, 2, 3]>
 but: was <[1, 3]>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1131)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:809)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:778)
 {code}
{code:java}
verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI]

java.lang.AssertionError: 
Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@35025a0a,
 executionInfo=[], position=Position{position={input-topic={0=1, 
1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@38732364,
 executionInfo=[], position=Position{position={input-topic={1=1}, 
globalResult=null}
Expected: is <[1, 2, 3]>
 but: was <[1, 2]>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:780)
 {code}
{code:java}
verifyStore[cache=true, log=false, supplier=ROCKS_WINDOW, kind=DSL] 

    java.lang.AssertionError: 
Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@2a32fb6,
 executionInfo=[], position=Position{position={input-topic={0=1, 
1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@6107165,
 executionInfo=[], position=Position{position={input-topic={1=1}, 
globalResult=null}
    Expected: is <[0, 1, 2, 3]> 
         but: was <[0, 2, 3]>
        at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleWindowRangeQuery(IQv2StoreIntegrationTest.java:1234)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleWindowRangeQueries(IQv2StoreIntegrationTest.java:880)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:793)
 {code}
 

Some observations:
 * After I added the 

Re: [DISCUSS] KIP-714: Client metrics and observability

2022-03-07 Thread Magnus Edenhill
Hi Jun,

thanks for your initiated questions, see my answers below.
There's been a number of clarifications to the KIP.



Den tors 27 jan. 2022 kl 20:08 skrev Jun Rao :

> Hi, Magnus,
>
> Thanks for updating the KIP. The overall approach makes sense to me. A few
> more detailed comments below.
>
> 20. ClientTelemetry: Should it be extending configurable and closable?
>

I'll pass this question to Sarat and/or Xavier.



> 21. Compression of the metrics on the client: what's the default?
>

How about we specify a prioritized list: zstd, lz4, snappy, gzip?
But ultimately it is up to what the client supports.


23. A client instance is considered a metric resource and the
> resource-level (thus client instance level) labels could include:
> client_software_name=confluent-kafka-python
> client_software_version=v2.1.3
> client_instance_id=B64CD139-3975-440A-91D4
> transactional_id=someTxnApp
> Are those labels added in PushTelemetryRequest? If so, are they per metric
> or per request?
>


client_software* and client_instance_id are not added by the client, but
available to
the broker-side metrics plugin for adding as it see fits, remove them from
the KIP.

As for transactional_id, group_id, etc, which I believe will be useful in
troubleshooting,
are included only once (per push) as resource-level attributes (the client
instance is a singular resource).


>
> 24.  "the broker will only send
> GetTelemetrySubscriptionsResponse.DeltaTemporality=True" :
> 24.1 If it's always true, does it need to be part of the protocol?
>

We're anticipating that it will take a lot longer to upgrade the majority
of clients than the
broker/plugin side, which is why we want the client to support both
temporalities out-of-the-box
so that cumulative reporting can be turned on seamlessly in the future.



> 24.2 Does delta only apply to Counter type?
>


And Histograms. More details in Xavier's OTLP link.



> 24.3 In the delta representation, the first request needs to send the full
> value, how does the broker plugin know whether a value is full or delta?
>

The client may (should) send the start time for each metric sample,
indicating when
the metric began to be collected.
We've discussed whether this should be the client instance start time or
the time when a matching
metric subscription for that metric is received.
For completeness we recommend using the former, the client instance start
time.



> 25. quota:
> 25.1 Since we are fitting PushTelemetryRequest into the existing request
> quota, it would be useful to document the impact, i.e. client metric
> throttling causes the data from the same client to be delayed.
> 25.2 Is PushTelemetryRequest subject to the write bandwidth quota like the
> producer?
>


Yes, it should be, as to protect the cluster from rogue clients.
But, in practice the size of metrics will be quite low (e.g., 1-10kb per
60s interval), so I don't think this will pose a problem.
The KIP has been updated with more details on quota/throttling behaviour,
see the
"Throttling and rate-limiting" section.


25.3 THROTTLING_QUOTA_EXCEEDED: Currently, we don't send this error when
> the request/bandwidth quota is exceeded since those requests are not
> rejected. We only set this error when the request is rejected (e.g., topic
> creation). It would be useful to clarify when this error is used.
>

Right, I was trying to reuse an existing error-code. We can introduce
a new one for the case where a client pushes metrics at a higher frequency
than the
than the configured push interval (e.g., out-of-profile sends).
This causes the broker to drop those metrics and send this error code back
to the client. There will be no connection throttling / channel-muting in
this
case (unless the standard quotas are exceeded).


> 27. kafka-client-metrics.sh: Could we add an example on how to disable a
> bad client?
>

There's now a --block option to kafka-client-metrics.sh which overrides all
subscriptions
for the matched client(s). This allows silencing metrics for one or more
clients without having
to remove existing subscriptions. From the client's perspective it will
look like it no longer has
any subscriptions.

# Block metrics collection for a specific client instance
$ kafka-client-metrics.sh --bootstrap-server $BROKERS \
   --add \
   --name 'Disabe_b69cc35a' \  # A descriptive name makes it easier to
clean up old subscriptions.
   --match client_instance_id=b69cc35a-7a54-4790-aa69-cc2bd4ee4538 \  #
Match this specific client instance
   --block




> 28. New broker side metrics: Could we spell out the details of the metrics
> (e.g., group, tags, etc)?
>

KIP has been updated accordingly (thanks Sarat).



>
> 29. Client instance-level metrics: client.io.wait.time is a gauge not a
> histogram.
>

I believe a population/distribution should preferably be represented as a
histogram, space permitting,
and only secondarily as a Gauge average.
While we might not want to maintain a bunch of histograms for each

Re: [DISCUSS] KIP-820: Extend KStream process with new Processor API

2022-03-07 Thread Guozhang Wang
Thanks John! I feel a bit ashamed of just thinking loud here without trying
out prototypes myself :P

I think the FixedKeyProcessor/Record looks very good -- like you said,
since we are making a new set of APIs then why don't we reconsider more
bolderly -- but at the same time I'd also like to make sure we agree on how
much "safety" we can achieve in runtime: even with the proposed APIs, we
cannot prevent users doing something like:

---
process(FixedKeyRecord inputRecord) {
inputRecord.key().modifyField(...); // this is not preventable with
runtime key validations either since we just check the key object itself is
not replaced
context.forward(inputRecord);
}

---

I.e. in either type-safety or runtime validation, we cannot be 100% safe
that users would not do anything wrong. This drives me to think, how much
we'd like to pay to "remind" (instead of say "enforce", since we cannot
really do it) users the semantics of "processValue". Personally I felt that
adding the new set of APIs for that purpose only is a bit overkill, and
hence was leaning towards just the runtime validation. But I admit this is
purely subjective so I'm willing to yield to the group if others feel it's
worthy to do so.


Guozhang



On Mon, Mar 7, 2022 at 10:32 AM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Thanks, John!
> This looks very promising.
>
> I will familiarize this approach and update the KIP accordingly. From what
> I can see so far, this should cover most of the open issues in this
> proposal.
>
> PS.
>
> > Just as a reminder, the current approach with transformers
> > is NOT enforced at compile time. Transformers have access to
> > a "forwarding disabled" processor context, which still has
> > the forward methods that throw a runtime exception when
> > invoked.
>
> Agree. I was referring to the value transformers where `readOnlyKey` is
> passed but not forwarded internally. Though about the "forwarding disabled"
> approach, you're totally right that is a runtime validation.
> Regardless, the approach proposed here will be a much better one.
>
>
> On Sun, 6 Mar 2022 at 18:59, John Roesler  wrote:
>
> > Hello all,
> >
> > It seems like we're making good progress on this discussion.
> > If I'm keeping track correctly, if we can resolve this
> > question about how to handle processValues(), then we should
> > be able to finalize the vote, right?
> >
> > I share Matthias's preference for having a type-safe API.
> >
> > Just as a reminder, the current approach with transformers
> > is NOT enforced at compile time. Transformers have access to
> > a "forwarding disabled" processor context, which still has
> > the forward methods that throw a runtime exception when
> > invoked.
> >
> > However, the spirit of the "new processor api" line of work
> > is to clean up a lot of the cruft around the original
> > processor API, so this is a good opportunity to introduce a
> > type-safe version if we can.
> >
> > Based on my experience adding the new processor API, I felt
> > like it should be possible to do what he suggests, but it
> > would be more involved than what he said. The biggest thing
> > I learned from that effort, though, is that you really have
> > to just try it to see what all the complications are.
> >
> > With that in mind, I went ahead and implemented the
> > suggestion: https://github.com/apache/kafka/pull/11854
> >
> > This is a functional prototype. It only adds processValues,
> > which takes a supplier of a new type, FixedKeyProcessor.
> > That processor only processes FixedKeyRecords, which have a
> > key that cannot be changed. FixedKeyProcessors have a
> > special context, a FixedKeyProcessorContext, which can only
> > forward FixedKeyRecords.
> >
> > FixedKeyRecords have "fixed keys" because its key can only
> > be set in the constructor, and its constructor is package-
> > private.
> >
> > As you can see, this new record/processor/context ecosystem
> > is an independent peer of the general one. This is necessary
> > to ensure the desired compiler check. For example, if
> > FixedKeyRecord were merely an interface implemented by
> > Record, then users could create a new Record with a new key
> > and forward it as a FixedKeyRecord, violating the
> > constraint.
> >
> > As I said, with this proposal, the devil is in the details,
> > so if anyone thinks the API can be simplified, I suggest you
> > check out the branch and try out your proposal. I'd be very
> > happy to have a simplier solution, but I'm also pretty sure
> > this complexity is necessary.
> >
> > Taking a step back, I do think this approach results in a
> > better API, even though the change is a little complicated.
> >
> > Thanks,
> > -John
> >
> > On Sun, 2022-03-06 at 10:51 +, Jorge Esteban Quilcate
> > Otoya wrote:
> > > Matthias, thanks for your feedback.
> > >
> > > I can see the following alternatives to deal with `processValues()`:
> > >
> > > 1. Runtime key validation (current 

Re: [DISCUSS] KIP-820: Extend KStream process with new Processor API

2022-03-07 Thread Matthias J. Sax

Thanks John! This is great!

I guess I was aware the pulling the `ValueRecord` string might move more 
parts than I mentioned. Happy to see you being supportive of the idea.



-Matthias

On 3/7/22 10:31 AM, Jorge Esteban Quilcate Otoya wrote:

Thanks, John!
This looks very promising.

I will familiarize this approach and update the KIP accordingly. From what
I can see so far, this should cover most of the open issues in this
proposal.

PS.


Just as a reminder, the current approach with transformers
is NOT enforced at compile time. Transformers have access to
a "forwarding disabled" processor context, which still has
the forward methods that throw a runtime exception when
invoked.


Agree. I was referring to the value transformers where `readOnlyKey` is
passed but not forwarded internally. Though about the "forwarding disabled"
approach, you're totally right that is a runtime validation.
Regardless, the approach proposed here will be a much better one.


On Sun, 6 Mar 2022 at 18:59, John Roesler  wrote:


Hello all,

It seems like we're making good progress on this discussion.
If I'm keeping track correctly, if we can resolve this
question about how to handle processValues(), then we should
be able to finalize the vote, right?

I share Matthias's preference for having a type-safe API.

Just as a reminder, the current approach with transformers
is NOT enforced at compile time. Transformers have access to
a "forwarding disabled" processor context, which still has
the forward methods that throw a runtime exception when
invoked.

However, the spirit of the "new processor api" line of work
is to clean up a lot of the cruft around the original
processor API, so this is a good opportunity to introduce a
type-safe version if we can.

Based on my experience adding the new processor API, I felt
like it should be possible to do what he suggests, but it
would be more involved than what he said. The biggest thing
I learned from that effort, though, is that you really have
to just try it to see what all the complications are.

With that in mind, I went ahead and implemented the
suggestion: https://github.com/apache/kafka/pull/11854

This is a functional prototype. It only adds processValues,
which takes a supplier of a new type, FixedKeyProcessor.
That processor only processes FixedKeyRecords, which have a
key that cannot be changed. FixedKeyProcessors have a
special context, a FixedKeyProcessorContext, which can only
forward FixedKeyRecords.

FixedKeyRecords have "fixed keys" because its key can only
be set in the constructor, and its constructor is package-
private.

As you can see, this new record/processor/context ecosystem
is an independent peer of the general one. This is necessary
to ensure the desired compiler check. For example, if
FixedKeyRecord were merely an interface implemented by
Record, then users could create a new Record with a new key
and forward it as a FixedKeyRecord, violating the
constraint.

As I said, with this proposal, the devil is in the details,
so if anyone thinks the API can be simplified, I suggest you
check out the branch and try out your proposal. I'd be very
happy to have a simplier solution, but I'm also pretty sure
this complexity is necessary.

Taking a step back, I do think this approach results in a
better API, even though the change is a little complicated.

Thanks,
-John

On Sun, 2022-03-06 at 10:51 +, Jorge Esteban Quilcate
Otoya wrote:

Matthias, thanks for your feedback.

I can see the following alternatives to deal with `processValues()`:

1. Runtime key validation (current proposal)
2. Using Void type. Guozhang already points out some important
considerations about allocating `Record` twice.
3. Adding a new ValueRecord, proposed by Matthias. This one would carry
some of the problems of the second alternative as ValueRecord will have

to

be created from a Record. Also, either by having a public constructor or
creation from a Record, the key _can_ be changed without being captured

by

the Topology.
4. Reducing the KIP scope to `process` only, and removing/postponing
`processValues` for a later DSL redesign.

A couple of additional comments:

About the Record API:

IIUC, the issue with allocating new objects is coming from the current
design of the Record API.
If a user does record.withKey(...).withValue(...) is already leading to a
couple of instatiations.
My impression is that if the cost/value of immutability has been weighed
already, then maybe the considerations for alternative 2 can be

disregarded?

Either way, if the cost of recreation of objects is something we want to
minimize, then maybe adding a Builder to the record should help to reduce
the allocations.

About the key validation:

So far, the only way I can see to _really_ validate a key doesn't change

at

compile-time is by not exposing it at all — as we are doing it today with
Transform.
Otherwise, deal with it at runtime — as we have been dealing with

Transform

without the ability to forward.

Re: [DISCUSS] KIP-813 Shared State Stores

2022-03-07 Thread Matthias J. Sax

Thanks for updating the KIP. LGTM.

I think we can start a vote.



 I think this might provide issues if your processor is doing a projection of 
the data.


This is correct. It's a know issue: 
https://issues.apache.org/jira/browse/KAFKA-7663


Global-stores/KTables are designed to put the data into the store 
_unmodified_.



-Matthias

On 2/28/22 5:05 AM, Daan Gertis wrote:

Updated the KIP to be more aligned with global state store function names.

If I remember correctly during restore the processor will not be used right? I 
think this might provide issues if your processor is doing a projection of the 
data. Either way, I would not add that into this KIP since it is a specific 
use-case pattern.

Unless there is anything more to add or change, I would propose moving to a 
vote?

Cheers!
D.

From: Matthias J. Sax 
Date: Friday, 18 February 2022 at 03:29
To: dev@kafka.apache.org 
Subject: Re: [DISCUSS] KIP-813 Shared State Stores
Thanks for updating the KIP!

I am wondering if we would need two overloads of `addReadOnlyStateStore`
one w/ and one w/o `TimestampExtractor` argument to effectively make it
an "optional" parameter?

Also wondering if we need to pass in a `String sourceName` and `String
processorName` parameters (similar to `addGlobalStore()`?) instead if
re-using the store name as currently proposed? -- In general I don't
have a strong opinion either way, but it seems to introduce some API
inconsistency if we don't follow the `addGlobalStore()` pattern?



Another thing we were confronted with was the restoring of state when the 
actual local storage is gone. For example, we host on K8s with ephemeral pods, 
so there is no persisted storage between pod restarts. However, the consumer 
group will be already been at the latest offset, preventing from previous data 
to be restored within the new pod’s statestore.


We have already code in-place in the runtime to do the right thing for
this case (ie, via DSL source-table changelog optimization). We can
re-use this part. It's nothing we need to discuss on the KIP, but we can
discuss on the PR later.


-Matthias


On 2/17/22 10:09 AM, Guozhang Wang wrote:

Hi Daan,

I think for the read-only state stores you'd need ot slightly augment the
checkpointing logic so that it would still write the checkpointed offsets
while restoring from the changelogs.


Guozhang

On Thu, Feb 17, 2022 at 7:02 AM Daan Gertis 
wrote:


Could you add more details about the signature of
`addReadOnlyStateStore()` -- What parameters does it take? Are there any
overloads taking different parameters? The KIP only contains some verbal
description on the "Implementation Plan" section, that is hard to find
and hard to read.

The KIP mentions a `ProcessorProvider` -- do you mean

`ProcessorSupplier`?


About timestamp synchronization: why do you propose to disable timestamp
synchronization (similar to global state stores)? It seems to be an
unnecessary limitation? -- Given that we could re-use the new method for
source `KTables` (ie, `StreamsBuilder#table()` implemenation), having
timestamp synchronization enabled seems to be important?


Yup, will do these updates. I’ll overload the addReadOnlyStateStore to
have allow for timestamp synchronization.

Another thing we were confronted with was the restoring of state when the
actual local storage is gone. For example, we host on K8s with ephemeral
pods, so there is no persisted storage between pod restarts. However, the
consumer group will be already been at the latest offset, preventing from
previous data to be restored within the new pod’s statestore.

If I remember correctly, there was some checkpoint logic available when
restoring, but we are bypassing that since logging is disabled on the
statestore, no?

As always, thanks for your insights.

Cheers,
D.


From: Matthias J. Sax 
Date: Wednesday, 16 February 2022 at 02:09
To: dev@kafka.apache.org 
Subject: Re: [DISCUSS] KIP-813 Shared State Stores
Thanks for updating the KIP.

Could you add more details about the signature of
`addReadOnlyStateStore()` -- What parameters does it take? Are there any
overloads taking different parameters? The KIP only contains some verbal
description on the "Implementation Plan" section, that is hard to find
and hard to read.

The KIP mentions a `ProcessorProvider` -- do you mean `ProcessorSupplier`?

About timestamp synchronization: why do you propose to disable timestamp
synchronization (similar to global state stores)? It seems to be an
unnecessary limitation? -- Given that we could re-use the new method for
source `KTables` (ie, `StreamsBuilder#table()` implemenation), having
timestamp synchronization enabled seems to be important?


-Matthias


On 2/8/22 11:01 PM, Guozhang Wang wrote:

Daan,

Thanks for the replies, those make sense to me.

On Tue, Feb 8, 2022 at 7:24 AM Daan Gertis 

wrote:



I just updated the KIP to reflect the things discussed in this thread.

As for your questions Guozhang:


1) How do we handle if the num.partitions 

Re: [DISCUSS] KIP-820: Extend KStream process with new Processor API

2022-03-07 Thread Jorge Esteban Quilcate Otoya
Thanks, John!
This looks very promising.

I will familiarize this approach and update the KIP accordingly. From what
I can see so far, this should cover most of the open issues in this
proposal.

PS.

> Just as a reminder, the current approach with transformers
> is NOT enforced at compile time. Transformers have access to
> a "forwarding disabled" processor context, which still has
> the forward methods that throw a runtime exception when
> invoked.

Agree. I was referring to the value transformers where `readOnlyKey` is
passed but not forwarded internally. Though about the "forwarding disabled"
approach, you're totally right that is a runtime validation.
Regardless, the approach proposed here will be a much better one.


On Sun, 6 Mar 2022 at 18:59, John Roesler  wrote:

> Hello all,
>
> It seems like we're making good progress on this discussion.
> If I'm keeping track correctly, if we can resolve this
> question about how to handle processValues(), then we should
> be able to finalize the vote, right?
>
> I share Matthias's preference for having a type-safe API.
>
> Just as a reminder, the current approach with transformers
> is NOT enforced at compile time. Transformers have access to
> a "forwarding disabled" processor context, which still has
> the forward methods that throw a runtime exception when
> invoked.
>
> However, the spirit of the "new processor api" line of work
> is to clean up a lot of the cruft around the original
> processor API, so this is a good opportunity to introduce a
> type-safe version if we can.
>
> Based on my experience adding the new processor API, I felt
> like it should be possible to do what he suggests, but it
> would be more involved than what he said. The biggest thing
> I learned from that effort, though, is that you really have
> to just try it to see what all the complications are.
>
> With that in mind, I went ahead and implemented the
> suggestion: https://github.com/apache/kafka/pull/11854
>
> This is a functional prototype. It only adds processValues,
> which takes a supplier of a new type, FixedKeyProcessor.
> That processor only processes FixedKeyRecords, which have a
> key that cannot be changed. FixedKeyProcessors have a
> special context, a FixedKeyProcessorContext, which can only
> forward FixedKeyRecords.
>
> FixedKeyRecords have "fixed keys" because its key can only
> be set in the constructor, and its constructor is package-
> private.
>
> As you can see, this new record/processor/context ecosystem
> is an independent peer of the general one. This is necessary
> to ensure the desired compiler check. For example, if
> FixedKeyRecord were merely an interface implemented by
> Record, then users could create a new Record with a new key
> and forward it as a FixedKeyRecord, violating the
> constraint.
>
> As I said, with this proposal, the devil is in the details,
> so if anyone thinks the API can be simplified, I suggest you
> check out the branch and try out your proposal. I'd be very
> happy to have a simplier solution, but I'm also pretty sure
> this complexity is necessary.
>
> Taking a step back, I do think this approach results in a
> better API, even though the change is a little complicated.
>
> Thanks,
> -John
>
> On Sun, 2022-03-06 at 10:51 +, Jorge Esteban Quilcate
> Otoya wrote:
> > Matthias, thanks for your feedback.
> >
> > I can see the following alternatives to deal with `processValues()`:
> >
> > 1. Runtime key validation (current proposal)
> > 2. Using Void type. Guozhang already points out some important
> > considerations about allocating `Record` twice.
> > 3. Adding a new ValueRecord, proposed by Matthias. This one would carry
> > some of the problems of the second alternative as ValueRecord will have
> to
> > be created from a Record. Also, either by having a public constructor or
> > creation from a Record, the key _can_ be changed without being captured
> by
> > the Topology.
> > 4. Reducing the KIP scope to `process` only, and removing/postponing
> > `processValues` for a later DSL redesign.
> >
> > A couple of additional comments:
> >
> > About the Record API:
> >
> > IIUC, the issue with allocating new objects is coming from the current
> > design of the Record API.
> > If a user does record.withKey(...).withValue(...) is already leading to a
> > couple of instatiations.
> > My impression is that if the cost/value of immutability has been weighed
> > already, then maybe the considerations for alternative 2 can be
> disregarded?
> > Either way, if the cost of recreation of objects is something we want to
> > minimize, then maybe adding a Builder to the record should help to reduce
> > the allocations.
> >
> > About the key validation:
> >
> > So far, the only way I can see to _really_ validate a key doesn't change
> at
> > compile-time is by not exposing it at all — as we are doing it today with
> > Transform.
> > Otherwise, deal with it at runtime — as we have been dealing with
> Transform
> > without the ability to 

Re: [VOTE] 3.0.1 RC0

2022-03-07 Thread Mickael Maison
Here is a successful Jenkins build for the 3.0 branch:
https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.0/183/

On Mon, Mar 7, 2022 at 12:27 AM Jakub Scholz  wrote:
>
> +1 (non-binding). I used the staged Scala 2.13 binaries and the staging
> Maven repository to run my tests. All seems to work fine, no issues found.
>
> Thanks
> Jakub
>
> On Thu, Mar 3, 2022 at 7:05 PM Mickael Maison  wrote:
>
> > Hello Kafka users, developers and client-developers,
> >
> > This is the first candidate for release of Apache Kafka 3.0.1.
> >
> > Apache Kafka 3.0.1 is a bugfix release and 29 issues have been fixed
> > since 3.0.0.
> >
> > Release notes for the 3.0.1 release:
> > https://home.apache.org/~mimaison/kafka-3.0.1-rc0/RELEASE_NOTES.html
> >
> > *** Please download, test and vote by Thursday, March 10, 6pm GMT ***
> >
> > Kafka's KEYS file containing PGP keys we use to sign the release:
> > https://kafka.apache.org/KEYS
> >
> > * Release artifacts to be voted upon (source and binary):
> > https://home.apache.org/~mimaison/kafka-3.0.1-rc0/
> >
> > * Maven artifacts to be voted upon:
> > https://repository.apache.org/content/groups/staging/org/apache/kafka/
> >
> > * Javadoc:
> > https://home.apache.org/~mimaison/kafka-3.0.1-rc0/javadoc/
> >
> > * Tag to be voted upon (off 3.0 branch) is the 3.0.1 tag:
> > https://github.com/apache/kafka/releases/tag/3.0.1-rc0
> >
> > * Documentation:
> > https://kafka.apache.org/30/documentation.html
> >
> > * Protocol:
> > https://kafka.apache.org/30/protocol.html
> >
> > * Successful Jenkins builds for the 3.0 branch:
> > I'll share a link once the build complete
> >
> > /**
> >
> > Thanks,
> > Mickael
> >


Jenkins build is back to stable : Kafka » Kafka Branch Builder » 3.0 #183

2022-03-07 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: Kafka » Kafka Branch Builder » 3.0 #182

2022-03-07 Thread Apache Jenkins Server
See 




Re: [DISCUSS] Apache Kafka 3.2.0 release

2022-03-07 Thread Bruno Cadonna

Hi Kafkateers,

Last week we reached KIP freeze for the next major release 3.2.0 of 
Apache Kafka.


I have updated the release plan for AK 3.2.0 with all the KIPs that 
passed the vote last week.


Please, verify the plan and let me know if any KIP should be added
to or removed from the release plan.

For the KIPs which are still in progress, please work closely with your
reviewers to make sure that they land on time for the feature freeze.

The next milestone for the AK 3.2.0 release is feature freeze on March 
16th 2022.


Best,
Bruno

On 01.03.22 17:41, Bruno Cadonna wrote:

Hi all,

A quick reminder that KIP freeze for the Apache 3.2.0 is tomorrow. 
Please make sure to close your votes if you want to add a KIP to the 
release plan.


Best,
Bruno

On 15.02.22 12:37, Bruno Cadonna wrote:

Hi all,

I published a release plan for the Apache Kafka 3.2.0 release here:
https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+3.2.0

KIP Freeze: 2 March 2022
Feature Freeze: 16 March 2022
Code Freeze:    30 March 2022

At least two weeks of stabilization will follow Code Freeze.

Please let me know if should add or remove KIPs from the plan or if 
you have any other objections.


Best,
Bruno


On 04.02.22 16:03, Bruno Cadonna wrote:

Hi,

I'd like to volunteer to be the release manager for our next
feature release, 3.2.0. If there are no objections, I'll send
out the release plan soon.

Best,
Bruno


Re: [DISCUSS] KIP-824 Allowing dumping segmentlogs limiting the batches in the output

2022-03-07 Thread Sergio Daniel Troiano
hey Luke,

I am interested in expanding the KIP scope but I am a bit concerned this
could create a lot of noise and confusion as they look like very similar
parameters, I agree this is a small change, so I think if I do it properly
it should not be a problem at all, I just will need a couple more of days
as I want to create the proper tests as well.

I have a doubt about editing the KIP, I mean should I add this as a new
feature as well?, should I describe this as a side effect finding? I don't
want this to minimize the main feature I want to deploy as I think the
message size limit is not as important as the limiting the amount of
batches.

It is up to you, if you guys consider we must add this in this KIP then I
will be happy to do it. 

Best regards.
Sergio Troiano

On Mon, 7 Mar 2022 at 02:01, Luke Chen  wrote:

> Hi Sergio,
>
> Thanks for your explanation.
> Make sense to me.
>
> > Only interesting thing that I have just found is *max-message-size *is
> not
> used while dump logs are requested, instead it is used by dumpIndex
>
> Are you interested in expanding the scope of this KIP to include the
> *max-message-size* in dumping logs?
> I think it's good because it will also be a small change, and no need to go
> through another KIP discussing/voting process. But I'm fine if you want to
> keep this KIP as is, and create another JIRA ticket for future work.
>
> Thank you.
> Luke
>
> On Mon, Mar 7, 2022 at 6:02 AM Sergio Daniel Troiano
>  wrote:
>
> > hey Luke,
> >
> > Let me answer them:
> > 1. If the *max-batches-size* is too small that results in no records
> > output, will we output any information to the user?
> >
> > If the  *max-batches-size*is even smaller than the first batch then there
> > won't be any output, this is handled by FileRecords class, I think this
> is
> > correct as this is the expected behaviour.
> >
> > 2. After your explanation, I guess the use of *max-batches-size* won't
> > conflict with *max-message-size*, right?
> >
> > Only interesting thing that I have just found is *max-message-size *is
> not
> > used while dump logs are requested, instead it is used by dumpIndex
> > <
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/DumpLogSegments.scala#L150
> > >
> > so,
> > this feature is not working for dumping logs, even though I checked if
> > there is a unit test for this and there is not any. Maybe we could
> create a
> > ticket for this?
> >
> > Regards.
> >
> >
> > On Sat, 5 Mar 2022 at 10:36, Luke Chen  wrote:
> >
> > > Hi Sergio,
> > >
> > > Thanks for the explanation! Very clear!
> > > I think we should put this example and explanation into KIP.
> > >
> > > Other comments:
> > > 1. If the *max-batches-size* is too small that results in no records
> > > output, will we output any information to the user?
> > > 2. After your explanation, I guess the use of *max-batches-size* won't
> > > conflict with *max-message-size*, right?
> > > That is, user can set the 2 arguments at the same time. Is that
> correct?
> > >
> > > Thank you.
> > > Luke
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Sat, Mar 5, 2022 at 4:47 PM Sergio Daniel Troiano
> > >  wrote:
> > >
> > > > hey Luke,
> > > >
> > > > thanks for the interest, it is a good question, please let me explain
> > > you:
> > > >
> > > > *max-message-size *a filter for the size of each batch, so for
> example
> > if
> > > > Iset --max-message-size 1000 bytes and my segment log has 300
> batches,
> > > 150
> > > > of them has a size of 500 bytes  and the other 150 has a size of 2000
> > > bytes
> > > > then the script will skip the las 150 ones as each batch is heavier
> > than
> > > > the limit.
> > > >
> > > > In the other hand following the same example above with
> > *max-batches-size
> > > > *set
> > > > to 1000 bytes it will only print out the first 2 batches (500 bytes
> > each)
> > > > and stop, This will avoid reading the whole file
> > > >
> > > >
> > > > Also if all of them are smaller than 1000 bytes it will end up
> printing
> > > out
> > > > all the batches.
> > > > The idea of my change is to limit the *amount* of batches no matter
> > their
> > > > size.
> > > >
> > > > I hope this reply helps.
> > > > Best regards.
> > > >
> > > > On Sat, 5 Mar 2022 at 08:00, Luke Chen  wrote:
> > > >
> > > > > Hi Sergio,
> > > > >
> > > > > Thanks for the KIP!
> > > > >
> > > > > One question:
> > > > > I saw there's a `max-message-size` argument that seems to do the
> same
> > > > thing
> > > > > as you want.
> > > > > Could you help explain what's the difference between
> > `max-message-size`
> > > > and
> > > > > `max-batches-size`?
> > > > >
> > > > > Thank you.
> > > > > Luke
> > > > >
> > > > > On Sat, Mar 5, 2022 at 3:21 AM Kirk True 
> > > wrote:
> > > > >
> > > > > > Hi Sergio,
> > > > > >
> > > > > > Thanks for the KIP. I don't know anything about the log segment
> > > > > internals,
> > > > > > but the logic and implementation seem sound.
> > > > > >
> > > > > > Three questions:
> > 

[jira] [Resolved] (KAFKA-10759) ARM support for Kafka

2022-03-07 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-10759.

Fix Version/s: 3.0.0
   Resolution: Fixed

> ARM support for Kafka
> -
>
> Key: KAFKA-10759
> URL: https://issues.apache.org/jira/browse/KAFKA-10759
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: PengLei
>Priority: Major
> Fix For: 3.0.0
>
> Attachments: build_output.log, run_test_output.log
>
>
> ARM support for Kafka.
> I tried to deploy the Kafka cluster on the ARM server, but unfortunately I 
> did not find the official ARM  release for Kafka. I think more and more 
> people will try the same thing as I do.
> Now the CI of kafka (in github) is handled by jenkins-ci. While the test is 
> running under x86 ARCH, the arm ARCH is missing. This leads an problem that 
> we don't have a way to test every pull request that if it'll break the kafka 
> deployment on arm or not. Similarly, we cannot provide the ARM release 
> package without the ARM CI.
> If Apache Kafka community has interested with it, I can help for the 
> integration.
> This is the umbrella issue to track the efforts to make Kafka run on ARM 
> processors.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13671) Power (ppc64le) support for kafka

2022-03-07 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-13671.

Fix Version/s: 3.2.0
   Resolution: Fixed

> Power (ppc64le) support for kafka
> -
>
> Key: KAFKA-13671
> URL: https://issues.apache.org/jira/browse/KAFKA-13671
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Abhijit
>Assignee: Mickael Maison
>Priority: Major
> Fix For: 3.2.0
>
> Attachments: kafka_ConsumerBounceTest_IT.txt, kafka_IT.txt, 
> kafka_UnitTest.txt
>
>
> Support for Power architecture (ppc64le) for apache kafka.
> What is IBM Power architecture?
> It is a RISC architecture and IBM has recently made its ISA (Instruction Set 
> Architecture) opensource and in doing so, they have significantly contributed 
> back to the opensource community at large. Many of the pioneers of banking 
> and HPC industries today run on ppc64le architecture.
> As an ongoing effort to enable open-source projects where Power architecture 
> can add value, we are trying to enable kafka on Power.
> IBM has already donated a ppc64le ubuntu VM to the community to be added to 
> the jenkins cluster for Apache Kafka builds. The VM spec has been reviewed in 
> the community and deemed suitable for kafka builds. 
> Jira: https://issues.apache.org/jira/browse/INFRA-22612
> I can help with the work needed to add new jenkins job for Power.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)