Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-07-28 Thread Almog Gavra
OK! I think I got everything, but I'll give the KIP another read with fresh
eyes later. Just a reminder that the voting is open, so go out and exercise
your civic duty! ;)

- Almog

On Fri, Jul 28, 2023 at 10:38 AM Almog Gavra  wrote:

> Thanks Guozhang & Sophie:
>
> A2. Will clarify in the KIP
> A3. Will change back to the deprecated version!
> A5. Seems like I'm outnumbered... DslStoreSuppliers it is.
>
> Will update the KIP today.
>
> - Almog
>
> On Thu, Jul 27, 2023 at 12:42 PM Guozhang Wang 
> wrote:
>
>> Yes, that sounds right to me. Thanks Sophie.
>>
>> On Thu, Jul 27, 2023 at 12:35 PM Sophie Blee-Goldman
>>  wrote:
>> >
>> > A2: Guozhang, just to close the book on the ListValue store thing, I
>> fully
>> > agree it seems like overreach
>> > to expose/force this on users, especially if it's fully internal today.
>> But
>> > just to make sure we're on the same
>> > page here, you're still ok with this KIP fixing the API gap that exists
>> > today, in which these stores cannot be
>> > customized by the user at all?
>> >
>> > In other words, after this KIP, the new behavior for the ListValue
>> store in
>> > a stream join will be:
>> >
>> > S1: First, check if the user passed in a `DSLStoreSuppliers` (or
>> whatever
>> > the name will be) to the
>> >StreamJoined config object, and use that to obtain the
>> > KVStoreSupplier for this ListValue store
>> >
>> > S2: If none was provided, check if the user has set a default
>> > DSLStoreSuppliers via the new config,
>> >and use that to get the KVStoreSupplier if so
>> >
>> > S3: If neither is set, fall back to the original logic as it is today,
>> > which is to pass in a KVStoreSupplier
>> >that is hard-coded to be either RocksDB or InMemory, based on
>> what
>> > is returned for the #persistent
>> >API by the StreamJoined's WindowStoreSupplier
>> >
>> > Does that sound right? We can clarify this further in the KIP if need be
>> >
>> > On Thu, Jul 27, 2023 at 10:48 AM Guozhang Wang <
>> guozhang.wang...@gmail.com>
>> > wrote:
>> >
>> > > Hi all,
>> > >
>> > > Like Almog's secretary as well! Just following up on that index:
>> > >
>> > > A2: I'm also happy without introducing versioned KV in this KIP as I
>> > > would envision it to be introduced as new params into the
>> > > KeyValuePluginParams in the future. And just to clarify on Sophie's
>> > > previous comment, I think ListStore should not be exposed in this API
>> > > until we see it as a common usage and hence would want to (again, we
>> > > need to think very carefully since it would potentially ask all
>> > > implementers to adopt) move it from the internal category to the
>> > > public interface category. As for now, I think only having kv / window
>> > > / session as public store types is fine.
>> > >
>> > > A3: Seems I was not making myself very clear at the beginning :P The
>> > > major thing that I'd actually like to avoid having two configs
>> > > co-exist for the same function since it will be a confusing learning
>> > > curve for users, and hence what I was proposing is to just have the
>> > > newly introduced interface but not introducing a new config, and I
>> > > realized now that it is actually more aligned with the CUSTOM idea
>> > > where the ordering would be looking at config first, and then the
>> > > interface. I blushed as I read Almog likes it.. After thinking about
>> > > it twice, I'm now a bit leaning towards just deprecating the old
>> > > config with the new API+config as well.
>> > >
>> > > A5: Among the names we have been discussed so far:
>> > >
>> > > DslStorePlugin
>> > > StoreTypeSpec
>> > > StoreImplSpec
>> > > DslStoreSuppliers
>> > >
>> > > I am in favor of DslStoreSuppliers as well as a restrictiveness on its
>> > > scope, just to echo Bruno's comments above.
>> > >
>> > >
>> > >
>> > > Guozhang
>> > >
>> > > On Thu, Jul 27, 2023 at 4:15 AM Bruno Cadonna 
>> wrote:
>> > > >
>> > > > Hi,
>> > > >
>> > > > A5. I have to admit that
>> > > > "If we envision extending this beyond just StoreSupplier types, it
>> could
>> > > > be a good option."
>> > > > is scaring me a bit.
>> > > > I am wondering what would be an example for such an extension?
>> > > > In general, I would propose to limit the scope of a config. In this
>> case
>> > > > the config should provide suppliers for state stores for the DSL.
>> > > >
>> > > > BTW, maybe it is a good idea to let DslStorePlugin extend
>> Configurable.
>> > > >
>> > > > Best,
>> > > > Bruno
>> > > >
>> > > > On 7/27/23 2:15 AM, Sophie Blee-Goldman wrote:
>> > > > > Thanks for the feedback Bruno -- sounds like we're getting close
>> to a
>> > > final
>> > > > > consensus here.
>> > > > > It sounds like the two main (only?) semi-unresolved questions that
>> > > still
>> > > > > have differing
>> > > > > opinions floating around are whether to deprecate the old config,
>> and
>> > > what
>> > > > > to name the new config
>> > > > > + interface.
>> > > > >
>> > > > > Although I won't personally push 

Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #2055

2023-07-28 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 295220 lines...]
Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testInnerInner[caching enabled = false] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testInnerInner[caching enabled = false] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testInnerOuter[caching enabled = false] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testInnerOuter[caching enabled = false] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testInnerLeft[caching enabled = false] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testInnerLeft[caching enabled = false] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testOuterInner[caching enabled = false] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testOuterInner[caching enabled = false] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testOuterOuter[caching enabled = false] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testOuterOuter[caching enabled = false] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testInnerWithRightVersionedOnly[caching enabled = false] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testInnerWithRightVersionedOnly[caching enabled = false] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testLeftWithLeftVersionedOnly[caching enabled = false] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testLeftWithLeftVersionedOnly[caching enabled = false] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testInnerWithLeftVersionedOnly[caching enabled = false] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testInnerWithLeftVersionedOnly[caching enabled = false] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
TaskAssignorIntegrationTest > shouldProperlyConfigureTheAssignor STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
TaskAssignorIntegrationTest > shouldProperlyConfigureTheAssignor PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
TaskMetadataIntegrationTest > shouldReportCorrectEndOffsetInformation STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
TaskMetadataIntegrationTest > shouldReportCorrectEndOffsetInformation PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
TaskMetadataIntegrationTest > shouldReportCorrectCommittedOffsetInformation 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
TaskMetadataIntegrationTest > shouldReportCorrectCommittedOffsetInformation 
PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
EmitOnChangeIntegrationTest > shouldEmitSameRecordAfterFailover() STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
EmitOnChangeIntegrationTest > shouldEmitSameRecordAfterFailover() PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
HighAvailabilityTaskAssignorIntegrationTest > 
shouldScaleOutWithWarmupTasksAndPersistentStores(TestInfo) STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
HighAvailabilityTaskAssignorIntegrationTest > 
shouldScaleOutWithWarmupTasksAndPersistentStores(TestInfo) PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
HighAvailabilityTaskAssignorIntegrationTest > 
shouldScaleOutWithWarmupTasksAndInMemoryStores(TestInfo) STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 

Build failed in Jenkins: Kafka » Kafka Branch Builder » 3.5 #52

2023-07-28 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 281693 lines...]
Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
VersionedKeyValueStoreIntegrationTest > shouldRestore STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
VersionedKeyValueStoreIntegrationTest > shouldRestore PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
VersionedKeyValueStoreIntegrationTest > shouldPutGetAndDelete STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
VersionedKeyValueStoreIntegrationTest > shouldPutGetAndDelete PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
VersionedKeyValueStoreIntegrationTest > 
shouldManualUpgradeFromNonVersionedTimestampedToVersioned STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
VersionedKeyValueStoreIntegrationTest > 
shouldManualUpgradeFromNonVersionedTimestampedToVersioned PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
HandlingSourceTopicDeletionIntegrationTest > 
shouldThrowErrorAfterSourceTopicDeleted STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
HandlingSourceTopicDeletionIntegrationTest > 
shouldThrowErrorAfterSourceTopicDeleted PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
StreamsAssignmentScaleTest > testHighAvailabilityTaskAssignorLargeNumConsumers 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
StreamsAssignmentScaleTest > testHighAvailabilityTaskAssignorLargeNumConsumers 
PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
StreamsAssignmentScaleTest > 
testHighAvailabilityTaskAssignorLargePartitionCount STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
StreamsAssignmentScaleTest > 
testHighAvailabilityTaskAssignorLargePartitionCount PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
StreamsAssignmentScaleTest > 
testHighAvailabilityTaskAssignorManyThreadsPerClient STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
StreamsAssignmentScaleTest > 
testHighAvailabilityTaskAssignorManyThreadsPerClient PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
StreamsAssignmentScaleTest > testStickyTaskAssignorManyStandbys STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
StreamsAssignmentScaleTest > testStickyTaskAssignorManyStandbys PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
StreamsAssignmentScaleTest > testStickyTaskAssignorManyThreadsPerClient STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
StreamsAssignmentScaleTest > testStickyTaskAssignorManyThreadsPerClient PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
StreamsAssignmentScaleTest > testFallbackPriorTaskAssignorManyThreadsPerClient 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
StreamsAssignmentScaleTest > testFallbackPriorTaskAssignorManyThreadsPerClient 
PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
StreamsAssignmentScaleTest > testFallbackPriorTaskAssignorLargePartitionCount 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
StreamsAssignmentScaleTest > testFallbackPriorTaskAssignorLargePartitionCount 
PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
StreamsAssignmentScaleTest > testStickyTaskAssignorLargePartitionCount STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
StreamsAssignmentScaleTest > testStickyTaskAssignorLargePartitionCount PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
StreamsAssignmentScaleTest > testFallbackPriorTaskAssignorManyStandbys STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
StreamsAssignmentScaleTest > testFallbackPriorTaskAssignorManyStandbys PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
StreamsAssignmentScaleTest > testHighAvailabilityTaskAssignorManyStandbys 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
StreamsAssignmentScaleTest > testHighAvailabilityTaskAssignorManyStandbys PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
StreamsAssignmentScaleTest > testFallbackPriorTaskAssignorLargeNumConsumers 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
StreamsAssignmentScaleTest > testFallbackPriorTaskAssignorLargeNumConsumers 
PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 178 > 
StreamsAssignmentScaleTest > testStickyTaskAssignorLargeNumConsumers STARTED

Gradle Test Run 

Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #2054

2023-07-28 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 396219 lines...]
[Pipeline] }

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testInnerOuter[caching enabled = false] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testInnerLeft[caching enabled = false] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testInnerLeft[caching enabled = false] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testOuterInner[caching enabled = false] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testOuterInner[caching enabled = false] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testOuterOuter[caching enabled = false] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testOuterOuter[caching enabled = false] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testInnerWithRightVersionedOnly[caching enabled = false] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testInnerWithRightVersionedOnly[caching enabled = false] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testLeftWithLeftVersionedOnly[caching enabled = false] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testLeftWithLeftVersionedOnly[caching enabled = false] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testInnerWithLeftVersionedOnly[caching enabled = false] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testInnerWithLeftVersionedOnly[caching enabled = false] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
TaskAssignorIntegrationTest > shouldProperlyConfigureTheAssignor STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
TaskAssignorIntegrationTest > shouldProperlyConfigureTheAssignor PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
TaskMetadataIntegrationTest > shouldReportCorrectEndOffsetInformation STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
TaskMetadataIntegrationTest > shouldReportCorrectEndOffsetInformation PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
TaskMetadataIntegrationTest > shouldReportCorrectCommittedOffsetInformation 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
TaskMetadataIntegrationTest > shouldReportCorrectCommittedOffsetInformation 
PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
EmitOnChangeIntegrationTest > shouldEmitSameRecordAfterFailover() STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
EmitOnChangeIntegrationTest > shouldEmitSameRecordAfterFailover() PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
HighAvailabilityTaskAssignorIntegrationTest > 
shouldScaleOutWithWarmupTasksAndPersistentStores(TestInfo) STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
HighAvailabilityTaskAssignorIntegrationTest > 
shouldScaleOutWithWarmupTasksAndPersistentStores(TestInfo) PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
HighAvailabilityTaskAssignorIntegrationTest > 
shouldScaleOutWithWarmupTasksAndInMemoryStores(TestInfo) STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
HighAvailabilityTaskAssignorIntegrationTest > 
shouldScaleOutWithWarmupTasksAndInMemoryStores(TestInfo) PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
KStreamAggregationDedupIntegrationTest > shouldReduce(TestInfo) STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
KStreamAggregationDedupIntegrationTest > shouldReduce(TestInfo) PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
KStreamAggregationDedupIntegrationTest > shouldGroupByKey(TestInfo) STARTED


Feedback of using the 'confluent-kafka-go' for reliablity and performance

2023-07-28 Thread Xinli shang
Hi all,

We are considering replacing the Sarama Go client with confluent-kafka-go
 which is supported and
seems promising. Regarding reliability and performance, does anybody want
to share the experience? Whether or not it is the experience in production
or testing, they are all appreciated.

-- 
Xinli Shang


[jira] [Resolved] (KAFKA-15263) KRaftMigrationDriver can run the migration twice

2023-07-28 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur resolved KAFKA-15263.
--
Resolution: Fixed

> KRaftMigrationDriver can run the migration twice
> 
>
> Key: KAFKA-15263
> URL: https://issues.apache.org/jira/browse/KAFKA-15263
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.0, 3.5.1
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Blocker
> Fix For: 3.6.0, 3.5.2
>
>
> There is a narrow race condition in KRaftMigrationDriver where a PollEvent 
> can run that sees the internal state as ZK_MIGRATION and is immediately 
> followed by another poll event (due to external call to {{{}wakeup(){}}}) 
> that results in two MigrateMetadataEvent being enqueued. 
> Since MigrateMetadataEvent lacks a check on the internal state, this causes 
> the metadata migration to occur twice. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-07-28 Thread Almog Gavra
Thanks Guozhang & Sophie:

A2. Will clarify in the KIP
A3. Will change back to the deprecated version!
A5. Seems like I'm outnumbered... DslStoreSuppliers it is.

Will update the KIP today.

- Almog

On Thu, Jul 27, 2023 at 12:42 PM Guozhang Wang 
wrote:

> Yes, that sounds right to me. Thanks Sophie.
>
> On Thu, Jul 27, 2023 at 12:35 PM Sophie Blee-Goldman
>  wrote:
> >
> > A2: Guozhang, just to close the book on the ListValue store thing, I
> fully
> > agree it seems like overreach
> > to expose/force this on users, especially if it's fully internal today.
> But
> > just to make sure we're on the same
> > page here, you're still ok with this KIP fixing the API gap that exists
> > today, in which these stores cannot be
> > customized by the user at all?
> >
> > In other words, after this KIP, the new behavior for the ListValue store
> in
> > a stream join will be:
> >
> > S1: First, check if the user passed in a `DSLStoreSuppliers` (or whatever
> > the name will be) to the
> >StreamJoined config object, and use that to obtain the
> > KVStoreSupplier for this ListValue store
> >
> > S2: If none was provided, check if the user has set a default
> > DSLStoreSuppliers via the new config,
> >and use that to get the KVStoreSupplier if so
> >
> > S3: If neither is set, fall back to the original logic as it is today,
> > which is to pass in a KVStoreSupplier
> >that is hard-coded to be either RocksDB or InMemory, based on what
> > is returned for the #persistent
> >API by the StreamJoined's WindowStoreSupplier
> >
> > Does that sound right? We can clarify this further in the KIP if need be
> >
> > On Thu, Jul 27, 2023 at 10:48 AM Guozhang Wang <
> guozhang.wang...@gmail.com>
> > wrote:
> >
> > > Hi all,
> > >
> > > Like Almog's secretary as well! Just following up on that index:
> > >
> > > A2: I'm also happy without introducing versioned KV in this KIP as I
> > > would envision it to be introduced as new params into the
> > > KeyValuePluginParams in the future. And just to clarify on Sophie's
> > > previous comment, I think ListStore should not be exposed in this API
> > > until we see it as a common usage and hence would want to (again, we
> > > need to think very carefully since it would potentially ask all
> > > implementers to adopt) move it from the internal category to the
> > > public interface category. As for now, I think only having kv / window
> > > / session as public store types is fine.
> > >
> > > A3: Seems I was not making myself very clear at the beginning :P The
> > > major thing that I'd actually like to avoid having two configs
> > > co-exist for the same function since it will be a confusing learning
> > > curve for users, and hence what I was proposing is to just have the
> > > newly introduced interface but not introducing a new config, and I
> > > realized now that it is actually more aligned with the CUSTOM idea
> > > where the ordering would be looking at config first, and then the
> > > interface. I blushed as I read Almog likes it.. After thinking about
> > > it twice, I'm now a bit leaning towards just deprecating the old
> > > config with the new API+config as well.
> > >
> > > A5: Among the names we have been discussed so far:
> > >
> > > DslStorePlugin
> > > StoreTypeSpec
> > > StoreImplSpec
> > > DslStoreSuppliers
> > >
> > > I am in favor of DslStoreSuppliers as well as a restrictiveness on its
> > > scope, just to echo Bruno's comments above.
> > >
> > >
> > >
> > > Guozhang
> > >
> > > On Thu, Jul 27, 2023 at 4:15 AM Bruno Cadonna 
> wrote:
> > > >
> > > > Hi,
> > > >
> > > > A5. I have to admit that
> > > > "If we envision extending this beyond just StoreSupplier types, it
> could
> > > > be a good option."
> > > > is scaring me a bit.
> > > > I am wondering what would be an example for such an extension?
> > > > In general, I would propose to limit the scope of a config. In this
> case
> > > > the config should provide suppliers for state stores for the DSL.
> > > >
> > > > BTW, maybe it is a good idea to let DslStorePlugin extend
> Configurable.
> > > >
> > > > Best,
> > > > Bruno
> > > >
> > > > On 7/27/23 2:15 AM, Sophie Blee-Goldman wrote:
> > > > > Thanks for the feedback Bruno -- sounds like we're getting close
> to a
> > > final
> > > > > consensus here.
> > > > > It sounds like the two main (only?) semi-unresolved questions that
> > > still
> > > > > have differing
> > > > > opinions floating around are whether to deprecate the old config,
> and
> > > what
> > > > > to name the new config
> > > > > + interface.
> > > > >
> > > > > Although I won't personally push back on any of the options listed
> > > above,
> > > > > here's my final two cents:
> > > > >
> > > > > A3. I'm still a firm believer in deprecating the old config, and
> agree
> > > > > wholeheartedly with what Bruno said.
> > > > >
> > > > > A5. I also wasn't crazy about "Plugin" at first, but I will admit
> it's
> > > > > grown on me. I think it rubbed me the wrong
> > > > 

[jira] [Resolved] (KAFKA-14702) Extend server side assignor to support rack aware replica placement

2023-07-28 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-14702.
-
Fix Version/s: 3.6.0
   Resolution: Fixed

> Extend server side assignor to support rack aware replica placement
> ---
>
> Key: KAFKA-14702
> URL: https://issues.apache.org/jira/browse/KAFKA-14702
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Ritika Muduganti
>Priority: Major
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2053

2023-07-28 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-15271) TimelineHashMap.entrySet yield unexpected results with nested TimelineHashMap

2023-07-28 Thread David Jacot (Jira)
David Jacot created KAFKA-15271:
---

 Summary: TimelineHashMap.entrySet yield unexpected results with 
nested TimelineHashMap
 Key: KAFKA-15271
 URL: https://issues.apache.org/jira/browse/KAFKA-15271
 Project: Kafka
  Issue Type: Bug
Reporter: David Jacot


Example:
{code:java}
@Test
public void bug() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
// Topic -> Partition -> Offset
TimelineHashMap> offsets =
new TimelineHashMap<>(snapshotRegistry, 0);

snapshotRegistry.getOrCreateSnapshot(0);

offsets
.computeIfAbsent("foo", __ -> new 
TimelineHashMap<>(snapshotRegistry, 0))
.put(0, 100L);

snapshotRegistry.getOrCreateSnapshot(1);

offsets
.computeIfAbsent("foo", __ -> new 
TimelineHashMap<>(snapshotRegistry, 0))
.put(1, 110L);

snapshotRegistry.getOrCreateSnapshot(2);

offsets
.computeIfAbsent("foo", __ -> new 
TimelineHashMap<>(snapshotRegistry, 0))
.put(1, 111L);

assertNull(offsets.get("foo", 1).get(1, 1));

offsets.entrySet(1).forEach(topicEntry -> {
System.out.println(topicEntry.getKey());
topicEntry.getValue().entrySet(1).forEach(partitionEntry -> {
System.out.println(partitionEntry.getKey() + " : " + 
partitionEntry.getValue());
});
});

/*
The above code prints:
foo
0 : 100
1 : 110

but should rather print:
foo
0 : 100
 */
} {code}
It yields the expected result when the third put is removed. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15270) Integration tests for AsyncConsumer simple consume case

2023-07-28 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-15270:
--

 Summary: Integration tests for AsyncConsumer simple consume case
 Key: KAFKA-15270
 URL: https://issues.apache.org/jira/browse/KAFKA-15270
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans


This task involves writing integration tests for covering the simple consume 
functionality of the AsyncConsumer. This should include validation of the 
assign, fetch and positions logic.

Not covering any committed offset functionality as part of this task. 

Integration tests should have a similar form as the existing 
PlaintextConsumerTest, but scoped to the simple consume flow. 

  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15269) Clean up the RaftClient interface

2023-07-28 Thread Jira
José Armando García Sancio created KAFKA-15269:
--

 Summary: Clean up the RaftClient interface
 Key: KAFKA-15269
 URL: https://issues.apache.org/jira/browse/KAFKA-15269
 Project: Kafka
  Issue Type: Task
Reporter: José Armando García Sancio
Assignee: José Armando García Sancio


Make the following changes to the {{RaftClient}} interface and implementation

Remove {{scheduleAtomicAppend;}} the controller doesn't use {{scheduleAppend}} 
so we can revert to the original semantic.

{{logEndOffset}} is misleading when called on the leader since it doesn't 
include records already appended to the {{BatchAccumulator}} and have not been 
written to the log. Rename it to {{endOffset}} in the process.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15268) Consider replacing Subscription Metadata by a hash

2023-07-28 Thread David Jacot (Jira)
David Jacot created KAFKA-15268:
---

 Summary: Consider replacing Subscription Metadata by a hash
 Key: KAFKA-15268
 URL: https://issues.apache.org/jira/browse/KAFKA-15268
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Jacot


With the addition of the racks, the subscription metadata record is getting 
large, too large in my opinion. We should consider replacing it with an hash. 
The subscription metadata is mainly used to detect changes in metadata. A hash 
would give a similar functionality.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-959 Add BooleanConverter to Kafka Connect

2023-07-28 Thread Hector Geraldino (BLOOMBERG/ 919 3RD A)
Hi everyone,

Thanks everyone who has reviewed and voted for this KIP. 

So far it has received 3 non-binding votes (Andrew Schofield, Yash Mayya, Kamal 
Chandraprakash) and 2 binding votes (Chris Egerton, Greg Harris)- still shy of 
one binding vote to pass.

Can we get help from a committer to push it through?

Thank you!
Hector

Sent from Bloomberg Professional for iPhone

- Original Message -
From: Greg Harris 
To: dev@kafka.apache.org
At: 07/26/23 12:23:20 UTC-04:00


Hey Hector,

Thanks for the straightforward and clear KIP!
+1 (binding)

Thanks,
Greg

On Wed, Jul 26, 2023 at 5:16 AM Chris Egerton  wrote:
>
> +1 (binding)
>
> Thanks Hector!
>
> On Wed, Jul 26, 2023 at 3:18 AM Kamal Chandraprakash <
> kamal.chandraprak...@gmail.com> wrote:
>
> > +1 (non-binding). Thanks for the KIP!
> >
> > On Tue, Jul 25, 2023 at 11:12 PM Yash Mayya  wrote:
> >
> > > Hi Hector,
> > >
> > > Thanks for the KIP!
> > >
> > > +1 (non-binding)
> > >
> > > Thanks,
> > > Yash
> > >
> > > On Tue, Jul 25, 2023 at 11:01 PM Andrew Schofield <
> > > andrew_schofield_j...@outlook.com> wrote:
> > >
> > > > Thanks for the KIP. As you say, not that controversial.
> > > >
> > > > +1 (non-binding)
> > > >
> > > > Thanks,
> > > > Andrew
> > > >
> > > > > On 25 Jul 2023, at 18:22, Hector Geraldino (BLOOMBERG/ 919 3RD A) <
> > > > hgerald...@bloomberg.net> wrote:
> > > > >
> > > > > Hi everyone,
> > > > >
> > > > > The changes proposed by KIP-959 (Add BooleanConverter to Kafka
> > Connect)
> > > > have a limited scope and shouldn't be controversial. I'm opening a
> > voting
> > > > thread with the hope that it can be included in the next upcoming 3.6
> > > > release.
> > > > >
> > > > > Here are some links:
> > > > >
> > > > > KIP:
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-959%3A+Add+BooleanConverter+to+Kafka+Connect
> > > > > JIRA: https://issues.apache.org/jira/browse/KAFKA-15248
> > > > > Discussion thread:
> > > > https://lists.apache.org/thread/15c2t0kl9bozmzjxmkl5n57kv4l4o1dt
> > > > > Pull Request: https://github.com/apache/kafka/pull/14093
> > > > >
> > > > > Thanks!
> > > >
> > > >
> > > >
> > >
> >


[jira] [Resolved] (KAFKA-14501) Implement Heartbeat API

2023-07-28 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-14501.
-
Fix Version/s: 3.6.0
 Reviewer: David Jacot
   Resolution: Fixed

> Implement Heartbeat API
> ---
>
> Key: KAFKA-14501
> URL: https://issues.apache.org/jira/browse/KAFKA-14501
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Jeff Kim
>Priority: Major
> Fix For: 3.6.0
>
>
> Implement Heartbeat API in the new Group Coordinator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15267) Cluster-wide disablement of Tiered Storage

2023-07-28 Thread Christo Lolov (Jira)
Christo Lolov created KAFKA-15267:
-

 Summary: Cluster-wide disablement of Tiered Storage
 Key: KAFKA-15267
 URL: https://issues.apache.org/jira/browse/KAFKA-15267
 Project: Kafka
  Issue Type: Sub-task
Reporter: Christo Lolov
Assignee: Christo Lolov


h2. Summary

KIP-405 defines the configuration {{remote.log.storage.system.enable}} which 
controls whether all resources needed for Tiered Storage to function are 
instantiated properly in Kafka. However, the interaction between remote data 
and Kafka if that configuration is set to false while there are still topics 
with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would like 
to give customers the ability to switch off Tiered Storage on a cluster level 
and as such would need to define the behaviour.*{color}

{{remote.log.storage.system.enable}} is a read-only configuration. This means 
that it can only be changed by *modifying the server.properties* and restarting 
brokers. As such, the {*}validity of values contained in it is only checked at 
broker startup{*}.

This JIRA proposes a few behaviours and a recommendation on a way forward.
h2. Option 1: Change nothing

Pros:
 * No operation.

Cons:
 * We do not solve the problem of moving back to older (or newer) Kafka 
versions not supporting TS.

h2. Option 2: Remove the configuration, enable Tiered Storage on a cluster 
level and do not allow it to be disabled

Always instantiate all resources for tiered storage. If no special ones are 
selected use the default ones which come with Kafka.

Pros:
 * We solve the problem for moving between versions not allowing TS to be 
disabled.

Cons:
 * We do not solve the problem of moving back to older (or newer) Kafka 
versions not supporting TS.
 * We haven’t quantified how much computer resources (CPU, memory) idle TS 
components occupy.
 * TS is a feature not required for running Kafka. As such, while it is still 
under development we shouldn’t put it on the critical path of starting a 
broker. In this way, a stray memory leak won’t impact anything on the critical 
path of a broker.
 * We are potentially swapping one problem for another. How does TS behave if 
one decides to swap the TS plugin classes when data has already been written?

h2. Option 3: Hide topics with tiering enabled

Customers cannot interact with topics which have tiering enabled. They cannot 
create new topics with the same names. Retention (and compaction?) do not take 
effect on files already in local storage.

Pros:
 * We do not force data-deletion.

Cons:
 * This will be quite involved - the controller will need to know when a 
broker’s server.properties have been altered; the broker will need to not 
proceed to delete logs it is not the leader or follower for.

h2. {color:#e6e6e6}Option 4: Do not start the broker if there are topics with 
tiering enabled{color} - Recommended

This option has 2 different sub-options. The first one is that TS cannot be 
disabled on cluster-level if there are *any* tiering topics - in other words 
all tiered topics need to be deleted. The second one is that TS cannot be 
disabled on a cluster-level if there are *any* topics with *tiering enabled* - 
they can have tiering disabled, but with a retention policy set to delete or 
retain (as per 
[KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]).
 A topic can have tiering disabled and remain on the cluster as long as there 
is no *remote* data when TS is disabled cluster-wide.

Pros:
 * We force the customer to be very explicit in disabling tiering of topics 
prior to disabling TS on the whole cluster.

Cons:
 * You have to make certain that all data in remote is deleted (just a 
disablement of tired topic is not enough). How do you determine whether all 
remote has expired if policy is retain? If retain policy in KIP-950 knows that 
there is data in remote then this should also be able to figure it out.

The common denominator is that there needs to be no *remote* data at the point 
of disabling TS. As such, the most straightforward option is to refuse to start 
brokers if there are topics with the {{remote.storage.enabled}} present. This 
in essence requires customers to clean any tiered topics before switching off 
TS, which is a fair ask. Should we wish to revise this later it should be 
possible.
h2. Option 5: Make Kafka forget about all remote information

Pros:
 * Clean cut

Cons:
 * Data is lost the moment TS is disabled regardless of whether it is reenabled 
later on, which might not be the behaviour expected by customers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15246) CoordinatorContext should be protected by a lock

2023-07-28 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-15246.
-
Fix Version/s: 3.6.0
 Reviewer: Justine Olshan
   Resolution: Fixed

> CoordinatorContext should be protected by a lock
> 
>
> Key: KAFKA-15246
> URL: https://issues.apache.org/jira/browse/KAFKA-15246
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2052

2023-07-28 Thread Apache Jenkins Server
See 




Re: [VOTE] KIP-951: Leader discovery optimisations for the client

2023-07-28 Thread Mayank Shekhar Narula
Thanks Jose.

On Thu, Jul 27, 2023 at 5:46 PM José Armando García Sancio
 wrote:

> The KIP LGTM. Thanks for the design. I am looking forward to the
> implementation.
>
> +1 (binding).
>
> Thanks!
> --
> -José
>


-- 
Regards,
Mayank Shekhar Narula


[jira] [Created] (KAFKA-15266) Log configs ignore static configs set non primary synonyms

2023-07-28 Thread Aman Harish Gandhi (Jira)
Aman Harish Gandhi created KAFKA-15266:
--

 Summary: Log configs ignore static configs set non primary synonyms
 Key: KAFKA-15266
 URL: https://issues.apache.org/jira/browse/KAFKA-15266
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.6.0
Reporter: Aman Harish Gandhi
Assignee: Aman Harish Gandhi


In our server.properties we had the following config
{code:java}
log.retention.hours=48
{code}
We noticed that after running alter configs to update broker level config(for a 
config unrelated to retention) we were only deleting data after 7 days instead 
of the configured 2.

The alterconfig we had ran was similar to this
{code:java}
sh kafka-config.sh --bootstrap-server localhost:9092 --alter --add-config 
"log.segment.bytes=50"
{code}
Digging deeper the issue could be pin pointed to the reconfigure block of 
DynamicLogConfig inside DynamicBrokerConfig. Here we only look at the "primary" 
KafkaConfig synonym of the LogConfig and if it is not set then we remove the 
value set in default log config as well. This eventually leads to the 
retention.ms not being set in the default log config and that leads to the 
default value of 7 days being used. The value set in "log.retention.hours" is 
completely ignored in this case.

Pasting the relevant code block here
{code:java}
newConfig.valuesFromThisConfig.forEach { (k, v) =>
  if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) {
DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { configName =>
  if (v == null)
 newBrokerDefaults.remove(configName)
  else
newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
}
  }
} {code}
In the above block `DynamicLogConfig.ReconfigurableConfigs` contains only 
log.retention.ms. It does not contain the other synonyms like 
`log.retention.minutes` or `log.retention.hours`.

This issue seems prevalent in all cases where there are more than 1 KafkaConfig 
synonyms for the LogConfig.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2051

2023-07-28 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-953: partition method to be overloaded to accept headers as well.

2023-07-28 Thread Andrew Schofield
Hi Sagar,
Thanks for your comments.

1) Server-side partitioning doesn’t necessarily mean that there’s only one
way to do it. It just means that the partitioning logic runs on the broker and
any configuration of partitioning applies to the broker’s partitioner. If we 
ever
see a KIP for this, that’s the kind of thing I would expect to see.

2) In the priority example in the KIP, there is a kind of contract between the
producers and consumers so that some records can be processed before
others regardless of the order in which they were sent. The producer
wants to apply special significance to a particular header to control which
partition is used. I would simply achieve this by setting the partition number
in the ProducerRecord at the time of sending.

I don’t think the KIP proposes adjusting the built-in partitioner or adding to 
AK
a new one that uses headers in the partitioning decision. So, any configuration
for a partitioner that does support headers would be up to the implementation
of that specific partitioner. Partitioner implements Configurable.

I’m just providing an alternative view and I’m not particularly opposed to the 
KIP.
I just don’t think it quite merits the work involved to get it voted and merged.
As an aside, a long time ago, I created a small KIP that was never adopted
and I didn’t push it because I eventually didn’t need it.

Thanks,
Andrew

> On 28 Jul 2023, at 05:15, Sagar  wrote:
>
> Hey Andrew,
>
> Thanks for the review. Since I had reviewed the KIP I thought I would also
> respond. Of course Jack has the final say on this since he wrote the KIP.
>
> 1) This is an interesting point and I hadn't considered it. The
> comparison with KIP-848 is a valid one but even within that KIP, it allows
> client side partitioning for power users like Streams. So while we would
> want to move away from client side partitioner as much as possible, we
> still shouldn't do away completely with Client side partitioning and end up
> being in a state of inflexibility for different kinds of usecases. This is
> my opinion though and you have more context on Clients, so would like to
> know your thoughts on this.
>
> 2) Regarding this, I assumed that since the headers are already part of the
> consumer records they should have access to the headers and if there is a
> contract b/w the applications producing and the application consuming, that
> decisioning should be transparent. Was my assumption incorrect? But as you
> rightly pointed out header based partitioning with keys is going to lead to
> surprising results. Assuming there is merit in this proposal, do you think
> we should ignore the keys in this case (similar to the effect of
> setting *partitioner.ignore.keys
> *config to false) and document it appropriately?
>
> Let me know what you think.
>
> Thanks!
> Sagar.
>
>
> On Thu, Jul 27, 2023 at 9:41 PM Andrew Schofield <
> andrew_schofield_j...@outlook.com> wrote:
>
>> Hi Jack,
>> Thanks for the KIP. I have a few concerns about the idea.
>>
>> 1) I think that while a client-side partitioner seems like a neat idea and
>> it’s an established part of Kafka,
>> it’s one of the things which makes Kafka clients quite complicated. Just
>> as KIP-848 is moving from
>> client-side assignors to server-side assignors, I wonder whether really we
>> should be looking to make
>> partitioning a server-side capability too over time. So, I’m not convinced
>> that making the Partitioner
>> interface richer is moving in the right direction.
>>
>> 2) For records with a key, the partitioner usually calculates the
>> partition from the key. This means
>> that records with the same key end up on the same partition. Many
>> applications expect this to give ordering.
>> Log compaction expects this. There are situations in which records have to
>> be repartitioned, such as
>> sometimes happens with Kafka Streams. I think that a header-based
>> partitioner for records which have
>> keys is going to be surprising and only going to have limited
>> applicability as a result.
>>
>> The tricky part about clever partitioning is that downstream systems have
>> no idea how the partition
>> number was arrived at, so they do not truly understand how the ordering
>> was derived. I do think that
>> perhaps there’s value to being able to influence the partitioning using
>> the headers, but I wonder if actually
>> transforming the headers into an “ordering context” that then flows with
>> the record as it moves through
>> the system would be a stronger solution. Today, the key is the ordering
>> context. Maybe it should be a
>> concept in its own right and the Producer could configure a converter from
>> headers to ordering context.
>> That is of course a much bigger change.
>>
>> In one of the examples you mention in the KIP, you mentioned using a
>> header to control priority. I guess the
>> idea is to preferentially process records off specific partitions so they
>> can overtake lower priority records.
>> I suggest just sending the 

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-07-28 Thread Sagar
Hey Yash,

Thanks for your comments.

1) Hmm the question is how do you qualify a partition as stale or old?
Let's say a connector has implemented updateOffsets and for a certain
partition for which no records are received then it will update it's
offsets. So technically that offset can't be termed as stale anymore. Even
though I can't think of a side effect at this point to disallow offset
deletion via this method, my opinion is to use a proper mechanism like the
ones introduced in KIP-875 to delete offsets. Moreover, if I also consider
the option presented in point #2 , for simplicity sake it seems better to
not add this feature at this point. If we feel it's really needed and users
are requesting it, we can add support for it later on.

2) I get the point now. I can't think of cases where updating offsets would
be needed. As with point #1, we can always add it back if needed later on.
For now, I have removed that part from the KIP.

3) Yes, because the offset commit happens on a different thread, ordering
guarantees might be harder to ensure if we do it from the other thread. The
current mechanism proposed, even though gets invoked multiple times, keeps
things simpler to reason about.

Let me know how things look now. If it's all looking ok, I would go ahead
and create a Vote thread for the same.

Thanks!
Sagar.

On Tue, Jul 25, 2023 at 5:15 PM Yash Mayya  wrote:

> Hi Sagar,
>
> Thanks for the updates. I had a few more follow up questions:
>
> > I have added that a better way of doing that would be
> > via KIP-875. Also, I didn't want to include any mechamisms
> > for users to meddle with the offsets topic. Allowing tombstone
> > records via this method would be akin to publishing tombstone
> > records directly to the offsets topic which is not recommended
> > generally.
>
> KIP-875 would allow a way for cluster administrators and / or users to do
> so manually externally whereas allowing tombstones in
> SourceTask::updateOffsets would enable connectors to clean up offsets for
> old / stale partitions without user intervention right? I'm not sure I
> follow what you mean by "I didn't want to include any mechamisms for users
> to meddle with the offsets topic" here? Furthermore, I'm not sure why
> publishing tombstone records directly to the offsets topic would not be
> recommended? Isn't that currently the only way to manually clean up offsets
> for a source connector?
>
> > It could be useful in a scenario where the offset of a partition
> > doesn't update for some period of time. In such cases, the
> > connector can do some kind of state tracking and update the
> > offsets after the time period elapses.
>
> I'm not sure I follow? In this case, won't the offsets argument passed
> to SourceTask::updateOffsets *not *contain the source partition which
> hasn't had an update for a long period of time? Wouldn't it make more sense
> to reduce the surface of the API as Chris suggested and only allow adding
> new partition offset pairs to the about to be committed offsets (since
> there don't seem to be any use cases outlined for allowing connectors to
> update offsets for source partitions that are already about to have an
> offset be committed for)?
>
> > All the records returned by the previous poll invocation
> >  got processed successfully
>
> Thanks for this clarification in the KIP, it looks like it does address the
> offsets ordering issue. As to Chris' point about invoking
> SourceTask::updateOffsets less frequently by calling it before offsets are
> committed rather than in every poll loop iteration - I guess that would
> make it a lot more tricky to address the ordering issue?
>
>
> Thanks,
> Yash
>
> On Thu, Jul 20, 2023 at 9:50 PM Sagar  wrote:
>
> > Hey All,
> >
> > Please let me know how the KIP looks now. Is it at a stage where I can
> > start with the Voting phase? Of course I am still open to
> > feedback/suggestions but planning to start the Vote for it.
> >
> > Thanks!
> > Sagar.
> >
> > On Tue, Jul 11, 2023 at 10:00 PM Sagar 
> wrote:
> >
> > > Hi Yash/Chris,
> > >
> > > Thanks for the feedback! I have updated the KIP with the suggestions
> > > provided. I would also update the PR with the suggestions.
> > >
> > > Also, I was hoping that this could make it to the 3.6 release given
> that
> > > it would benefit source connectors which have some of the problems
> listed
> > > in the Motivation Section.
> > >
> > > Responses Inline:
> > >
> > > Yash:
> > >
> > > 1) In the proposed changes section where you talk about modifying the
> > >> offsets, could you please clarify that tasks shouldn't modify the
> > offsets
> > >> map that is passed as an argument? Currently, the distinction between
> > the
> > >> offsets map passed as an argument and the offsets map that is returned
> > is
> > >> not very clear in numerous places.
> > >
> > >
> > >
> > > Added
> > >
> > > 2) The default return value of Optional.empty() seems to be fairly
> > >> non-intuitive considering that the return value is 

Re: Apache Kafka 3.6.0 release

2023-07-28 Thread Satish Duggana
Hi All,
Whoever has KIP entries in the 3.6.0 release plan. Please update it
with the latest status by tomorrow(end of the day 29th Jul UTC ).

Thanks
Satish.

On Fri, 28 Jul 2023 at 12:01, Satish Duggana  wrote:
>
> Thanks Ismael and Divij for the suggestions.
>
> One way was to follow the earlier guidelines that we set for any early
> access release. It looks Ismael already mentioned the example of
> KRaft.
>
> KIP-405 mentions upgrade/downgrade and limitations sections. We can
> clarify that in the release notes for users on how this feature can be
> used for early access.
>
> Divij, We do not want users to enable this feature on production
> environments in early access release. Let us work together on the
> followups Ismael suggested.
>
> ~Satish.
>
> On Fri, 28 Jul 2023 at 02:24, Divij Vaidya  wrote:
> >
> > Those are great suggestions, thank you. We will continue this discussion
> > forward in a separate KIP for release plan for Tiered Storage.
> >
> > On Thu 27. Jul 2023 at 21:46, Ismael Juma  wrote:
> >
> > > Hi Divij,
> > >
> > > I think the points you bring up for discussion are all good. My main
> > > feedback is that they should be discussed in the context of KIPs vs the
> > > release template. That's why we have a backwards compatibility section for
> > > every KIP, it's precisely to ensure we think carefully about some of the
> > > points you're bringing up. When it comes to defining the meaning of early
> > > access, we have two options:
> > >
> > > 1. Have a KIP specifically for tiered storage.
> > > 2. Have a KIP to define general guidelines for what early access means.
> > >
> > > Does this make sense?
> > >
> > > Ismael
> > >
> > > On Thu, Jul 27, 2023 at 6:38 PM Divij Vaidya 
> > > wrote:
> > >
> > > > Thank you for the response, Ismael.
> > > >
> > > > 1. Specifically in context of 3.6, I wanted this compatibility
> > > > guarantee point to encourage a discussion on
> > > >
> > > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-952%3A+Regenerate+segment-aligned+producer+snapshots+when+upgrading+to+a+Kafka+version+supporting+Tiered+Storage
> > > > .
> > > > Due to lack of producer snapshots in <2.8 versions, a customer may not
> > > > be able to upgrade to 3.6 and use TS on a topic which was created when
> > > > the cluster was on <2.8 version (see motivation for details). We can
> > > > discuss and agree that it does not break compatibility, which is fine.
> > > > But I want to ensure that we have a discussion soon on this to reach a
> > > > conclusion.
> > > >
> > > > 2. I will start a KIP on this for further discussion.
> > > >
> > > > 3. In the context of 3.6, this would mean that there should be
> > > > no-regression, if a user does "not" turn-on remote storage (early
> > > > access feature) at a cluster level. We have some known cases (such as
> > > > https://issues.apache.org/jira/browse/KAFKA-15189) which violate this
> > > > compatibility requirement. Having this guarantee mentioned in the
> > > > release plan will ensure that we are all in agreement with which cases
> > > > are truly blockers and which aren't.
> > > >
> > > > 4. Fair, instead of a general goal, let me put it specifically in the
> > > > context of 3.6. Let me know if this is not the right forum for this
> > > > discussion.
> > > > Once a user "turns on" tiered storage (TS) at a cluster level, I am
> > > > proposing that they should have the ability to turn it off as well at
> > > > a cluster level. Since this is a topic level feature, folks may not
> > > > spin up a separate cluster to try this feature, hence, we need to
> > > > ensure that we provide them with the ability to try tiered storage for
> > > > a topic which could be deleted and featured turned-off, so that rest
> > > > of the production cases are not impacted.
> > > >
> > > > 5. Agree on not making public interface change as a requirement but we
> > > > should define what "early access" means in that case. Users may not be
> > > > aware that "early access" public APIs may change (unless I am missing
> > > > some documentation somewhere completely, in which case I apologize for
> > > > bringing this naive point).
> > > >
> > > > --
> > > > Divij Vaidya
> > > >
> > > > On Thu, Jul 27, 2023 at 2:27 PM Ismael Juma  wrote:
> > > > >
> > > > > Hi Divij,
> > > > >
> > > > > Some of these are launch checklist items (not really goals) and some
> > > are
> > > > > compatibility guarantees. More below.
> > > > >
> > > > > On Thu, Jul 27, 2023, 12:10 PM Divij Vaidya 
> > > > wrote:
> > > > >
> > > > > > Hey Satish
> > > > > >
> > > > > > Could we consider adding "launch goals" in the release plan. While
> > > > > > some of these may be implicit, it would be nice to list them down in
> > > > > > the release plan. For this release, our launch requirements would 
> > > > > > be:
> > > > > > 1. Users should be able to upgrade from any prior Kafka version to
> > > this
> > > > > > version.
> > > > > >
> > > > >
> > > > > This is part of the 

Re: Apache Kafka 3.6.0 release

2023-07-28 Thread Satish Duggana
Thanks Ismael and Divij for the suggestions.

One way was to follow the earlier guidelines that we set for any early
access release. It looks Ismael already mentioned the example of
KRaft.

KIP-405 mentions upgrade/downgrade and limitations sections. We can
clarify that in the release notes for users on how this feature can be
used for early access.

Divij, We do not want users to enable this feature on production
environments in early access release. Let us work together on the
followups Ismael suggested.

~Satish.

On Fri, 28 Jul 2023 at 02:24, Divij Vaidya  wrote:
>
> Those are great suggestions, thank you. We will continue this discussion
> forward in a separate KIP for release plan for Tiered Storage.
>
> On Thu 27. Jul 2023 at 21:46, Ismael Juma  wrote:
>
> > Hi Divij,
> >
> > I think the points you bring up for discussion are all good. My main
> > feedback is that they should be discussed in the context of KIPs vs the
> > release template. That's why we have a backwards compatibility section for
> > every KIP, it's precisely to ensure we think carefully about some of the
> > points you're bringing up. When it comes to defining the meaning of early
> > access, we have two options:
> >
> > 1. Have a KIP specifically for tiered storage.
> > 2. Have a KIP to define general guidelines for what early access means.
> >
> > Does this make sense?
> >
> > Ismael
> >
> > On Thu, Jul 27, 2023 at 6:38 PM Divij Vaidya 
> > wrote:
> >
> > > Thank you for the response, Ismael.
> > >
> > > 1. Specifically in context of 3.6, I wanted this compatibility
> > > guarantee point to encourage a discussion on
> > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-952%3A+Regenerate+segment-aligned+producer+snapshots+when+upgrading+to+a+Kafka+version+supporting+Tiered+Storage
> > > .
> > > Due to lack of producer snapshots in <2.8 versions, a customer may not
> > > be able to upgrade to 3.6 and use TS on a topic which was created when
> > > the cluster was on <2.8 version (see motivation for details). We can
> > > discuss and agree that it does not break compatibility, which is fine.
> > > But I want to ensure that we have a discussion soon on this to reach a
> > > conclusion.
> > >
> > > 2. I will start a KIP on this for further discussion.
> > >
> > > 3. In the context of 3.6, this would mean that there should be
> > > no-regression, if a user does "not" turn-on remote storage (early
> > > access feature) at a cluster level. We have some known cases (such as
> > > https://issues.apache.org/jira/browse/KAFKA-15189) which violate this
> > > compatibility requirement. Having this guarantee mentioned in the
> > > release plan will ensure that we are all in agreement with which cases
> > > are truly blockers and which aren't.
> > >
> > > 4. Fair, instead of a general goal, let me put it specifically in the
> > > context of 3.6. Let me know if this is not the right forum for this
> > > discussion.
> > > Once a user "turns on" tiered storage (TS) at a cluster level, I am
> > > proposing that they should have the ability to turn it off as well at
> > > a cluster level. Since this is a topic level feature, folks may not
> > > spin up a separate cluster to try this feature, hence, we need to
> > > ensure that we provide them with the ability to try tiered storage for
> > > a topic which could be deleted and featured turned-off, so that rest
> > > of the production cases are not impacted.
> > >
> > > 5. Agree on not making public interface change as a requirement but we
> > > should define what "early access" means in that case. Users may not be
> > > aware that "early access" public APIs may change (unless I am missing
> > > some documentation somewhere completely, in which case I apologize for
> > > bringing this naive point).
> > >
> > > --
> > > Divij Vaidya
> > >
> > > On Thu, Jul 27, 2023 at 2:27 PM Ismael Juma  wrote:
> > > >
> > > > Hi Divij,
> > > >
> > > > Some of these are launch checklist items (not really goals) and some
> > are
> > > > compatibility guarantees. More below.
> > > >
> > > > On Thu, Jul 27, 2023, 12:10 PM Divij Vaidya 
> > > wrote:
> > > >
> > > > > Hey Satish
> > > > >
> > > > > Could we consider adding "launch goals" in the release plan. While
> > > > > some of these may be implicit, it would be nice to list them down in
> > > > > the release plan. For this release, our launch requirements would be:
> > > > > 1. Users should be able to upgrade from any prior Kafka version to
> > this
> > > > > version.
> > > > >
> > > >
> > > > This is part of the compatibility guarantees. The upgrade notes mention
> > > > this already. If there is a change in a given release, it should
> > > definitely
> > > > be highlighted.
> > > >
> > > > 2. On release, this version (or it's dependencies) would not have any
> > > > > known MEDIUM/HIGH CVE.
> > > > >
> > > >
> > > > This is a new policy and the details should be discussed. In
> > particular,
> > > > the threshold (medium or high).
> > > >
> > > > 3.