Re: [DISCUSS] KIP-953: partition method to be overloaded to accept headers as well.

2023-08-03 Thread Jack Tomy
All right, Thanks Andrew.

Hey everyone,
Please share your thoughts and feedback on the KIP :
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263424937



On Fri, Aug 4, 2023 at 2:50 AM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:

> Hi Jack,
> I do understand the idea of extending the Partitioner interface so that
> people are now able to use headers in the partitioning decision, and I see
> that it’s filling in gap in the interface which was left when headers were
> originally added.
>
> Experience with non-default partitioning schemes in the past makes me
> unlikely to use anything other than the default partitioning scheme.
> But I wouldn’t let that dissuade you.
>
> Thanks,
> Andrew
>
> > On 3 Aug 2023, at 13:43, Jack Tomy  wrote:
> >
> > Hey Andrew, Sagar
> >
> > Please share your thoughts. Thanks.
> >
> >
> >
> > On Mon, Jul 31, 2023 at 5:58 PM Jack Tomy  wrote:
> >
> >> Hey Andrew, Sagar
> >>
> >> Thanks. I'm travelling so sorry for being brief and getting back late.
> >>
> >> 1. For the first concern, that is moving in a direction of server side
> >> partitioner, the idea seems very much promising but I believe we still
> have
> >> a long way to go. Since the proposal/design for the same is still not
> >> available, it's hard for me to defend my proposal.
> >> 2.  For the second concern:
> >> 2.1 Loss of order in messages, I believe the ordering of messages is
> >> never promised and the partitioner has no requirement to ensure the
> same.
> >> It is upto the user to implement/use a partitioner which ensures
> ordering
> >> based on keys.
> >> 2.2 Key deciding the partitioner, It is totally up to the user to decide
> >> the partition regardless of the key, we are also passing the value to
> the
> >> partitioner. Even the existing implementation receives the value which
> lets
> >> the user decide the partition based on value.
> >> 2.3 Sending to a specific partition, for this, I need to be aware of the
> >> total number of partitions, but if I can do that same in partitioner,
> the
> >> cluster param gives me all the information I want.
> >>
> >> I would also quote a line from KIP-82 where headers were added to the
> >> serializer : The payload is traditionally for the business object, and
> *headers
> >> are traditionally used for transport routing*, filtering etc. So I
> >> believe when a user wants to add some routing information (in this case
> >> which set of partitions to go for), headers seem to be the right place.
> >>
> >>
> >>
> >> On Sat, Jul 29, 2023 at 8:48 PM Sagar 
> wrote:
> >>
> >>> Hi Andrew,
> >>>
> >>> Thanks for your comments.
> >>>
> >>> 1) Yes that makes sense and that's what even would expect to see as
> well.
> >>> I
> >>> just wanted to highlight that we might still need a way to let client
> side
> >>> partitioning logic be present as well. Anyways, I am good on this
> point.
> >>> 2) The example provided does seem achievable by simply attaching the
> >>> partition number in the ProducerRecord. I guess if we can't find any
> >>> further examples which strengthen the case of this partitioner, it
> might
> >>> be
> >>> harder to justify adding it.
> >>>
> >>>
> >>> Thanks!
> >>> Sagar.
> >>>
> >>> On Fri, Jul 28, 2023 at 2:05 PM Andrew Schofield <
> >>> andrew_schofield_j...@outlook.com> wrote:
> >>>
>  Hi Sagar,
>  Thanks for your comments.
> 
>  1) Server-side partitioning doesn’t necessarily mean that there’s only
> >>> one
>  way to do it. It just means that the partitioning logic runs on the
> >>> broker
>  and
>  any configuration of partitioning applies to the broker’s partitioner.
> >>> If
>  we ever
>  see a KIP for this, that’s the kind of thing I would expect to see.
> 
>  2) In the priority example in the KIP, there is a kind of contract
> >>> between
>  the
>  producers and consumers so that some records can be processed before
>  others regardless of the order in which they were sent. The producer
>  wants to apply special significance to a particular header to control
> >>> which
>  partition is used. I would simply achieve this by setting the
> partition
>  number
>  in the ProducerRecord at the time of sending.
> 
>  I don’t think the KIP proposes adjusting the built-in partitioner or
>  adding to AK
>  a new one that uses headers in the partitioning decision. So, any
>  configuration
>  for a partitioner that does support headers would be up to the
>  implementation
>  of that specific partitioner. Partitioner implements Configurable.
> 
>  I’m just providing an alternative view and I’m not particularly
> opposed
> >>> to
>  the KIP.
>  I just don’t think it quite merits the work involved to get it voted
> and
>  merged.
>  As an aside, a long time ago, I created a small KIP that was never
> >>> adopted
>  and I didn’t push it because I eventually didn’t need it.
> 
>  Thanks,
>  

Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #2067

2023-08-03 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 395922 lines...]

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > shouldRestoreStateFromChangelogTopic(boolean) > [1] 
true STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > shouldRestoreStateFromChangelogTopic(boolean) > [1] 
true PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > shouldRestoreStateFromChangelogTopic(boolean) > [2] 
false STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > 
shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(boolean) > 
[1] true PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > 
shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(boolean) > 
[2] false STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > shouldRestoreStateFromChangelogTopic(boolean) > [2] 
false PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
SmokeTestDriverIntegrationTest > shouldWorkWithRebalance(boolean) > [1] true 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > 
shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(boolean) > 
[2] false PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > 
shouldProcessDataFromStoresWithLoggingDisabled(boolean) > [1] true STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > 
shouldProcessDataFromStoresWithLoggingDisabled(boolean) > [1] true PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > 
shouldProcessDataFromStoresWithLoggingDisabled(boolean) > [2] false STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > 
shouldProcessDataFromStoresWithLoggingDisabled(boolean) > [2] false PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > shouldRestoreNullRecord() STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
SmokeTestDriverIntegrationTest > shouldWorkWithRebalance(boolean) > [1] true 
PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
SmokeTestDriverIntegrationTest > shouldWorkWithRebalance(boolean) > [2] false 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > shouldRestoreNullRecord() PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > shouldRestoreStateFromSourceTopic(boolean) > [1] true 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > shouldRestoreStateFromSourceTopic(boolean) > [1] true 
PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > shouldRestoreStateFromSourceTopic(boolean) > [2] false 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > shouldRestoreStateFromSourceTopic(boolean) > [2] false 
PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > shouldSuccessfullyStartWhenLoggingDisabled(boolean) > 
[1] true STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > shouldSuccessfullyStartWhenLoggingDisabled(boolean) > 
[1] true PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > shouldSuccessfullyStartWhenLoggingDisabled(boolean) > 
[2] false STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > shouldSuccessfullyStartWhenLoggingDisabled(boolean) > 
[2] false PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > shouldRestoreStateFromChangelogTopic(boolean) > [1] 
true STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > shouldRestoreStateFromChangelogTopic(boolean) > [1] 
true PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > shouldRestoreStateFromChangelogTopic(boolean) > [2] 
false STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > shouldRestoreStateFromChangelogTopic(boolean) > [2] 
false PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
SmokeTestDriverIntegrationTest > shouldWorkWithRebalance(boolean) > [1] true 
STARTED

Gradle Test Run 

Re: Apache Kafka 3.6.0 release

2023-08-03 Thread Satish Duggana
Hi Chris,
Thanks for the update. This looks to be a minor change and is also
useful for backward compatibility. I added it to the release plan as
an exceptional case.

~Satish.

On Thu, 3 Aug 2023 at 21:34, Chris Egerton  wrote:
>
> Hi Satish,
>
> Would it be possible to include KIP-949 (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-949%3A+Add+flag+to+enable+the+usage+of+topic+separator+in+MM2+DefaultReplicationPolicy)
> in the 3.6.0 release? It passed voting yesterday, and is a very small,
> low-risk change that we'd like to put out as soon as possible in order to
> patch an accidental break in backwards compatibility caused a few versions
> ago.
>
> Best,
>
> Chris
>
> On Fri, Jul 28, 2023 at 2:35 AM Satish Duggana 
> wrote:
>
> > Hi All,
> > Whoever has KIP entries in the 3.6.0 release plan. Please update it
> > with the latest status by tomorrow(end of the day 29th Jul UTC ).
> >
> > Thanks
> > Satish.
> >
> > On Fri, 28 Jul 2023 at 12:01, Satish Duggana 
> > wrote:
> > >
> > > Thanks Ismael and Divij for the suggestions.
> > >
> > > One way was to follow the earlier guidelines that we set for any early
> > > access release. It looks Ismael already mentioned the example of
> > > KRaft.
> > >
> > > KIP-405 mentions upgrade/downgrade and limitations sections. We can
> > > clarify that in the release notes for users on how this feature can be
> > > used for early access.
> > >
> > > Divij, We do not want users to enable this feature on production
> > > environments in early access release. Let us work together on the
> > > followups Ismael suggested.
> > >
> > > ~Satish.
> > >
> > > On Fri, 28 Jul 2023 at 02:24, Divij Vaidya 
> > wrote:
> > > >
> > > > Those are great suggestions, thank you. We will continue this
> > discussion
> > > > forward in a separate KIP for release plan for Tiered Storage.
> > > >
> > > > On Thu 27. Jul 2023 at 21:46, Ismael Juma  wrote:
> > > >
> > > > > Hi Divij,
> > > > >
> > > > > I think the points you bring up for discussion are all good. My main
> > > > > feedback is that they should be discussed in the context of KIPs vs
> > the
> > > > > release template. That's why we have a backwards compatibility
> > section for
> > > > > every KIP, it's precisely to ensure we think carefully about some of
> > the
> > > > > points you're bringing up. When it comes to defining the meaning of
> > early
> > > > > access, we have two options:
> > > > >
> > > > > 1. Have a KIP specifically for tiered storage.
> > > > > 2. Have a KIP to define general guidelines for what early access
> > means.
> > > > >
> > > > > Does this make sense?
> > > > >
> > > > > Ismael
> > > > >
> > > > > On Thu, Jul 27, 2023 at 6:38 PM Divij Vaidya <
> > divijvaidy...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Thank you for the response, Ismael.
> > > > > >
> > > > > > 1. Specifically in context of 3.6, I wanted this compatibility
> > > > > > guarantee point to encourage a discussion on
> > > > > >
> > > > > >
> > > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-952%3A+Regenerate+segment-aligned+producer+snapshots+when+upgrading+to+a+Kafka+version+supporting+Tiered+Storage
> > > > > > .
> > > > > > Due to lack of producer snapshots in <2.8 versions, a customer may
> > not
> > > > > > be able to upgrade to 3.6 and use TS on a topic which was created
> > when
> > > > > > the cluster was on <2.8 version (see motivation for details). We
> > can
> > > > > > discuss and agree that it does not break compatibility, which is
> > fine.
> > > > > > But I want to ensure that we have a discussion soon on this to
> > reach a
> > > > > > conclusion.
> > > > > >
> > > > > > 2. I will start a KIP on this for further discussion.
> > > > > >
> > > > > > 3. In the context of 3.6, this would mean that there should be
> > > > > > no-regression, if a user does "not" turn-on remote storage (early
> > > > > > access feature) at a cluster level. We have some known cases (such
> > as
> > > > > > https://issues.apache.org/jira/browse/KAFKA-15189) which violate
> > this
> > > > > > compatibility requirement. Having this guarantee mentioned in the
> > > > > > release plan will ensure that we are all in agreement with which
> > cases
> > > > > > are truly blockers and which aren't.
> > > > > >
> > > > > > 4. Fair, instead of a general goal, let me put it specifically in
> > the
> > > > > > context of 3.6. Let me know if this is not the right forum for this
> > > > > > discussion.
> > > > > > Once a user "turns on" tiered storage (TS) at a cluster level, I am
> > > > > > proposing that they should have the ability to turn it off as well
> > at
> > > > > > a cluster level. Since this is a topic level feature, folks may not
> > > > > > spin up a separate cluster to try this feature, hence, we need to
> > > > > > ensure that we provide them with the ability to try tiered storage
> > for
> > > > > > a topic which could be deleted and featured turned-off, so that
> > rest
> > > > > > of the production cases are not 

[jira] [Resolved] (KAFKA-15106) AbstractStickyAssignor may stuck in 3.5

2023-08-03 Thread li xiangyuan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

li xiangyuan resolved KAFKA-15106.
--
Resolution: Fixed

> AbstractStickyAssignor may stuck in 3.5
> ---
>
> Key: KAFKA-15106
> URL: https://issues.apache.org/jira/browse/KAFKA-15106
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.5.0
>Reporter: li xiangyuan
>Assignee: li xiangyuan
>Priority: Major
> Fix For: 3.6.0
>
>
> this could reproduce in ut easy,
> just int 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest#testLargeAssignmentAndGroupWithNonEqualSubscription,
> plz set 
> partitionCount=200, 
> consumerCount=20,  you can see 
> isBalanced will return false forever.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-759: Unneeded repartition canceling

2023-08-03 Thread Shay Lin
Hi all,

Thanks to everyone who participated in the vote and the discussion. I'll
close this since it has been open for over 72 hours and we have a
sufficient number of votes. KIP-793 has been accepted with the following +1
votes (binding): Matthias, Walker, Bill, Bruno, Sophie.

I'll start working on the implementation.

Best,
Shay

On Wed, Aug 2, 2023 at 3:56 PM Sophie Blee-Goldman 
wrote:

> +1 (binding)
>
> thanks Shay!
>
> On Wed, Aug 2, 2023 at 1:19 AM Bruno Cadonna  wrote:
>
> > Hi,
> >
> > +1 (binding)
> >
> > Thanks for the KIP!
> >
> > Best,
> > Bruno
> >
> > On 8/2/23 1:19 AM, Bill Bejeck wrote:
> > > I caught up on the discussion thread and the KIP LGTM.
> > >
> > > +1(binding)
> > >
> > > On Tue, Aug 1, 2023 at 3:07 PM Walker Carlson
> > 
> > > wrote:
> > >
> > >> +1 (binding)
> > >>
> > >> On Mon, Jul 31, 2023 at 10:43 PM Matthias J. Sax 
> > wrote:
> > >>
> > >>> +1 (binding)
> > >>>
> > >>> On 7/11/23 11:16 AM, Shay Lin wrote:
> >  Hi all,
> > 
> >  I'd like to call a vote on KIP-759: Unneeded repartition canceling
> >  The KIP has been under discussion for quite some time(two years).
> This
> > >>> is a
> >  valuable optimization for advanced users. I hope we can push this
> > >> toward
> >  the finish line this time.
> > 
> >  Link to the KIP:
> > 
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling
> > 
> >  Best,
> >  Shay
> > 
> > >>>
> > >>
> > >
> >
>


Re: [DISCUSS] KIP-953: partition method to be overloaded to accept headers as well.

2023-08-03 Thread Andrew Schofield
Hi Jack,
I do understand the idea of extending the Partitioner interface so that
people are now able to use headers in the partitioning decision, and I see
that it’s filling in gap in the interface which was left when headers were
originally added.

Experience with non-default partitioning schemes in the past makes me
unlikely to use anything other than the default partitioning scheme.
But I wouldn’t let that dissuade you.

Thanks,
Andrew

> On 3 Aug 2023, at 13:43, Jack Tomy  wrote:
>
> Hey Andrew, Sagar
>
> Please share your thoughts. Thanks.
>
>
>
> On Mon, Jul 31, 2023 at 5:58 PM Jack Tomy  wrote:
>
>> Hey Andrew, Sagar
>>
>> Thanks. I'm travelling so sorry for being brief and getting back late.
>>
>> 1. For the first concern, that is moving in a direction of server side
>> partitioner, the idea seems very much promising but I believe we still have
>> a long way to go. Since the proposal/design for the same is still not
>> available, it's hard for me to defend my proposal.
>> 2.  For the second concern:
>> 2.1 Loss of order in messages, I believe the ordering of messages is
>> never promised and the partitioner has no requirement to ensure the same.
>> It is upto the user to implement/use a partitioner which ensures ordering
>> based on keys.
>> 2.2 Key deciding the partitioner, It is totally up to the user to decide
>> the partition regardless of the key, we are also passing the value to the
>> partitioner. Even the existing implementation receives the value which lets
>> the user decide the partition based on value.
>> 2.3 Sending to a specific partition, for this, I need to be aware of the
>> total number of partitions, but if I can do that same in partitioner, the
>> cluster param gives me all the information I want.
>>
>> I would also quote a line from KIP-82 where headers were added to the
>> serializer : The payload is traditionally for the business object, and 
>> *headers
>> are traditionally used for transport routing*, filtering etc. So I
>> believe when a user wants to add some routing information (in this case
>> which set of partitions to go for), headers seem to be the right place.
>>
>>
>>
>> On Sat, Jul 29, 2023 at 8:48 PM Sagar  wrote:
>>
>>> Hi Andrew,
>>>
>>> Thanks for your comments.
>>>
>>> 1) Yes that makes sense and that's what even would expect to see as well.
>>> I
>>> just wanted to highlight that we might still need a way to let client side
>>> partitioning logic be present as well. Anyways, I am good on this point.
>>> 2) The example provided does seem achievable by simply attaching the
>>> partition number in the ProducerRecord. I guess if we can't find any
>>> further examples which strengthen the case of this partitioner, it might
>>> be
>>> harder to justify adding it.
>>>
>>>
>>> Thanks!
>>> Sagar.
>>>
>>> On Fri, Jul 28, 2023 at 2:05 PM Andrew Schofield <
>>> andrew_schofield_j...@outlook.com> wrote:
>>>
 Hi Sagar,
 Thanks for your comments.

 1) Server-side partitioning doesn’t necessarily mean that there’s only
>>> one
 way to do it. It just means that the partitioning logic runs on the
>>> broker
 and
 any configuration of partitioning applies to the broker’s partitioner.
>>> If
 we ever
 see a KIP for this, that’s the kind of thing I would expect to see.

 2) In the priority example in the KIP, there is a kind of contract
>>> between
 the
 producers and consumers so that some records can be processed before
 others regardless of the order in which they were sent. The producer
 wants to apply special significance to a particular header to control
>>> which
 partition is used. I would simply achieve this by setting the partition
 number
 in the ProducerRecord at the time of sending.

 I don’t think the KIP proposes adjusting the built-in partitioner or
 adding to AK
 a new one that uses headers in the partitioning decision. So, any
 configuration
 for a partitioner that does support headers would be up to the
 implementation
 of that specific partitioner. Partitioner implements Configurable.

 I’m just providing an alternative view and I’m not particularly opposed
>>> to
 the KIP.
 I just don’t think it quite merits the work involved to get it voted and
 merged.
 As an aside, a long time ago, I created a small KIP that was never
>>> adopted
 and I didn’t push it because I eventually didn’t need it.

 Thanks,
 Andrew

> On 28 Jul 2023, at 05:15, Sagar  wrote:
>
> Hey Andrew,
>
> Thanks for the review. Since I had reviewed the KIP I thought I would
 also
> respond. Of course Jack has the final say on this since he wrote the
>>> KIP.
>
> 1) This is an interesting point and I hadn't considered it. The
> comparison with KIP-848 is a valid one but even within that KIP, it
 allows
> client side partitioning for power users like Streams. So while we
>>> would
> want to move 

[jira] [Created] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2023-08-03 Thread Jinyong Choi (Jira)
Jinyong Choi created KAFKA-15302:


 Summary: Stale value returned when using store.all() in 
punctuation function.
 Key: KAFKA-15302
 URL: https://issues.apache.org/jira/browse/KAFKA-15302
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.5.1
Reporter: Jinyong Choi


When using the store.all() function within the Punctuation function of 
this.context.schedule, the previous value is returned. In other words, even 
though the value has been stored from 1 to 2, it doesn't return 2; instead, it 
returns 1.

In the provided test code, you can see the output 'BROKEN !!!', and while this 
doesn't occur 100% of the time, by adding logs, it's evident that during the 
while loop after all() is called, the cache is flushed. As a result, the named 
cache holds a null value, causing the return of a value from RocksDB. This is 
observed as the value after the .get() call is different from the expected 
value. This is possibly due to the consistent read functionality of RocksDB, 
although the exact cause is not certain.

Of course, if you perform {{store.flush()}} before {{all()}} there won't be any 
errors.

 
 * test code (forked from balajirrao and modified for this)

[https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]

 
{code:java}

private void forwardAll(final long timestamp) {
//
    System.err.println("forwardAll Start");    KeyValueIterator kvList = this.kvStore.all();
    while (kvList.hasNext()) {
        KeyValue entry = kvList.next();
        final Record msg = new Record<>(entry.key, 
entry.value, context.currentSystemTimeMs());
        final Integer storeValue = this.kvStore.get(entry.key);        if 
(entry.value != storeValue) {
            System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: " 
+ entry.key + " Expected in stored(Cache or Store) value: " + storeValue + " 
but KeyValueIterator value: " + entry.value);
            throw new RuntimeException("Broken!");
        }        this.context.forward(msg);
    }
    kvList.close();
}
{code}
 * log file (add log in stream source)

 
{code:java}
# console log
sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
[info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
...
[info] running Coordinator 1
appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
[0] starting instance +1
forwardAll Start
[0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
KeyValueIterator value: 1


# log file
...
01:05:00.382 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
01:05:00.388 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
dirtyKeys.size():7873 entries:7873
01:05:00.434 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.p.i.ProcessorStateManager -- stream-thread 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 stream-task [0_0] Flushed cache or buffer Counts
...
01:05:00.587 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.s.i.CachingKeyValueStore --  KeyValueIterator 
all()
01:05:00.588 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.RocksDBStore --  RocksDB KeyValueIterator all
01:05:00.590 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.ThreadCache -- stream-thread 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
  MemoryLRUCacheBytesIterator cache all()
01:05:00.591 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache --   NamedCache allKeys() 
size():325771
01:05:00.637 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache --   NamedCache keySetIterator() 
TreeSet size():325771
...
01:05:07.052 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- 
===0_0-Counts evict() isDirty() eldest.size():103
01:05:07.052 
[95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
 INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
flush: #hits=5636398, #misses=6233857, #overwrites=639857, #flushes=402
01:05:07.053 

Re: Apache Kafka 3.6.0 release

2023-08-03 Thread Chris Egerton
Hi Satish,

Would it be possible to include KIP-949 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-949%3A+Add+flag+to+enable+the+usage+of+topic+separator+in+MM2+DefaultReplicationPolicy)
in the 3.6.0 release? It passed voting yesterday, and is a very small,
low-risk change that we'd like to put out as soon as possible in order to
patch an accidental break in backwards compatibility caused a few versions
ago.

Best,

Chris

On Fri, Jul 28, 2023 at 2:35 AM Satish Duggana 
wrote:

> Hi All,
> Whoever has KIP entries in the 3.6.0 release plan. Please update it
> with the latest status by tomorrow(end of the day 29th Jul UTC ).
>
> Thanks
> Satish.
>
> On Fri, 28 Jul 2023 at 12:01, Satish Duggana 
> wrote:
> >
> > Thanks Ismael and Divij for the suggestions.
> >
> > One way was to follow the earlier guidelines that we set for any early
> > access release. It looks Ismael already mentioned the example of
> > KRaft.
> >
> > KIP-405 mentions upgrade/downgrade and limitations sections. We can
> > clarify that in the release notes for users on how this feature can be
> > used for early access.
> >
> > Divij, We do not want users to enable this feature on production
> > environments in early access release. Let us work together on the
> > followups Ismael suggested.
> >
> > ~Satish.
> >
> > On Fri, 28 Jul 2023 at 02:24, Divij Vaidya 
> wrote:
> > >
> > > Those are great suggestions, thank you. We will continue this
> discussion
> > > forward in a separate KIP for release plan for Tiered Storage.
> > >
> > > On Thu 27. Jul 2023 at 21:46, Ismael Juma  wrote:
> > >
> > > > Hi Divij,
> > > >
> > > > I think the points you bring up for discussion are all good. My main
> > > > feedback is that they should be discussed in the context of KIPs vs
> the
> > > > release template. That's why we have a backwards compatibility
> section for
> > > > every KIP, it's precisely to ensure we think carefully about some of
> the
> > > > points you're bringing up. When it comes to defining the meaning of
> early
> > > > access, we have two options:
> > > >
> > > > 1. Have a KIP specifically for tiered storage.
> > > > 2. Have a KIP to define general guidelines for what early access
> means.
> > > >
> > > > Does this make sense?
> > > >
> > > > Ismael
> > > >
> > > > On Thu, Jul 27, 2023 at 6:38 PM Divij Vaidya <
> divijvaidy...@gmail.com>
> > > > wrote:
> > > >
> > > > > Thank you for the response, Ismael.
> > > > >
> > > > > 1. Specifically in context of 3.6, I wanted this compatibility
> > > > > guarantee point to encourage a discussion on
> > > > >
> > > > >
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-952%3A+Regenerate+segment-aligned+producer+snapshots+when+upgrading+to+a+Kafka+version+supporting+Tiered+Storage
> > > > > .
> > > > > Due to lack of producer snapshots in <2.8 versions, a customer may
> not
> > > > > be able to upgrade to 3.6 and use TS on a topic which was created
> when
> > > > > the cluster was on <2.8 version (see motivation for details). We
> can
> > > > > discuss and agree that it does not break compatibility, which is
> fine.
> > > > > But I want to ensure that we have a discussion soon on this to
> reach a
> > > > > conclusion.
> > > > >
> > > > > 2. I will start a KIP on this for further discussion.
> > > > >
> > > > > 3. In the context of 3.6, this would mean that there should be
> > > > > no-regression, if a user does "not" turn-on remote storage (early
> > > > > access feature) at a cluster level. We have some known cases (such
> as
> > > > > https://issues.apache.org/jira/browse/KAFKA-15189) which violate
> this
> > > > > compatibility requirement. Having this guarantee mentioned in the
> > > > > release plan will ensure that we are all in agreement with which
> cases
> > > > > are truly blockers and which aren't.
> > > > >
> > > > > 4. Fair, instead of a general goal, let me put it specifically in
> the
> > > > > context of 3.6. Let me know if this is not the right forum for this
> > > > > discussion.
> > > > > Once a user "turns on" tiered storage (TS) at a cluster level, I am
> > > > > proposing that they should have the ability to turn it off as well
> at
> > > > > a cluster level. Since this is a topic level feature, folks may not
> > > > > spin up a separate cluster to try this feature, hence, we need to
> > > > > ensure that we provide them with the ability to try tiered storage
> for
> > > > > a topic which could be deleted and featured turned-off, so that
> rest
> > > > > of the production cases are not impacted.
> > > > >
> > > > > 5. Agree on not making public interface change as a requirement
> but we
> > > > > should define what "early access" means in that case. Users may
> not be
> > > > > aware that "early access" public APIs may change (unless I am
> missing
> > > > > some documentation somewhere completely, in which case I apologize
> for
> > > > > bringing this naive point).
> > > > >
> > > > > --
> > > > > Divij Vaidya
> > > > >
> > > > > On Thu, Jul 27, 2023 

Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2065

2023-08-03 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-953: partition method to be overloaded to accept headers as well.

2023-08-03 Thread Jack Tomy
 Hey Andrew, Sagar

Please share your thoughts. Thanks.



On Mon, Jul 31, 2023 at 5:58 PM Jack Tomy  wrote:

> Hey Andrew, Sagar
>
> Thanks. I'm travelling so sorry for being brief and getting back late.
>
> 1. For the first concern, that is moving in a direction of server side
> partitioner, the idea seems very much promising but I believe we still have
> a long way to go. Since the proposal/design for the same is still not
> available, it's hard for me to defend my proposal.
> 2.  For the second concern:
>  2.1 Loss of order in messages, I believe the ordering of messages is
> never promised and the partitioner has no requirement to ensure the same.
> It is upto the user to implement/use a partitioner which ensures ordering
> based on keys.
> 2.2 Key deciding the partitioner, It is totally up to the user to decide
> the partition regardless of the key, we are also passing the value to the
> partitioner. Even the existing implementation receives the value which lets
> the user decide the partition based on value.
> 2.3 Sending to a specific partition, for this, I need to be aware of the
> total number of partitions, but if I can do that same in partitioner, the
> cluster param gives me all the information I want.
>
> I would also quote a line from KIP-82 where headers were added to the
> serializer : The payload is traditionally for the business object, and 
> *headers
> are traditionally used for transport routing*, filtering etc. So I
> believe when a user wants to add some routing information (in this case
> which set of partitions to go for), headers seem to be the right place.
>
>
>
> On Sat, Jul 29, 2023 at 8:48 PM Sagar  wrote:
>
>> Hi Andrew,
>>
>> Thanks for your comments.
>>
>> 1) Yes that makes sense and that's what even would expect to see as well.
>> I
>> just wanted to highlight that we might still need a way to let client side
>> partitioning logic be present as well. Anyways, I am good on this point.
>> 2) The example provided does seem achievable by simply attaching the
>> partition number in the ProducerRecord. I guess if we can't find any
>> further examples which strengthen the case of this partitioner, it might
>> be
>> harder to justify adding it.
>>
>>
>> Thanks!
>> Sagar.
>>
>> On Fri, Jul 28, 2023 at 2:05 PM Andrew Schofield <
>> andrew_schofield_j...@outlook.com> wrote:
>>
>> > Hi Sagar,
>> > Thanks for your comments.
>> >
>> > 1) Server-side partitioning doesn’t necessarily mean that there’s only
>> one
>> > way to do it. It just means that the partitioning logic runs on the
>> broker
>> > and
>> > any configuration of partitioning applies to the broker’s partitioner.
>> If
>> > we ever
>> > see a KIP for this, that’s the kind of thing I would expect to see.
>> >
>> > 2) In the priority example in the KIP, there is a kind of contract
>> between
>> > the
>> > producers and consumers so that some records can be processed before
>> > others regardless of the order in which they were sent. The producer
>> > wants to apply special significance to a particular header to control
>> which
>> > partition is used. I would simply achieve this by setting the partition
>> > number
>> > in the ProducerRecord at the time of sending.
>> >
>> > I don’t think the KIP proposes adjusting the built-in partitioner or
>> > adding to AK
>> > a new one that uses headers in the partitioning decision. So, any
>> > configuration
>> > for a partitioner that does support headers would be up to the
>> > implementation
>> > of that specific partitioner. Partitioner implements Configurable.
>> >
>> > I’m just providing an alternative view and I’m not particularly opposed
>> to
>> > the KIP.
>> > I just don’t think it quite merits the work involved to get it voted and
>> > merged.
>> > As an aside, a long time ago, I created a small KIP that was never
>> adopted
>> > and I didn’t push it because I eventually didn’t need it.
>> >
>> > Thanks,
>> > Andrew
>> >
>> > > On 28 Jul 2023, at 05:15, Sagar  wrote:
>> > >
>> > > Hey Andrew,
>> > >
>> > > Thanks for the review. Since I had reviewed the KIP I thought I would
>> > also
>> > > respond. Of course Jack has the final say on this since he wrote the
>> KIP.
>> > >
>> > > 1) This is an interesting point and I hadn't considered it. The
>> > > comparison with KIP-848 is a valid one but even within that KIP, it
>> > allows
>> > > client side partitioning for power users like Streams. So while we
>> would
>> > > want to move away from client side partitioner as much as possible, we
>> > > still shouldn't do away completely with Client side partitioning and
>> end
>> > up
>> > > being in a state of inflexibility for different kinds of usecases.
>> This
>> > is
>> > > my opinion though and you have more context on Clients, so would like
>> to
>> > > know your thoughts on this.
>> > >
>> > > 2) Regarding this, I assumed that since the headers are already part
>> of
>> > the
>> > > consumer records they should have access to the headers and if there
>> is a
>> > > 

Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2064

2023-08-03 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-15301) [Tiered Storage] Historically compacted topics send request to remote for acive segment

2023-08-03 Thread Mital Awachat (Jira)
Mital Awachat created KAFKA-15301:
-

 Summary: [Tiered Storage] Historically compacted topics send 
request to remote for acive segment
 Key: KAFKA-15301
 URL: https://issues.apache.org/jira/browse/KAFKA-15301
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.6.0
Reporter: Mital Awachat


In AWS MSK (Kafka 2.8) tiered storage a case surfaced where tiered storage 
plugin received requests for active segments. The topics for which it happened 
were historically compacted topics for which compaction was disabled and 
tiering was enabled.

Create topic with compact cleanup policy -> Produce data with few repeat keys 
and create multiple segments -> let compaction happen -> change cleanup policy 
to delete -> produce some more data for segment rollover -> enable tiering on 
topic -> wait for segments to be uploaded to s3 and cleanup from local (active 
segment would remain), consume from beginning -> Observe logs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15274) support moving files to be deleted to other directories

2023-08-03 Thread jianbin.chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jianbin.chen resolved KAFKA-15274.
--
Resolution: Duplicate

> support moving files to be deleted to other directories
> ---
>
> Key: KAFKA-15274
> URL: https://issues.apache.org/jira/browse/KAFKA-15274
> Project: Kafka
>  Issue Type: Task
>Reporter: jianbin.chen
>Assignee: jianbin.chen
>Priority: Major
>
> Hello everyone, I am a Kafka user from China. Our company operates in public 
> clouds overseas, such as AWS, Ali Cloud, and Huawei Cloud. We face a large 
> amount of data exchange and business message delivery every day. Daily 
> messages consume a significant amount of disk space. Purchasing the 
> corresponding storage capacity on these cloud providers incurs substantial 
> costs, especially for SSDs with ultra-high IOPS. High IOPS is very effective 
> for disaster recovery, especially in the event of a sudden broker failure 
> where storage space becomes full or memory space is exhausted leading to OOM 
> kills. This high IOPS storage greatly improves data recovery efficiency, 
> forcing us to adopt smaller storage specifications with high IO to save 
> costs. Particularly, cloud providers only allow capacity expansion but not 
> reduction.
> Currently, we have come up with a solution and would like to contribute it to 
> the community for discussion. When we need to delete logs, I can purchase S3 
> or Minio storage from services like AWS and mount it to my brokers. When a 
> log needs to be deleted, we can decide how it leaves the broker. The default 
> is to delete it directly, while the move option moves it to S3. Since most of 
> the deleted data is cold data that won't be used in the short term, this 
> approach improves the retention period of historical data while maintaining 
> good cost control.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!

2023-08-03 Thread Erik van Oosten

Hi Philip, Colin, Chris, Matthias, Kirk, David, Xiangyuan LI,

KIP-944 was extended a bit more to explain why effect systems like Zio 
and Cats-effects make it impossible to run code on a specific thread.


I understand that using an effect system is pretty far removed from 
writing Java in transaction script style, the style that is probably 
used by most Kafka committers. I took me quite some time to get 
comfortable with effects. It is not the academic fringe tool as 
perceived by many. For me it is a way to quickly and correctly write 
serious data processing applications.


Even so, we both use the same Kafka eco-system and supporting different 
styles only makes it more rich. IMHO it would be a shame if we can not 
live together using the same code base.


Philip, thanks for your support. I hope I have convinced the others as 
well by now. If not, I am giving up and I will spend my energy elsewhere.


Kind regards,
    Erik.


Op 24-07-2023 om 18:12 schreef Erik van Oosten:

Hello Xiangyuan LI,

I am not familiar with coroutines, nor with Kotlin. You will have to 
work with the documentation: 
https://kotlinlang.org/docs/coroutines-overview.html


However, I am familiar with Zio and Cats-effects (both Scala 
libraries). In both Zio and Cats-effects one creates effects (aka 
workflows) which are descriptions of a computation. For example, when 
executing the Scala code `val effect = ZIO.attempt(println("Hello 
world!"))` one creates only a description; it does not print anything 
yet. The language to describe these effects is rich enough to describe 
entire applications including things like concurrency. In fact, the 
language is so rich, that it is the most convenient way that I know to 
safely write highly concurrent and async applications. For many 
developer teams the performance penalty (which is real but not big) is 
worth it.


To execute a Zio or Cats effect one gives it to the runtime. The 
runtime then schedules the work on one of the threads in its 
thread-pool. Zio, nor Cats-effects supports running an effect on the 
thread that manages the thread-pool.


I hope this clear enough.

Kind regards,
    Erik.


Op 24-07-2023 om 05:21 schreef Xiangyuan LI:

Hi Erik:
 I read KIP-944 and email list roughly, it seems most Java 
developer not
familiar with the conception of "coroutine" so cannot imagine why 
code of

one function without Thread.start() may run in separate threads and even
developer couldn't control it. Maybe you need a more elaborate 
description

to demonstrate how coroutine code run.

Erik van Oosten  于2023年7月23日周日 
17:47写道:



--
Erik van Oosten
e.vanoos...@grons.nl
https://day-to-day-stuff.blogspot.com



Re: [DISCUSS] KIP-942: Add Power(ppc64le) support

2023-08-03 Thread Divij Vaidya
Hey Vaibhav

1. KIP says "Enable CI for power architecture and run tests with Java
8, 11 and 17 and Scala 2.13". Do different versions of JVM work
differently for power architecture? Would it be sufficient if we just
run it with the latest supported JDK (20) + latest supported scala
(2.13) ?

2. Can you also please add that we plan to run this only on branch
builder and not on every PR. Note that we have two CI runs configured
today, one is "branch builder" which runs when a commit is merged to
trunk or preceding versions and another is "PR builder" which runs on
every commit on every PR. From our earlier discussion on this thread,
we discussed to only add it for "branch builder". Also, please add
option of adding test to "PR builder" in the rejected alternative
section.


--
Divij Vaidya

On Thu, Aug 3, 2023 at 8:40 AM Vaibhav Nazare
 wrote:
>
> Hi Divij
>
> Thanks for the response. Agree with you, also I have updated the KIP 
> accordingly.
>


Re: Request permission to contribute

2023-08-03 Thread Luke Chen
Hi Adrian,

Your account is all set.

Thanks.
Luke

On Thu, Aug 3, 2023 at 4:25 PM Adrian Preston  wrote:

> Hello,
> Please could my JIRA account (prestona) be given the permissions required
> to contribute to Kafka?
> Thanks,
> Adrian
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>


Request permission to contribute

2023-08-03 Thread Adrian Preston
Hello,
Please could my JIRA account (prestona) be given the permissions required to 
contribute to Kafka?
Thanks,
Adrian

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU


[DISCUSS] KIP-942: Add Power(ppc64le) support

2023-08-03 Thread Vaibhav Nazare
Hi Divij

Thanks for the response. Agree with you, also I have updated the KIP 
accordingly.