Re: [VOTE] KIP-872: Add Serializer#serializeToByteBuffer() to reduce memory copying

2023-06-20 Thread ShunKang Lin
Hi all,

Thanks all for voting!

Currently we have +3 binding (from Divij Vaidya, Luke Chen and ziming deng)
and +2 non-binding(from Kirk True and Kamal Chandraprakash).

This vote thread has been open over 72 hours and has sufficient votes, so
I’ll close the voting at this time.

KIP-872 has PASSED.

Best,
ShunKang


在 2023年6月21日星期三,Divij Vaidya  写道:

> +1 (again) - binding :)
>
> Please update the PR ShunKang and tag me for review when you are ready.
>
> --
> Divij Vaidya
>
>
>
> On Tue, Jun 20, 2023 at 4:11 PM John Roesler  wrote:
>
> > Hi Divij and ShunKang,
> >
> > I pulled open this thread to see if you needed my vote, but FYI, Divij is
> > a committer now, so he can re-cast his vote as binding.
> >
> > Thanks,
> > -John
> >
> > On 2023/06/20 13:37:04 ShunKang Lin wrote:
> > > Hi all,
> > >
> > > Bump this thread again and see if we could get a few more votes.
> > > Currently we have +3 non-binding (from Divij Vaidya, Kirk True and
> Kamal
> > > Chandraprakash)  and +2 binding (from Luke Chen and ziming deng).
> > > Hoping we can get this approved, reviewed, and merged in time for
> 3.6.0.
> > >
> > > Best,
> > > ShunKang
> > >
> > > ShunKang Lin  于2023年5月7日周日 15:24写道:
> > >
> > > > Hi everyone,
> > > >
> > > > I'd like to open the vote for KIP-872, which proposes to add
> > > > Serializer#serializeToByteBuffer() to reduce memory copying.
> > > >
> > > > The proposal is here:
> > > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828
> > > >
> > > > The pull request is here:
> > > > https://github.com/apache/kafka/pull/12685
> > > >
> > > > Thanks to all who reviewed the proposal, and thanks in advance for
> > taking
> > > > the time to vote!
> > > >
> > > > Best,
> > > > ShunKang
> > > >
> > >
> >
>


Re: [VOTE] KIP-872: Add Serializer#serializeToByteBuffer() to reduce memory copying

2023-06-20 Thread ShunKang Lin
Hi John,

Thank you for your reminder!

Best,
ShunKang

John Roesler 于2023年6月20日 周二22:11写道:

> Hi Divij and ShunKang,
>
> I pulled open this thread to see if you needed my vote, but FYI, Divij is
> a committer now, so he can re-cast his vote as binding.
>
> Thanks,
> -John
>
> On 2023/06/20 13:37:04 ShunKang Lin wrote:
> > Hi all,
> >
> > Bump this thread again and see if we could get a few more votes.
> > Currently we have +3 non-binding (from Divij Vaidya, Kirk True and Kamal
> > Chandraprakash)  and +2 binding (from Luke Chen and ziming deng).
> > Hoping we can get this approved, reviewed, and merged in time for 3.6.0.
> >
> > Best,
> > ShunKang
> >
> > ShunKang Lin  于2023年5月7日周日 15:24写道:
> >
> > > Hi everyone,
> > >
> > > I'd like to open the vote for KIP-872, which proposes to add
> > > Serializer#serializeToByteBuffer() to reduce memory copying.
> > >
> > > The proposal is here:
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828
> > >
> > > The pull request is here:
> > > https://github.com/apache/kafka/pull/12685
> > >
> > > Thanks to all who reviewed the proposal, and thanks in advance for
> taking
> > > the time to vote!
> > >
> > > Best,
> > > ShunKang
> > >
> >
>


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1940

2023-06-20 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: Kafka » Kafka Branch Builder » 3.5 #24

2023-06-20 Thread Apache Jenkins Server
See 




[GitHub] [kafka-site] mjsax opened a new pull request, #528: MINOR: Add statmenet about ZK deprecation to 3.5 release blog post

2023-06-20 Thread via GitHub


mjsax opened a new pull request, #528:
URL: https://github.com/apache/kafka-site/pull/528

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Build failed in Jenkins: Kafka » Kafka Branch Builder » 3.4 #145

2023-06-20 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 525857 lines...]
[2023-06-20T23:26:33.505Z] 
/home/jenkins/workspace/Kafka_kafka_3.4@2/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:890:
 warning - Tag @link: reference not found: DefaultPartitioner
[2023-06-20T23:26:33.505Z] 
/home/jenkins/workspace/Kafka_kafka_3.4@2/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:919:
 warning - Tag @link: reference not found: DefaultPartitioner
[2023-06-20T23:26:33.505Z] 
/home/jenkins/workspace/Kafka_kafka_3.4@2/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:939:
 warning - Tag @link: reference not found: DefaultPartitioner
[2023-06-20T23:26:33.505Z] 25 warnings
[2023-06-20T23:26:34.439Z] 
[2023-06-20T23:26:34.439Z] > Task :streams:javadocJar
[2023-06-20T23:26:36.188Z] 
[2023-06-20T23:26:36.188Z] > Task :clients:javadoc
[2023-06-20T23:26:36.188Z] 
/home/jenkins/workspace/Kafka_kafka_3.4@2/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/package-info.java:21:
 warning - Tag @link: reference not found: 
org.apache.kafka.common.security.oauthbearer
[2023-06-20T23:26:37.122Z] 
/home/jenkins/workspace/Kafka_kafka_3.4@2/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/package-info.java:21:
 warning - Tag @link: reference not found: 
org.apache.kafka.common.security.oauthbearer
[2023-06-20T23:26:37.122Z] 3 warnings
[2023-06-20T23:26:38.056Z] 
[2023-06-20T23:26:38.056Z] > Task :clients:javadocJar
[2023-06-20T23:26:38.990Z] > Task :clients:testJar
[2023-06-20T23:26:38.990Z] > Task :clients:testSrcJar
[2023-06-20T23:26:38.990Z] > Task 
:clients:publishMavenJavaPublicationToMavenLocal
[2023-06-20T23:26:38.990Z] > Task :clients:publishToMavenLocal
[2023-06-20T23:26:55.358Z] > Task :core:compileScala
[2023-06-20T23:28:44.741Z] > Task :core:classes
[2023-06-20T23:28:44.741Z] > Task :core:compileTestJava NO-SOURCE
[2023-06-20T23:29:10.545Z] > Task :core:compileTestScala
[2023-06-20T23:30:43.872Z] > Task :core:testClasses
[2023-06-20T23:30:43.872Z] > Task :streams:compileTestJava UP-TO-DATE
[2023-06-20T23:30:43.872Z] > Task :streams:testClasses UP-TO-DATE
[2023-06-20T23:30:43.872Z] > Task :streams:testJar
[2023-06-20T23:30:43.872Z] > Task :streams:testSrcJar
[2023-06-20T23:30:43.872Z] > Task 
:streams:publishMavenJavaPublicationToMavenLocal
[2023-06-20T23:30:43.872Z] > Task :streams:publishToMavenLocal
[2023-06-20T23:30:43.872Z] 
[2023-06-20T23:30:43.872Z] Deprecated Gradle features were used in this build, 
making it incompatible with Gradle 8.0.
[2023-06-20T23:30:43.872Z] 
[2023-06-20T23:30:43.872Z] You can use '--warning-mode all' to show the 
individual deprecation warnings and determine if they come from your own 
scripts or plugins.
[2023-06-20T23:30:43.872Z] 
[2023-06-20T23:30:43.872Z] See 
https://docs.gradle.org/7.6/userguide/command_line_interface.html#sec:command_line_warnings
[2023-06-20T23:30:43.872Z] 
[2023-06-20T23:30:43.872Z] Execution optimizations have been disabled for 2 
invalid unit(s) of work during this build to ensure correctness.
[2023-06-20T23:30:43.872Z] Please consult deprecation warnings for more details.
[2023-06-20T23:30:43.872Z] 
[2023-06-20T23:30:43.872Z] BUILD SUCCESSFUL in 4m 45s
[2023-06-20T23:30:43.872Z] 81 actionable tasks: 35 executed, 46 up-to-date
[Pipeline] sh
[2023-06-20T23:30:46.674Z] + grep ^version= gradle.properties
[2023-06-20T23:30:46.674Z] + cut -d= -f 2
[Pipeline] dir
[2023-06-20T23:30:47.356Z] Running in 
/home/jenkins/workspace/Kafka_kafka_3.4@2/streams/quickstart
[Pipeline] {
[Pipeline] sh
[2023-06-20T23:30:49.481Z] + mvn clean install -Dgpg.skip
[2023-06-20T23:30:52.258Z] [INFO] Scanning for projects...
[2023-06-20T23:30:52.258Z] [INFO] 

[2023-06-20T23:30:52.258Z] [INFO] Reactor Build Order:
[2023-06-20T23:30:52.258Z] [INFO] 
[2023-06-20T23:30:52.258Z] [INFO] Kafka Streams :: Quickstart   
 [pom]
[2023-06-20T23:30:52.258Z] [INFO] streams-quickstart-java   
 [maven-archetype]
[2023-06-20T23:30:52.258Z] [INFO] 
[2023-06-20T23:30:52.258Z] [INFO] < 
org.apache.kafka:streams-quickstart >-
[2023-06-20T23:30:52.258Z] [INFO] Building Kafka Streams :: Quickstart 
3.4.1-SNAPSHOT[1/2]
[2023-06-20T23:30:52.258Z] [INFO]   from pom.xml
[2023-06-20T23:30:52.258Z] [INFO] [ pom 
]-
[2023-06-20T23:30:53.231Z] [INFO] 
[2023-06-20T23:30:53.231Z] [INFO] --- clean:3.0.0:clean (default-clean) @ 
streams-quickstart ---
[2023-06-20T23:30:53.231Z] [INFO] 
[2023-06-20T23:30:53.231Z] [INFO] --- remote-resources:1.5:process 
(process-resource-bundles) @ streams-quickstart ---
[2023-06-20T23:30:54.161Z] [INFO] 
[2023-06-20T23:30:54.161Z] [INFO] --- 

Build failed in Jenkins: Kafka » Kafka Branch Builder » 3.3 #178

2023-06-20 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 256696 lines...]
[2023-06-20T23:01:13.548Z] 
[2023-06-20T23:01:13.548Z] 
org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest > 
shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables STARTED
[2023-06-20T23:01:15.311Z] 
[2023-06-20T23:01:15.311Z] 
org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest > 
shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables PASSED
[2023-06-20T23:01:16.381Z] 
[2023-06-20T23:01:16.381Z] 
org.apache.kafka.streams.integration.StateDirectoryIntegrationTest > 
testNotCleanUpStateDirIfNotEmpty STARTED
[2023-06-20T23:01:19.040Z] 
[2023-06-20T23:01:19.040Z] 
org.apache.kafka.streams.integration.StateDirectoryIntegrationTest > 
testNotCleanUpStateDirIfNotEmpty PASSED
[2023-06-20T23:01:19.040Z] 
[2023-06-20T23:01:19.040Z] 
org.apache.kafka.streams.integration.StateDirectoryIntegrationTest > 
testCleanUpStateDirIfEmpty STARTED
[2023-06-20T23:01:22.074Z] 
[2023-06-20T23:01:22.074Z] 
org.apache.kafka.streams.integration.StateDirectoryIntegrationTest > 
testCleanUpStateDirIfEmpty PASSED
[2023-06-20T23:01:28.687Z] 
[2023-06-20T23:01:28.687Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQuerySpecificActivePartitionStores STARTED
[2023-06-20T23:01:33.724Z] 
[2023-06-20T23:01:33.724Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQuerySpecificActivePartitionStores PASSED
[2023-06-20T23:01:33.724Z] 
[2023-06-20T23:01:33.724Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQueryAllStalePartitionStores STARTED
[2023-06-20T23:01:38.431Z] 
[2023-06-20T23:01:38.431Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQueryAllStalePartitionStores PASSED
[2023-06-20T23:01:38.431Z] 
[2023-06-20T23:01:38.431Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQuerySpecificStalePartitionStoresMultiStreamThreads STARTED
[2023-06-20T23:01:45.746Z] 
[2023-06-20T23:01:45.746Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQuerySpecificStalePartitionStoresMultiStreamThreads PASSED
[2023-06-20T23:01:45.746Z] 
[2023-06-20T23:01:45.746Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQuerySpecificStalePartitionStores STARTED
[2023-06-20T23:01:51.584Z] 
[2023-06-20T23:01:51.584Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQuerySpecificStalePartitionStores PASSED
[2023-06-20T23:01:51.584Z] 
[2023-06-20T23:01:51.584Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQueryOnlyActivePartitionStoresByDefault STARTED
[2023-06-20T23:01:58.541Z] 
[2023-06-20T23:01:58.541Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQueryOnlyActivePartitionStoresByDefault PASSED
[2023-06-20T23:01:58.541Z] 
[2023-06-20T23:01:58.541Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQueryStoresAfterAddingAndRemovingStreamThread STARTED
[2023-06-20T23:02:10.487Z] 
[2023-06-20T23:02:10.487Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQueryStoresAfterAddingAndRemovingStreamThread PASSED
[2023-06-20T23:02:10.487Z] 
[2023-06-20T23:02:10.487Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQuerySpecificStalePartitionStoresMultiStreamThreadsNamedTopology STARTED
[2023-06-20T23:02:14.145Z] 
[2023-06-20T23:02:14.145Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQuerySpecificStalePartitionStoresMultiStreamThreadsNamedTopology PASSED
[2023-06-20T23:02:14.145Z] 
[2023-06-20T23:02:14.145Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testInnerRepartitioned[caching enabled = true] STARTED
[2023-06-20T23:02:26.526Z] 
[2023-06-20T23:02:26.526Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testInnerRepartitioned[caching enabled = true] PASSED
[2023-06-20T23:02:26.526Z] 
[2023-06-20T23:02:26.526Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testOuterRepartitioned[caching enabled = true] STARTED
[2023-06-20T23:02:36.805Z] 
[2023-06-20T23:02:36.805Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testOuterRepartitioned[caching enabled = true] PASSED
[2023-06-20T23:02:36.805Z] 
[2023-06-20T23:02:36.805Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testInner[caching enabled = true] STARTED
[2023-06-20T23:02:43.152Z] 
[2023-06-20T23:02:43.152Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testInner[caching enabled = true] PASSED
[2023-06-20T23:02:43.152Z] 
[2023-06-20T23:02:43.152Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testOuter[caching enabled = true] STARTED
[2023-06-20T23:02:48.174Z] 

Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1939

2023-06-20 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-20 Thread Nick Telford
Here's what I'm thinking: based on Bruno's earlier feedback, I'm going to
try to simplify my original design down such that it needs no/minimal
changes to the public interface.

If that succeeds, then it should also be possible to transparently
implement the "no memtables" solution as a performance optimization when
the record cache is enabled. I consider this approach only an optimisation,
because of the need to still support stores with the cache disabled.

For that reason, I think the "no memtables" approach would probably best be
suited as a follow-up KIP, but that we keep it in mind during the design of
this one.

What do you think?

Regards,
Nick


On Tue, 20 Jun 2023, 22:26 John Roesler,  wrote:

> Oh, that's a good point.
>
> On the topic of a behavioral switch for disabled caches, the typical use
> case for disabling the cache is to cause each individual update to
> propagate down the topology, so another thought might be to just go
> ahead and add the memory we would have used for the memtables to the
> cache size, but if people did disable the cache entirely, then we could
> still go ahead and forward the records on each write?
>
> I know that Guozhang was also proposing for a while to actually decouple
> caching and forwarding, which might provide a way to side-step this
> dilemma (i.e., we just always forward and only apply the cache to state
> and changelog writes).
>
> By the way, I'm basing my statement on why you'd disable caches on
> memory, but also on the guidance here:
>
> https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html
> . That doc also contains a section on how to bound the total memory
> usage across RocksDB memtables, which points to another benefit of
> disabling memtables and managing the write buffer ourselves (simplified
> memory configuration).
>
> Thanks,
> -John
>
> On 6/20/23 16:05, Nick Telford wrote:
> > Potentially we could just go the memorable with Rocks WriteBatches route
> if
> > the cache is disabled?
> >
> > On Tue, 20 Jun 2023, 22:00 John Roesler,  wrote:
> >
> >> Touché!
> >>
> >> Ok, I agree that figuring out the case of a disabled cache would be
> >> non-trivial. Ingesting single-record SST files will probably not be
> >> performant, but benchmarking may prove different. Or maybe we can have
> >> some reserved cache space on top of the user-configured cache, which we
> >> would have reclaimed from the memtable space. Or some other, more
> >> creative solution.
> >>
> >> Thanks,
> >> -John
> >>
> >> On 6/20/23 15:30, Nick Telford wrote:
>  Note that users can disable the cache, which would still be
> >>> ok, I think. We wouldn't ingest the SST files on every record, but just
> >>> append to them and only ingest them on commit, when we're already
> >>> waiting for acks and a RocksDB commit.
> >>>
> >>> In this case, how would uncommitted records be read by joins?
> >>>
> >>> On Tue, 20 Jun 2023, 20:51 John Roesler,  wrote:
> >>>
>  Ah, sorry Nick,
> 
>  I just meant the regular heap based cache that we maintain in
> Streams. I
>  see that it's not called "RecordCache" (my mistake).
> 
>  The actual cache is ThreadCache:
> 
> 
> >>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
> 
>  Here's the example of how we use the cache in KeyValueStore:
> 
> 
> >>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
> 
>  It's basically just an on-heap Map of records that have not yet been
>  written to the changelog or flushed into the underlying store. It gets
>  flushed when the total cache size exceeds `cache.max.bytes.buffering`
> or
>  the `commit.interval.ms` elapses.
> 
>  Speaking of those configs, another benefit to this idea is that we
> would
>  no longer need to trigger extra commits based on the size of the
> ongoing
>  transaction. Instead, we'd just preserve the existing cache-flush
>  behavior. Note that users can disable the cache, which would still be
>  ok, I think. We wouldn't ingest the SST files on every record, but
> just
>  append to them and only ingest them on commit, when we're already
>  waiting for acks and a RocksDB commit.
> 
>  Thanks,
>  -John
> 
>  On 6/20/23 14:09, Nick Telford wrote:
> > Hi John,
> >
> > By "RecordCache", do you mean the RocksDB "WriteBatch"? I can't find
> >> any
> > class called "RecordCache"...
> >
> > Cheers,
> >
> > Nick
> >
> > On Tue, 20 Jun 2023 at 19:42, John Roesler 
> >> wrote:
> >
> >> Hi Nick,
> >>
> >> Thanks for picking this up again!
> >>
> >> I did have one new thought over the intervening months, which I'd
> like
> >> your take on.
> >>
> >> What if, instead of using the RocksDB atomic write primitive at all,
> >> 

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-20 Thread John Roesler

Oh, that's a good point.

On the topic of a behavioral switch for disabled caches, the typical use 
case for disabling the cache is to cause each individual update to 
propagate down the topology, so another thought might be to just go 
ahead and add the memory we would have used for the memtables to the 
cache size, but if people did disable the cache entirely, then we could 
still go ahead and forward the records on each write?


I know that Guozhang was also proposing for a while to actually decouple 
caching and forwarding, which might provide a way to side-step this 
dilemma (i.e., we just always forward and only apply the cache to state 
and changelog writes).


By the way, I'm basing my statement on why you'd disable caches on 
memory, but also on the guidance here: 
https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html 
. That doc also contains a section on how to bound the total memory 
usage across RocksDB memtables, which points to another benefit of 
disabling memtables and managing the write buffer ourselves (simplified 
memory configuration).


Thanks,
-John

On 6/20/23 16:05, Nick Telford wrote:

Potentially we could just go the memorable with Rocks WriteBatches route if
the cache is disabled?

On Tue, 20 Jun 2023, 22:00 John Roesler,  wrote:


Touché!

Ok, I agree that figuring out the case of a disabled cache would be
non-trivial. Ingesting single-record SST files will probably not be
performant, but benchmarking may prove different. Or maybe we can have
some reserved cache space on top of the user-configured cache, which we
would have reclaimed from the memtable space. Or some other, more
creative solution.

Thanks,
-John

On 6/20/23 15:30, Nick Telford wrote:

Note that users can disable the cache, which would still be

ok, I think. We wouldn't ingest the SST files on every record, but just
append to them and only ingest them on commit, when we're already
waiting for acks and a RocksDB commit.

In this case, how would uncommitted records be read by joins?

On Tue, 20 Jun 2023, 20:51 John Roesler,  wrote:


Ah, sorry Nick,

I just meant the regular heap based cache that we maintain in Streams. I
see that it's not called "RecordCache" (my mistake).

The actual cache is ThreadCache:



https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java


Here's the example of how we use the cache in KeyValueStore:



https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java


It's basically just an on-heap Map of records that have not yet been
written to the changelog or flushed into the underlying store. It gets
flushed when the total cache size exceeds `cache.max.bytes.buffering` or
the `commit.interval.ms` elapses.

Speaking of those configs, another benefit to this idea is that we would
no longer need to trigger extra commits based on the size of the ongoing
transaction. Instead, we'd just preserve the existing cache-flush
behavior. Note that users can disable the cache, which would still be
ok, I think. We wouldn't ingest the SST files on every record, but just
append to them and only ingest them on commit, when we're already
waiting for acks and a RocksDB commit.

Thanks,
-John

On 6/20/23 14:09, Nick Telford wrote:

Hi John,

By "RecordCache", do you mean the RocksDB "WriteBatch"? I can't find

any

class called "RecordCache"...

Cheers,

Nick

On Tue, 20 Jun 2023 at 19:42, John Roesler 

wrote:



Hi Nick,

Thanks for picking this up again!

I did have one new thought over the intervening months, which I'd like
your take on.

What if, instead of using the RocksDB atomic write primitive at all,

we

instead just:
1. disable memtables entirely
2. directly write the RecordCache into SST files when we flush
3. atomically ingest the SST file(s) into RocksDB when we get the ACK
from the changelog (see





https://github.com/EighteenZi/rocksdb_wiki/blob/master/Creating-and-Ingesting-SST-files.md

and





https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/IngestExternalFileOptions.java

and





https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L1413-L1429

)
4. track the changelog offsets either in another CF or the same CF

with

a reserved key, either of which will make the changelog offset update
atomic with the file ingestions

I suspect this'll have a number of benefits:
* writes to RocksDB will always be atomic
* we don't fragment memory between the RecordCache and the memtables
* RecordCache gives far higher performance than memtable for reads and
writes
* we don't need any new "transaction" concepts or memory bound configs

What do you think?

Thanks,
-John

On 6/20/23 10:51, Nick Telford wrote:

Hi Bruno,

Thanks for reviewing the KIP. It's been a long road, I started

working

on

this more than a year ago, and most of the time in the last 6 months

has

been spent on the 

Re: [VOTE] KIP-872: Add Serializer#serializeToByteBuffer() to reduce memory copying

2023-06-20 Thread Divij Vaidya
+1 (again) - binding :)

Please update the PR ShunKang and tag me for review when you are ready.

--
Divij Vaidya



On Tue, Jun 20, 2023 at 4:11 PM John Roesler  wrote:

> Hi Divij and ShunKang,
>
> I pulled open this thread to see if you needed my vote, but FYI, Divij is
> a committer now, so he can re-cast his vote as binding.
>
> Thanks,
> -John
>
> On 2023/06/20 13:37:04 ShunKang Lin wrote:
> > Hi all,
> >
> > Bump this thread again and see if we could get a few more votes.
> > Currently we have +3 non-binding (from Divij Vaidya, Kirk True and Kamal
> > Chandraprakash)  and +2 binding (from Luke Chen and ziming deng).
> > Hoping we can get this approved, reviewed, and merged in time for 3.6.0.
> >
> > Best,
> > ShunKang
> >
> > ShunKang Lin  于2023年5月7日周日 15:24写道:
> >
> > > Hi everyone,
> > >
> > > I'd like to open the vote for KIP-872, which proposes to add
> > > Serializer#serializeToByteBuffer() to reduce memory copying.
> > >
> > > The proposal is here:
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828
> > >
> > > The pull request is here:
> > > https://github.com/apache/kafka/pull/12685
> > >
> > > Thanks to all who reviewed the proposal, and thanks in advance for
> taking
> > > the time to vote!
> > >
> > > Best,
> > > ShunKang
> > >
> >
>


Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-20 Thread Nick Telford
Potentially we could just go the memorable with Rocks WriteBatches route if
the cache is disabled?

On Tue, 20 Jun 2023, 22:00 John Roesler,  wrote:

> Touché!
>
> Ok, I agree that figuring out the case of a disabled cache would be
> non-trivial. Ingesting single-record SST files will probably not be
> performant, but benchmarking may prove different. Or maybe we can have
> some reserved cache space on top of the user-configured cache, which we
> would have reclaimed from the memtable space. Or some other, more
> creative solution.
>
> Thanks,
> -John
>
> On 6/20/23 15:30, Nick Telford wrote:
> >> Note that users can disable the cache, which would still be
> > ok, I think. We wouldn't ingest the SST files on every record, but just
> > append to them and only ingest them on commit, when we're already
> > waiting for acks and a RocksDB commit.
> >
> > In this case, how would uncommitted records be read by joins?
> >
> > On Tue, 20 Jun 2023, 20:51 John Roesler,  wrote:
> >
> >> Ah, sorry Nick,
> >>
> >> I just meant the regular heap based cache that we maintain in Streams. I
> >> see that it's not called "RecordCache" (my mistake).
> >>
> >> The actual cache is ThreadCache:
> >>
> >>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
> >>
> >> Here's the example of how we use the cache in KeyValueStore:
> >>
> >>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
> >>
> >> It's basically just an on-heap Map of records that have not yet been
> >> written to the changelog or flushed into the underlying store. It gets
> >> flushed when the total cache size exceeds `cache.max.bytes.buffering` or
> >> the `commit.interval.ms` elapses.
> >>
> >> Speaking of those configs, another benefit to this idea is that we would
> >> no longer need to trigger extra commits based on the size of the ongoing
> >> transaction. Instead, we'd just preserve the existing cache-flush
> >> behavior. Note that users can disable the cache, which would still be
> >> ok, I think. We wouldn't ingest the SST files on every record, but just
> >> append to them and only ingest them on commit, when we're already
> >> waiting for acks and a RocksDB commit.
> >>
> >> Thanks,
> >> -John
> >>
> >> On 6/20/23 14:09, Nick Telford wrote:
> >>> Hi John,
> >>>
> >>> By "RecordCache", do you mean the RocksDB "WriteBatch"? I can't find
> any
> >>> class called "RecordCache"...
> >>>
> >>> Cheers,
> >>>
> >>> Nick
> >>>
> >>> On Tue, 20 Jun 2023 at 19:42, John Roesler 
> wrote:
> >>>
>  Hi Nick,
> 
>  Thanks for picking this up again!
> 
>  I did have one new thought over the intervening months, which I'd like
>  your take on.
> 
>  What if, instead of using the RocksDB atomic write primitive at all,
> we
>  instead just:
>  1. disable memtables entirely
>  2. directly write the RecordCache into SST files when we flush
>  3. atomically ingest the SST file(s) into RocksDB when we get the ACK
>  from the changelog (see
> 
> 
> >>
> https://github.com/EighteenZi/rocksdb_wiki/blob/master/Creating-and-Ingesting-SST-files.md
>  and
> 
> 
> >>
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/IngestExternalFileOptions.java
>  and
> 
> 
> >>
> https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L1413-L1429
>  )
>  4. track the changelog offsets either in another CF or the same CF
> with
>  a reserved key, either of which will make the changelog offset update
>  atomic with the file ingestions
> 
>  I suspect this'll have a number of benefits:
>  * writes to RocksDB will always be atomic
>  * we don't fragment memory between the RecordCache and the memtables
>  * RecordCache gives far higher performance than memtable for reads and
>  writes
>  * we don't need any new "transaction" concepts or memory bound configs
> 
>  What do you think?
> 
>  Thanks,
>  -John
> 
>  On 6/20/23 10:51, Nick Telford wrote:
> > Hi Bruno,
> >
> > Thanks for reviewing the KIP. It's been a long road, I started
> working
> >> on
> > this more than a year ago, and most of the time in the last 6 months
> >> has
> > been spent on the "Atomic Checkpointing" stuff that's been benched,
> so
>  some
> > of the reasoning behind some of my decisions have been lost, but I'll
> >> do
>  my
> > best to reconstruct them.
> >
> > 1.
> > IIRC, this was the initial approach I tried. I don't remember the
> exact
> > reasons I changed it to use a separate "view" of the StateStore that
> > encapsulates the transaction, but I believe it had something to do
> with
> > concurrent access to the StateStore from Interactive Query threads.
> >> Reads
> > from interactive queries need to be isolated 

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-20 Thread John Roesler

Touché!

Ok, I agree that figuring out the case of a disabled cache would be 
non-trivial. Ingesting single-record SST files will probably not be 
performant, but benchmarking may prove different. Or maybe we can have 
some reserved cache space on top of the user-configured cache, which we 
would have reclaimed from the memtable space. Or some other, more 
creative solution.


Thanks,
-John

On 6/20/23 15:30, Nick Telford wrote:

Note that users can disable the cache, which would still be

ok, I think. We wouldn't ingest the SST files on every record, but just
append to them and only ingest them on commit, when we're already
waiting for acks and a RocksDB commit.

In this case, how would uncommitted records be read by joins?

On Tue, 20 Jun 2023, 20:51 John Roesler,  wrote:


Ah, sorry Nick,

I just meant the regular heap based cache that we maintain in Streams. I
see that it's not called "RecordCache" (my mistake).

The actual cache is ThreadCache:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java

Here's the example of how we use the cache in KeyValueStore:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java

It's basically just an on-heap Map of records that have not yet been
written to the changelog or flushed into the underlying store. It gets
flushed when the total cache size exceeds `cache.max.bytes.buffering` or
the `commit.interval.ms` elapses.

Speaking of those configs, another benefit to this idea is that we would
no longer need to trigger extra commits based on the size of the ongoing
transaction. Instead, we'd just preserve the existing cache-flush
behavior. Note that users can disable the cache, which would still be
ok, I think. We wouldn't ingest the SST files on every record, but just
append to them and only ingest them on commit, when we're already
waiting for acks and a RocksDB commit.

Thanks,
-John

On 6/20/23 14:09, Nick Telford wrote:

Hi John,

By "RecordCache", do you mean the RocksDB "WriteBatch"? I can't find any
class called "RecordCache"...

Cheers,

Nick

On Tue, 20 Jun 2023 at 19:42, John Roesler  wrote:


Hi Nick,

Thanks for picking this up again!

I did have one new thought over the intervening months, which I'd like
your take on.

What if, instead of using the RocksDB atomic write primitive at all, we
instead just:
1. disable memtables entirely
2. directly write the RecordCache into SST files when we flush
3. atomically ingest the SST file(s) into RocksDB when we get the ACK
from the changelog (see



https://github.com/EighteenZi/rocksdb_wiki/blob/master/Creating-and-Ingesting-SST-files.md

and



https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/IngestExternalFileOptions.java

and



https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L1413-L1429

)
4. track the changelog offsets either in another CF or the same CF with
a reserved key, either of which will make the changelog offset update
atomic with the file ingestions

I suspect this'll have a number of benefits:
* writes to RocksDB will always be atomic
* we don't fragment memory between the RecordCache and the memtables
* RecordCache gives far higher performance than memtable for reads and
writes
* we don't need any new "transaction" concepts or memory bound configs

What do you think?

Thanks,
-John

On 6/20/23 10:51, Nick Telford wrote:

Hi Bruno,

Thanks for reviewing the KIP. It's been a long road, I started working

on

this more than a year ago, and most of the time in the last 6 months

has

been spent on the "Atomic Checkpointing" stuff that's been benched, so

some

of the reasoning behind some of my decisions have been lost, but I'll

do

my

best to reconstruct them.

1.
IIRC, this was the initial approach I tried. I don't remember the exact
reasons I changed it to use a separate "view" of the StateStore that
encapsulates the transaction, but I believe it had something to do with
concurrent access to the StateStore from Interactive Query threads.

Reads

from interactive queries need to be isolated from the currently ongoing
transaction, both for consistency (so interactive queries don't observe
changes that are subsequently rolled-back), but also to prevent

Iterators

opened by an interactive query from being closed and invalidated by the
StreamThread when it commits the transaction, which causes your

interactive

queries to crash.

Another reason I believe I implemented it this way was a separation of
concerns. Recall that newTransaction() originally created an object of

type

Transaction, not StateStore. My intent was to improve the type-safety

of

the API, in an effort to ensure Transactions weren't used incorrectly.
Unfortunately, this didn't pan out, but newTransaction() remained.

Finally, this had the added benefit that implementations could easily

add

support for transactions *without* 

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-20 Thread Nick Telford
> Note that users can disable the cache, which would still be
ok, I think. We wouldn't ingest the SST files on every record, but just
append to them and only ingest them on commit, when we're already
waiting for acks and a RocksDB commit.

In this case, how would uncommitted records be read by joins?

On Tue, 20 Jun 2023, 20:51 John Roesler,  wrote:

> Ah, sorry Nick,
>
> I just meant the regular heap based cache that we maintain in Streams. I
> see that it's not called "RecordCache" (my mistake).
>
> The actual cache is ThreadCache:
>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
>
> Here's the example of how we use the cache in KeyValueStore:
>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
>
> It's basically just an on-heap Map of records that have not yet been
> written to the changelog or flushed into the underlying store. It gets
> flushed when the total cache size exceeds `cache.max.bytes.buffering` or
> the `commit.interval.ms` elapses.
>
> Speaking of those configs, another benefit to this idea is that we would
> no longer need to trigger extra commits based on the size of the ongoing
> transaction. Instead, we'd just preserve the existing cache-flush
> behavior. Note that users can disable the cache, which would still be
> ok, I think. We wouldn't ingest the SST files on every record, but just
> append to them and only ingest them on commit, when we're already
> waiting for acks and a RocksDB commit.
>
> Thanks,
> -John
>
> On 6/20/23 14:09, Nick Telford wrote:
> > Hi John,
> >
> > By "RecordCache", do you mean the RocksDB "WriteBatch"? I can't find any
> > class called "RecordCache"...
> >
> > Cheers,
> >
> > Nick
> >
> > On Tue, 20 Jun 2023 at 19:42, John Roesler  wrote:
> >
> >> Hi Nick,
> >>
> >> Thanks for picking this up again!
> >>
> >> I did have one new thought over the intervening months, which I'd like
> >> your take on.
> >>
> >> What if, instead of using the RocksDB atomic write primitive at all, we
> >> instead just:
> >> 1. disable memtables entirely
> >> 2. directly write the RecordCache into SST files when we flush
> >> 3. atomically ingest the SST file(s) into RocksDB when we get the ACK
> >> from the changelog (see
> >>
> >>
> https://github.com/EighteenZi/rocksdb_wiki/blob/master/Creating-and-Ingesting-SST-files.md
> >> and
> >>
> >>
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/IngestExternalFileOptions.java
> >> and
> >>
> >>
> https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L1413-L1429
> >> )
> >> 4. track the changelog offsets either in another CF or the same CF with
> >> a reserved key, either of which will make the changelog offset update
> >> atomic with the file ingestions
> >>
> >> I suspect this'll have a number of benefits:
> >> * writes to RocksDB will always be atomic
> >> * we don't fragment memory between the RecordCache and the memtables
> >> * RecordCache gives far higher performance than memtable for reads and
> >> writes
> >> * we don't need any new "transaction" concepts or memory bound configs
> >>
> >> What do you think?
> >>
> >> Thanks,
> >> -John
> >>
> >> On 6/20/23 10:51, Nick Telford wrote:
> >>> Hi Bruno,
> >>>
> >>> Thanks for reviewing the KIP. It's been a long road, I started working
> on
> >>> this more than a year ago, and most of the time in the last 6 months
> has
> >>> been spent on the "Atomic Checkpointing" stuff that's been benched, so
> >> some
> >>> of the reasoning behind some of my decisions have been lost, but I'll
> do
> >> my
> >>> best to reconstruct them.
> >>>
> >>> 1.
> >>> IIRC, this was the initial approach I tried. I don't remember the exact
> >>> reasons I changed it to use a separate "view" of the StateStore that
> >>> encapsulates the transaction, but I believe it had something to do with
> >>> concurrent access to the StateStore from Interactive Query threads.
> Reads
> >>> from interactive queries need to be isolated from the currently ongoing
> >>> transaction, both for consistency (so interactive queries don't observe
> >>> changes that are subsequently rolled-back), but also to prevent
> Iterators
> >>> opened by an interactive query from being closed and invalidated by the
> >>> StreamThread when it commits the transaction, which causes your
> >> interactive
> >>> queries to crash.
> >>>
> >>> Another reason I believe I implemented it this way was a separation of
> >>> concerns. Recall that newTransaction() originally created an object of
> >> type
> >>> Transaction, not StateStore. My intent was to improve the type-safety
> of
> >>> the API, in an effort to ensure Transactions weren't used incorrectly.
> >>> Unfortunately, this didn't pan out, but newTransaction() remained.
> >>>
> >>> Finally, this had the added benefit that implementations could easily
> add
> >>> support for transactions 

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-20 Thread John Roesler

Ah, sorry Nick,

I just meant the regular heap based cache that we maintain in Streams. I 
see that it's not called "RecordCache" (my mistake).


The actual cache is ThreadCache: 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java


Here's the example of how we use the cache in KeyValueStore:
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java

It's basically just an on-heap Map of records that have not yet been 
written to the changelog or flushed into the underlying store. It gets 
flushed when the total cache size exceeds `cache.max.bytes.buffering` or 
the `commit.interval.ms` elapses.


Speaking of those configs, another benefit to this idea is that we would 
no longer need to trigger extra commits based on the size of the ongoing 
transaction. Instead, we'd just preserve the existing cache-flush 
behavior. Note that users can disable the cache, which would still be 
ok, I think. We wouldn't ingest the SST files on every record, but just 
append to them and only ingest them on commit, when we're already 
waiting for acks and a RocksDB commit.


Thanks,
-John

On 6/20/23 14:09, Nick Telford wrote:

Hi John,

By "RecordCache", do you mean the RocksDB "WriteBatch"? I can't find any
class called "RecordCache"...

Cheers,

Nick

On Tue, 20 Jun 2023 at 19:42, John Roesler  wrote:


Hi Nick,

Thanks for picking this up again!

I did have one new thought over the intervening months, which I'd like
your take on.

What if, instead of using the RocksDB atomic write primitive at all, we
instead just:
1. disable memtables entirely
2. directly write the RecordCache into SST files when we flush
3. atomically ingest the SST file(s) into RocksDB when we get the ACK
from the changelog (see

https://github.com/EighteenZi/rocksdb_wiki/blob/master/Creating-and-Ingesting-SST-files.md
and

https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/IngestExternalFileOptions.java
and

https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L1413-L1429
)
4. track the changelog offsets either in another CF or the same CF with
a reserved key, either of which will make the changelog offset update
atomic with the file ingestions

I suspect this'll have a number of benefits:
* writes to RocksDB will always be atomic
* we don't fragment memory between the RecordCache and the memtables
* RecordCache gives far higher performance than memtable for reads and
writes
* we don't need any new "transaction" concepts or memory bound configs

What do you think?

Thanks,
-John

On 6/20/23 10:51, Nick Telford wrote:

Hi Bruno,

Thanks for reviewing the KIP. It's been a long road, I started working on
this more than a year ago, and most of the time in the last 6 months has
been spent on the "Atomic Checkpointing" stuff that's been benched, so

some

of the reasoning behind some of my decisions have been lost, but I'll do

my

best to reconstruct them.

1.
IIRC, this was the initial approach I tried. I don't remember the exact
reasons I changed it to use a separate "view" of the StateStore that
encapsulates the transaction, but I believe it had something to do with
concurrent access to the StateStore from Interactive Query threads. Reads
from interactive queries need to be isolated from the currently ongoing
transaction, both for consistency (so interactive queries don't observe
changes that are subsequently rolled-back), but also to prevent Iterators
opened by an interactive query from being closed and invalidated by the
StreamThread when it commits the transaction, which causes your

interactive

queries to crash.

Another reason I believe I implemented it this way was a separation of
concerns. Recall that newTransaction() originally created an object of

type

Transaction, not StateStore. My intent was to improve the type-safety of
the API, in an effort to ensure Transactions weren't used incorrectly.
Unfortunately, this didn't pan out, but newTransaction() remained.

Finally, this had the added benefit that implementations could easily add
support for transactions *without* re-writing their existing,
non-transactional implementation. I think this can be a benefit both for
implementers of custom StateStores, but also for anyone extending
RocksDbStore, as they can rely on the existing access methods working how
they expect them to.

I'm not too happy with the way the current design has panned out, so I'm
open to ideas on how to improve it. Key to this is finding some way to
ensure that reads from Interactive Query threads are properly isolated

from

the transaction, *without* the performance overhead of checking which
thread the method is being called from on every access.

As for replacing flush() with commit() - I saw no reason to add this
complexity to the KIP, unless there was a need to add arguments to the
flush/commit method. This need arises with Atomic 

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-20 Thread Nick Telford
Hi John,

I think you're referring to the "record cache" that's provided by the
ThreadCache class?

1-3.
I was hoping to (eventually) remove the "flush-on-commit" behaviour from
RocksDbStore, so that RocksDB can choose when to flush memtables, enabling
users to tailor RocksDB performance to their workload. Explicitly flushing
the Record Cache to files instead would entail either flushing on every
commit, or the current behaviour, of flushing on every commit provided at
least 10K records have been processed. Compared with RocksDB-managed
memtable flushing, this is very inflexible. If we pursue this design, I
highly recommend replacing the hard-coded 10K limit with something
configurable so that users can tune flush behaviour for their workloads.

4.
Tracking the changelog offsets in another CF and atomically updating it
with the main CFs is orthogonal, I think, as it can be done when using
memtables provided the "Atomic Flush" feature of RocksDB is enabled. This
is something I'd originally planned for this KIP, but we're trying to pull
out into a later KIP to make things more manageable.

> * we don't fragment memory between the RecordCache and the memtables
I think by memory fragmentation, you mean duplication, because we're
caching the records both in the (on-heap) Record Cache and the RocksDB
memtables? This is a good point that I hadn't considered before. Wouldn't a
simpler solution be to just disable the record cache for RocksDB stores (by
default), and let the memtables do the caching? Although I guess that would
reduce read performance, which could be especially important for joins.

> * RecordCache gives far higher performance than memtable for reads and
writes
I'll concede this point. The JNI boundary plus RocksDB record encoding will
likely make it impossible to ever match the Record Cache on throughput.

> * we don't need any new "transaction" concepts or memory bound configs
Maybe. Unless I'm mistaken, the Record Cache only retains the most recently
written value for a key, which would mean that Interactive Queries would
always observe new record values *before* they're committed to the
changelog. While this is the current behaviour, it's also a violation of
consistency, because successive IQ could observe a regression of a value,
due to an error writing to the changelog (e.g. a changelog transaction
rollback or a timeout). This is something that KIP-892 aims to improve on,
as the current design would ensure that records are only observed by IQ
*after* they have been committed to the Kafka changelog.

That said, it definitely sounds *feasible*.

Regards,

Nick


Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-20 Thread Nick Telford
Hi John,

By "RecordCache", do you mean the RocksDB "WriteBatch"? I can't find any
class called "RecordCache"...

Cheers,

Nick

On Tue, 20 Jun 2023 at 19:42, John Roesler  wrote:

> Hi Nick,
>
> Thanks for picking this up again!
>
> I did have one new thought over the intervening months, which I'd like
> your take on.
>
> What if, instead of using the RocksDB atomic write primitive at all, we
> instead just:
> 1. disable memtables entirely
> 2. directly write the RecordCache into SST files when we flush
> 3. atomically ingest the SST file(s) into RocksDB when we get the ACK
> from the changelog (see
>
> https://github.com/EighteenZi/rocksdb_wiki/blob/master/Creating-and-Ingesting-SST-files.md
> and
>
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/IngestExternalFileOptions.java
> and
>
> https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L1413-L1429
> )
> 4. track the changelog offsets either in another CF or the same CF with
> a reserved key, either of which will make the changelog offset update
> atomic with the file ingestions
>
> I suspect this'll have a number of benefits:
> * writes to RocksDB will always be atomic
> * we don't fragment memory between the RecordCache and the memtables
> * RecordCache gives far higher performance than memtable for reads and
> writes
> * we don't need any new "transaction" concepts or memory bound configs
>
> What do you think?
>
> Thanks,
> -John
>
> On 6/20/23 10:51, Nick Telford wrote:
> > Hi Bruno,
> >
> > Thanks for reviewing the KIP. It's been a long road, I started working on
> > this more than a year ago, and most of the time in the last 6 months has
> > been spent on the "Atomic Checkpointing" stuff that's been benched, so
> some
> > of the reasoning behind some of my decisions have been lost, but I'll do
> my
> > best to reconstruct them.
> >
> > 1.
> > IIRC, this was the initial approach I tried. I don't remember the exact
> > reasons I changed it to use a separate "view" of the StateStore that
> > encapsulates the transaction, but I believe it had something to do with
> > concurrent access to the StateStore from Interactive Query threads. Reads
> > from interactive queries need to be isolated from the currently ongoing
> > transaction, both for consistency (so interactive queries don't observe
> > changes that are subsequently rolled-back), but also to prevent Iterators
> > opened by an interactive query from being closed and invalidated by the
> > StreamThread when it commits the transaction, which causes your
> interactive
> > queries to crash.
> >
> > Another reason I believe I implemented it this way was a separation of
> > concerns. Recall that newTransaction() originally created an object of
> type
> > Transaction, not StateStore. My intent was to improve the type-safety of
> > the API, in an effort to ensure Transactions weren't used incorrectly.
> > Unfortunately, this didn't pan out, but newTransaction() remained.
> >
> > Finally, this had the added benefit that implementations could easily add
> > support for transactions *without* re-writing their existing,
> > non-transactional implementation. I think this can be a benefit both for
> > implementers of custom StateStores, but also for anyone extending
> > RocksDbStore, as they can rely on the existing access methods working how
> > they expect them to.
> >
> > I'm not too happy with the way the current design has panned out, so I'm
> > open to ideas on how to improve it. Key to this is finding some way to
> > ensure that reads from Interactive Query threads are properly isolated
> from
> > the transaction, *without* the performance overhead of checking which
> > thread the method is being called from on every access.
> >
> > As for replacing flush() with commit() - I saw no reason to add this
> > complexity to the KIP, unless there was a need to add arguments to the
> > flush/commit method. This need arises with Atomic Checkpointing, but that
> > will be implemented separately, in a future KIP. Do you see a need for
> some
> > arguments to the flush/commit method that I've missed? Or were you simply
> > suggesting a rename?
> >
> > 2.
> > This is simply due to the practical reason that isolationLevel() is
> really
> > a proxy for checking if the app is under EOS. The application
> configuration
> > is not provided to the constructor of StateStores, but it *is* provided
> to
> > init(), via StateStoreContext. For this reason, it seemed somewhat
> natural
> > to add it to StateStoreContext. I think this makes sense, since the
> > IsolationLevel of all StateStores in an application *must* be the same,
> and
> > since those stores are all initialized with the same StateStoreContext,
> it
> > seems natural for that context to carry the desired IsolationLevel to
> use.
> >
> > 3.
> > Using IsolationLevel instead of just passing `boolean eosEnabled`, like
> > much of the internals was an attempt to logically de-couple the
> StateStore
> > API from the 

Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #1938

2023-06-20 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-20 Thread John Roesler

Hi Nick,

Thanks for picking this up again!

I did have one new thought over the intervening months, which I'd like 
your take on.


What if, instead of using the RocksDB atomic write primitive at all, we 
instead just:

1. disable memtables entirely
2. directly write the RecordCache into SST files when we flush
3. atomically ingest the SST file(s) into RocksDB when we get the ACK 
from the changelog (see 
https://github.com/EighteenZi/rocksdb_wiki/blob/master/Creating-and-Ingesting-SST-files.md 
and 
https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/IngestExternalFileOptions.java 
and 
https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L1413-L1429)
4. track the changelog offsets either in another CF or the same CF with 
a reserved key, either of which will make the changelog offset update 
atomic with the file ingestions


I suspect this'll have a number of benefits:
* writes to RocksDB will always be atomic
* we don't fragment memory between the RecordCache and the memtables
* RecordCache gives far higher performance than memtable for reads and 
writes

* we don't need any new "transaction" concepts or memory bound configs

What do you think?

Thanks,
-John

On 6/20/23 10:51, Nick Telford wrote:

Hi Bruno,

Thanks for reviewing the KIP. It's been a long road, I started working on
this more than a year ago, and most of the time in the last 6 months has
been spent on the "Atomic Checkpointing" stuff that's been benched, so some
of the reasoning behind some of my decisions have been lost, but I'll do my
best to reconstruct them.

1.
IIRC, this was the initial approach I tried. I don't remember the exact
reasons I changed it to use a separate "view" of the StateStore that
encapsulates the transaction, but I believe it had something to do with
concurrent access to the StateStore from Interactive Query threads. Reads
from interactive queries need to be isolated from the currently ongoing
transaction, both for consistency (so interactive queries don't observe
changes that are subsequently rolled-back), but also to prevent Iterators
opened by an interactive query from being closed and invalidated by the
StreamThread when it commits the transaction, which causes your interactive
queries to crash.

Another reason I believe I implemented it this way was a separation of
concerns. Recall that newTransaction() originally created an object of type
Transaction, not StateStore. My intent was to improve the type-safety of
the API, in an effort to ensure Transactions weren't used incorrectly.
Unfortunately, this didn't pan out, but newTransaction() remained.

Finally, this had the added benefit that implementations could easily add
support for transactions *without* re-writing their existing,
non-transactional implementation. I think this can be a benefit both for
implementers of custom StateStores, but also for anyone extending
RocksDbStore, as they can rely on the existing access methods working how
they expect them to.

I'm not too happy with the way the current design has panned out, so I'm
open to ideas on how to improve it. Key to this is finding some way to
ensure that reads from Interactive Query threads are properly isolated from
the transaction, *without* the performance overhead of checking which
thread the method is being called from on every access.

As for replacing flush() with commit() - I saw no reason to add this
complexity to the KIP, unless there was a need to add arguments to the
flush/commit method. This need arises with Atomic Checkpointing, but that
will be implemented separately, in a future KIP. Do you see a need for some
arguments to the flush/commit method that I've missed? Or were you simply
suggesting a rename?

2.
This is simply due to the practical reason that isolationLevel() is really
a proxy for checking if the app is under EOS. The application configuration
is not provided to the constructor of StateStores, but it *is* provided to
init(), via StateStoreContext. For this reason, it seemed somewhat natural
to add it to StateStoreContext. I think this makes sense, since the
IsolationLevel of all StateStores in an application *must* be the same, and
since those stores are all initialized with the same StateStoreContext, it
seems natural for that context to carry the desired IsolationLevel to use.

3.
Using IsolationLevel instead of just passing `boolean eosEnabled`, like
much of the internals was an attempt to logically de-couple the StateStore
API from the internals of Kafka Streams. Technically, StateStores don't
need to know/care what processing mode the KS app is using, all they need
to know is the isolation level expected of them.

Having formal definitions for the expectations of the two required
IsolationLevels allow implementers to implement transactional stores
without having to dig through the internals of Kafka Streams and understand
exactly how they are used. The tight coupling between state stores and
internal behaviour has 

[jira] [Created] (KAFKA-15109) ISR not expanding on ZK brokers during migration

2023-06-20 Thread David Arthur (Jira)
David Arthur created KAFKA-15109:


 Summary: ISR not expanding on ZK brokers during migration
 Key: KAFKA-15109
 URL: https://issues.apache.org/jira/browse/KAFKA-15109
 Project: Kafka
  Issue Type: Bug
  Components: kraft, replication
Affects Versions: 3.5.0
Reporter: David Arthur


KAFKA-15021 introduced a new controller behavior that avoids increasing the 
leader epoch during the controlled shutdown scenario. This prevents some 
unnecessary thrashing of metadata and threads on the brokers and clients. 

While a cluster is in a KIP-866 migration and has a KRaft controller with ZK 
brokers, we cannot employ this leader epoch bump avoidance. The ZK brokers must 
have the leader epoch bump in order for the ISR expansion to complete.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-20 Thread Nick Telford
Hi Bruno,

Thanks for reviewing the KIP. It's been a long road, I started working on
this more than a year ago, and most of the time in the last 6 months has
been spent on the "Atomic Checkpointing" stuff that's been benched, so some
of the reasoning behind some of my decisions have been lost, but I'll do my
best to reconstruct them.

1.
IIRC, this was the initial approach I tried. I don't remember the exact
reasons I changed it to use a separate "view" of the StateStore that
encapsulates the transaction, but I believe it had something to do with
concurrent access to the StateStore from Interactive Query threads. Reads
from interactive queries need to be isolated from the currently ongoing
transaction, both for consistency (so interactive queries don't observe
changes that are subsequently rolled-back), but also to prevent Iterators
opened by an interactive query from being closed and invalidated by the
StreamThread when it commits the transaction, which causes your interactive
queries to crash.

Another reason I believe I implemented it this way was a separation of
concerns. Recall that newTransaction() originally created an object of type
Transaction, not StateStore. My intent was to improve the type-safety of
the API, in an effort to ensure Transactions weren't used incorrectly.
Unfortunately, this didn't pan out, but newTransaction() remained.

Finally, this had the added benefit that implementations could easily add
support for transactions *without* re-writing their existing,
non-transactional implementation. I think this can be a benefit both for
implementers of custom StateStores, but also for anyone extending
RocksDbStore, as they can rely on the existing access methods working how
they expect them to.

I'm not too happy with the way the current design has panned out, so I'm
open to ideas on how to improve it. Key to this is finding some way to
ensure that reads from Interactive Query threads are properly isolated from
the transaction, *without* the performance overhead of checking which
thread the method is being called from on every access.

As for replacing flush() with commit() - I saw no reason to add this
complexity to the KIP, unless there was a need to add arguments to the
flush/commit method. This need arises with Atomic Checkpointing, but that
will be implemented separately, in a future KIP. Do you see a need for some
arguments to the flush/commit method that I've missed? Or were you simply
suggesting a rename?

2.
This is simply due to the practical reason that isolationLevel() is really
a proxy for checking if the app is under EOS. The application configuration
is not provided to the constructor of StateStores, but it *is* provided to
init(), via StateStoreContext. For this reason, it seemed somewhat natural
to add it to StateStoreContext. I think this makes sense, since the
IsolationLevel of all StateStores in an application *must* be the same, and
since those stores are all initialized with the same StateStoreContext, it
seems natural for that context to carry the desired IsolationLevel to use.

3.
Using IsolationLevel instead of just passing `boolean eosEnabled`, like
much of the internals was an attempt to logically de-couple the StateStore
API from the internals of Kafka Streams. Technically, StateStores don't
need to know/care what processing mode the KS app is using, all they need
to know is the isolation level expected of them.

Having formal definitions for the expectations of the two required
IsolationLevels allow implementers to implement transactional stores
without having to dig through the internals of Kafka Streams and understand
exactly how they are used. The tight coupling between state stores and
internal behaviour has actually significantly hindered my progress on this
KIP, and encouraged me to avoid increasing this logical coupling as much as
possible.

This also frees implementations to satisfy those requirements in any way
they choose. Transactions might not be the only/available approach to an
implementation, but they might have an alternative way to satisfy the
isolation requirements. I admit that this point is more about semantics,
but "transactional" would need to be formally defined in order for
implementers to provide a valid implementation, and these IsolationLevels
provide that formal definition.

4.
I can remove them. I added them only as I planned to include them in the
org.apache.kafka.streams.state package, as a recommended base
implementation for all StateStores, including those implemented by users. I
had assumed that anything in "public" packages, such as
org.apache.kafka.streams.state, should be included in a KIP. Is that wrong?

5.
RocksDB provides no way to measure the actual size of a
WriteBatch(WithIndex), so we're limited to tracking the sum total of the
size of keys + values that are written to the transaction. This obviously
under-estimates the actual memory usage, because WriteBatch no-doubt
includes some record overheads, and 

Re: First Contribution

2023-06-20 Thread Mickael Maison
Hi Steven,

Thanks for your contribution! This is definitively a process we need
to automate. I left a few comments in your PRs.

Thanks,
Mickael

On Tue, Jun 13, 2023 at 8:11 PM Lovish Madhu
 wrote:
>
> Looks good, but I am also a newbie.
>
> However I feel like variables such as n = 10 as some meaningful variable
> name as it is global constant.
> Also you forgot to use n and instead used 10
> here: refreshed_collaborators[:10]
>
> On Tue, Jun 13, 2023 at 11:23 PM Steven Booke 
> wrote:
>
> > To whom this may concern,
> >
> > This is my first time contributing and I have just submitted my first PR. I
> > have made changes to both apache/kafka-site and apache/kafka repos. I
> > believe I have followed the detailed instructions for contributing a code
> > change and am looking for feedback.
> >
> > Here are the PRs for reference: https://github.com/apache/kafka/pull/13842
> > and https://github.com/apache/kafka-site/pull/521
> >
> > Here is the JIRA ticket for reference:
> > https://issues.apache.org/jira/browse/KAFKA-14995
> >
> > --
> > Regards
> >
> > Steven Booke
> >


Re: [DISCUSS] KIP-714: Client metrics and observability

2023-06-20 Thread Kirk True
Hi Andrew,



> On Jun 13, 2023, at 8:06 AM, Andrew Schofield 
>  wrote:
> 
> Hi,
> I would like to start a new discussion thread on KIP-714: Client metrics and 
> observability.
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability
> 
> I have edited the proposal significantly to reduce the scope. The overall 
> mechanism for client metric subscriptions is unchanged, but the
> KIP is now based on the existing client metrics, rather than introducing new 
> metrics. The purpose remains helping cluster operators
> investigate performance problems experienced by clients without requiring 
> changes to the client application code or configuration.
> 
> Thanks,
> Andrew

Thanks for the KIP updates. A few questions:

1. The concept of a client instance ID is somewhat similar to the unique 
producer ID that is created for transactional producers. Can we augment the 
name or somehow clarify that this client instance ID is only for use by 
telemetry? The pandora’s box alternative is to make the creation, management, 
etc. of a unique, per-client instance ID a first-class construct. I assume 
that’s something we don’t want to bite off in this KIP ;)

2. I’m having trouble understanding where this provision for the terminating 
flag would be useful:

> The Terminating flag may be reused upon the next expiry of PushIntervalMs.

In the happy path, the terminating flag is set once at time of application 
shutdown by the close() method of a client. A buggy/nefarious client may send 
multiple push telemetry requests with the terminating flag set to skirt 
throttling. What’s the use case where an application would want to send a 
second request with the terminating flag set after PushInteralMs?

3. KIP-848 introduces a new flavor of regex for topic subscriptions. Is that 
what we plan to adopt for the regex used by the subscription match?

4. What’s the benefit of having the broker specify the delta temporality if 
it’s (for now) always delta, besides API protocol bumping?

5. What is gained by the existence of the ClientTelemetry interface? Why not 
let interested parties implement ClientTelemetryReceiver directly?

Thanks!

[GitHub] [kafka-site] mimaison commented on pull request #521: KAFKA-14995: Automate asf.yaml collaborators refresh

2023-06-20 Thread via GitHub


mimaison commented on PR #521:
URL: https://github.com/apache/kafka-site/pull/521#issuecomment-1599054544

   Let's sort out https://github.com/apache/kafka/pull/13842 first but this 
will need to be updated to include our new committer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [DISCUSS] KIP-660: Pluggable ReplicaPlacer

2023-06-20 Thread Viktor Somogyi-Vass
Hey all,

I'd like to revive this discussion. I've created
https://cwiki.apache.org/confluence/display/KAFKA/KIP-879%3A+Multi-level+Rack+Awareness
last November and it seems to be that there is a nice overlap between the
two and would be good to merge. Should we revive KIP-660 and merge the two
KIPs?
If you don't have time for this Mickael currently, I'm happy to take it
over from you and merge the two interfaces, it seems like they're somewhat
similar (and also with the current internal interface).

Best,
Viktor

On Tue, May 31, 2022 at 3:57 PM Mickael Maison 
wrote:

> Hi Vikas,
>
> You make some very good points and most importantly I agree that being
> able to prevent putting new partitions on a broker should be part of
> Kafka itself and not require a plugin.
>
> This feature would addresses 2 out of the 3 scenarios mentioned in the
> motivation section. The last one "When adding brokers to a cluster,
> Kafka currently does not necessarily place new partitions on new
> brokers" is clearly less important.
>
> So I think I'll retire this KIP and I'll follow up with a new KIP to
> focus on that feature.
>
> Thanks,
> Mickael
>
>
> On Mon, May 9, 2022 at 8:11 PM Vikas Singh 
> wrote:
> >
> > Hi Mickael,
> >
> > It's a nice proposal. It's appealing to have a pluggable way to override
> > default kafka placement decisions, and the motivation section lists some
> of
> > them. Here are few comments:
> >
> > * The motivation section has "When adding brokers to a cluster, Kafka
> > currently does not necessarily place new partitions on new brokers". I am
> > not sure how valuable doing this will be. A newly created kafka topic
> takes
> > time to reach the same usage level as existing topics, say because the
> > topic created by a new workload that is getting onboarded, or the
> expansion
> > was done to relieve disk pressure on existing nodes etc. While new topics
> > catch up to existing workload, the new brokers are not sharing equal load
> > in the cluster, which probably defeats the purpose of adding new brokers.
> > In addition to that clustering new topics like this on new brokers have
> > implications from fault domain perspective. A reasonable way to approach
> it
> > is to indeed use CruiseControl to move things around so that the newly
> > added nodes become immediately involved and share cluster load.
> > * Regarding "When administrators want to remove brokers from a cluster,
> > there is no way to prevent Kafka from placing partitions on them", this
> is
> > indeed an issue. I would argue that this is needed by everyone and should
> > be part of Kafka, instead of being implemented as part of a plugin
> > interface by multiple teams.
> > * For "When some brokers are near their storage/throughput limit, Kafka
> > could avoid putting new partitions on them", while this can help relieve
> > short term overload I think again the correct solution here is something
> > like CruiseControl where the system is monitored and things moved around
> to
> > maintain a balanced cluster. A new topic will not take any disk space, so
> > placing them anywhere normally isn't going to add to the storage
> overload.
> > Similar to the previous case, maybe a mechanism in Kafka to put nodes in
> a
> > quarantine state is a better way to approach this.
> >
> > In terms of the proposed api, I have a couple of comments:
> >
> > * It is not clear if the proposal applies to partitions of new topics or
> > addition on partitions to an existing topic. Explicitly stating that will
> > be helpful.
> > * Regarding part "To address the use cases identified in the motivation
> > section, some knowledge about the current state of the cluster is
> > necessary. Details whether a new broker has just been added or is being
> > decommissioned are not part of the cluster metadata. Therefore such
> > knowledge has to be provided via an external means to the ReplicaPlacer,
> > for example via the configuration". It's not clear how this will be done.
> > If I have to implement this interface, it will be helpful to have clear
> > guidance/examples here which hopefully ties to the use cases in the
> > motivation section. It also allows us to figure out if the proposed
> > interface is complete and helps future implementers of the interface.
> >
> > Couple of minor comments:
> > * The KIP is not listed in the main KIP page (
> >
> https://cwiki-test.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> ).
> > Can you please add it there.
> > * The page has "This is especially true for the 4 scenarios listed in the
> > Motivation section", but there are only 3 scenarios listed.
> >
> > Regards,
> > Vikas
> >
> >
> > On Tue, May 3, 2022 at 5:51 PM Colin McCabe  wrote:
> >
> > > Hi Mickael,
> > >
> > > We did discuss this earlier, and I remember not being too enthusiastic
> > > about a pluggable policy here :)
> > >
> > > There have been several changes to the placement code in the last few
> > > weeks. (These are examples of the kind 

Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #1937

2023-06-20 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 484806 lines...]
[2023-06-20T14:57:29.188Z] 230 actionable tasks: 124 executed, 106 up-to-date
[2023-06-20T14:57:29.188Z] 
[2023-06-20T14:57:29.188Z] See the profiling report at: 
file:///home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk/build/reports/profile/profile-2023-06-20-12-03-56.html
[2023-06-20T14:57:29.188Z] A fine-grained performance profile is available: use 
the --scan option.
[2023-06-20T14:57:29.657Z] > Task :core:testClasses
[2023-06-20T14:57:29.657Z] > Task :streams:compileTestJava UP-TO-DATE
[2023-06-20T14:57:29.657Z] > Task :streams:testClasses UP-TO-DATE
[2023-06-20T14:57:29.657Z] > Task :streams:testJar
[2023-06-20T14:57:29.657Z] > Task :streams:testSrcJar
[2023-06-20T14:57:29.657Z] > Task 
:streams:publishMavenJavaPublicationToMavenLocal
[2023-06-20T14:57:29.657Z] > Task :streams:publishToMavenLocal
[2023-06-20T14:57:29.657Z] 
[2023-06-20T14:57:29.657Z] Deprecated Gradle features were used in this build, 
making it incompatible with Gradle 9.0.
[2023-06-20T14:57:29.657Z] 
[2023-06-20T14:57:29.657Z] You can use '--warning-mode all' to show the 
individual deprecation warnings and determine if they come from your own 
scripts or plugins.
[2023-06-20T14:57:29.657Z] 
[2023-06-20T14:57:29.657Z] See 
https://docs.gradle.org/8.1.1/userguide/command_line_interface.html#sec:command_line_warnings
[2023-06-20T14:57:29.657Z] 
[2023-06-20T14:57:29.657Z] BUILD SUCCESSFUL in 5m 37s
[2023-06-20T14:57:29.657Z] 89 actionable tasks: 33 executed, 56 up-to-date
[Pipeline] sh
[Pipeline] junit
[2023-06-20T14:57:31.758Z] Recording test results
[2023-06-20T14:57:32.340Z] + grep ^version= gradle.properties
[2023-06-20T14:57:32.340Z] + cut -d= -f 2
[Pipeline] dir
[2023-06-20T14:57:33.029Z] Running in 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk/streams/quickstart
[Pipeline] {
[Pipeline] sh
[2023-06-20T14:57:35.696Z] + mvn clean install -Dgpg.skip
[2023-06-20T14:57:38.344Z] [INFO] Scanning for projects...
[2023-06-20T14:57:39.283Z] [INFO] 

[2023-06-20T14:57:39.283Z] [INFO] Reactor Build Order:
[2023-06-20T14:57:39.283Z] [INFO] 
[2023-06-20T14:57:39.283Z] [INFO] Kafka Streams :: Quickstart   
 [pom]
[2023-06-20T14:57:39.283Z] [INFO] streams-quickstart-java   
 [maven-archetype]
[2023-06-20T14:57:39.283Z] [INFO] 
[2023-06-20T14:57:39.283Z] [INFO] < 
org.apache.kafka:streams-quickstart >-
[2023-06-20T14:57:39.283Z] [INFO] Building Kafka Streams :: Quickstart 
3.6.0-SNAPSHOT[1/2]
[2023-06-20T14:57:39.283Z] [INFO]   from pom.xml
[2023-06-20T14:57:39.283Z] [INFO] [ pom 
]-
[2023-06-20T14:57:39.283Z] [INFO] 
[2023-06-20T14:57:39.283Z] [INFO] --- clean:3.0.0:clean (default-clean) @ 
streams-quickstart ---
[2023-06-20T14:57:39.283Z] [INFO] 
[2023-06-20T14:57:39.283Z] [INFO] --- remote-resources:1.5:process 
(process-resource-bundles) @ streams-quickstart ---
[2023-06-20T14:57:40.282Z] [INFO] 
[2023-06-20T14:57:40.282Z] [INFO] --- site:3.5.1:attach-descriptor 
(attach-descriptor) @ streams-quickstart ---
[2023-06-20T14:57:42.039Z] [INFO] 
[2023-06-20T14:57:42.039Z] [INFO] --- gpg:1.6:sign (sign-artifacts) @ 
streams-quickstart ---
[2023-06-20T14:57:42.039Z] [INFO] 
[2023-06-20T14:57:42.039Z] [INFO] --- install:2.5.2:install (default-install) @ 
streams-quickstart ---
[2023-06-20T14:57:42.039Z] [INFO] Installing 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk/streams/quickstart/pom.xml
 to 
/home/jenkins/.m2/repository/org/apache/kafka/streams-quickstart/3.6.0-SNAPSHOT/streams-quickstart-3.6.0-SNAPSHOT.pom
[2023-06-20T14:57:42.039Z] [INFO] 
[2023-06-20T14:57:42.039Z] [INFO] --< 
org.apache.kafka:streams-quickstart-java >--
[2023-06-20T14:57:42.039Z] [INFO] Building streams-quickstart-java 
3.6.0-SNAPSHOT[2/2]
[2023-06-20T14:57:42.039Z] [INFO]   from java/pom.xml
[2023-06-20T14:57:42.039Z] [INFO] --[ maven-archetype 
]---
[2023-06-20T14:57:42.039Z] [INFO] 
[2023-06-20T14:57:42.039Z] [INFO] --- clean:3.0.0:clean (default-clean) @ 
streams-quickstart-java ---
[2023-06-20T14:57:42.039Z] [INFO] 
[2023-06-20T14:57:42.039Z] [INFO] --- remote-resources:1.5:process 
(process-resource-bundles) @ streams-quickstart-java ---
[2023-06-20T14:57:42.039Z] [INFO] 
[2023-06-20T14:57:42.039Z] [INFO] --- resources:2.7:resources 
(default-resources) @ streams-quickstart-java ---
[2023-06-20T14:57:42.039Z] [INFO] Using 'UTF-8' encoding to copy filtered 
resources.
[2023-06-20T14:57:42.039Z] [INFO] Copying 6 resources
[2023-06-20T14:57:42.039Z] [INFO] Copying 3 resources
[2023-06-20T14:57:42.039Z] [INFO] 

[jira] [Resolved] (KAFKA-15087) Move InterBrokerSendThread to server-commons module

2023-06-20 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-15087.
-
Fix Version/s: 3.6.0
 Reviewer: David Jacot
   Resolution: Fixed

> Move InterBrokerSendThread to server-commons module
> ---
>
> Key: KAFKA-15087
> URL: https://issues.apache.org/jira/browse/KAFKA-15087
> Project: Kafka
>  Issue Type: Task
>Reporter: Dimitar Dimitrov
>Assignee: Dimitar Dimitrov
>Priority: Major
> Fix For: 3.6.0
>
>
> Similar to the move of {{ShutdownableThread}} done with KAFKA-14706.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Request for review for my PR

2023-06-20 Thread Owen Leung
Hi there,

Can I ask for a quick review for my PR below ? I've addressed the comments
a few days ago and I am eager to hear from you guys.

https://github.com/apache/kafka/pull/13773

Thanks a lot
Owen


[GitHub] [kafka-site] zzccctv opened a new pull request, #527: Update powered-by.html

2023-06-20 Thread via GitHub


zzccctv opened a new pull request, #527:
URL: https://github.com/apache/kafka-site/pull/527

   Added Kafkaide 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [VOTE] KIP-872: Add Serializer#serializeToByteBuffer() to reduce memory copying

2023-06-20 Thread John Roesler
Hi Divij and ShunKang,

I pulled open this thread to see if you needed my vote, but FYI, Divij is a 
committer now, so he can re-cast his vote as binding.

Thanks,
-John

On 2023/06/20 13:37:04 ShunKang Lin wrote:
> Hi all,
> 
> Bump this thread again and see if we could get a few more votes.
> Currently we have +3 non-binding (from Divij Vaidya, Kirk True and Kamal
> Chandraprakash)  and +2 binding (from Luke Chen and ziming deng).
> Hoping we can get this approved, reviewed, and merged in time for 3.6.0.
> 
> Best,
> ShunKang
> 
> ShunKang Lin  于2023年5月7日周日 15:24写道:
> 
> > Hi everyone,
> >
> > I'd like to open the vote for KIP-872, which proposes to add
> > Serializer#serializeToByteBuffer() to reduce memory copying.
> >
> > The proposal is here:
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828
> >
> > The pull request is here:
> > https://github.com/apache/kafka/pull/12685
> >
> > Thanks to all who reviewed the proposal, and thanks in advance for taking
> > the time to vote!
> >
> > Best,
> > ShunKang
> >
> 


Re: [VOTE] KIP-872: Add Serializer#serializeToByteBuffer() to reduce memory copying

2023-06-20 Thread ShunKang Lin
Hi all,

Bump this thread again and see if we could get a few more votes.
Currently we have +3 non-binding (from Divij Vaidya, Kirk True and Kamal
Chandraprakash)  and +2 binding (from Luke Chen and ziming deng).
Hoping we can get this approved, reviewed, and merged in time for 3.6.0.

Best,
ShunKang

ShunKang Lin  于2023年5月7日周日 15:24写道:

> Hi everyone,
>
> I'd like to open the vote for KIP-872, which proposes to add
> Serializer#serializeToByteBuffer() to reduce memory copying.
>
> The proposal is here:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828
>
> The pull request is here:
> https://github.com/apache/kafka/pull/12685
>
> Thanks to all who reviewed the proposal, and thanks in advance for taking
> the time to vote!
>
> Best,
> ShunKang
>


Build streaming graph processing on kafka

2023-06-20 Thread pzwpzw

Kafka is a widely used streaming storage system. Many streaming processing system 
have build on kafka. e.g Flink & Spark Streaming.Recently We have build a 
streaming graph processing engine on kafka which can read data from kafkaand do 
incremental graph computing, in the end writing the results to kafka. Here is the 
github address of the project: 
https://github.com/TuGraph-family/tugraph-analyticsLooking forward to more discuss 
about the streaming graph processing system on kafka from the community.Sorry for 
bother you all. Thanks!

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-20 Thread Bruno Cadonna

Hi Nick,

Thanks for the updates!

I really appreciate that you simplified the KIP by removing some 
aspects. As I have already told you, I think the removed aspects are 
also good ideas and we can discuss them on follow-up KIPs.


Regarding the current KIP, I have the following feedback.

1.
Is there a good reason to add method newTransaction() to the StateStore 
interface? As far as I understand, the idea is that users of a state 
store (transactional or not) call this method at start-up and after each 
commit. Since the call to newTransaction() is done in any case and I 
think it would simplify the caller code if we just start a new 
transaction after a commit in the implementation?
As far as I understand, you plan to commit the transaction in the 
flush() method. I find the idea to replace flush() with commit() 
presented in KIP-844 an elegant solution.


2.
Why is the method to query the isolation level added to the state store 
context?


3.
Do we need all the isolation level definitions? I think it is good to 
know the guarantees of the transactionality of the state store. 
However, currently, Streams guarantees that there will only be one 
transaction that writes to the state store. Only the stream thread that 
executes the active task that owns the state store will write to the 
state store. I think it should be enough to know if the state store is 
transactional or not. So my proposal would be to just add a method on 
the state store interface the returns if a state store is transactional 
or not by returning a boolean or an enum.


4.
I am wondering why AbstractTransaction and AbstractTransactionalStore 
are part of the KIP. They look like implementation details that should 
not be exposed in the public API.


5.
Why does StateStore#approximateNumUncommittedBytes() return an 
approximate number of bytes?


6.
RocksDB is just one implementation of the state stores in Streams. 
However, the issues regarding OOM errors might also apply to other 
custom implementations. So in the KIP I would extract that part from 
section "RocksDB Transaction". I would also move section "RocksDB 
Transaction" to the end of section "Proposed Changes" and handle it as 
an example implementation for a state store.


7.
Should statestore.uncommitted.max.bytes only limit the uncommitted bytes 
or the uncommitted bytes that reside in memory? In future, other 
transactional state store implementations might implement a buffer for 
uncommitted records that are able to spill records on disk. I think 
statestore.uncommitted.max.bytes needs to limit the uncommitted bytes 
irrespective if they reside in memory or disk. Since Streams will use 
this config to decide if it needs to trigger a commit, state store 
implementations that can spill to disk will never be able to spill to 
disk. You would only need to change the doc of the config, if you agree 
with me.


8.
Section "Transaction Management" about the wrappers is rather a 
implementation detail that should not be in the KIP.


9.
Could you add a section that describes how failover will work with the 
transactional state stores? I think section "Error handling" is already 
a good start.



Best,
Bruno




On 15.05.23 11:04, Nick Telford wrote:

Hi everyone,

Quick update: I've added a new section to the KIP: "Offsets for Consumer
Rebalances", that outlines my solution to the problem that
StreamsPartitionAssignor needs to read StateStore offsets even if they're
not currently open.

Regards,
Nick

On Wed, 3 May 2023 at 11:34, Nick Telford  wrote:


Hi Bruno,

Thanks for reviewing my proposal.

1.
The main reason I added it was because it was easy to do. If we see no
value in it, I can remove it.

2.
Global StateStores can have multiple partitions in their input topics
(which function as their changelogs), so they would have more than one
partition.

3.
That's a good point. At present, the only method it adds is
isolationLevel(), which is likely not necessary outside of StateStores.
It *does* provide slightly different guarantees in the documentation to
several of the methods (hence the overrides). I'm not sure if this is
enough to warrant a new interface though.
I think the question that remains is whether this interface makes it
easier to implement custom transactional StateStores than if we were to
remove it? Probably not.

4.
The main motivation for the Atomic Checkpointing is actually performance.
My team has been testing out an implementation of this KIP without it, and
we had problems with RocksDB doing *much* more compaction, due to the
significantly increased flush rate. It was enough of a problem that (for
the time being), we had to revert back to Kafka Streams proper.
I think the best way to solve this, as you say, is to keep the .checkpoint
files *in addition* to the offsets being stored within the store itself.
Essentially, when closing StateStores, we force a memtable flush, then
call getCommittedOffsets and write those out to the .checkpoint file.
That would 

Re: [DISCUSS] KIP-940: Broker extension point for validating record contents at produce time

2023-06-20 Thread Edoardo Comar
Thanks Николай,
We’d like to open a vote next week.
Hopefully getting some more feedback before then.

Edo


On Wed, 7 Jun 2023 at 19:15, Николай Ижиков  wrote:

> Hello.
>
> As author of one of related KIPs I’m +1 for this change.
> Long waited feature.
>
> > 7 июня 2023 г., в 19:02, Edoardo Comar  написал(а):
> >
> > Dear all,
> > Adrian and I would like to start a discussion thread on
> >
> > KIP-940: Broker extension point for validating record contents at
> produce time
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-940%3A+Broker+extension+point+for+validating+record+contents+at+produce+time
> >
> > This KIP proposes a new broker-side extension point (a “record
> validation policy”) that can be used to reject records published by a
> misconfigured client.
> > Though general, it is aimed at the common, best-practice use case of
> defining Kafka message formats with schemas maintained in a schema registry.
> >
> > Please post your feedback, thanks !
> >
> > Edoardo & Adrian
> >
> > Unless otherwise stated above:
> >
> > IBM United Kingdom Limited
> > Registered in England and Wales with number 741598
> > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>
>


Re: [DISCUSS] KIP-935: Extend AlterConfigPolicy with existing configurations

2023-06-20 Thread Jorge Esteban Quilcate Otoya
Thanks Colin! You're right. I started this KIP only thinking on the latest
incremental API, and haven't thought much on the legacy one.

After taking a another look at both APIs, I can see some inconsistencies on
how the policies are applied in each case. I have added a section "Current
workflow" [1] to the current proposal to summarize how alter config works
in both cases (legacy and incremental) and for both back-ends (ZK, KRaft).

In summary:
- On Legacy Alter Config, the set of changes proposed is the same as the
new config with the difference that null values are removed from the new
config.
- On Incremental Alter Config, the set of changes proposed is not the same
as the new config. It only contains explicit changes to the config
- Implicit deletes are a set of configs inferred on legacy alter config
when no value is provided but it exists on the current config
- Even though alter config policy receives the "requested" configurations,
these have 2 different meanings depending on the API used to update configs.
  - When validating policies on Legacy Alter Config, it means: requested
changes that is equal to new config state including explicit deletes
  - When validating policies on Incremental Alter Config, it means: only
requested changes including explicit deletes but without any other config
from current or new status
  - Plugin implementations *do not know which one are they actually dealing
with*, and as incremental (new) API becomes broadly adopted, then current
status configs not included in the request are not considered.

The problem is a bit larger than the one framed on the motivation. It's not
only that we don't have the current configs to compare with; but depending
on the API used to alter configs we may have them or not.

Is this assessment correct?
If it is, then we may discuss approaching this issue as a bug instead. We
could consider aligning the semantics of the configs passed to the policy.
At the moment the "requested configs" are passed to policies when either
API is called, but both have _different meaning depending on which API is
used_. We could instead align the meaning, and pass the "new configs,
including explicit deletes" as we do on legacy when doing incremental
updates as well.

Looking forward to your feedback and many thanks again!
Jorge.

[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-935%3A+Extend+AlterConfigPolicy+with+existing+configurations#KIP935:ExtendAlterConfigPolicywithexistingconfigurations-Currentworkflow

On Thu, 15 Jun 2023 at 22:07, Colin McCabe  wrote:

> Hi Jorge,
>
> I appreciate you trying to solve the issue. However, from the perspective
> of someone using the plugin API, it's quite messy: what is the difference
> between "proposed" and "resulting"? They sound the same.
>
> I think there are two APIs that make sense:
>
> 1. A (prev, next) based one where you just get the previous set of
> configs, and the new one, and can draw your own conclusions
>
> 2. A (prev, changed, removed) one where you get the previous set of
> configs, plus the changes (additions or modifications), and deletions.
>
> 3. Same as 2 but you have a "changed" map whose values are Optionals, and
> express deletions as Optional.empty
>
> The old API should just stay the same, bugs and all, for compatibility
> reasons. But for the new API we should choose one of the above, I think.
> I'm not completely sure which...
>
> best,
> Colin
>
> On Mon, Jun 12, 2023, at 07:08, Jorge Esteban Quilcate Otoya wrote:
> > Thanks Colin! You're right. I have added some notes about this to the
> KIP,
> > and clarify how this KIP is related to legacy and incremental alter
> config
> > APIs.
> >
> > Let me know if there's any gaps on the current proposal.
> >
> > Many thanks,
> > Jorge.
> >
> > On Mon, 12 Jun 2023 at 11:04, Colin McCabe  wrote:
> >
> >> See KAFKA-14195. Some deletions are not handled correctly. And this
> cannot
> >> be fixed without a kip because of backwards compatibility.
> >>
> >> Colin
> >>
> >> On Wed, Jun 7, 2023, at 17:07, Jorge Esteban Quilcate Otoya wrote:
> >> > Thank Colin.
> >> >
> >> > I've took a closer look on how configs are passed to the policy when
> >> > delete
> >> > configs are requested, and either null (KRaft) or empty values
> >> > (ZkAdminManager) are passed:
> >> > - ZkAdminManager passes empty values:
> >> >   - Config Entries definition:
> >> >
> >>
> https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba09/core/src/main/scala/kafka/server/ZkAdminManager.scala#L503
> >> >   - and passed to policy without changes:
> >> >
> >>
> https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba09/core/src/main/scala/kafka/server/ZkAdminManager.scala#L495
> >> > - Similar with ConfigurationControlManager (KRaft) passes null values:
> >> >   - Config entries added regardless of value==null:
> >> >
> >>
> 

Re: [VOTE] KIP-937: Improve Message Timestamp Validation

2023-06-20 Thread Andrew Schofield
+1 (non-binding).

Thanks,
Andrew

> On 19 Jun 2023, at 11:42, Divij Vaidya  wrote:
>
> This KIP solves a real operational pain point for the administrator of
> Kafka cluster.
>
> +1 (binding)
>
> --
> Divij Vaidya
>
>
>
> On Sun, Jun 18, 2023 at 5:09 AM Kirk True  wrote:
>
>> +1 (non-binding)
>>
>> Thanks Mehari!
>>
>>> On Jun 16, 2023, at 6:29 PM, Luke Chen  wrote:
>>>
>>> +1 (binding) from me.
>>>
>>> Thanks.
>>> Luke
>>>
>>> On Fri, Jun 16, 2023 at 11:55 PM Beyene, Mehari
>> 
>>> wrote:
>>>
 Hello everyone,

 I am opening the Volte on KIP-937 here. If we have more to discuss,
>> please
 continue the discussion on the existing thread at:
 https://lists.apache.org/thread/wdpw845q9f5rhf6tz9tdlx3kc1g5zczc

 Thank you,
 Mehari


>>
>>



Re: [DISCUSS] KIP-942: Add Power(ppc64le) support

2023-06-20 Thread Vaibhav Nazare

Thank you for response Divij.

1. We are going to use ASF infra provided nodes for better availability and 
stability as there are 3 power9 nodes managed officially by ASF infra team 
themselves.
Ref: https://issues.apache.org/jira/browse/INFRA-24663
https://jenkins-ccos.apache.org/view/Shared%20-%20ppc64le%20nodes/
previously used power node details for apache/kafka CI:
RAM- 16GB
VCPUs- 8 VCPU
Disk- 160GB
for shared VMs we need to check with ASF infra team to provide details

2. We can run nightly builds once or twice in a day on specific period of time 
instead of every commit
3. apache/camel https://builds.apache.org/job/Camel/job/el/ has already enabled 
CI for power platform they are using same H/W resources as
RAM- 16GB
VCPUs- 8 VCPU
Disk- 160GB

-Original Message-
From: Divij Vaidya  
Sent: Monday, June 19, 2023 10:20 PM
To: dev@kafka.apache.org
Subject: [EXTERNAL] Re: [DISCUSS] KIP-942: Add Power(ppc64le) support

Thank you for the KIP Vaibhav.

1. Builds for power architecture were intentionally disabled in the past since 
the infrastructure was flaky [1]. Could you please add to the KIP on what has 
changed since then?
2. What do you think about an alternative solution where we run a nightly build 
for this platform instead of running the CI with every PR/commit?
3. To bolster the case for this KIP, could you please add information from 
other Apache projects who are already running CI for this platform? Is their CI 
stable on Apache Infra hosts?


[1] https://github.com/apache/kafka/pull/12380 

--
Divij Vaidya



On Mon, Jun 19, 2023 at 12:30 PM Vaibhav Nazare 
 wrote:

>
> INVALID URI REMOVED
> confluence_display_KAFKA_KIP-2D942-253A-2BAdd-2BPower-2528ppc64le-2529
> -2Bsupport=DwIFaQ=jf_iaSHvJObTbx-siA1ZOg=s9I3h_d72lHAurpHrTUoOkX
> 8ByFHVUGD0XU1PTKfCiw=z6ZZ_vt5XP--aKB5lpRRZxdVMA37hD_0ch7COCLdMtLhMve
> 8AJcbKfwRtBac267r=BQtj2lbWlu32mK0TP37XeZanal33QOf5HB1-33QJIqc=
>