Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-12 Thread Sophie Blee-Goldman
Regarding the naming, I personally think `clientInstanceId` makes sense for
the plain clients
 -- especially if we might later introduce the notion of an
`applicationInstanceId`.

I'm not a huge fan of `clientsInstanceIds` for the Kafka Streams API,
though, can we use
`clientInstanceIds` instead? (The difference being the placement of the
plural 's')
I would similarly rename the class to just ClientInstanceIds

we can also not have a timeout-less overload,  because `KafkaStreams` does
> not have a `default.api.timeout.ms` config either

With respect to the timeout for the Kafka Streams API, I'm a bit confused
by the
doubletriple-negative of Matthias' comment here, but I was thinking about
this
earlier and this was my take: with the current proposal, we would allow
users to pass
in an absolute timeout as a parameter that would apply to the method as a
whole.
Meanwhile within the method we would issue separate calls to each of the
clients using
the default or user-configured value of their  `default.api.timeout.ms` as
the timeout
parameter.

So the API as proposed makes sense to me.


On Wed, Oct 11, 2023 at 6:48 PM Matthias J. Sax  wrote:

> In can answer 130 and 131.
>
> 130) We cannot guarantee that all clients are already initialized due to
> race conditions. We plan to not allow calling
> `KafkaStreams#clientsInstanceIds()` when the state is not RUNNING (or
> REBALANCING) though -- guess this slipped on the KIP and should be
> added? But because StreamThreads can be added dynamically (and producer
> might be created dynamically at runtime; cf below), we still cannot
> guarantee that all clients are already initialized when the method is
> called. Of course, we assume that all clients are most likely initialize
> on the happy path, and blocking calls to `client.clientInstanceId()`
> should be rare.
>
> To address the worst case, we won't do a naive implementation and just
> loop over all clients, but fan-out the call to the different
> StreamThreads (and GlobalStreamThread if it exists), and use Futures to
> gather the results.
>
> Currently, `StreamThreads` has 3 clients (if ALOS or EOSv2 is used), so
> we might do 3 blocking calls in the worst case (for EOSv1 we get a
> producer per tasks, and we might end up doing more blocking calls if the
> producers are not initialized yet). Note that EOSv1 is already
> deprecated, and we are also working on thread refactoring that will
> reduce the number of client on StreamThread to 2 -- and we have more
> refactoring planned to reduce the number of clients even further.
>
> Inside `KafakStreams#clientsInstanceIds()` we might only do single
> blocking call for the admin client (ie, `admin.clientInstanceId()`).
>
> I agree that we need to do some clever timeout management, but it seems
> to be more of an implementation detail?
>
> Do you have any particular concerns, or does the proposed implementation
> as sketched above address your question?
>
>
> 130) If the Topology does not have a global-state-store, there won't be
> a GlobalThread and thus not global consumer. Thus, we return an Optional.
>
>
>
> On three related question for Andrew.
>
> (1) Why is the method called `clientInstanceId()` and not just plain
> `instanceId()`?
>
> (2) Why so we return a `String` while but not a UUID type? The added
> protocol request/response classes use UUIDs.
>
> (3) Would it make sense to have an overloaded `clientInstanceId()`
> method that does not take any parameter but uses `default.api.timeout`
> config (this config does no exist on the producer though, so we could
> only have it for consumer and admin at this point). We could of course
> also add overloads like this later if user request them (and/or add
> `default.api.timeout.ms` to the producer, too).
>
> Btw: For KafkaStreams, I think `clientsInstanceIds` still makes sense as
> a method name though, as `KafkaStreams` itself does not have an
> `instanceId` -- we can also not have a timeout-less overload, because
> `KafkaStreams` does not have a `default.api.timeout.ms` config either
> (and I don't think it make sense to add).
>
>
>
> -Matthias
>
> On 10/11/23 5:07 PM, Jun Rao wrote:
> > Hi, Andrew,
> >
> > Thanks for the updated KIP. Just a few more minor comments.
> >
> > 130. KafkaStreams.clientsInstanceId(Duration timeout): Does it wait for
> all
> > consumer/producer/adminClient instances to be initialized? Are all those
> > instances created during KafkaStreams initialization?
> >
> > 131. Why does globalConsumerInstanceId() return Optional while
> > other consumer instances don't return Optional?
> >
> > 132. ClientMetricsSubscriptionRequestCount: Do we need this since we
> have a
> > set of generic metrics
> > (kafka.network:type=RequestMetrics,name=RequestsPerSec,request=*) that
> > report Request rate for every request type?
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Oct 11, 2023 at 1:47 PM Matthias J. Sax 
> wrote:
> >
> >> Thanks!
> >>
> >> On 10/10/23 11:31 PM, Andrew Schofield wrote:
> >>> Matthias,
> >

Build failed in Jenkins: Kafka » Kafka Branch Builder » 3.5 #78

2023-10-12 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 284693 lines...]
> Task :connect:json:testJar
> Task :raft:compileTestJava UP-TO-DATE
> Task :raft:testClasses UP-TO-DATE
> Task :group-coordinator:compileTestJava UP-TO-DATE
> Task :group-coordinator:testClasses UP-TO-DATE
> Task :connect:json:testSrcJar
> Task :metadata:compileTestJava UP-TO-DATE
> Task :metadata:testClasses UP-TO-DATE
> Task :clients:generateMetadataFileForMavenJavaPublication

> Task :connect:api:javadoc
/home/jenkins/workspace/Kafka_kafka_3.5/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java:44:
 warning - Tag @link: reference not found: org.apache.kafka.connect.data
1 warning

> Task :connect:api:copyDependantLibs UP-TO-DATE
> Task :connect:api:jar UP-TO-DATE
> Task :connect:api:generateMetadataFileForMavenJavaPublication
> Task :connect:json:copyDependantLibs UP-TO-DATE
> Task :connect:json:jar UP-TO-DATE
> Task :connect:json:generateMetadataFileForMavenJavaPublication
> Task :connect:api:javadocJar
> Task :connect:api:compileTestJava UP-TO-DATE
> Task :connect:api:testClasses UP-TO-DATE
> Task :connect:json:publishMavenJavaPublicationToMavenLocal
> Task :connect:json:publishToMavenLocal
> Task :connect:api:testJar
> Task :connect:api:testSrcJar
> Task :connect:api:publishMavenJavaPublicationToMavenLocal
> Task :connect:api:publishToMavenLocal
> Task :streams:javadoc
> Task :streams:javadocJar

> Task :clients:javadoc
/home/jenkins/workspace/Kafka_kafka_3.5/clients/src/main/java/org/apache/kafka/clients/admin/ScramMechanism.java:32:
 warning - Tag @see: missing final '>': "https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API";>KIP-554:
 Add Broker-side SCRAM Config API

 This code is duplicated in 
org.apache.kafka.common.security.scram.internals.ScramMechanism.
 The type field in both files must match and must not change. The type field
 is used both for passing ScramCredentialUpsertion and for the internal
 UserScramCredentialRecord. Do not change the type field."
/home/jenkins/workspace/Kafka_kafka_3.5/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/package-info.java:21:
 warning - Tag @link: reference not found: 
org.apache.kafka.common.security.oauthbearer
2 warnings

> Task :clients:javadocJar
> Task :clients:srcJar
> Task :clients:testJar
> Task :clients:testSrcJar
> Task :clients:publishMavenJavaPublicationToMavenLocal
> Task :clients:publishToMavenLocal
> Task :core:compileScala
> Task :core:classes
> Task :core:compileTestJava NO-SOURCE
> Task :core:compileTestScala
> Task :core:testClasses
> Task :streams:compileTestJava UP-TO-DATE
> Task :streams:testClasses UP-TO-DATE
> Task :streams:testJar
> Task :streams:testSrcJar
> Task :streams:publishMavenJavaPublicationToMavenLocal
> Task :streams:publishToMavenLocal

Deprecated Gradle features were used in this build, making it incompatible with 
Gradle 9.0.

You can use '--warning-mode all' to show the individual deprecation warnings 
and determine if they come from your own scripts or plugins.

See 
https://docs.gradle.org/8.0.2/userguide/command_line_interface.html#sec:command_line_warnings

BUILD SUCCESSFUL in 2m 57s
89 actionable tasks: 33 executed, 56 up-to-date
[Pipeline] sh
+ grep ^version= gradle.properties
+ cut -d= -f 2
[Pipeline] dir
Running in /home/jenkins/workspace/Kafka_kafka_3.5/streams/quickstart
[Pipeline] {
[Pipeline] sh
+ mvn clean install -Dgpg.skip
[INFO] Scanning for projects...
[INFO] 
[INFO] Reactor Build Order:
[INFO] 
[INFO] Kafka Streams :: Quickstart[pom]
[INFO] streams-quickstart-java[maven-archetype]
[INFO] 
[INFO] < org.apache.kafka:streams-quickstart >-
[INFO] Building Kafka Streams :: Quickstart 3.5.2-SNAPSHOT[1/2]
[INFO]   from pom.xml
[INFO] [ pom ]-
[INFO] 
[INFO] --- clean:3.0.0:clean (default-clean) @ streams-quickstart ---
[INFO] 
[INFO] --- remote-resources:1.5:process (process-resource-bundles) @ 
streams-quickstart ---
[INFO] 
[INFO] --- site:3.5.1:attach-descriptor (attach-descriptor) @ 
streams-quickstart ---
[INFO] 
[INFO] --- gpg:1.6:sign (sign-artifacts) @ streams-quickstart ---
[INFO] 
[INFO] --- install:2.5.2:install (default-install) @ streams-quickstart ---
[INFO] Installing 
/home/jenkins/workspace/Kafka_kafka_3.5/streams/quickstart/pom.xml to 
/home/jenkins/.m2/repository/org/apache/kafka/streams-quickstart/3.5.2-SNAPSHOT/streams-quickstart-3.5.2-SNAPSHOT.pom
[INFO] 
[INFO] --< org.apache.kafka:streams-quickstart-java >--
[INFO] Building streams-quickstart-java 3.5.2-SNAPSHOT[2/2]
[INFO]   from java/pom.xml
[INFO] ---

Re: [VOTE] KIP-960: Support single-key_single-timestamp interactive queries (IQv2) for versioned state stores

2023-10-12 Thread Lucas Brutschy
+1 (binding)

Thanks for the KIP!

Cheers,
Lucas

On Wed, Oct 11, 2023 at 7:55 PM Walker Carlson
 wrote:
>
> +1 (binding)
>
> Thanks for the kip Alieh!
>
> Walker
>
> On Wed, Oct 11, 2023 at 3:52 AM Bruno Cadonna  wrote:
>
> > Thanks for the KIP, Alieh!
> >
> > +1 (binding)
> >
> > Best,
> > Bruno
> >
> > On 10/10/23 1:14 AM, Matthias J. Sax wrote:
> > > One more nit: as discussed on the related KIP-698 thread, we should not
> > > use `get` as prefix for the getters.
> > >
> > > So it should be `K key()` and `Optional asOfTimestamp()`.
> > >
> > >
> > > Otherwise the KIP LGTM.
> > >
> > >
> > > +1 (binding)
> > >
> > >
> > > -Matthias
> > >
> > > On 10/6/23 2:50 AM, Alieh Saeedi wrote:
> > >> Hi everyone,
> > >>
> > >> Since KIP-960 is reduced to the simplest IQ type and all further
> > comments
> > >> are related to the following-up KIPs, I decided to finalize it at this
> > >> point.
> > >>
> > >>
> > >> A huge thank you to everyone who has reviewed this KIP (and also the
> > >> following-up ones), and
> > >> participated in the discussion thread!
> > >>
> > >> I'd also like to thank you in advance for taking the time to vote.
> > >>
> > >> Best,
> > >> Alieh
> > >>
> >


[DISCUSS] 3.5.2 Release

2023-10-12 Thread Levani Kokhreidze
Hello,

KAFKA-15571 [1] was merged and backported to the 3.5 and 3.6 branches. Bug 
fixes the feature that was added in 3.5. Considering the feature doesn't work 
as expected without a fix, I would like to know if it's reasonable to start the 
3.5.2 release. Of course, releasing such a massive project like Kafka is not a 
trivial task, and I am looking for the community's input on this if it's 
reasonable to start the 3.5.2 release process.

Best,
Levani

[1] - https://issues.apache.org/jira/browse/KAFKA-15571

Re: [DISCUSS] 3.5.2 Release

2023-10-12 Thread Divij Vaidya
Hello Levani

>From a process perspective, there is no fixed schedule for bug fix
releases. If we have a volunteer for release manager (must be a committer),
they can start with the process of bug fix release (with the approval of
PMC).

My personal opinion is that it's too early to start 3.6.1 and we should
wait at least 1 months to hear feedback on 3.6.0. We need to make a careful
balance between getting the critical fixes in the hands of users as soon
as possible vs. spending community effort towards releases (the effort that
could be used to make Kafka better, feature-wise & operational
stability-wise, otherwise).

For 3.5.2, I think there are sufficient pending (including some CVE fixes)
to start a bug fix release. We just need a volunteer for the release
manager.

--
Divij Vaidya



On Thu, Oct 12, 2023 at 9:57 AM Levani Kokhreidze 
wrote:

> Hello,
>
> KAFKA-15571 [1] was merged and backported to the 3.5 and 3.6 branches. Bug
> fixes the feature that was added in 3.5. Considering the feature doesn't
> work as expected without a fix, I would like to know if it's reasonable to
> start the 3.5.2 release. Of course, releasing such a massive project like
> Kafka is not a trivial task, and I am looking for the community's input on
> this if it's reasonable to start the 3.5.2 release process.
>
> Best,
> Levani
>
> [1] - https://issues.apache.org/jira/browse/KAFKA-15571


Re: [DISCUSS] 3.5.2 Release

2023-10-12 Thread Luke Chen
Hi Levani and Divij,

I can work on the 3.5.2 release.
I'll start a new thread for volunteering it maybe next week.

Thanks.
Luke

On Thu, Oct 12, 2023 at 5:07 PM Divij Vaidya 
wrote:

> Hello Levani
>
> From a process perspective, there is no fixed schedule for bug fix
> releases. If we have a volunteer for release manager (must be a committer),
> they can start with the process of bug fix release (with the approval of
> PMC).
>
> My personal opinion is that it's too early to start 3.6.1 and we should
> wait at least 1 months to hear feedback on 3.6.0. We need to make a careful
> balance between getting the critical fixes in the hands of users as soon
> as possible vs. spending community effort towards releases (the effort that
> could be used to make Kafka better, feature-wise & operational
> stability-wise, otherwise).
>
> For 3.5.2, I think there are sufficient pending (including some CVE fixes)
> to start a bug fix release. We just need a volunteer for the release
> manager.
>
> --
> Divij Vaidya
>
>
>
> On Thu, Oct 12, 2023 at 9:57 AM Levani Kokhreidze 
> wrote:
>
> > Hello,
> >
> > KAFKA-15571 [1] was merged and backported to the 3.5 and 3.6 branches.
> Bug
> > fixes the feature that was added in 3.5. Considering the feature doesn't
> > work as expected without a fix, I would like to know if it's reasonable
> to
> > start the 3.5.2 release. Of course, releasing such a massive project like
> > Kafka is not a trivial task, and I am looking for the community's input
> on
> > this if it's reasonable to start the 3.5.2 release process.
> >
> > Best,
> > Levani
> >
> > [1] - https://issues.apache.org/jira/browse/KAFKA-15571
>


Re: [DISCUSS] 3.5.2 Release

2023-10-12 Thread Levani Kokhreidze
Hi Divij,

Thanks for the explanation, makes sense.

Hi Luke, thanks you! It would be awesome to see 3.5.2 out.

Best,
Levani

> On 12. Oct 2023, at 12:39, Luke Chen  wrote:
> 
> Hi Levani and Divij,
> 
> I can work on the 3.5.2 release.
> I'll start a new thread for volunteering it maybe next week.
> 
> Thanks.
> Luke
> 
> On Thu, Oct 12, 2023 at 5:07 PM Divij Vaidya 
> wrote:
> 
>> Hello Levani
>> 
>> From a process perspective, there is no fixed schedule for bug fix
>> releases. If we have a volunteer for release manager (must be a committer),
>> they can start with the process of bug fix release (with the approval of
>> PMC).
>> 
>> My personal opinion is that it's too early to start 3.6.1 and we should
>> wait at least 1 months to hear feedback on 3.6.0. We need to make a careful
>> balance between getting the critical fixes in the hands of users as soon
>> as possible vs. spending community effort towards releases (the effort that
>> could be used to make Kafka better, feature-wise & operational
>> stability-wise, otherwise).
>> 
>> For 3.5.2, I think there are sufficient pending (including some CVE fixes)
>> to start a bug fix release. We just need a volunteer for the release
>> manager.
>> 
>> --
>> Divij Vaidya
>> 
>> 
>> 
>> On Thu, Oct 12, 2023 at 9:57 AM Levani Kokhreidze 
>> wrote:
>> 
>>> Hello,
>>> 
>>> KAFKA-15571 [1] was merged and backported to the 3.5 and 3.6 branches.
>> Bug
>>> fixes the feature that was added in 3.5. Considering the feature doesn't
>>> work as expected without a fix, I would like to know if it's reasonable
>> to
>>> start the 3.5.2 release. Of course, releasing such a massive project like
>>> Kafka is not a trivial task, and I am looking for the community's input
>> on
>>> this if it's reasonable to start the 3.5.2 release process.
>>> 
>>> Best,
>>> Levani
>>> 
>>> [1] - https://issues.apache.org/jira/browse/KAFKA-15571
>> 



Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #2278

2023-10-12 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 316180 lines...]

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultStateUpdaterTest > 
shouldRestoreActiveStatefulTasksAndUpdateStandbyTasks() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultStateUpdaterTest > shouldPauseStandbyTask() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultStateUpdaterTest > shouldPauseStandbyTask() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultStateUpdaterTest > shouldThrowIfStatefulTaskNotInStateRestoring() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultStateUpdaterTest > shouldThrowIfStatefulTaskNotInStateRestoring() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldSetUncaughtStreamsException() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldSetUncaughtStreamsException() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldClearTaskTimeoutOnProcessed() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldClearTaskTimeoutOnProcessed() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldUnassignTaskWhenRequired() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldUnassignTaskWhenRequired() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldClearTaskReleaseFutureOnShutdown() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldClearTaskReleaseFutureOnShutdown() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldProcessTasks() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldProcessTasks() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldPunctuateStreamTime() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldPunctuateStreamTime() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldShutdownTaskExecutor() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldShutdownTaskExecutor() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldAwaitProcessableTasksIfNoneAssignable() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldAwaitProcessableTasksIfNoneAssignable() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > 
shouldRespectPunctuationDisabledByTaskExecutionMetadata() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > 
shouldRespectPunctuationDisabledByTaskExecutionMetadata() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldSetTaskTimeoutOnTimeoutException() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldSetTaskTimeoutOnTimeoutException() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldPunctuateSystemTime() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldPunctuateSystemTime() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldUnassignTaskWhenNotProgressing() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldUnassignTaskWhenNotProgressing() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldNotFlushOnException() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldNotFlushOnException() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > 
shouldRespectProcessingDisabledByTaskExecutionMetadata() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > 
shouldRespectProcessingDisabledByTaskExecutionMetadata() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldLockAnEmptySetOfTasks() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldLockAnEmptySetOfTasks() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldAssignTasksThatCanBeSystemTimePunctuated() 
STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldAssignTasksTha

[jira] [Created] (KAFKA-15595) Session window aggregate drops records headers

2023-10-12 Thread Abdullah alkhawatrah (Jira)
Abdullah alkhawatrah created KAFKA-15595:


 Summary: Session window aggregate drops records headers
 Key: KAFKA-15595
 URL: https://issues.apache.org/jira/browse/KAFKA-15595
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.5.1
Reporter: Abdullah alkhawatrah


Hey,

While upgrading to 3.5.1 from 3.2.X I noticed a change in SessionWindow 
aggregate behaviour, it seems now that custom headers added before the 
aggregate are dropped.

I could reproduce the behaviour with the following test topology:
{code:java}
// code placeholder
final StreamsBuilder builder = new StreamsBuilder();
builder.stream(inputTopic, Consumed.with(EARLIEST))
.process(() -> new Processor() {
private ProcessorContext context;

@Override
public void init(final ProcessorContext context) {
this.context = context;
}

@Override
public void process(Record record) {
record.headers().add("key1", 
record.value().toString().getBytes());
context.forward(record);
}
})

.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofDays(1L), 
Duration.ofDays(1L)))
.aggregate(() -> 1,
(key, value, aggregate) -> aggregate,
(aggKey, aggOne, aggTwo) -> aggTwo)
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), value))
.to(outputTopic); {code}
Checking evens in the `outputTopic` show that the headers are empty. With 3.2.* 
the same topology would have propagated the headers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15596) Upgrade ZooKeeper to 3.8.3

2023-10-12 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-15596:
--

 Summary: Upgrade ZooKeeper to 3.8.3
 Key: KAFKA-15596
 URL: https://issues.apache.org/jira/browse/KAFKA-15596
 Project: Kafka
  Issue Type: Improvement
Reporter: Mickael Maison
Assignee: Mickael Maison


ZooKeeper 3.8.3 fixes 
[CVE-2023-44981|https://www.cve.org/CVERecord?id=CVE-2023-44981] as described 
in https://lists.apache.org/thread/7o6cch0gm7hzz0zcj2zs16hnl1dxm6oy



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15597) Allow Connect DropHeaders SMT remove headers on a wildcard-basis

2023-10-12 Thread Roman Schmitz (Jira)
Roman Schmitz created KAFKA-15597:
-

 Summary: Allow Connect DropHeaders SMT remove headers on a 
wildcard-basis
 Key: KAFKA-15597
 URL: https://issues.apache.org/jira/browse/KAFKA-15597
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Roman Schmitz


In many use cases you might not only want to drop a few specific Kafka headers 
but a set of headers whose names can also dynamically change (e.g. when used 
with some end-to-end-encryption libraries). To prevent those headers to be 
further forwarded/processed downstream, I suggest to add regexp matching to the 
*apply* method instead of a set-based {*}contains{*}. Link to the relevant code:
[https://github.com/apache/kafka/blob/7b5d640cc656443a078bda096d01910b3edfdb37/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java#L54
 |http://example.com]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re:[DISCUSS] KIP-967: Support custom SSL configuration for Kafka Connect RestServer

2023-10-12 Thread Taras Ledkov
Hi Ashwin,

> I was referring to (and did not understand) the removal of L141 in 
> clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
This line is moved to "new" private method `instantiateSslEngineFactory0 `. 
Please take a look at the `SslFactory:L132` at the patch.
Just dummy refactoring.

> Yes, I think this class [SslEngineFactory] should be moved to something like 
> `server-common` module - but would like any of the committers to comment on 
> this.
Sorry, not catch an idea.
SslEngineFactory - public interface is placed at the 'clients' project. I don't 
know a more common place


[jira] [Resolved] (KAFKA-15492) Enable spotbugs when building with Java 21

2023-10-12 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya resolved KAFKA-15492.
--
Resolution: Fixed

> Enable spotbugs when building with Java 21
> --
>
> Key: KAFKA-15492
> URL: https://issues.apache.org/jira/browse/KAFKA-15492
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 3.7.0
>
>
> The latest version of spotbugs (4.7.3) doesn't support Java 21. In order not 
> to delay Java 21 support, we disabled spotbugs when building with Java 21. 
> This should be reverted once we upgrade to a version of spotbugs that 
> supports Java 21.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #2279

2023-10-12 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 211178 lines...]
Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskExecutorTest > shouldNotFlushOnException() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskExecutorTest > shouldNotFlushOnException() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskExecutorTest > 
shouldRespectProcessingDisabledByTaskExecutionMetadata() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskExecutorTest > 
shouldRespectProcessingDisabledByTaskExecutionMetadata() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldLockAnEmptySetOfTasks() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldLockAnEmptySetOfTasks() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldAssignTasksThatCanBeSystemTimePunctuated() 
STARTED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldAssignTasksThatCanBeSystemTimePunctuated() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldNotUnassignNotOwnedTask() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldNotUnassignNotOwnedTask() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldNotSetUncaughtExceptionsTwice() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldNotSetUncaughtExceptionsTwice() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldReturnFromAwaitOnAdding() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldReturnFromAwaitOnAdding() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldShutdownTaskExecutors() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldShutdownTaskExecutors() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > 
shouldNotAssignTasksForPunctuationIfPunctuationDisabled() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > 
shouldNotAssignTasksForPunctuationIfPunctuationDisabled() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldReturnFromAwaitOnSignalProcessableTasks() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldReturnFromAwaitOnSignalProcessableTasks() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldAddTask() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldAddTask() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldReturnFromAwaitOnUnassignment() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldReturnFromAwaitOnUnassignment() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldNotAssignAnyLockedTask() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldNotAssignAnyLockedTask() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldRemoveTask() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldRemoveTask() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldNotRemoveAssignedTask() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldNotRemoveAssignedTask() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldBlockOnAwait() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldBlockOnAwait() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldReturnFromAwaitOnInterruption() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldReturnFromAwaitOnInterruption() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldAssignTaskThatCanBeProcessed() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldAssignTaskThatCanBeProcessed() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldStartTaskExecutors() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 83 > 
DefaultTaskManagerTest > shouldStartTaskExe

Build failed in Jenkins: Kafka » Kafka Branch Builder » 3.5 #79

2023-10-12 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 287166 lines...]

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
TableTableJoinIntegrationTest > [caching enabled = true] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testInner[caching
 enabled = true] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
TableTableJoinIntegrationTest > [caching enabled = true] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testOuter[caching
 enabled = true] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
TableTableJoinIntegrationTest > [caching enabled = true] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testOuter[caching
 enabled = true] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
TableTableJoinIntegrationTest > [caching enabled = true] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testInnerWithVersionedStores[caching
 enabled = true] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
TableTableJoinIntegrationTest > [caching enabled = true] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testInnerWithVersionedStores[caching
 enabled = true] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
TableTableJoinIntegrationTest > [caching enabled = true] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testLeft[caching
 enabled = true] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
TableTableJoinIntegrationTest > [caching enabled = true] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testLeft[caching
 enabled = true] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
TableTableJoinIntegrationTest > [caching enabled = true] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testOuterWithVersionedStores[caching
 enabled = true] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
TableTableJoinIntegrationTest > [caching enabled = true] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testOuterWithVersionedStores[caching
 enabled = true] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
TableTableJoinIntegrationTest > [caching enabled = true] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testOuterWithRightVersionedOnly[caching
 enabled = true] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
TableTableJoinIntegrationTest > [caching enabled = true] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testOuterWithRightVersionedOnly[caching
 enabled = true] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
TableTableJoinIntegrationTest > [caching enabled = true] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testLeftWithVersionedStores[caching
 enabled = true] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
TableTableJoinIntegrationTest > [caching enabled = true] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testLeftWithVersionedStores[caching
 enabled = true] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
TableTableJoinIntegrationTest > [caching enabled = true] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testOuterWithLeftVersionedOnly[caching
 enabled = true] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
TableTableJoinIntegrationTest > [caching enabled = true] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testOuterWithLeftVersionedOnly[caching
 enabled = true] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
TableTableJoinIntegrationTest > [caching enabled = true] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testLeftWithRightVersionedOnly[caching
 enabled = true] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
TableTableJoinIntegrationTest > [caching enabled = true] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testLeftWithRightVersionedOnly[caching
 enabled = true] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
TableTableJoinIntegrationTest > [caching enabled = true] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testInnerInner[caching
 enabled = true] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
TableTableJoinIntegrationTest > [caching enabled = true] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testInnerInner[caching
 enabled = true] 

[jira] [Created] (KAFKA-15598) Add integration tests for DescribeGroups API, DeleteGroups API and OffsetDelete API

2023-10-12 Thread Dongnuo Lyu (Jira)
Dongnuo Lyu created KAFKA-15598:
---

 Summary: Add integration tests for DescribeGroups API, 
DeleteGroups API and OffsetDelete API
 Key: KAFKA-15598
 URL: https://issues.apache.org/jira/browse/KAFKA-15598
 Project: Kafka
  Issue Type: Sub-task
Reporter: Dongnuo Lyu
Assignee: Dongnuo Lyu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] Apache Kafka 3.6.0

2023-10-12 Thread Satish Duggana
I'd like to clarify calling out 3.6.0 as a minor release.

We use semantic versioning*(Major.Minor.Patch) in Apache Kafka
releases. Releases like 3.5.0 or 3.6.0 are called minor releases
according to the semantic versioning. The next major release will be
4.0.0.

3.6.0 release is packed with big features and improvements as
mentioned in the blog post[1], and release notes[2], even though it is
called a minor release :)

1. https://kafka.apache.org/blog
2. https://downloads.apache.org/kafka/3.6.0/RELEASE_NOTES.html
* https://semver.org/

Thanks,
Satish.


On Wed, 11 Oct 2023 at 12:00, Luke Chen  wrote:
>
> Thanks for running the release, Satish!
>
> BTW, 3.6.0 should be a major release, not a minor one. :)
>
> Luke
>
> On Wed, Oct 11, 2023 at 1:39 PM Satish Duggana  wrote:
>
> > The Apache Kafka community is pleased to announce the release for
> > Apache Kafka 3.6.0
> >
> > This is a minor release and it includes fixes and improvements from 238
> > JIRAs.
> >
> > All of the changes in this release can be found in the release notes:
> > https://www.apache.org/dist/kafka/3.6.0/RELEASE_NOTES.html
> >
> > An overview of the release can be found in our announcement blog post:
> > https://kafka.apache.org/blog
> >
> > You can download the source and binary release (Scala 2.12 and Scala 2.13)
> > from:
> > https://kafka.apache.org/downloads#3.6.0
> >
> >
> > ---
> >
> >
> > Apache Kafka is a distributed streaming platform with four core APIs:
> >
> >
> > ** The Producer API allows an application to publish a stream of records to
> > one or more Kafka topics.
> >
> > ** The Consumer API allows an application to subscribe to one or more
> > topics and process the stream of records produced to them.
> >
> > ** The Streams API allows an application to act as a stream processor,
> > consuming an input stream from one or more topics and producing an
> > output stream to one or more output topics, effectively transforming the
> > input streams to output streams.
> >
> > ** The Connector API allows building and running reusable producers or
> > consumers that connect Kafka topics to existing applications or data
> > systems. For example, a connector to a relational database might
> > capture every change to a table.
> >
> >
> > With these APIs, Kafka can be used for two broad classes of application:
> >
> > ** Building real-time streaming data pipelines that reliably get data
> > between systems or applications.
> >
> > ** Building real-time streaming applications that transform or react
> > to the streams of data.
> >
> >
> > Apache Kafka is in use at large and small companies worldwide, including
> > Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
> > Target, The New York Times, Uber, Yelp, and Zalando, among others.
> >
> > A big thank you for the following 139 contributors to this release!
> > (Please report an unintended omission)
> >
> > This was a community effort, so thank you to everyone who contributed
> > to this release, including all our users and our 139 contributors:
> > A. Sophie Blee-Goldman, Aaron Ai, Abhijeet Kumar, aindriu-aiven,
> > Akhilesh Chaganti, Alexandre Dupriez, Alexandre Garnier, Alok
> > Thatikunta, Alyssa Huang, Aman Singh, Andras Katona, Andrew Schofield,
> > Andrew Grant, Aneel Kumar, Anton Agestam, Artem Livshits, atu-sharm,
> > bachmanity1, Bill Bejeck, Bo Gao, Bruno Cadonna, Calvin Liu, Chaitanya
> > Mukka, Chase Thomas, Cheryl Simmons, Chia-Ping Tsai, Chris Egerton,
> > Christo Lolov, Clay Johnson, Colin P. McCabe, Colt McNealy, d00791190,
> > Damon Xie, Danica Fine, Daniel Scanteianu, Daniel Urban, David Arthur,
> > David Jacot, David Mao, dengziming, Deqi Hu, Dimitar Dimitrov, Divij
> > Vaidya, DL1231, Dániel Urbán, Erik van Oosten, ezio, Farooq Qaiser,
> > Federico Valeri, flashmouse, Florin Akermann, Gabriel Oliveira,
> > Gantigmaa Selenge, Gaurav Narula, GeunJae Jeon, Greg Harris, Guozhang
> > Wang, Hailey Ni, Hao Li, Hector Geraldino, hudeqi, hzh0425, Iblis Lin,
> > iit2009060, Ismael Juma, Ivan Yurchenko, James Shaw, Jason Gustafson,
> > Jeff Kim, Jim Galasyn, John Roesler, Joobi S B, Jorge Esteban Quilcate
> > Otoya, Josep Prat, Joseph (Ting-Chou) Lin, José Armando García Sancio,
> > Jun Rao, Justine Olshan, Kamal Chandraprakash, Keith Wall, Kirk True,
> > Lianet Magrans, LinShunKang, Liu Zeyu, lixy, Lucas Bradstreet, Lucas
> > Brutschy, Lucent-Wong, Lucia Cerchie, Luke Chen, Manikumar Reddy,
> > Manyanda Chitimbo, Maros Orsak, Matthew de Detrich, Matthias J. Sax,
> > maulin-vasavada, Max Riedel, Mehari Beyene, Michal Cabak (@miccab),
> > Mickael Maison, Milind Mantri, minjian.cai, mojh7, Nikolay, Okada
> > Haruki, Omnia G H Ibrahim, Owen Leung, Philip Nee, prasanthV, Proven
> > Provenzano, Purshotam Chauhan, Qichao Chu, Rajini Sivaram, Randall
> > Hauch, Renaldo Baur Filho, Ritika Reddy, Rittika Adhikari, Rohan, Ron
> > Dagostino, Sagar Rao, Said Boudjelda,

[jira] [Resolved] (KAFKA-14506) Implement DeleteGroups API and OffsetDelete API

2023-10-12 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-14506.
-
  Reviewer: David Jacot
Resolution: Fixed

> Implement DeleteGroups API and OffsetDelete API
> ---
>
> Key: KAFKA-14506
> URL: https://issues.apache.org/jira/browse/KAFKA-14506
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Dongnuo Lyu
>Priority: Major
>
> Implement OffsetDelete API in the new Group Coordinator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14504) Implement DescribeGroups API

2023-10-12 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-14504.
-
  Reviewer: David Jacot
Resolution: Fixed

> Implement DescribeGroups API
> 
>
> Key: KAFKA-14504
> URL: https://issues.apache.org/jira/browse/KAFKA-14504
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Dongnuo Lyu
>Priority: Major
>
> Implement DescribeGroups API in the Group Coordinator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] Apache Kafka 3.6.0

2023-10-12 Thread Ismael Juma
Congratulations to the community on an exciting release! Special thanks to
Satish for driving the release and KIP-405. :)

Ismael

On Tue, Oct 10, 2023 at 10:39 PM Satish Duggana  wrote:

> The Apache Kafka community is pleased to announce the release for
> Apache Kafka 3.6.0
>
> This is a minor release and it includes fixes and improvements from 238
> JIRAs.
>
> All of the changes in this release can be found in the release notes:
> https://www.apache.org/dist/kafka/3.6.0/RELEASE_NOTES.html
>
> An overview of the release can be found in our announcement blog post:
> https://kafka.apache.org/blog
>
> You can download the source and binary release (Scala 2.12 and Scala 2.13)
> from:
> https://kafka.apache.org/downloads#3.6.0
>
>
> ---
>
>
> Apache Kafka is a distributed streaming platform with four core APIs:
>
>
> ** The Producer API allows an application to publish a stream of records to
> one or more Kafka topics.
>
> ** The Consumer API allows an application to subscribe to one or more
> topics and process the stream of records produced to them.
>
> ** The Streams API allows an application to act as a stream processor,
> consuming an input stream from one or more topics and producing an
> output stream to one or more output topics, effectively transforming the
> input streams to output streams.
>
> ** The Connector API allows building and running reusable producers or
> consumers that connect Kafka topics to existing applications or data
> systems. For example, a connector to a relational database might
> capture every change to a table.
>
>
> With these APIs, Kafka can be used for two broad classes of application:
>
> ** Building real-time streaming data pipelines that reliably get data
> between systems or applications.
>
> ** Building real-time streaming applications that transform or react
> to the streams of data.
>
>
> Apache Kafka is in use at large and small companies worldwide, including
> Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
> Target, The New York Times, Uber, Yelp, and Zalando, among others.
>
> A big thank you for the following 139 contributors to this release!
> (Please report an unintended omission)
>
> This was a community effort, so thank you to everyone who contributed
> to this release, including all our users and our 139 contributors:
> A. Sophie Blee-Goldman, Aaron Ai, Abhijeet Kumar, aindriu-aiven,
> Akhilesh Chaganti, Alexandre Dupriez, Alexandre Garnier, Alok
> Thatikunta, Alyssa Huang, Aman Singh, Andras Katona, Andrew Schofield,
> Andrew Grant, Aneel Kumar, Anton Agestam, Artem Livshits, atu-sharm,
> bachmanity1, Bill Bejeck, Bo Gao, Bruno Cadonna, Calvin Liu, Chaitanya
> Mukka, Chase Thomas, Cheryl Simmons, Chia-Ping Tsai, Chris Egerton,
> Christo Lolov, Clay Johnson, Colin P. McCabe, Colt McNealy, d00791190,
> Damon Xie, Danica Fine, Daniel Scanteianu, Daniel Urban, David Arthur,
> David Jacot, David Mao, dengziming, Deqi Hu, Dimitar Dimitrov, Divij
> Vaidya, DL1231, Dániel Urbán, Erik van Oosten, ezio, Farooq Qaiser,
> Federico Valeri, flashmouse, Florin Akermann, Gabriel Oliveira,
> Gantigmaa Selenge, Gaurav Narula, GeunJae Jeon, Greg Harris, Guozhang
> Wang, Hailey Ni, Hao Li, Hector Geraldino, hudeqi, hzh0425, Iblis Lin,
> iit2009060, Ismael Juma, Ivan Yurchenko, James Shaw, Jason Gustafson,
> Jeff Kim, Jim Galasyn, John Roesler, Joobi S B, Jorge Esteban Quilcate
> Otoya, Josep Prat, Joseph (Ting-Chou) Lin, José Armando García Sancio,
> Jun Rao, Justine Olshan, Kamal Chandraprakash, Keith Wall, Kirk True,
> Lianet Magrans, LinShunKang, Liu Zeyu, lixy, Lucas Bradstreet, Lucas
> Brutschy, Lucent-Wong, Lucia Cerchie, Luke Chen, Manikumar Reddy,
> Manyanda Chitimbo, Maros Orsak, Matthew de Detrich, Matthias J. Sax,
> maulin-vasavada, Max Riedel, Mehari Beyene, Michal Cabak (@miccab),
> Mickael Maison, Milind Mantri, minjian.cai, mojh7, Nikolay, Okada
> Haruki, Omnia G H Ibrahim, Owen Leung, Philip Nee, prasanthV, Proven
> Provenzano, Purshotam Chauhan, Qichao Chu, Rajini Sivaram, Randall
> Hauch, Renaldo Baur Filho, Ritika Reddy, Rittika Adhikari, Rohan, Ron
> Dagostino, Sagar Rao, Said Boudjelda, Sambhav Jain, Satish Duggana,
> sciclon2, Shekhar Rajak, Sungyun Hur, Sushant Mahajan, Tanay
> Karmarkar, tison, Tom Bentley, vamossagar12, Victoria Xia, Vincent
> Jiang, vveicc, Walker Carlson, Yash Mayya, Yi-Sheng Lien, Ziming Deng,
> 蓝士钦
>
> We welcome your help and feedback. For more information on how to
> report problems, and to get involved, visit the project website at
> https://kafka.apache.org/
>
> Thank you!
>
> Regards,
> Satish Duggana
>


Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #2280

2023-10-12 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 316828 lines...]
Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldSetTaskTimeoutOnTimeoutException() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldPunctuateSystemTime() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldPunctuateSystemTime() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldUnassignTaskWhenNotProgressing() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldUnassignTaskWhenNotProgressing() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldNotFlushOnException() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldNotFlushOnException() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > 
shouldRespectProcessingDisabledByTaskExecutionMetadata() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > 
shouldRespectProcessingDisabledByTaskExecutionMetadata() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldLockAnEmptySetOfTasks() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldLockAnEmptySetOfTasks() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldAssignTasksThatCanBeSystemTimePunctuated() 
STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldAssignTasksThatCanBeSystemTimePunctuated() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldNotUnassignNotOwnedTask() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldNotUnassignNotOwnedTask() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldNotSetUncaughtExceptionsTwice() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldNotSetUncaughtExceptionsTwice() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldReturnFromAwaitOnAdding() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldReturnFromAwaitOnAdding() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldShutdownTaskExecutors() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldShutdownTaskExecutors() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > 
shouldNotAssignTasksForPunctuationIfPunctuationDisabled() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > 
shouldNotAssignTasksForPunctuationIfPunctuationDisabled() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldReturnFromAwaitOnSignalProcessableTasks() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldReturnFromAwaitOnSignalProcessableTasks() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldAddTask() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldAddTask() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldReturnFromAwaitOnUnassignment() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldReturnFromAwaitOnUnassignment() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldNotAssignAnyLockedTask() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldNotAssignAnyLockedTask() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldRemoveTask() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldRemoveTask() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldNotRemoveAssignedTask() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldNotRemoveAssignedTask() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldBlockOnAwait() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldBlockOnAwait() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskManagerTest > shouldReturnFrom

Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-12 Thread Jun Rao
Hi, Matthias,

Thanks for the reply.

130. What would be the semantic? If the timeout has expired and only some
of the client instances' id have been retrieved, does the call return the
partial result or throw an exception?

131. Could we group all consumer instances in a single method since we are
returning the key for each instance already? This probably also avoids
exposing implementation details that could change over time.

Thanks,

Jun

On Thu, Oct 12, 2023 at 12:00 AM Sophie Blee-Goldman 
wrote:

> Regarding the naming, I personally think `clientInstanceId` makes sense for
> the plain clients
>  -- especially if we might later introduce the notion of an
> `applicationInstanceId`.
>
> I'm not a huge fan of `clientsInstanceIds` for the Kafka Streams API,
> though, can we use
> `clientInstanceIds` instead? (The difference being the placement of the
> plural 's')
> I would similarly rename the class to just ClientInstanceIds
>
> we can also not have a timeout-less overload,  because `KafkaStreams` does
> > not have a `default.api.timeout.ms` config either
>
> With respect to the timeout for the Kafka Streams API, I'm a bit confused
> by the
> doubletriple-negative of Matthias' comment here, but I was thinking about
> this
> earlier and this was my take: with the current proposal, we would allow
> users to pass
> in an absolute timeout as a parameter that would apply to the method as a
> whole.
> Meanwhile within the method we would issue separate calls to each of the
> clients using
> the default or user-configured value of their  `default.api.timeout.ms` as
> the timeout
> parameter.
>
> So the API as proposed makes sense to me.
>
>
> On Wed, Oct 11, 2023 at 6:48 PM Matthias J. Sax  wrote:
>
> > In can answer 130 and 131.
> >
> > 130) We cannot guarantee that all clients are already initialized due to
> > race conditions. We plan to not allow calling
> > `KafkaStreams#clientsInstanceIds()` when the state is not RUNNING (or
> > REBALANCING) though -- guess this slipped on the KIP and should be
> > added? But because StreamThreads can be added dynamically (and producer
> > might be created dynamically at runtime; cf below), we still cannot
> > guarantee that all clients are already initialized when the method is
> > called. Of course, we assume that all clients are most likely initialize
> > on the happy path, and blocking calls to `client.clientInstanceId()`
> > should be rare.
> >
> > To address the worst case, we won't do a naive implementation and just
> > loop over all clients, but fan-out the call to the different
> > StreamThreads (and GlobalStreamThread if it exists), and use Futures to
> > gather the results.
> >
> > Currently, `StreamThreads` has 3 clients (if ALOS or EOSv2 is used), so
> > we might do 3 blocking calls in the worst case (for EOSv1 we get a
> > producer per tasks, and we might end up doing more blocking calls if the
> > producers are not initialized yet). Note that EOSv1 is already
> > deprecated, and we are also working on thread refactoring that will
> > reduce the number of client on StreamThread to 2 -- and we have more
> > refactoring planned to reduce the number of clients even further.
> >
> > Inside `KafakStreams#clientsInstanceIds()` we might only do single
> > blocking call for the admin client (ie, `admin.clientInstanceId()`).
> >
> > I agree that we need to do some clever timeout management, but it seems
> > to be more of an implementation detail?
> >
> > Do you have any particular concerns, or does the proposed implementation
> > as sketched above address your question?
> >
> >
> > 130) If the Topology does not have a global-state-store, there won't be
> > a GlobalThread and thus not global consumer. Thus, we return an Optional.
> >
> >
> >
> > On three related question for Andrew.
> >
> > (1) Why is the method called `clientInstanceId()` and not just plain
> > `instanceId()`?
> >
> > (2) Why so we return a `String` while but not a UUID type? The added
> > protocol request/response classes use UUIDs.
> >
> > (3) Would it make sense to have an overloaded `clientInstanceId()`
> > method that does not take any parameter but uses `default.api.timeout`
> > config (this config does no exist on the producer though, so we could
> > only have it for consumer and admin at this point). We could of course
> > also add overloads like this later if user request them (and/or add
> > `default.api.timeout.ms` to the producer, too).
> >
> > Btw: For KafkaStreams, I think `clientsInstanceIds` still makes sense as
> > a method name though, as `KafkaStreams` itself does not have an
> > `instanceId` -- we can also not have a timeout-less overload, because
> > `KafkaStreams` does not have a `default.api.timeout.ms` config either
> > (and I don't think it make sense to add).
> >
> >
> >
> > -Matthias
> >
> > On 10/11/23 5:07 PM, Jun Rao wrote:
> > > Hi, Andrew,
> > >
> > > Thanks for the updated KIP. Just a few more minor comments.
> > >
> > > 130. KafkaStreams.clientsInstanceId(Duration t

[jira] [Resolved] (KAFKA-15596) Upgrade ZooKeeper to 3.8.3

2023-10-12 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-15596.

Fix Version/s: 3.7.0
   3.6.1
   Resolution: Fixed

> Upgrade ZooKeeper to 3.8.3
> --
>
> Key: KAFKA-15596
> URL: https://issues.apache.org/jira/browse/KAFKA-15596
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
> Fix For: 3.7.0, 3.6.1
>
>
> ZooKeeper 3.8.3 fixes 
> [CVE-2023-44981|https://www.cve.org/CVERecord?id=CVE-2023-44981] as described 
> in https://lists.apache.org/thread/7o6cch0gm7hzz0zcj2zs16hnl1dxm6oy



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-12 Thread Matthias J. Sax

Thanks Sophie and Jun.

`clientInstanceIds()` is fine with me -- was not sure about the double 
plural myself.


Sorry if my comments was confusing. I was trying to say, that adding a 
overload to `KafkaStreams` that does not take a timeout parameter does 
not make sense, because there is no `default.api.timeout.ms` config for 
Kafka Streams, so users always need to pass in a timeout. (Same for 
producer.)


For the implementation, I think KS would always call 
`client.clientInstanceId(timeout)` and never rely on 
`default.api.timeout.ms` though, so we can stay in control -- if a 
timeout is passed by the user, it would always overwrite 
`default.api.timeout.ms` on the consumer/admin and thus we should follow 
the same semantics in Kafka Streams, and overwrite it explicitly when 
calling `client.clientInstanceId()`.


The proposed API also makes sense to me. I was just wondering if we want 
to extend it for client users -- for KS we won't need/use the 
timeout-less overloads.




130) My intent was to throw a TimeoutException if we cannot get all 
instanceIds, because it's the standard contract for timeouts. It would 
also be hard to tell for a user, if a full or partial result was 
returned (or we add a method `boolean isPartialResult()` to make it 
easier for users).


If there is concerns/objections, I am also ok to return a partial result 
-- it would require a change to the newly added `ClientInstanceIds` 
return type -- for `adminInstanceId` we only return a `String` right now 
-- we might need to change this to `Optional` so we are able to 
return a partial result?



131) Of course we could, but I am not sure what we would gain? In the 
end, implementation details would always leak because if we change the 
number of consumer we use, we would return different keys in the `Map`. 
Atm, the proposal implies that the same key might be used for the "main" 
and "restore" consumer of the same thread -- but we can make keys unique 
by adding a `-restore` suffix to the restore-consumer key if we merge 
both maps. -- Curious to hear what others think. I am very open to do it 
differently than currently proposed.



-Matthias


On 10/12/23 8:39 AM, Jun Rao wrote:

Hi, Matthias,

Thanks for the reply.

130. What would be the semantic? If the timeout has expired and only some
of the client instances' id have been retrieved, does the call return the
partial result or throw an exception?

131. Could we group all consumer instances in a single method since we are
returning the key for each instance already? This probably also avoids
exposing implementation details that could change over time.

Thanks,

Jun

On Thu, Oct 12, 2023 at 12:00 AM Sophie Blee-Goldman 
wrote:


Regarding the naming, I personally think `clientInstanceId` makes sense for
the plain clients
  -- especially if we might later introduce the notion of an
`applicationInstanceId`.

I'm not a huge fan of `clientsInstanceIds` for the Kafka Streams API,
though, can we use
`clientInstanceIds` instead? (The difference being the placement of the
plural 's')
I would similarly rename the class to just ClientInstanceIds

we can also not have a timeout-less overload,  because `KafkaStreams` does

not have a `default.api.timeout.ms` config either


With respect to the timeout for the Kafka Streams API, I'm a bit confused
by the
doubletriple-negative of Matthias' comment here, but I was thinking about
this
earlier and this was my take: with the current proposal, we would allow
users to pass
in an absolute timeout as a parameter that would apply to the method as a
whole.
Meanwhile within the method we would issue separate calls to each of the
clients using
the default or user-configured value of their  `default.api.timeout.ms` as
the timeout
parameter.

So the API as proposed makes sense to me.


On Wed, Oct 11, 2023 at 6:48 PM Matthias J. Sax  wrote:


In can answer 130 and 131.

130) We cannot guarantee that all clients are already initialized due to
race conditions. We plan to not allow calling
`KafkaStreams#clientsInstanceIds()` when the state is not RUNNING (or
REBALANCING) though -- guess this slipped on the KIP and should be
added? But because StreamThreads can be added dynamically (and producer
might be created dynamically at runtime; cf below), we still cannot
guarantee that all clients are already initialized when the method is
called. Of course, we assume that all clients are most likely initialize
on the happy path, and blocking calls to `client.clientInstanceId()`
should be rare.

To address the worst case, we won't do a naive implementation and just
loop over all clients, but fan-out the call to the different
StreamThreads (and GlobalStreamThread if it exists), and use Futures to
gather the results.

Currently, `StreamThreads` has 3 clients (if ALOS or EOSv2 is used), so
we might do 3 blocking calls in the worst case (for EOSv1 we get a
producer per tasks, and we might end up doing more blocking calls if the
producers are not initialized yet). Not

Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-12 Thread Andrew Schofield
Hi Matthias,
131) I also think that separating the main and restore consumers is excessively 
specific about the current implementation.
So, I think I’d prefer:

public class ClientInstanceIds {
  String adminInstanceId();

  Optional globalConsumerInstanceId();

  Map consumerInstanceIds();

  Map producerInstanceIds();
}

I’m not sure whether it makes sense to combine the global consumer instance id 
too.

I’ve got rid of the double plural :)

My 2 cents.

Thanks,
Andrew

> On 12 Oct 2023, at 17:29, Matthias J. Sax  wrote:
>
> Thanks Sophie and Jun.
>
> `clientInstanceIds()` is fine with me -- was not sure about the double plural 
> myself.
>
> Sorry if my comments was confusing. I was trying to say, that adding a 
> overload to `KafkaStreams` that does not take a timeout parameter does not 
> make sense, because there is no `default.api.timeout.ms` config for Kafka 
> Streams, so users always need to pass in a timeout. (Same for producer.)
>
> For the implementation, I think KS would always call 
> `client.clientInstanceId(timeout)` and never rely on `default.api.timeout.ms` 
> though, so we can stay in control -- if a timeout is passed by the user, it 
> would always overwrite `default.api.timeout.ms` on the consumer/admin and 
> thus we should follow the same semantics in Kafka Streams, and overwrite it 
> explicitly when calling `client.clientInstanceId()`.
>
> The proposed API also makes sense to me. I was just wondering if we want to 
> extend it for client users -- for KS we won't need/use the timeout-less 
> overloads.
>
>
>
> 130) My intent was to throw a TimeoutException if we cannot get all 
> instanceIds, because it's the standard contract for timeouts. It would also 
> be hard to tell for a user, if a full or partial result was returned (or we 
> add a method `boolean isPartialResult()` to make it easier for users).
>
> If there is concerns/objections, I am also ok to return a partial result -- 
> it would require a change to the newly added `ClientInstanceIds` return type 
> -- for `adminInstanceId` we only return a `String` right now -- we might need 
> to change this to `Optional` so we are able to return a partial 
> result?
>
>
> 131) Of course we could, but I am not sure what we would gain? In the end, 
> implementation details would always leak because if we change the number of 
> consumer we use, we would return different keys in the `Map`. Atm, the 
> proposal implies that the same key might be used for the "main" and "restore" 
> consumer of the same thread -- but we can make keys unique by adding a 
> `-restore` suffix to the restore-consumer key if we merge both maps. -- 
> Curious to hear what others think. I am very open to do it differently than 
> currently proposed.
>
>
> -Matthias
>
>
> On 10/12/23 8:39 AM, Jun Rao wrote:
>> Hi, Matthias,
>> Thanks for the reply.
>> 130. What would be the semantic? If the timeout has expired and only some
>> of the client instances' id have been retrieved, does the call return the
>> partial result or throw an exception?
>> 131. Could we group all consumer instances in a single method since we are
>> returning the key for each instance already? This probably also avoids
>> exposing implementation details that could change over time.
>> Thanks,
>> Jun
>> On Thu, Oct 12, 2023 at 12:00 AM Sophie Blee-Goldman 
>> wrote:
>>> Regarding the naming, I personally think `clientInstanceId` makes sense for
>>> the plain clients
>>>  -- especially if we might later introduce the notion of an
>>> `applicationInstanceId`.
>>>
>>> I'm not a huge fan of `clientsInstanceIds` for the Kafka Streams API,
>>> though, can we use
>>> `clientInstanceIds` instead? (The difference being the placement of the
>>> plural 's')
>>> I would similarly rename the class to just ClientInstanceIds
>>>
>>> we can also not have a timeout-less overload,  because `KafkaStreams` does
 not have a `default.api.timeout.ms` config either
>>>
>>> With respect to the timeout for the Kafka Streams API, I'm a bit confused
>>> by the
>>> doubletriple-negative of Matthias' comment here, but I was thinking about
>>> this
>>> earlier and this was my take: with the current proposal, we would allow
>>> users to pass
>>> in an absolute timeout as a parameter that would apply to the method as a
>>> whole.
>>> Meanwhile within the method we would issue separate calls to each of the
>>> clients using
>>> the default or user-configured value of their  `default.api.timeout.ms` as
>>> the timeout
>>> parameter.
>>>
>>> So the API as proposed makes sense to me.
>>>
>>>
>>> On Wed, Oct 11, 2023 at 6:48 PM Matthias J. Sax  wrote:
>>>
 In can answer 130 and 131.

 130) We cannot guarantee that all clients are already initialized due to
 race conditions. We plan to not allow calling
 `KafkaStreams#clientsInstanceIds()` when the state is not RUNNING (or
 REBALANCING) though -- guess this slipped on the KIP and should be
 added? But because StreamThreads can be added dynamicall

[DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException

2023-10-12 Thread Nick Telford
Hi everyone,

This is a Streams KIP to add a new DeserializationHandlerResponse,
"SUSPEND", that suspends the failing Task but continues to process other
Tasks normally.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-990%3A+Capability+to+SUSPEND+Tasks+on+DeserializationException

I'm not yet completely convinced that this is practical, as I suspect it
might be abusing the SUSPENDED Task.State for something it was not designed
for. The intent is to pause an active Task *without* re-assigning it to
another instance, which causes cascading failures when the FAIL
DeserializationHandlerResponse is used.

Let me know what you think!

Regards,
Nick


Re: [VOTE] KIP-960: Support single-key_single-timestamp interactive queries (IQv2) for versioned state stores

2023-10-12 Thread Alieh Saeedi
Thanks to Matthias, Bruno, Lucas, and Walker for voting. So I consider this
KIP accepted.

Cheers,
Alieh

On Thu, Oct 12, 2023 at 9:26 AM Lucas Brutschy
 wrote:

> +1 (binding)
>
> Thanks for the KIP!
>
> Cheers,
> Lucas
>
> On Wed, Oct 11, 2023 at 7:55 PM Walker Carlson
>  wrote:
> >
> > +1 (binding)
> >
> > Thanks for the kip Alieh!
> >
> > Walker
> >
> > On Wed, Oct 11, 2023 at 3:52 AM Bruno Cadonna 
> wrote:
> >
> > > Thanks for the KIP, Alieh!
> > >
> > > +1 (binding)
> > >
> > > Best,
> > > Bruno
> > >
> > > On 10/10/23 1:14 AM, Matthias J. Sax wrote:
> > > > One more nit: as discussed on the related KIP-698 thread, we should
> not
> > > > use `get` as prefix for the getters.
> > > >
> > > > So it should be `K key()` and `Optional asOfTimestamp()`.
> > > >
> > > >
> > > > Otherwise the KIP LGTM.
> > > >
> > > >
> > > > +1 (binding)
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 10/6/23 2:50 AM, Alieh Saeedi wrote:
> > > >> Hi everyone,
> > > >>
> > > >> Since KIP-960 is reduced to the simplest IQ type and all further
> > > comments
> > > >> are related to the following-up KIPs, I decided to finalize it at
> this
> > > >> point.
> > > >>
> > > >>
> > > >> A huge thank you to everyone who has reviewed this KIP (and also the
> > > >> following-up ones), and
> > > >> participated in the discussion thread!
> > > >>
> > > >> I'd also like to thank you in advance for taking the time to vote.
> > > >>
> > > >> Best,
> > > >> Alieh
> > > >>
> > >
>


[jira] [Created] (KAFKA-15600) KIP-990: Capability to SUSPEND Tasks on DeserializationException

2023-10-12 Thread Nicholas Telford (Jira)
Nicholas Telford created KAFKA-15600:


 Summary: KIP-990: Capability to SUSPEND Tasks on 
DeserializationException
 Key: KAFKA-15600
 URL: https://issues.apache.org/jira/browse/KAFKA-15600
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Nicholas Telford


Presently, Kafka Streams provides users with two options for handling a 
{{DeserializationException}}  via the {{DeserializationExceptionHandler}}  
interface:
 # {{FAIL}} - throw an Exception that causes the stream thread to fail. This 
will either cause the whole application instance to exit, or the stream thread 
will be replaced and restarted. Either way, the failed {{Task}} will end up 
being resumed, either by the current instance or after being rebalanced to 
another, causing a cascading failure until a user intervenes to address the 
problem.
 # {{CONTINUE}} - discard the record and continue processing with the next 
record. This can cause data loss if the record triggering the 
{{DeserializationException}} should be considered a valid record. This can 
happen if an upstream producer changes the record schema in a way that is 
incompatible with the streams application, or if there is a bug in the 
{{Deserializer}}  (for example, failing to handle a valid edge-case).

The user can currently choose between data loss, or a cascading failure that 
usually causes all processing to slowly grind to a halt.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15599) Move KafkaMetadataLog and RaftManager to metadata module

2023-10-12 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-15599:
---

 Summary: Move KafkaMetadataLog and RaftManager to metadata module
 Key: KAFKA-15599
 URL: https://issues.apache.org/jira/browse/KAFKA-15599
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ismael Juma
Assignee: Ismael Juma






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-12 Thread Jun Rao
Hi, Matthias,

130. Yes, throwing an exception sounds reasonable. It would be useful to
document this.

131. I was thinking that we could just return all consumers (including the
global consumer) through Map consumerInstanceIds() and use
keys to identify each consumer instance. The benefit is that the
implementation (whether to use a separate global consumer or not) could
change in the future, but the API can remain the same. Another slight
benefit is that there is no need for returning Optional. If the
global consumer is not used, it just won't be included in the map.

Thanks,

Jun


On Thu, Oct 12, 2023 at 9:30 AM Matthias J. Sax  wrote:

> Thanks Sophie and Jun.
>
> `clientInstanceIds()` is fine with me -- was not sure about the double
> plural myself.
>
> Sorry if my comments was confusing. I was trying to say, that adding a
> overload to `KafkaStreams` that does not take a timeout parameter does
> not make sense, because there is no `default.api.timeout.ms` config for
> Kafka Streams, so users always need to pass in a timeout. (Same for
> producer.)
>
> For the implementation, I think KS would always call
> `client.clientInstanceId(timeout)` and never rely on
> `default.api.timeout.ms` though, so we can stay in control -- if a
> timeout is passed by the user, it would always overwrite
> `default.api.timeout.ms` on the consumer/admin and thus we should follow
> the same semantics in Kafka Streams, and overwrite it explicitly when
> calling `client.clientInstanceId()`.
>
> The proposed API also makes sense to me. I was just wondering if we want
> to extend it for client users -- for KS we won't need/use the
> timeout-less overloads.
>
>
>
> 130) My intent was to throw a TimeoutException if we cannot get all
> instanceIds, because it's the standard contract for timeouts. It would
> also be hard to tell for a user, if a full or partial result was
> returned (or we add a method `boolean isPartialResult()` to make it
> easier for users).
>
> If there is concerns/objections, I am also ok to return a partial result
> -- it would require a change to the newly added `ClientInstanceIds`
> return type -- for `adminInstanceId` we only return a `String` right now
> -- we might need to change this to `Optional` so we are able to
> return a partial result?
>
>
> 131) Of course we could, but I am not sure what we would gain? In the
> end, implementation details would always leak because if we change the
> number of consumer we use, we would return different keys in the `Map`.
> Atm, the proposal implies that the same key might be used for the "main"
> and "restore" consumer of the same thread -- but we can make keys unique
> by adding a `-restore` suffix to the restore-consumer key if we merge
> both maps. -- Curious to hear what others think. I am very open to do it
> differently than currently proposed.
>
>
> -Matthias
>
>
> On 10/12/23 8:39 AM, Jun Rao wrote:
> > Hi, Matthias,
> >
> > Thanks for the reply.
> >
> > 130. What would be the semantic? If the timeout has expired and only some
> > of the client instances' id have been retrieved, does the call return the
> > partial result or throw an exception?
> >
> > 131. Could we group all consumer instances in a single method since we
> are
> > returning the key for each instance already? This probably also avoids
> > exposing implementation details that could change over time.
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Oct 12, 2023 at 12:00 AM Sophie Blee-Goldman <
> sop...@responsive.dev>
> > wrote:
> >
> >> Regarding the naming, I personally think `clientInstanceId` makes sense
> for
> >> the plain clients
> >>   -- especially if we might later introduce the notion of an
> >> `applicationInstanceId`.
> >>
> >> I'm not a huge fan of `clientsInstanceIds` for the Kafka Streams API,
> >> though, can we use
> >> `clientInstanceIds` instead? (The difference being the placement of the
> >> plural 's')
> >> I would similarly rename the class to just ClientInstanceIds
> >>
> >> we can also not have a timeout-less overload,  because `KafkaStreams`
> does
> >>> not have a `default.api.timeout.ms` config either
> >>
> >> With respect to the timeout for the Kafka Streams API, I'm a bit
> confused
> >> by the
> >> doubletriple-negative of Matthias' comment here, but I was thinking
> about
> >> this
> >> earlier and this was my take: with the current proposal, we would allow
> >> users to pass
> >> in an absolute timeout as a parameter that would apply to the method as
> a
> >> whole.
> >> Meanwhile within the method we would issue separate calls to each of the
> >> clients using
> >> the default or user-configured value of their  `default.api.timeout.ms`
> as
> >> the timeout
> >> parameter.
> >>
> >> So the API as proposed makes sense to me.
> >>
> >>
> >> On Wed, Oct 11, 2023 at 6:48 PM Matthias J. Sax 
> wrote:
> >>
> >>> In can answer 130 and 131.
> >>>
> >>> 130) We cannot guarantee that all clients are already initialized due
> to
> >>> race conditions. We plan to not all

Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-12 Thread Matthias J. Sax

Seems both Andrew and Jun prefer to merge the consumers. I am ok with this.

I'll leave it to Andrew to update the KIP accordingly, including adding 
`throws TimeoutException`.



-Matthias

On 10/12/23 10:07 AM, Jun Rao wrote:

Hi, Matthias,

130. Yes, throwing an exception sounds reasonable. It would be useful to
document this.

131. I was thinking that we could just return all consumers (including the
global consumer) through Map consumerInstanceIds() and use
keys to identify each consumer instance. The benefit is that the
implementation (whether to use a separate global consumer or not) could
change in the future, but the API can remain the same. Another slight
benefit is that there is no need for returning Optional. If the
global consumer is not used, it just won't be included in the map.

Thanks,

Jun


On Thu, Oct 12, 2023 at 9:30 AM Matthias J. Sax  wrote:


Thanks Sophie and Jun.

`clientInstanceIds()` is fine with me -- was not sure about the double
plural myself.

Sorry if my comments was confusing. I was trying to say, that adding a
overload to `KafkaStreams` that does not take a timeout parameter does
not make sense, because there is no `default.api.timeout.ms` config for
Kafka Streams, so users always need to pass in a timeout. (Same for
producer.)

For the implementation, I think KS would always call
`client.clientInstanceId(timeout)` and never rely on
`default.api.timeout.ms` though, so we can stay in control -- if a
timeout is passed by the user, it would always overwrite
`default.api.timeout.ms` on the consumer/admin and thus we should follow
the same semantics in Kafka Streams, and overwrite it explicitly when
calling `client.clientInstanceId()`.

The proposed API also makes sense to me. I was just wondering if we want
to extend it for client users -- for KS we won't need/use the
timeout-less overloads.



130) My intent was to throw a TimeoutException if we cannot get all
instanceIds, because it's the standard contract for timeouts. It would
also be hard to tell for a user, if a full or partial result was
returned (or we add a method `boolean isPartialResult()` to make it
easier for users).

If there is concerns/objections, I am also ok to return a partial result
-- it would require a change to the newly added `ClientInstanceIds`
return type -- for `adminInstanceId` we only return a `String` right now
-- we might need to change this to `Optional` so we are able to
return a partial result?


131) Of course we could, but I am not sure what we would gain? In the
end, implementation details would always leak because if we change the
number of consumer we use, we would return different keys in the `Map`.
Atm, the proposal implies that the same key might be used for the "main"
and "restore" consumer of the same thread -- but we can make keys unique
by adding a `-restore` suffix to the restore-consumer key if we merge
both maps. -- Curious to hear what others think. I am very open to do it
differently than currently proposed.


-Matthias


On 10/12/23 8:39 AM, Jun Rao wrote:

Hi, Matthias,

Thanks for the reply.

130. What would be the semantic? If the timeout has expired and only some
of the client instances' id have been retrieved, does the call return the
partial result or throw an exception?

131. Could we group all consumer instances in a single method since we

are

returning the key for each instance already? This probably also avoids
exposing implementation details that could change over time.

Thanks,

Jun

On Thu, Oct 12, 2023 at 12:00 AM Sophie Blee-Goldman <

sop...@responsive.dev>

wrote:


Regarding the naming, I personally think `clientInstanceId` makes sense

for

the plain clients
   -- especially if we might later introduce the notion of an
`applicationInstanceId`.

I'm not a huge fan of `clientsInstanceIds` for the Kafka Streams API,
though, can we use
`clientInstanceIds` instead? (The difference being the placement of the
plural 's')
I would similarly rename the class to just ClientInstanceIds

we can also not have a timeout-less overload,  because `KafkaStreams`

does

not have a `default.api.timeout.ms` config either


With respect to the timeout for the Kafka Streams API, I'm a bit

confused

by the
doubletriple-negative of Matthias' comment here, but I was thinking

about

this
earlier and this was my take: with the current proposal, we would allow
users to pass
in an absolute timeout as a parameter that would apply to the method as

a

whole.
Meanwhile within the method we would issue separate calls to each of the
clients using
the default or user-configured value of their  `default.api.timeout.ms`

as

the timeout
parameter.

So the API as proposed makes sense to me.


On Wed, Oct 11, 2023 at 6:48 PM Matthias J. Sax 

wrote:



In can answer 130 and 131.

130) We cannot guarantee that all clients are already initialized due

to

race conditions. We plan to not allow calling
`KafkaStreams#clientsInstanceIds()` when the state is not RUNNING (or
REBALANCING) though -- g

[jira] [Resolved] (KAFKA-15540) Handle heartbeat and revocation when consumer leaves group

2023-10-12 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans resolved KAFKA-15540.

Resolution: Duplicate

Duplicate of KAKFA-15548

> Handle heartbeat and revocation when consumer leaves group
> --
>
> Key: KAFKA-15540
> URL: https://issues.apache.org/jira/browse/KAFKA-15540
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> When a consumer intentionally leaves a group we should:
>  * release assignment (revoke partitions)
>  * send a last Heartbeat request with epoch -1 (or -2 if static member)
> Note that the revocation involves stop fetching, committing offsets if 
> auto-commit enabled and invoking the onPartitionsRevoked callback.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is unstable: Kafka » Kafka Branch Builder » 3.6 #91

2023-10-12 Thread Apache Jenkins Server
See 




Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #2281

2023-10-12 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 210646 lines...]
Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultStateUpdaterTest > shouldThrowIfAddingActiveTasksWithSameId() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultStateUpdaterTest > shouldThrowIfAddingActiveTasksWithSameId() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultStateUpdaterTest > 
shouldRestoreActiveStatefulTasksAndUpdateStandbyTasks() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultStateUpdaterTest > 
shouldRestoreActiveStatefulTasksAndUpdateStandbyTasks() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultStateUpdaterTest > shouldPauseStandbyTask() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultStateUpdaterTest > shouldPauseStandbyTask() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultStateUpdaterTest > shouldThrowIfStatefulTaskNotInStateRestoring() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultStateUpdaterTest > shouldThrowIfStatefulTaskNotInStateRestoring() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldSetUncaughtStreamsException() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldSetUncaughtStreamsException() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldClearTaskTimeoutOnProcessed() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldClearTaskTimeoutOnProcessed() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldUnassignTaskWhenRequired() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldUnassignTaskWhenRequired() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldClearTaskReleaseFutureOnShutdown() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldClearTaskReleaseFutureOnShutdown() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldProcessTasks() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldProcessTasks() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldPunctuateStreamTime() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldPunctuateStreamTime() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldShutdownTaskExecutor() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldShutdownTaskExecutor() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldAwaitProcessableTasksIfNoneAssignable() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldAwaitProcessableTasksIfNoneAssignable() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > 
shouldRespectPunctuationDisabledByTaskExecutionMetadata() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > 
shouldRespectPunctuationDisabledByTaskExecutionMetadata() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldSetTaskTimeoutOnTimeoutException() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldSetTaskTimeoutOnTimeoutException() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldPunctuateSystemTime() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldPunctuateSystemTime() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldUnassignTaskWhenNotProgressing() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldUnassignTaskWhenNotProgressing() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldNotFlushOnException() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > shouldNotFlushOnException() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > 
shouldRespectProcessingDisabledByTaskExecutionMetadata() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTaskExecutorTest > 
shouldRespectProcessingDisabledByTaskExecutionMetadata() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 84 > 
DefaultTas

Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-12 Thread Andrew Schofield
Hi Jun,
Thanks for your comments.

130. As Matthias described, and I am adding to the KIP, the 
`KafkaStreams#clientInstanceIds` method
is only permitted when the state is RUNNING or REBALANCING. Also, clients can 
be added dynamically
so the maps might change over time. If it’s in a permitted state, the method is 
prepared to wait up to the
supplied timeout to get the client instance ids. It does not return a partial 
result - it returns a result or
fails.

131. I’ve refactored the `ClientsInstanceIds` object and the global consumer is 
now part of the map
of consumers. There is no need for the Optional any longer. I’ve also renamed 
it `ClientInstanceIds`.

132. My reading of 
`(kafka.network:type=RequestMetrics,name=RequestsPerSec,request=*)` is that
It does not support every request type - it supports Produce, FetchConsumer and 
FetchFollower.
Consequently, I think the ClientMetricsSubscriptionRequestCount is not 
instantly obsolete.

If I’ve misunderstood, please let me know.

Thanks,
Andrew


> On 12 Oct 2023, at 01:07, Jun Rao  wrote:
>
> Hi, Andrew,
>
> Thanks for the updated KIP. Just a few more minor comments.
>
> 130. KafkaStreams.clientsInstanceId(Duration timeout): Does it wait for all
> consumer/producer/adminClient instances to be initialized? Are all those
> instances created during KafkaStreams initialization?
>
> 131. Why does globalConsumerInstanceId() return Optional while
> other consumer instances don't return Optional?
>
> 132. ClientMetricsSubscriptionRequestCount: Do we need this since we have a
> set of generic metrics
> (kafka.network:type=RequestMetrics,name=RequestsPerSec,request=*) that
> report Request rate for every request type?
>
> Thanks,
>
> Jun
>
> On Wed, Oct 11, 2023 at 1:47 PM Matthias J. Sax  wrote:
>
>> Thanks!
>>
>> On 10/10/23 11:31 PM, Andrew Schofield wrote:
>>> Matthias,
>>> Yes, I think that’s a sensible way forward and the interface you propose
>> looks good. I’ll update the KIP accordingly.
>>>
>>> Thanks,
>>> Andrew
>>>
 On 10 Oct 2023, at 23:01, Matthias J. Sax  wrote:

 Andrew,

 yes I would like to get this change into KIP-714 right way. Seems to be
>> important, as we don't know if/when a follow-up KIP for Kafka Streams would
>> land.

 I was also thinking (and discussed with a few others) how to expose it,
>> and we would propose the following:

 We add a new method to `KafkaStreams` class:

public ClientsInstanceIds clientsInstanceIds(Duration timeout);

 The returned object is like below:

  public class ClientsInstanceIds {
// we only have a single admin client per KS instance
String adminInstanceId();

// we only have a single global consumer per KS instance (if any)
// Optional<> because we might not have global-thread
Optional globalConsumerInstanceId();

// return a  ClientInstanceId> mapping
// for the underlying (restore-)consumers/producers
Map mainConsumerInstanceIds();
Map restoreConsumerInstanceIds();
Map producerInstanceIds();
 }

 For the `threadKey`, we would use some pattern like this:

  [Stream|StateUpdater]Thread-


 Would this work from your POV?



 -Matthias


 On 10/9/23 2:15 AM, Andrew Schofield wrote:
> Hi Matthias,
> Good point. Makes sense to me.
> Is this something that can also be included in the proposed Kafka
>> Streams follow-on KIP, or would you prefer that I add it to KIP-714?
> I have a slight preference for the former to put all of the KS
>> enhancements into a separate KIP.
> Thanks,
> Andrew
>> On 7 Oct 2023, at 02:12, Matthias J. Sax  wrote:
>>
>> Thanks Andrew. SGTM.
>>
>> One point you did not address is the idea to add a method to
>> `KafkaStreams` similar to the proposed `clientInstanceId()` that will be
>> added to consumer/producer/admin clients.
>>
>> Without addressing this, Kafka Streams users won't have a way to get
>> the assigned `instanceId` of the internally created clients, and thus it
>> would be very difficult for them to know which metrics that the broker
>> receives belong to a Kafka Streams app. It seems they would only find the
>> `instanceIds` in the log4j output if they enable client logging?
>>
>> Of course, because there is multiple clients inside Kafka Streams,
>> the return type cannot be an single "String", but must be some some complex
>> data structure -- we could either add a new class, or return a
>> Map using a client key that maps to the `instanceId`.
>>
>> For example we could use the following key:
>>
>>   [Global]StreamThread[-][-restore][consumer|producer]
>>
>> (Of course, only the valid combination.)
>>
>> Or maybe even better, we might want to return a `Future` because
>> collection all the `instanceId` might be a blocking all on each client? I
>> have already a few idea ho

Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-12 Thread Andrew Schofield
Hi Matthias,
I’ll answer (1) to (3).

(1) The KIP uses the phrase “client instance id” and the method name mirrors 
that. Personally, I’m
comfortable with the current name.

(2) That’s a good point. I’ll update it to use a Kafka Uuid instead.

(3) Although it’s a trivial thing to add an overload with no timeout parameter, 
the fact that it doesn’t really fit in the
Producer interface makes me prefer not to. I’d rather keep the timeout explicit 
on the method and keep the signature the
same across all three client interfaces that implement it.

I’ll update the KIP now.l

Thanks,
Andrew

> On 12 Oct 2023, at 02:47, Matthias J. Sax  wrote:
>
> In can answer 130 and 131.
>
> 130) We cannot guarantee that all clients are already initialized due to race 
> conditions. We plan to not allow calling `KafkaStreams#clientsInstanceIds()` 
> when the state is not RUNNING (or REBALANCING) though -- guess this slipped 
> on the KIP and should be added? But because StreamThreads can be added 
> dynamically (and producer might be created dynamically at runtime; cf below), 
> we still cannot guarantee that all clients are already initialized when the 
> method is called. Of course, we assume that all clients are most likely 
> initialize on the happy path, and blocking calls to 
> `client.clientInstanceId()` should be rare.
>
> To address the worst case, we won't do a naive implementation and just loop 
> over all clients, but fan-out the call to the different StreamThreads (and 
> GlobalStreamThread if it exists), and use Futures to gather the results.
>
> Currently, `StreamThreads` has 3 clients (if ALOS or EOSv2 is used), so we 
> might do 3 blocking calls in the worst case (for EOSv1 we get a producer per 
> tasks, and we might end up doing more blocking calls if the producers are not 
> initialized yet). Note that EOSv1 is already deprecated, and we are also 
> working on thread refactoring that will reduce the number of client on 
> StreamThread to 2 -- and we have more refactoring planned to reduce the 
> number of clients even further.
>
> Inside `KafakStreams#clientsInstanceIds()` we might only do single blocking 
> call for the admin client (ie, `admin.clientInstanceId()`).
>
> I agree that we need to do some clever timeout management, but it seems to be 
> more of an implementation detail?
>
> Do you have any particular concerns, or does the proposed implementation as 
> sketched above address your question?
>
>
> 130) If the Topology does not have a global-state-store, there won't be a 
> GlobalThread and thus not global consumer. Thus, we return an Optional.
>
>
>
> On three related question for Andrew.
>
> (1) Why is the method called `clientInstanceId()` and not just plain 
> `instanceId()`?
>
> (2) Why so we return a `String` while but not a UUID type? The added protocol 
> request/response classes use UUIDs.
>
> (3) Would it make sense to have an overloaded `clientInstanceId()` method 
> that does not take any parameter but uses `default.api.timeout` config (this 
> config does no exist on the producer though, so we could only have it for 
> consumer and admin at this point). We could of course also add overloads like 
> this later if user request them (and/or add `default.api.timeout.ms` to the 
> producer, too).
>
> Btw: For KafkaStreams, I think `clientsInstanceIds` still makes sense as a 
> method name though, as `KafkaStreams` itself does not have an `instanceId` -- 
> we can also not have a timeout-less overload, because `KafkaStreams` does not 
> have a `default.api.timeout.ms` config either (and I don't think it make 
> sense to add).
>
>
>
> -Matthias
>
> On 10/11/23 5:07 PM, Jun Rao wrote:
>> Hi, Andrew,
>> Thanks for the updated KIP. Just a few more minor comments.
>> 130. KafkaStreams.clientsInstanceId(Duration timeout): Does it wait for all
>> consumer/producer/adminClient instances to be initialized? Are all those
>> instances created during KafkaStreams initialization?
>> 131. Why does globalConsumerInstanceId() return Optional while
>> other consumer instances don't return Optional?
>> 132. ClientMetricsSubscriptionRequestCount: Do we need this since we have a
>> set of generic metrics
>> (kafka.network:type=RequestMetrics,name=RequestsPerSec,request=*) that
>> report Request rate for every request type?
>> Thanks,
>> Jun
>> On Wed, Oct 11, 2023 at 1:47 PM Matthias J. Sax  wrote:
>>> Thanks!
>>>
>>> On 10/10/23 11:31 PM, Andrew Schofield wrote:
 Matthias,
 Yes, I think that’s a sensible way forward and the interface you propose
>>> looks good. I’ll update the KIP accordingly.

 Thanks,
 Andrew

> On 10 Oct 2023, at 23:01, Matthias J. Sax  wrote:
>
> Andrew,
>
> yes I would like to get this change into KIP-714 right way. Seems to be
>>> important, as we don't know if/when a follow-up KIP for Kafka Streams would
>>> land.
>
> I was also thinking (and discussed with a few others) how to expose it,
>>> and we would propose the following:
>

Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-12 Thread Jun Rao
Hi, Andrew,

Thanks for the reply.

131. Could we also document how one could correlate each client instance in
KStreams with the labels for the metrics received by the brokers?

132. The documentation for RequestsPerSec is not complete. If you trace
through how
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala#L71

is
implemented, it includes every API key tagged with the corresponding
listener.

Jun

On Thu, Oct 12, 2023 at 11:42 AM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:

> Hi Jun,
> Thanks for your comments.
>
> 130. As Matthias described, and I am adding to the KIP, the
> `KafkaStreams#clientInstanceIds` method
> is only permitted when the state is RUNNING or REBALANCING. Also, clients
> can be added dynamically
> so the maps might change over time. If it’s in a permitted state, the
> method is prepared to wait up to the
> supplied timeout to get the client instance ids. It does not return a
> partial result - it returns a result or
> fails.
>
> 131. I’ve refactored the `ClientsInstanceIds` object and the global
> consumer is now part of the map
> of consumers. There is no need for the Optional any longer. I’ve also
> renamed it `ClientInstanceIds`.
>
> 132. My reading of
> `(kafka.network:type=RequestMetrics,name=RequestsPerSec,request=*)` is that
> It does not support every request type - it supports Produce,
> FetchConsumer and FetchFollower.
> Consequently, I think the ClientMetricsSubscriptionRequestCount is not
> instantly obsolete.
>
> If I’ve misunderstood, please let me know.
>
> Thanks,
> Andrew
>
>
> > On 12 Oct 2023, at 01:07, Jun Rao  wrote:
> >
> > Hi, Andrew,
> >
> > Thanks for the updated KIP. Just a few more minor comments.
> >
> > 130. KafkaStreams.clientsInstanceId(Duration timeout): Does it wait for
> all
> > consumer/producer/adminClient instances to be initialized? Are all those
> > instances created during KafkaStreams initialization?
> >
> > 131. Why does globalConsumerInstanceId() return Optional while
> > other consumer instances don't return Optional?
> >
> > 132. ClientMetricsSubscriptionRequestCount: Do we need this since we
> have a
> > set of generic metrics
> > (kafka.network:type=RequestMetrics,name=RequestsPerSec,request=*) that
> > report Request rate for every request type?
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Oct 11, 2023 at 1:47 PM Matthias J. Sax 
> wrote:
> >
> >> Thanks!
> >>
> >> On 10/10/23 11:31 PM, Andrew Schofield wrote:
> >>> Matthias,
> >>> Yes, I think that’s a sensible way forward and the interface you
> propose
> >> looks good. I’ll update the KIP accordingly.
> >>>
> >>> Thanks,
> >>> Andrew
> >>>
>  On 10 Oct 2023, at 23:01, Matthias J. Sax  wrote:
> 
>  Andrew,
> 
>  yes I would like to get this change into KIP-714 right way. Seems to
> be
> >> important, as we don't know if/when a follow-up KIP for Kafka Streams
> would
> >> land.
> 
>  I was also thinking (and discussed with a few others) how to expose
> it,
> >> and we would propose the following:
> 
>  We add a new method to `KafkaStreams` class:
> 
> public ClientsInstanceIds clientsInstanceIds(Duration timeout);
> 
>  The returned object is like below:
> 
>   public class ClientsInstanceIds {
> // we only have a single admin client per KS instance
> String adminInstanceId();
> 
> // we only have a single global consumer per KS instance (if any)
> // Optional<> because we might not have global-thread
> Optional globalConsumerInstanceId();
> 
> // return a  ClientInstanceId> mapping
> // for the underlying (restore-)consumers/producers
> Map mainConsumerInstanceIds();
> Map restoreConsumerInstanceIds();
> Map producerInstanceIds();
>  }
> 
>  For the `threadKey`, we would use some pattern like this:
> 
>   [Stream|StateUpdater]Thread-
> 
> 
>  Would this work from your POV?
> 
> 
> 
>  -Matthias
> 
> 
>  On 10/9/23 2:15 AM, Andrew Schofield wrote:
> > Hi Matthias,
> > Good point. Makes sense to me.
> > Is this something that can also be included in the proposed Kafka
> >> Streams follow-on KIP, or would you prefer that I add it to KIP-714?
> > I have a slight preference for the former to put all of the KS
> >> enhancements into a separate KIP.
> > Thanks,
> > Andrew
> >> On 7 Oct 2023, at 02:12, Matthias J. Sax  wrote:
> >>
> >> Thanks Andrew. SGTM.
> >>
> >> One point you did not address is the idea to add a method to
> >> `KafkaStreams` similar to the proposed `clientInstanceId()` that will be
> >> added to consumer/producer/admin clients.
> >>
> >> Without addressing this, Kafka Streams users won't have a way to get
> >> the assigned `instanceId` of the in

Re: [DISCUSS] KIP-985 Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-12 Thread Alieh Saeedi
Hi,
just pointing to javadocs for range() and reverseRange():

range(): *@return The iterator for this range, from smallest to largest
bytes.*
reverseRange(): * @return The reverse iterator for this range, from largest
to smallest key bytes.

Cheers,
Alieh


On Thu, Oct 12, 2023 at 7:32 AM Matthias J. Sax  wrote:

> Quick addendum.
>
> Some DSL operator use `range` and seems to rely on ascending order,
> too... Of course, if we say `range()` has no ordering guarantee, we
> would add `forwardRange()` and let the DSL use `forwardRange`, too.
>
> The discussion of course also applies to `all()` and `reveserAll()`, and
> and I assume also `prefixScan()` (even if there is no "reverse" version
> for it).
>
>
> On 10/11/23 10:22 PM, Matthias J. Sax wrote:
> > Thanks for raising this question Hanyu. Great find!
> >
> > My interpretation is as follows (it's actually a warning signal that the
> > API contract is not better defined, and we should fix this by extending
> > JavaDocs and docs on the web page about it).
> >
> > We have existing `range()` and `reverseRange()` methods on
> > `ReadOnlyKeyValueStore` -- the interface itself is not typed (ie, just
> > generics), and we state that we don't guarantee "logical order" because
> > underlying stores are based on `byte[]` type. So far so... well.
> >
> > However, to make matters worse, we are also not explicit if the
> > underlying store implementation *must* return keys is
> > byte[]-lexicographical order or not...
> >
> > For `range()`, I would be kinda willing to accept that there is no
> > ordering guarantee at all -- for example, if the underlying byte[]-store
> > is hash-based and implements a full scan to answer a `range()` it might
> > not be efficient, but also not incorrect if keys are be returned in some
> > "random" (byte[]-)order. In isolation, I don't see an API contract
> > violation.
> >
> > However, `reverseRange` implicitly states with its name, that some
> > "descending order" (base on keys) is expected. Given the JavaDoc comment
> > about "logical" vs "byte[]" order, the contract (at least to me) is
> > clear: returns records in descending byte[]-lexicographical order. --
> > Any other interpretation seems to be off? Curious to hear if you agree
> > or disagree to this interpretation?
> >
> >
> >
> > If this is correct, it means we are actually lacking a API contract for
> > ascending byte[]-lexicographical range scan. Furthermore, a hash-based
> > byte[]-store would need to actually explicitly sort it's result for
> > `reverseRange` to not violate the contract.
> >
> > To me, this raises the question if `range()` actually has a
> > (non-explicit) contract about returning data in byte[]-lexicographical
> > order? It seems a lot of people rely on this, and our default stores
> > actually implement it this way. So if we don't look at `range()` in
> > isolation, but look at the `ReadOnlyKeyValueStore` interface
> > holistically, I would also buy the argument that `range()` implies
> > "ascending "byte[]-lexicographical order". Thoughts?
> >
> > To be frank: to me, it's pretty clear that the original idea to add
> > `range()` was to return data in ascending order.
> >
> >
> > Question 1:
> >   - Do we believe that the range() contract is ascending
> > byte[]-lexicographical order right now?
> >
> > If yes, I would propose to make it explicit in the JavaDocs.
> >
> > If no, I would also propose to make it explicit in the JavaDocs. In
> > addition, it raises the question if a method `forwardRange()` (for the
> > lack of a better idea about a name right now) is actually missing to
> > provide such a contract?
> >
> >
> > Of course, we always depend on the serialization format for order, and
> > if users need "logical order" they need to ensure to use a serialization
> > format that align byte[]-lexicographical order to logical order. But for
> > the scope of this work, I would not even try to open this can of worms...
> >
> >
> >
> >
> > Looking into `RangeQuery` the JavaDocs don't say anything about order.
> > Thus, `RangeQuery#range()` could actually also be implemented by calling
> > `reverseRange()` without violating the contract as it seems. A hash-base
> > store could also implement it, without the need to explicitly sort...
> >
> > What brings be back to my original though about having three types of
> > results for `Range`
> >   - no ordering guarantee
> >   - ascending (we would only give byte[]-lexicographical order)
> >   - descending (we would only give byte[]-lexicographical order)
> >
> > Again, I actually believe that the original intent of RangeQuery was to
> > inherit the ascending order of `ReadOnlyKeyValueStore#range()`... Please
> > keep me honest about it.  On the other hand, both APIs seems to be
> > independent enough to not couple them... -- this could actually be a
> > step into the right direction and would follow the underlying idea of
> > IQv2 to begin with: decouple semantics for the store interfaces from the
> > que

Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-12 Thread Matthias J. Sax
Thanks Andrew. Makes sense to me. Adding the parameter-less overload was 
just a random idea. No need to extend the KIP.



-Matthias

On 10/12/23 12:12 PM, Jun Rao wrote:

Hi, Andrew,

Thanks for the reply.

131. Could we also document how one could correlate each client instance in
KStreams with the labels for the metrics received by the brokers?

132. The documentation for RequestsPerSec is not complete. If you trace
through how
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala#L71

is
implemented, it includes every API key tagged with the corresponding
listener.

Jun

On Thu, Oct 12, 2023 at 11:42 AM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:


Hi Jun,
Thanks for your comments.

130. As Matthias described, and I am adding to the KIP, the
`KafkaStreams#clientInstanceIds` method
is only permitted when the state is RUNNING or REBALANCING. Also, clients
can be added dynamically
so the maps might change over time. If it’s in a permitted state, the
method is prepared to wait up to the
supplied timeout to get the client instance ids. It does not return a
partial result - it returns a result or
fails.

131. I’ve refactored the `ClientsInstanceIds` object and the global
consumer is now part of the map
of consumers. There is no need for the Optional any longer. I’ve also
renamed it `ClientInstanceIds`.

132. My reading of
`(kafka.network:type=RequestMetrics,name=RequestsPerSec,request=*)` is that
It does not support every request type - it supports Produce,
FetchConsumer and FetchFollower.
Consequently, I think the ClientMetricsSubscriptionRequestCount is not
instantly obsolete.

If I’ve misunderstood, please let me know.

Thanks,
Andrew



On 12 Oct 2023, at 01:07, Jun Rao  wrote:

Hi, Andrew,

Thanks for the updated KIP. Just a few more minor comments.

130. KafkaStreams.clientsInstanceId(Duration timeout): Does it wait for

all

consumer/producer/adminClient instances to be initialized? Are all those
instances created during KafkaStreams initialization?

131. Why does globalConsumerInstanceId() return Optional while
other consumer instances don't return Optional?

132. ClientMetricsSubscriptionRequestCount: Do we need this since we

have a

set of generic metrics
(kafka.network:type=RequestMetrics,name=RequestsPerSec,request=*) that
report Request rate for every request type?

Thanks,

Jun

On Wed, Oct 11, 2023 at 1:47 PM Matthias J. Sax 

wrote:



Thanks!

On 10/10/23 11:31 PM, Andrew Schofield wrote:

Matthias,
Yes, I think that’s a sensible way forward and the interface you

propose

looks good. I’ll update the KIP accordingly.


Thanks,
Andrew


On 10 Oct 2023, at 23:01, Matthias J. Sax  wrote:

Andrew,

yes I would like to get this change into KIP-714 right way. Seems to

be

important, as we don't know if/when a follow-up KIP for Kafka Streams

would

land.


I was also thinking (and discussed with a few others) how to expose

it,

and we would propose the following:


We add a new method to `KafkaStreams` class:

public ClientsInstanceIds clientsInstanceIds(Duration timeout);

The returned object is like below:

  public class ClientsInstanceIds {
// we only have a single admin client per KS instance
String adminInstanceId();

// we only have a single global consumer per KS instance (if any)
// Optional<> because we might not have global-thread
Optional globalConsumerInstanceId();

// return a  ClientInstanceId> mapping
// for the underlying (restore-)consumers/producers
Map mainConsumerInstanceIds();
Map restoreConsumerInstanceIds();
Map producerInstanceIds();
}

For the `threadKey`, we would use some pattern like this:

  [Stream|StateUpdater]Thread-


Would this work from your POV?



-Matthias


On 10/9/23 2:15 AM, Andrew Schofield wrote:

Hi Matthias,
Good point. Makes sense to me.
Is this something that can also be included in the proposed Kafka

Streams follow-on KIP, or would you prefer that I add it to KIP-714?

I have a slight preference for the former to put all of the KS

enhancements into a separate KIP.

Thanks,
Andrew

On 7 Oct 2023, at 02:12, Matthias J. Sax  wrote:

Thanks Andrew. SGTM.

One point you did not address is the idea to add a method to

`KafkaStreams` similar to the proposed `clientInstanceId()` that will be
added to consumer/producer/admin clients.


Without addressing this, Kafka Streams users won't have a way to get

the assigned `instanceId` of the internally created clients, and thus it
would be very difficult for them to know which metrics that the broker
receives belong to a Kafka Streams app. It seems they would only find

the

`instanceIds` in the log4j output if they enable client logging?


Of course, because there is multiple clients inside Kafka Streams,

the return type cannot be an single "String", but must be some some

complex

data structure -- 

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-10-12 Thread Sophie Blee-Goldman
Hey Nick! First of all thanks for taking up this awesome feature, I'm sure
every single
Kafka Streams user and dev would agree that it is sorely needed.

I've just been catching up on the KIP and surrounding discussion, so please
forgive me
for any misunderstandings or misinterpretations of the current plan and
don't hesitate to
correct me.

Before I jump in, I just want to say that having seen this drag on for so
long, my singular
goal in responding is to help this KIP past a perceived impasse so we can
finally move on
to voting and implementing it. Long discussions are to be expected for
major features like
this but it's completely on us as the Streams devs to make sure there is an
end in sight
for any ongoing discussion.

With that said, it's my understanding that the KIP as currently proposed is
just not tenable
for Kafka Streams, and would prevent some EOS users from upgrading to the
version it
first appears in. Given that we can't predict or guarantee whether any of
the followup KIPs
would be completed in the same release cycle as this one, we need to make
sure that the
feature is either compatible with all current users or else feature-flagged
so that they may
opt in/out.

Therefore, IIUC we need to have either (or both) of these as
fully-implemented config options:
1. default.state.isolation.level
2. enable.transactional.state.stores

This way EOS users for whom read_committed semantics are not viable can
still upgrade,
and either use the isolation.level config to leverage the new txn state
stores without sacrificing
their application semantics, or else simply keep the transactional state
stores disabled until we
are able to fully implement the isolation level configuration at either an
application or query level.

Frankly you are the expert here and know much more about the tradeoffs in
both semantics and
effort level of implementing one of these configs vs the other. In my
opinion, either option would
be fine and I would leave the decision of which one to include in this KIP
completely up to you.
I just don't see a way for the KIP to proceed without some variation of the
above that would allow
EOS users to opt-out of read_committed.

(If it's all the same to you, I would recommend always including a feature
flag in large structural
changes like this. No matter how much I trust someone or myself to
implement a feature, you just
never know what kind of bugs might slip in, especially with the very first
iteration that gets released.
So personally, my choice would be to add the feature flag and leave it off
by default. If all goes well
you can do a quick KIP to enable it by default as soon as the
isolation.level config has been
completed. But feel free to just pick whichever option is easiest or
quickest for you to implement)

Hope this helps move the discussion forward,
Sophie

On Tue, Sep 19, 2023 at 1:57 AM Nick Telford  wrote:

> Hi Bruno,
>
> Agreed, I can live with that for now.
>
> In an effort to keep the scope of this KIP from expanding, I'm leaning
> towards just providing a configurable default.state.isolation.level and
> removing IsolationLevel from the StateStoreContext. This would be
> compatible with adding support for query-time IsolationLevels in the
> future, whilst providing a way for users to select an isolation level now.
>
> The big problem with this, however, is that if a user selects
> processing.mode
> = "exactly-once(-v2|-beta)", and default.state.isolation.level =
> "READ_UNCOMMITTED", we need to guarantee that the data isn't written to
> disk until commit() is called, but we also need to permit IQ threads to
> read from the ongoing transaction.
>
> A simple solution would be to (temporarily) forbid this combination of
> configuration, and have default.state.isolation.level automatically switch
> to READ_COMMITTED when processing.mode is anything other than
> at-least-once. Do you think this would be acceptable?
>
> In a later KIP, we can add support for query-time isolation levels and
> solve this particular problem there, which would relax this restriction.
>
> Regards,
> Nick
>
> On Tue, 19 Sept 2023 at 09:30, Bruno Cadonna  wrote:
>
> > Why do we need to add READ_COMMITTED to InMemoryKeyValueStore? I think
> > it is perfectly valid to say InMemoryKeyValueStore do not support
> > READ_COMMITTED for now, since READ_UNCOMMITTED is the de-facto default
> > at the moment.
> >
> > Best,
> > Bruno
> >
> > On 9/18/23 7:12 PM, Nick Telford wrote:
> > > Oh! One other concern I haven't mentioned: if we make IsolationLevel a
> > > query-time constraint, then we need to add support for READ_COMMITTED
> to
> > > InMemoryKeyValueStore too, which will require some changes to the
> > > implementation.
> > >
> > > On Mon, 18 Sept 2023 at 17:24, Nick Telford 
> > wrote:
> > >
> > >> Hi everyone,
> > >>
> > >> I agree that having IsolationLevel be determined at query-time is the
> > >> ideal design, but there are a few sticking points:
> > >>
> > >> 1.
> > >> There needs to be some way to 

Re: Requesting to be added to Kafka project

2023-10-12 Thread Alyssa Huang
Can I also have edit/write permission for Confluence to start a KIP? My
username is ahuang98, thanks a bunch

On Fri, Apr 30, 2021 at 3:31 PM Bill Bejeck  wrote:

> Hi Alyssa,
>
> I've added you to the contributors role in Jira so you should be able to
> self-assign tickets now.
>
> -Bill
>
> On Fri, Apr 30, 2021 at 5:26 PM Alyssa Huang 
> wrote:
>
> > Oops, yes I'd like to be added to JIRA, my username is alyssahuang.
> >
> > Thanks!
> >
> > On Fri, Apr 30, 2021 at 2:06 PM Justine Olshan
> > 
> > wrote:
> >
> > > Hi Alyssa,
> > > Are you asking to be added to JIRA? If so, can you provide your jira
> > > username?
> > >
> > > Thanks,
> > > Justine
> > >
> > > On Fri, Apr 30, 2021 at 9:48 AM Alyssa Huang
>  > >
> > > wrote:
> > >
> > > > Hello,
> > > >
> > > > I'm interested in contributing to Kafka! Can I be added to the
> project?
> > > >
> > > > Best,
> > > > Alyssa
> > > >
> > >
> >
>


[jira] [Created] (KAFKA-15601) Client metrics and observability

2023-10-12 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-15601:
-

 Summary: Client metrics and observability
 Key: KAFKA-15601
 URL: https://issues.apache.org/jira/browse/KAFKA-15601
 Project: Kafka
  Issue Type: Improvement
Reporter: Apoorv Mittal


This Jira tracks the development of KIP-714: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Requesting to be added to Kafka project

2023-10-12 Thread Greg Harris
Hey Alyssa,

Thanks for considering opening a KIP!
I've given you the necessary permissions to create a KIP, let me know
if you have any more issues.

Thanks!
Greg

On Thu, Oct 12, 2023 at 3:22 PM Alyssa Huang
 wrote:
>
> Can I also have edit/write permission for Confluence to start a KIP? My
> username is ahuang98, thanks a bunch
>
> On Fri, Apr 30, 2021 at 3:31 PM Bill Bejeck  wrote:
>
> > Hi Alyssa,
> >
> > I've added you to the contributors role in Jira so you should be able to
> > self-assign tickets now.
> >
> > -Bill
> >
> > On Fri, Apr 30, 2021 at 5:26 PM Alyssa Huang 
> > wrote:
> >
> > > Oops, yes I'd like to be added to JIRA, my username is alyssahuang.
> > >
> > > Thanks!
> > >
> > > On Fri, Apr 30, 2021 at 2:06 PM Justine Olshan
> > > 
> > > wrote:
> > >
> > > > Hi Alyssa,
> > > > Are you asking to be added to JIRA? If so, can you provide your jira
> > > > username?
> > > >
> > > > Thanks,
> > > > Justine
> > > >
> > > > On Fri, Apr 30, 2021 at 9:48 AM Alyssa Huang
> >  > > >
> > > > wrote:
> > > >
> > > > > Hello,
> > > > >
> > > > > I'm interested in contributing to Kafka! Can I be added to the
> > project?
> > > > >
> > > > > Best,
> > > > > Alyssa
> > > > >
> > > >
> > >
> >


[PR] MINOR: fix Kafka Streams metric names [kafka-site]

2023-10-12 Thread via GitHub


mjsax opened a new pull request, #558:
URL: https://github.com/apache/kafka-site/pull/558

   Follow up to https://github.com/apache/kafka/pull/14221


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [DISCUSS] 3.5.2 Release

2023-10-12 Thread Sophie Blee-Goldman
Thanks for volunteering Luke!

On Thu, Oct 12, 2023 at 2:55 AM Levani Kokhreidze 
wrote:

> Hi Divij,
>
> Thanks for the explanation, makes sense.
>
> Hi Luke, thanks you! It would be awesome to see 3.5.2 out.
>
> Best,
> Levani
>
> > On 12. Oct 2023, at 12:39, Luke Chen  wrote:
> >
> > Hi Levani and Divij,
> >
> > I can work on the 3.5.2 release.
> > I'll start a new thread for volunteering it maybe next week.
> >
> > Thanks.
> > Luke
> >
> > On Thu, Oct 12, 2023 at 5:07 PM Divij Vaidya 
> > wrote:
> >
> >> Hello Levani
> >>
> >> From a process perspective, there is no fixed schedule for bug fix
> >> releases. If we have a volunteer for release manager (must be a
> committer),
> >> they can start with the process of bug fix release (with the approval of
> >> PMC).
> >>
> >> My personal opinion is that it's too early to start 3.6.1 and we should
> >> wait at least 1 months to hear feedback on 3.6.0. We need to make a
> careful
> >> balance between getting the critical fixes in the hands of users as soon
> >> as possible vs. spending community effort towards releases (the effort
> that
> >> could be used to make Kafka better, feature-wise & operational
> >> stability-wise, otherwise).
> >>
> >> For 3.5.2, I think there are sufficient pending (including some CVE
> fixes)
> >> to start a bug fix release. We just need a volunteer for the release
> >> manager.
> >>
> >> --
> >> Divij Vaidya
> >>
> >>
> >>
> >> On Thu, Oct 12, 2023 at 9:57 AM Levani Kokhreidze <
> levani.co...@gmail.com>
> >> wrote:
> >>
> >>> Hello,
> >>>
> >>> KAFKA-15571 [1] was merged and backported to the 3.5 and 3.6 branches.
> >> Bug
> >>> fixes the feature that was added in 3.5. Considering the feature
> doesn't
> >>> work as expected without a fix, I would like to know if it's reasonable
> >> to
> >>> start the 3.5.2 release. Of course, releasing such a massive project
> like
> >>> Kafka is not a trivial task, and I am looking for the community's input
> >> on
> >>> this if it's reasonable to start the 3.5.2 release process.
> >>>
> >>> Best,
> >>> Levani
> >>>
> >>> [1] - https://issues.apache.org/jira/browse/KAFKA-15571
> >>
>
>


[jira] [Created] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-12 Thread Luke Kirby (Jira)
Luke Kirby created KAFKA-15602:
--

 Summary: Breaking change in 3.4.0 ByteBufferSerializer
 Key: KAFKA-15602
 URL: https://issues.apache.org/jira/browse/KAFKA-15602
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 3.5.1, 3.6.0, 3.4.1, 3.5.0, 3.4.0
Reporter: Luke Kirby


[This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
solved the situation described by KAFKA-4852, namely, to have 
ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
offsets (or, put another way, to honor the buffer's position() as the start 
point to consume bytes from). Unfortunately, it failed to actually do this, and 
instead changed the expectations for how an input ByteBuffer's limit and 
position should be set before being provided to send() on a producer configured 
with ByteBufferSerializer. Code that worked with pre-3.4.0 releases now produce 
0-length messages instead of the intended messages, effectively introducing a 
breaking change for existing users of the serializer in the wild.

Here are a few different inputs and serialized outputs under pre-3.4.0 and 
3.4.0+ to summarize the breaking change:
||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
|ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
val=test|len=4 val=test|
|ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
val=test|len=0 val=|
|ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
val=test<0><0><0><0>|len=4 val=test|
|ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
buff.limit(buff.position());|len=4 
val=test|len=4 val=test|
|ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|

Notably, plain-wrappers of byte arrays continue to work under both versions due 
to the special case in the serializer for them. I suspect that this is the 
dominant use-case, which is why this has apparently gone un-reported to this 
point. The wrapped-with-offset case fails for both cases for different reasons 
(the expected value would be "est"). As demonstrated here, you can ensure that 
a manually assembled ByteBuffer will work under both versions by ensuring that 
your buffers start have position == limit == message-length (and an actual 
desired start position of 0). Clearly, though, behavior has changed 
dramatically for the second and third case there, with the 3.3.2 behavior, in 
my experience, aligning better with naive expectations.

[Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
 the serializer would just rewind() the buffer and respect the limit as the 
indicator as to how much data was in the buffer. So, essentially, the 
prevailing contract was that the data from position 0 (always!) up to the limit 
on the buffer would be serialized; so it was really just the limit that was 
honored. So if, per the original issue, you have a byte[] array wrapped with, 
say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() with 
position = 3 indicating the desired start point to read from, but effectively 
ignored by the serializer due to the rewind().

So while the serializer didn't work when presenting a ByteBuffer view onto a 
sub-view of a backing array, it did however follow expected behavior when 
employing standard patterns to populate ByteBuffers backed by 
larger-than-necessary arrays and using limit() to identify the end of actual 
data, consistent with conventional usage of flip() to switch from writing to a 
buffer to setting it up to be read from (e.g., to be passed into a 
producer.send() call). E.g.,
{code:java}
ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
... // some sequence of 
bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
... 
bb.flip(); /* logically, this says "I am done writing, let's set this up for 
reading"; pragmatically, it sets the limit to the current position so that 
whoever reads the buffer knows when to stop reading, and sets the position to 
zero so it knows where to start reading from */ 
producer.send(bb); {code}
Technically, you wouldn't even need to use flip() there, since position is 
ignored; it would sufficient to just call {{{}bb.limit(bb.position()){}}}. 
Notably, a buffer constructed using any variant of ByteBuffer.wrap() is 
essentially immediately in read-mode with position indicating the start and 
limit the end.

With the change introduced in 3.4.0, however, the contract changes 
dramatically, and the code just presented produces a 0-byte message. As 
indicated above, it also continues to fail if you just passed in an 
offset-specified ByteBuffer.wrap()ped message, too, i.e., the case described by 
KAFKA-4852:
{code:java}
@Test
public void testByteBufferSerializerOnOffsetWrappedBytes() {
fina