[jira] [Created] (KAFKA-15238) Connect workers can be disabled by DLQ related stuck admin client calls

2023-07-23 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15238:
--

 Summary: Connect workers can be disabled by DLQ related stuck 
admin client calls
 Key: KAFKA-15238
 URL: https://issues.apache.org/jira/browse/KAFKA-15238
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


When Kafka Connect is run in distributed mode - if a sink connector's task is 
restarted (via a worker's REST API), the following sequence of steps will occur 
(on the DistributedHerder's thread):

 
 # The existing sink task will be stopped 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1367])
 # A new sink task will be started 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1867C40-L1867C40])
 # As a part of the above step, a new {{WorkerSinkTask}} will be instantiated 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L656-L663])
 # The DLQ reporter (see 
[KIP-298|https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect])
 for the sink task is also instantiated and configured as a part of this 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1800])
 # The DLQ reporter setup involves two synchronous admin client calls to list 
topics and create the DLQ topic if it isn't already created 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L84-L87])

 

All of these are occurring synchronously on the herder's tick thread - in this 
portion 
[here|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L457-L469]
 where external requests are run. If the admin client call in the DLQ reporter 
setup step blocks for some time (due to auth failures and retries or network 
issues or whatever other reason), this can cause the Connect worker to become 
non-functional (REST API requests will timeout) and even fall out of the 
Connect cluster and become a zombie (since the tick thread also drives group 
membership functions).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15237) Implement `offsets.commit.timeout.ms` setting

2023-07-23 Thread David Jacot (Jira)
David Jacot created KAFKA-15237:
---

 Summary: Implement `offsets.commit.timeout.ms` setting
 Key: KAFKA-15237
 URL: https://issues.apache.org/jira/browse/KAFKA-15237
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Jacot






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-949: Add flag to enable the usage of topic separator in MM2 DefaultReplicationPolicy

2023-07-23 Thread Federico Valeri
+1 (non binding)

Thanks
Fede


On Sun, Jul 23, 2023 at 6:30 PM Omnia Ibrahim  wrote:
>
> Hi everyone,
> I would like to open a vote for KIP-949. The proposal is here
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-949%3A+Add+flag+to+enable+the+usage+of+topic+separator+in+MM2+DefaultReplicationPolicy
> .
> 
>
> Thanks
> Omnia


Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #2030

2023-07-23 Thread Apache Jenkins Server
See 




[jira] [Resolved] (KAFKA-12283) Flaky Test RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining

2023-07-23 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-12283.
---
Resolution: Fixed

> Flaky Test 
> RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining
> 
>
> Key: KAFKA-12283
> URL: https://issues.apache.org/jira/browse/KAFKA-12283
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect, unit tests
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
>
> https://github.com/apache/kafka/pull/1/checks?check_run_id=1820092809
> {quote} {{java.lang.AssertionError: Tasks are imbalanced: 
> localhost:36037=[seq-source13-0, seq-source13-1, seq-source13-2, 
> seq-source13-3, seq-source12-0, seq-source12-1, seq-source12-2, 
> seq-source12-3]
> localhost:43563=[seq-source11-0, seq-source11-2, seq-source10-0, 
> seq-source10-2]
> localhost:46539=[seq-source11-1, seq-source11-3, seq-source10-1, 
> seq-source10-3]
>   at org.junit.Assert.fail(Assert.java:89)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.assertConnectorAndTasksAreUniqueAndBalanced(RebalanceSourceConnectorsIntegrationTest.java:362)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290)
>   at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining(RebalanceSourceConnectorsIntegrationTest.java:313)}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-8391) Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector

2023-07-23 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-8391.
--
Resolution: Fixed

Fixed with https://github.com/apache/kafka/pull/12561

> Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
> ---
>
> Key: KAFKA-8391
> URL: https://issues.apache.org/jira/browse/KAFKA-8391
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Sagar Rao
>Priority: Critical
>  Labels: flaky-test
> Attachments: 100-gradle-builds.tar
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4747/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testDeleteConnector/]
> {quote}java.lang.AssertionError: Condition not met within timeout 3. 
> Connector tasks did not stop in time. at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:166){quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] KIP-930: Tiered Storage Metrics

2023-07-23 Thread Abhijeet Kumar
Hi All,

I created KIP-930 for adding RemoteIndexCache stats and also to rename some
tiered storage metrics added as part of KIP-405

to
remove ambiguity.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-930%3A+Tiered+Storage+Metrics

Feedback and suggestions are welcome.

Regards,
Abhijeet.


Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!

2023-07-23 Thread Xiangyuan LI
Hi Erik:
I read KIP-944 and email list roughly, it seems most Java developer not
familiar with the conception of "coroutine" so cannot imagine why code of
one function without Thread.start() may run in separate threads and even
developer couldn't control it. Maybe you need a more elaborate description
to demonstrate how coroutine code run.

Erik van Oosten  于2023年7月23日周日 17:47写道:

> Hi David,
>
>  > Could you elaborate a bit more on why the callbacks must be ran in
> another thread vs in the invoker thread?
>
> I have been thinking on how to explain this for 2 months now and it is
> not easy. It has something to do with that you cannot control what a
> thread is doing if you have to also run on that thread. But I just
> realized that /for me/ it really comes down to this:
>
>  We want to use Zio in the callback. Zio does not support it.
>
> There are more reasons as can be read in KAFKA-7143. But I do not know
> anything about Kotlin so I cannot elaborate on that.
>
> Kind regards,
>   Erik.
>
> Op 22-07-2023 om 21:39 schreef David Jacot:
> > Hi Erik,
> >
> > Thanks for the KIP. I would like to better understand the motivation of
> > this KIP. I am not familiar with async runtimes so please excuse me if I
> > ask stupid questions.
> >
> > Could you elaborate a bit more on why the callbacks must be ran in
> another
> > thread vs in the invoker thread? This is not clear to me. In the example
> > that you use with the ConsumerRebalanceListener, I would have thought
> that
> > calling commitSync (without changing thread) would have achieved the
> same.
> > The invoker has to wait anyway on the offset commit completion so using
> > another thread does not bring any benefit here.  I suppose that I am
> > missing something here…
> >
> > Regarding Chris’ proposal, this feels like a hack to me. The issue with
> it
> > is that we cannot guarantee it in the long term if it is not part of
> *the*
> > Consumer interface.
> >
> > I second what Chris said. We are all trying to understand the motivation
> in
> > order to find a good solution for Kafka. I apologize if this creates
> > frustration. This is definitely not our goal.
> >
> > Best,
> > David
> >
> > PS: I just saw that you opened a new KIP based on Chris’ idea. This is
> not
> > necessary. You can just update the current KIP based on the discussion.
> >
> > Le sam. 22 juil. 2023 à 18:34, Erik van Oosten
> 
> > a écrit :
> >
> >> Colin, Matthias, Chris,
> >>
> >> I have expanded the use case description in KIP-944. I hope it is more
> >> clear what we're trying to achieve.
> >>
> >> https://cwiki.apache.org/confluence/x/chw0Dw
> >>
> >> Kind regards,
> >>   Erik.
> >>
> >>
> >> Op 22-07-2023 om 17:23 schreef Erik van Oosten:
> >>> Hello Chris,
> >>>
> >>> Thanks for elaborating Matthias' words. Apparently the use case
> >>> description is too terse. Indeed, that is not FUD and that is
> >>> something I can work with.
> >>>
>  It's also worth mentioning that what's proposed in the KIP is only
> >>> blocked by the private access modifier on the KafkaConsumer::acquire
> >>> and KafkaConsumer::release methods. If we upgraded the visibility of
> >>> these methods from private to protected, it would be possible for
> >>> subclasses to implement the proposal in KIP-944, without any KIPs or
> >>> other changes to the official Java clients library.
> >>>
> >>> That is absolutely brilliant! Since I am pretty sure I am using the
> >>> consumer correctly, I could replace acquire and release with an empty
> >>> method body and be done.
> >>>
> >>> /Is making acquire and release protected something that other people
> >>> can live with?/
> >>> If yes, I will create a new PR with just that change.
> >>>
> >>> Kind regards,
> >>>  Erik.
> >>>
> >>>
> >>> Op 22-07-2023 om 16:39 schreef Chris Egerton:
>  Hi Erik,
> 
>  I don't think Matthias is bringing FUD to the discussion. Many of the
>  people who maintain Kafka are familiar with Kafka client internals
>  and the
>  Java programming language, but not necessarily other JVM languages or
>  asynchronous runtimes. I think it's reasonable to ask for a code
>  snippet or
>  two that demonstrates what you'd like to do with the consumer today
> that
>  you can't because of restrictions around concurrent access, and this
>  is not
>  already addressed in the KIP. Linking to a docs page on Kotlin
>  coroutines
>  is helpful but still requires reviewers to gather a lot of context on
>  their
>  own that could more easily be provided in the KIP, and although the
>  description of KAFKA-7143 is more detailed, I find it a little hard to
>  follow as someone who isn't already familiar with the environment the
>  user
>  is working in.
> 
>  It's also worth mentioning that what's proposed in the KIP is only
>  blocked
>  by the private access modifier on the KafkaConsumer::acquire and
>  KafkaConsumer::release methods. 

[jira] [Resolved] (KAFKA-14712) Confusing error when writing downgraded FeatureImage

2023-07-23 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-14712.
---
Resolution: Fixed

> Confusing error when writing downgraded FeatureImage
> 
>
> Key: KAFKA-14712
> URL: https://issues.apache.org/jira/browse/KAFKA-14712
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: David Arthur
>Assignee: Owen C.H. Leung
>Priority: Minor
>  Labels: good-first-issue
> Fix For: 3.6.0
>
>
> We have logic in ImageWriterOptions which forces any MetadataVersion lower 
> than 3.3-IV0 to be treated as 3.0-IV1. This was because FeatureLevel records 
> were not supported before 3.3-IV0. 
> When FeatureLevel is written at an older version, the "loss handler" produces 
> an error message warning the user that some metadata is being lost.
> For example, when writing a FeatureImage with flag "foo" at MetadataVersion 
> 3.2-IV0, we get a message like:
> > Metadata has been lost because the following could not be represented in 
> > metadata version 3.0-IV1: feature flag(s): foo
> This is confusing since we told the image builder to use MetadataVersion 
> 3.2-IV0, but 3.0-IV1 appears in the text.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-07-23 Thread Sophie Blee-Goldman
Guozhang:

On your 2nd point:

> "impl types" (in hindsight it may not be a good name) for rocksdb /
memory / custom, and we used "store types" for kv / windowed / sessioned
etc,
First off, thanks so much for this clarification -- using "store type" here
was definitely making me uncomfortable as this usually refers to KV vs
window, etc -- but I just couldn't for the life of me think of the right
term for rocks vs IM. We should 100% change to something like StoreImplSpec
for this kind of interface.

> As for list-value store (for stream-stream Join)
Again, glad you mentioned this -- I forgot how the extra stream-stream join
store is not a "regular" KV Store but rather this special list-value store.
If we proceed with something like the current approach, perhaps that should
be a boolean (or enum) parameter in the KVConfig, similar to the
EmitStrategy? After all, the high-level goal of this KIP is to be able to
fully customize all DSL state stores, and this is currently not possible
due to KAFKA-14976 .

If we expect there to be further customizations like this going forward,
perhaps we could instead have each of the three StoreConfig classes accept
a single enum parameter for the "sub-type" (or whatever you want to call
it), which would encompass (and replace) things like the EmitStrategy as
well as the list-value type (we could define one enum for each Config class
so there is no accidentally requesting a LIST_VALUE subtype on a
WindowStore). Thoughts?

Lastly, regarding 3.b:

I love that you brought this up because that is actually what I first
proposed to Almog, ie introducing a param class to clean up the
StoreBuilder API, during our chat that led to this KIP. He pushed back,
claiming (rightly so) that this change would be huge in scope for a purely
aesthetic/API change that doesn't add any functionality, and that it makes
more sense to start with the DSL config since there is a clear gap in
functionality there, particularly when it comes to custom state stores
(reasons 1 & 3 in the Motivation section). He did agree that the param
classes were a better API, which is why you see them in this KIP. In other
words: I fully agree that we should apply this improvement to the
PAPI/StoreBuilder interface as well, but I think that's a distinct concept
for the time-being and should not block the DSL improvement. Rather, I
consider this KIP as setting the stage for a followup KIP down the line to
clean up the StoreBuilders and bring them in line with the new param/config
class approach.

That said,  A) we should definitely make sure whatever we introduce here
can be extended to the PAPI StoreBuilders in a natural way, and B) I should
clarify that I personally would be happy to see this included in the
current KIP, but as Almog's KIP it's up to him to decide whether he's
comfortable expanding the scope like this. If you can convince him where I
could not, more power to you! :P

On Sun, Jul 23, 2023 at 4:48 PM Sophie Blee-Goldman 
wrote:

> Matthias:
>
> I'm not sure I agree with (or maybe don't follow) this take:
>>
>> we need all kind of `StoreTypeSpec` implementations,
>> and it might also imply that we need follow up KIPs for new feature
>> (like in-memory versioned store) that might not need a KIP otherwise.
>>
> I see this feature as being a nice add-on/convenience API for any store
> types which have a full DSL implementation. I don't think it's unreasonable
> to just say that this feature is only going to be available for store types
> that have KV, Window, and Session implementations. I can't think of any
> case besides versioned stores where this would force a KIP for a new
> feature that would not otherwise have to go through a KIP, and even for
> versioned state stores, the only issue is that the KIP for that was already
> accepted.
>
> However, I think I agree on your main point -- that things like "regular"
> vs timestamped vs versioned are/should be an implementation detail that's
> hidden from the user. As I noted previously, the current KIP actually
> greatly improves the situation for timestamped stores, as this would be
> handled completely transparently by the OOTB RocksDBStoreSpec. To me, this
> provides a very natural way to let the DSL operators using the default
> store type/spec to specify which kind of store (eg
> versioned/timestamped/etc) it wants, and choose the correct default. If the
> eventual intention is to have versioned state stores replace timestamped
> stores as the default in the DSL, then we can simply swap out the versioned
> stores for the timestamped stores in the RocksDBStoreTypeSpec, when that
> time comes. Until then, users who want to use the versioned store will have
> to do what they do today, which is individually override operators via
> Materialized/StoreSuppliers.
>
> All in all, it sounds like we should not offer a versioned store type
> spec, as "versioned" is more akin to "timestamped" than to a true
> differenc

Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-07-23 Thread Sophie Blee-Goldman
Matthias:

I'm not sure I agree with (or maybe don't follow) this take:
>
> we need all kind of `StoreTypeSpec` implementations,
> and it might also imply that we need follow up KIPs for new feature
> (like in-memory versioned store) that might not need a KIP otherwise.
>
I see this feature as being a nice add-on/convenience API for any store
types which have a full DSL implementation. I don't think it's unreasonable
to just say that this feature is only going to be available for store types
that have KV, Window, and Session implementations. I can't think of any
case besides versioned stores where this would force a KIP for a new
feature that would not otherwise have to go through a KIP, and even for
versioned state stores, the only issue is that the KIP for that was already
accepted.

However, I think I agree on your main point -- that things like "regular"
vs timestamped vs versioned are/should be an implementation detail that's
hidden from the user. As I noted previously, the current KIP actually
greatly improves the situation for timestamped stores, as this would be
handled completely transparently by the OOTB RocksDBStoreSpec. To me, this
provides a very natural way to let the DSL operators using the default
store type/spec to specify which kind of store (eg
versioned/timestamped/etc) it wants, and choose the correct default. If the
eventual intention is to have versioned state stores replace timestamped
stores as the default in the DSL, then we can simply swap out the versioned
stores for the timestamped stores in the RocksDBStoreTypeSpec, when that
time comes. Until then, users who want to use the versioned store will have
to do what they do today, which is individually override operators via
Materialized/StoreSuppliers.

All in all, it sounds like we should not offer a versioned store type spec,
as "versioned" is more akin to "timestamped" than to a true difference in
underlying store implementation type (eg rocks vs in-memory). W.r.t whether
to deprecate the old config or introduce a new CUSTOM enum type, either
seems fine to me, and we can go with that alternative instead. The only
other con to this approach that I can think of, and I'm honestly not sure
if this is something users would care about or only devs, is that the
advantage to moving rocks and IM to the store type spec interface is that
it helps to keep the relevant logic encapsulated in one easy place you can
quickly check to tell what kind of state store is used where. In the
current code, I found it extremely annoying and difficult to track down all
usages of the StoreType enum to see which actual rocksdb store was being
used where (for example some stores using the TimeOrderedBuffer variants in
some special cases, or to understand whether the DSL was defaulting to
plain, timestamped, or versioned stores for RocksDB vs InMemory -- both of
which seem like they could be of interest to a user). This would be much
easier if everything was handled in one place, and you can just go to the
(eg) RocksDBStoreTypeSpec and see what it's doing, or find usages of the
methods to understand what stores are being handed to which DSL operators.

I suppose we could still clean up the API and solve this problem by having
the old (and new) config delegate to a StoreTypeSpec no matter what, but
make RocksDBStoreTypeSpec and InMemoryStoreTypeSpec internal classes that
are simply implementation details of the ROCKSDB vs IN_MEMORY enums. WDYT?


On Sun, Jul 23, 2023 at 11:14 AM Guozhang Wang 
wrote:

> Thanks everyone for the great discussions so far! I first saw the JIRA
> and left some quick thoughts without being aware of the
> already-written KIP (kudos to Almog, very great one) and the DISCUSS
> thread here. And I happily find some of my initial thoughts align with
> the KIP already :)
>
> Would like to add a bit more of my 2c after reading through the KIP
> and the thread here:
>
> 1. On the high level, I'm in favor of pushing this KIP through without
> waiting on the other gaps to be closed. In my back pocket's
> "dependency graph" of Kafka Streams roadmap of large changes or
> feature gaps, the edges of dependencies are defined based on my
> understanding of whether doing one first would largely complicate /
> negate the effort of the other but not vice versa, in which case we
> should consider getting the other done first. In this case, I feel
> such a dependency is not strong enough, so encouraging the KIP
> contributor to finish what he/she would love to do to close some gaps
> early would be higher priorities. I did not see by just doing this we
> could end up in a worse intermediate stage yet, but I could be
> corrected.
>
> 2. Regarding the store types --- gain here I'd like to just clarify
> the terms a bit since in the past it has some confusions: we used
> "impl types" (in hindsight it may not be a good name) for rocksdb /
> memory / custom, and we used "store types" for kv / windowed /
> sessioned etc, as I said in the JIRA I think the curr

Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-07-23 Thread Guozhang Wang
Thanks everyone for the great discussions so far! I first saw the JIRA
and left some quick thoughts without being aware of the
already-written KIP (kudos to Almog, very great one) and the DISCUSS
thread here. And I happily find some of my initial thoughts align with
the KIP already :)

Would like to add a bit more of my 2c after reading through the KIP
and the thread here:

1. On the high level, I'm in favor of pushing this KIP through without
waiting on the other gaps to be closed. In my back pocket's
"dependency graph" of Kafka Streams roadmap of large changes or
feature gaps, the edges of dependencies are defined based on my
understanding of whether doing one first would largely complicate /
negate the effort of the other but not vice versa, in which case we
should consider getting the other done first. In this case, I feel
such a dependency is not strong enough, so encouraging the KIP
contributor to finish what he/she would love to do to close some gaps
early would be higher priorities. I did not see by just doing this we
could end up in a worse intermediate stage yet, but I could be
corrected.

2. Regarding the store types --- gain here I'd like to just clarify
the terms a bit since in the past it has some confusions: we used
"impl types" (in hindsight it may not be a good name) for rocksdb /
memory / custom, and we used "store types" for kv / windowed /
sessioned etc, as I said in the JIRA I think the current proposal also
have a good side effect as quality bar to really enforce us think
twice when trying to add more store types in the future as it will
impact API instantiations. In the ideal world, I would consider:

* We have (timestamped) kv store, versioned kv store, window store,
session store as first-class DSL store types. Some DSL operators could
accept multiple store types (e.g. versioned and non versioned
kv-store) for semantics / efficiency trade-offs. But I think we would
remove un-timestamped kv stores eventually since that efficiency
trade-off is so minimal compared to its usage limitations.
* As for list-value store (for stream-stream Join), memory-lru-cache
(for PAPI use only), memory-time-ordered-buffer (for suppression),
they would not be exposed as DSL first-class store types in the
future. Instead, they would be treated as internal used stores (e.g.
list-value store is built on key-value store with specialized serde
and putInternal), or continue to be just convenient OOTB PAPI used
stores only.
* As we move on, we will continue to be very, very strict on what
would be added as DSL store types (and hence requires changes to the
proposed APIs), what to be added as convenient OOTB PAPI store impls
only, what to be added as internal used store types that should not be
exposed to users nor customizable at all.

3. Some more detailed thoughts below:

3.a) I originally also think that we can extend the existing config,
rather than replacing it. The difference was that I was thinking that
order-wise, the runtime would look at the API first, and then the
config, whereas in your rejected alternative it was looking at the
config first, and then the API --- that I think is a minor thing and
either is fine. I'm in agreement that having two configs would be more
confusing to users to learn about their precedence rather than
helpful, but if we keep both a config and a public API, then the
precedence ordering would not be so confusing as long as we state them
clearly. For example:

* We have DefaultStoreTypeSpec OOTB, in that impl we look at the
config only, and would only expect either ROCKS or MEMORY, and return
corresponding OOTB store impls; if any other values configured, we
error out.
* Users extend that by having MyStoreTypeSpec, in which they could do
arbituray things without respecting the config at all, but our
recommended pattern in docs would still say "look into the config, if
it is ROCKS or MEMORY just return fall back to DefaultStoreTypeSepc;
otherwise if it's some String you recognize, then return your
customized store based on the string value, otherwise error out".

3.b) About the struct-like Params classes, I like the idea a lot and
wished we would pursue this in the first place, but if we only do this
in Spec it would leave some inconsistencies with the StoreBuilders
though arguably the latter is only for PAPI. I'm wondering if we
should consider including the changes in StoreBuilders (e.g.
WindowStoreBuilder(WindowSupplierParams)), and if yes, maybe we should
also consider renaming that e.g. `WindowSupplierParams` to
`WindowStoreSpecParams` too? For this one I only have a "weak feeling"
so I can be convinced otherwise :P

Thanks,
Guozhang



On Sun, Jul 23, 2023 at 9:52 AM Matthias J. Sax  wrote:
>
> Thanks for all the input. My intention was not to block the KIP, but
> just to take a step back and try get a holistic picture and discussion,
> to explore if there are good/viable alternative designs. As said
> originally, I really like to close this gap, and was always aware th

Re: Request permission to contribute

2023-07-23 Thread Guozhang Wang
Hello Taras,

I saw your ID in the contributors list already, could you check if you
can create JIRAs now?

Guozhang

On Fri, Jul 21, 2023 at 4:00 AM Taras Ledkov  wrote:
>
> Hi, Kafka Team.
>
> I'm following this wiki to request permission to contribute to Apache Kafka
> https://cwiki.apache.org/confluence/display/kafka/kafka+improvement+proposals#KafkaImprovementProposals-GettingStarted
>
> I'll propose custom SSL factory for Kafka Connect REST server 
> [org.apache.kafka.connect.runtime.rest.RestServer].
> Kafka connect REST server can be configured only with file based key stores 
> in current implementation.
>
> My wiki ID and jira ID are both: tledkov (tled...@apache.org)
> Can I get permission please?
>
> --
> With best regards,
> Taras Ledkov
>


Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-07-23 Thread Matthias J. Sax
Thanks for all the input. My intention was not to block the KIP, but 
just to take a step back and try get a holistic picture and discussion, 
to explore if there are good/viable alternative designs. As said 
originally, I really like to close this gap, and was always aware that 
the current config is not flexible enough.



I guess, my "concern" is that the KIP does increase the API surface area 
significantly, as we need all kind of `StoreTypeSpec` implementations, 
and it might also imply that we need follow up KIPs for new feature 
(like in-memory versioned store) that might not need a KIP otherwise.


The second question is if it might make the already "patchy" situation 
with regard to customization worse.


We did de-scope the original KIP-591 for this reason, and given the new 
situation of the DSL, it seems that it actually got worse compared to 
back in the days.


Lastly, I hope to make the new versioned stores the default in the DSL 
and we did not do it in the previous KIP due to backward compatibility 
issues. Thus, from a DSL point of view, I believe there should be only 
"RocksDB", "InMemory", and "Custom" in an ideal world. Introducing (I am 
exaggerating to highlight my point) "KvRocksDbSpec", 
"TimestampeKvRocksDbSpec", "VersionedRocksDbSpec", plus the 
corresponding in-memory specs seems to head into the opposite direction. 
-- My goal is to give users a handle of the _physical_ store (RocksDB vs 
InMemory vs Custom) but not the _logical_ stores (plain kv, ts-kv, 
versioned) which is "dictated" by the DSL itself and should not be 
customizable (we are just in a weird intermediate situation that we need 
to clean up, but not "lean into" IMHO).


Thus, I am also not sure if adding "VersionedRocksDbSpec" would be ideal 
(also, given that it only changes a single store, but not the two 
windowed stores)?


Furthermore, I actually hope that we could use the new versioned store 
to replace the window- and sessions- stores, and thus to decrease the 
number of required store types.



Admittedly, I am talking a lot about a potential future, but the goal is 
only to explore opportunities to not get into "worse" intermediate 
state, that will require a huge deprecation surface area later on. Of 
course, if there is no better way, and my concerns are not shared, I am 
ok to move forward with the KIP.



Bottom line: I would personally prefer to keep the current config and 
add a `Custom` option to it, plus adding one new config that allows 
people to set their custom `StoreTypeSpec` class. -- I would not add a 
built-in spec for versioned stores at this point (or any other built-in 
`StorytypeSpec` implementations). I guess, users could write a custom 
spec if they want to enable versioned store across the board for now 
(until we make them the default anyway)?



Hope my train of thoughts is halfway reasonable and not totally off track?


-Matthias

On 7/21/23 15:27, Sophie Blee-Goldman wrote:

I agree with everything Almog said above, and will just add on to two
points:

1. Regarding whether to block this KIP on the completion of any or all
future implementations of in-memory version stores (or persist suppression
buffers), I feel that would be unfair to this feature which is completely
unrelated to the semantic improvements offered by versioned state stores.
It seems like the responsibility of those driving the versioned state
stores feature, not Almog/this KIP, to make sure that those bases are
covered. Further, if anything, this KIP will help with the massive
proliferation of StoreSuppliers on the Stores factory class, and provide
users with a much easier way to leverage the versioned stores without
having to muck around directly with the StoreSuppliers.

I also thought about it a bit, and really like Almog's suggestion to
introduce an additional StoreSpec for the Versioned state stores. Obviously
we can add the RocksDB one to this KIP now, and then as he mentioned,
there's an easy way to get users onto the IMVersionedStateStore types once
they are completed.

Lastly, on this note, I want to point out how smoothly this solved the
issue of timestamped stores, which are intended to be the DSL default but
are only a special case for RocksDB. Right now it can be confusing for a
user scrolling through the endless Stores class and seeing a timestamped
version of the persistent but not in-memory stores. One could easily assume
there was no timestamped option for IM stores and that this feature was
incomplete, if they weren't acutely aware of the internal implementation
details (namely that it's only required for RocksDB for compatibility
reasons). However, with this KIP, all that is handled completely
transparently to the user, and we the devs, who *are* aware of the internal
implementation details, are now appropriately the ones responsible for
handing the correct store type to a particular operator. While versioned
state stores may not be completely comparable, depending on whether we want
users t

Re: [DISCUSS] KIP-949: Add flag to enable the usage of topic separator in MM2 DefaultReplicationPolicy

2023-07-23 Thread Omnia Ibrahim
Thanks for both of your feedback and time, I updated the rejected
alternative section in the KIP and opened a voting thread here
https://lists.apache.org/thread/cy04n445noyp0pqztlp8rk74crvhlrk7
I'll work on the PR in meanwhile so we are ready to go once we get 3
binding votes in order to get into 3.6 release.

Cheers
Omnia

On Fri, Jul 21, 2023 at 3:43 PM Federico Valeri 
wrote:

> Hi, the point that the legacy approach can only be taken once is
> valid, so LGTM. Thanks.
>
> Cheers
> Fede
>
> On Fri, Jul 21, 2023 at 4:28 PM Chris Egerton 
> wrote:
> >
> > Hi Omnia,
> >
> > LGTM, thanks! We may want to note the LegacyReplicationPolicy option in
> the
> > rejected alternatives section in case others prefer that option.
> >
> > Given that we'd like this to land in time for 3.6.0, you may want to
> start
> > a vote thread soon.
> >
> > Cheers,
> >
> > Chris
> >
> > On Fri, Jul 21, 2023 at 10:08 AM Omnia Ibrahim 
> > wrote:
> >
> > > Hi Chris and Federico,
> > > thinking about I think Chris's concern is valid and the bigger concern
> here
> > > is that having this `LegacyReplicationPolicy` will eventually open the
> door
> > > for diversion at some point between this `LegacyReplicationPolicy` and
> the
> > > default one.
> > > For now, let's have the flag properly fix this bug and we can keep it
> as an
> > > option for people to switch between both behaviours. I know having a
> > > bug-fix property is not great but we can treat it as a backward
> > > compatibility property instead in order to keep old mirrors using the
> old
> > > internal topics.
> > >
> > > Hope this is reasonable for the time being.
> > >
> > > Cheers,
> > > Omnia
> > >
> > > On Wed, Jul 19, 2023 at 11:16 PM Chris Egerton  >
> > > wrote:
> > >
> > > > Hi Federico,
> > > >
> > > > Ah yes, sorry about that! You're correct that this would keep the two
> > > > classes in line and largely eliminate the concern I posed about
> porting
> > > > changes to both. Still, I'm a bit hesitant, and I'm not actually
> certain
> > > > that this alternative is more intuitive. The name isn't very
> descriptive,
> > > > and this is the kind of approach we can only really take once; if we
> > > break
> > > > compatibility again, would we introduce a
> LegacyLegacyReplicationPolicy?
> > > > LegacyReplicationPolicy2? Finally, it seems a bit strange to
> introduce a
> > > > new class to implement a change in behavior this small.
> > > >
> > > > That said, I don't think this is worth blocking on and wouldn't be
> > > opposed
> > > > if others felt strongly that a new replication policy class is
> superior
> > > to
> > > > a new property on the existing class.
> > > >
> > > > Cheers,
> > > >
> > > > Chris
> > > >
> > > > On Wed, Jul 19, 2023 at 2:53 PM Federico Valeri <
> fedeval...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Chris, the KIP says it would be a subclass of
> > > DefaultReplicationPolicy
> > > > > that overrides the ReplicationPolicy.offsetSyncsTopic and
> > > > > ReplicationPolicy.checkpointsTopic. So, not much to maintain and it
> > > would
> > > > > be more intuitive, as you say.
> > > > >
> > > > > On Wed, Jul 19, 2023, 4:50 PM Chris Egerton
> 
> > > > > wrote:
> > > > >
> > > > > > HI all,
> > > > > >
> > > > > > I'm not sure I understand the benefits of introducing a separate
> > > > > > replication policy class, besides maybe being more
> readable/intuitive
> > > > to
> > > > > > users who would want to know when to use one or the other. It
> feels
> > > > like
> > > > > > we've swapped out a "fix the bug" property for an entire "fix the
> > > bug"
> > > > > > class, and it still leaves the problem of graceful migration from
> > > > legacy
> > > > > > behavior to new behavior unsolved. It would also require us to
> > > consider
> > > > > > whether any future changes we make to the
> DefaultReplicationPolicy
> > > > class
> > > > > > would have to be ported over to the LegacyReplicationPolicy
> class as
> > > > > well.
> > > > > >
> > > > > > Perhaps I'm missing something; are there other benefits of
> > > introducing
> > > > a
> > > > > > separate replication policy class?
> > > > > >
> > > > > > Cheers,
> > > > > >
> > > > > > Chris
> > > > > >
> > > > > > On Wed, Jul 19, 2023 at 5:45 AM Omnia Ibrahim <
> > > o.g.h.ibra...@gmail.com
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Federico,
> > > > > > > I like the idea of implementing `LegacyReplicationPolicy` and
> > > > avoiding
> > > > > > bug
> > > > > > > fixes properties. We can drop the idea of the property and
> just go
> > > > > ahead
> > > > > > > with introducing the `LegacyReplicationPolicy` and any user
> upgrade
> > > > > from
> > > > > > > pre-KIP-690 can use this policy instead of default and no
> impact
> > > will
> > > > > > > happen to users upgrading from 3.x to post-KIP-949. We can mark
> > > > > > > `LegacyReplicationPolicy` as deprecated later if we want (but
> not
> > > in
> > > > > 4.0
> > > > > > as
> > > > > > > this is very soon) and we can drop it entir

[VOTE] KIP-949: Add flag to enable the usage of topic separator in MM2 DefaultReplicationPolicy

2023-07-23 Thread Omnia Ibrahim
Hi everyone,
I would like to open a vote for KIP-949. The proposal is here
https://cwiki.apache.org/confluence/display/KAFKA/KIP-949%3A+Add+flag+to+enable+the+usage+of+topic+separator+in+MM2+DefaultReplicationPolicy
.


Thanks
Omnia


Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!

2023-07-23 Thread Erik van Oosten

Hi David,

> Could you elaborate a bit more on why the callbacks must be ran in 
another thread vs in the invoker thread?


I have been thinking on how to explain this for 2 months now and it is 
not easy. It has something to do with that you cannot control what a 
thread is doing if you have to also run on that thread. But I just 
realized that /for me/ it really comes down to this:


    We want to use Zio in the callback. Zio does not support it.

There are more reasons as can be read in KAFKA-7143. But I do not know 
anything about Kotlin so I cannot elaborate on that.


Kind regards,
 Erik.

Op 22-07-2023 om 21:39 schreef David Jacot:

Hi Erik,

Thanks for the KIP. I would like to better understand the motivation of
this KIP. I am not familiar with async runtimes so please excuse me if I
ask stupid questions.

Could you elaborate a bit more on why the callbacks must be ran in another
thread vs in the invoker thread? This is not clear to me. In the example
that you use with the ConsumerRebalanceListener, I would have thought that
calling commitSync (without changing thread) would have achieved the same.
The invoker has to wait anyway on the offset commit completion so using
another thread does not bring any benefit here.  I suppose that I am
missing something here…

Regarding Chris’ proposal, this feels like a hack to me. The issue with it
is that we cannot guarantee it in the long term if it is not part of *the*
Consumer interface.

I second what Chris said. We are all trying to understand the motivation in
order to find a good solution for Kafka. I apologize if this creates
frustration. This is definitely not our goal.

Best,
David

PS: I just saw that you opened a new KIP based on Chris’ idea. This is not
necessary. You can just update the current KIP based on the discussion.

Le sam. 22 juil. 2023 à 18:34, Erik van Oosten 
a écrit :


Colin, Matthias, Chris,

I have expanded the use case description in KIP-944. I hope it is more
clear what we're trying to achieve.

https://cwiki.apache.org/confluence/x/chw0Dw

Kind regards,
  Erik.


Op 22-07-2023 om 17:23 schreef Erik van Oosten:

Hello Chris,

Thanks for elaborating Matthias' words. Apparently the use case
description is too terse. Indeed, that is not FUD and that is
something I can work with.


It's also worth mentioning that what's proposed in the KIP is only

blocked by the private access modifier on the KafkaConsumer::acquire
and KafkaConsumer::release methods. If we upgraded the visibility of
these methods from private to protected, it would be possible for
subclasses to implement the proposal in KIP-944, without any KIPs or
other changes to the official Java clients library.

That is absolutely brilliant! Since I am pretty sure I am using the
consumer correctly, I could replace acquire and release with an empty
method body and be done.

/Is making acquire and release protected something that other people
can live with?/
If yes, I will create a new PR with just that change.

Kind regards,
 Erik.


Op 22-07-2023 om 16:39 schreef Chris Egerton:

Hi Erik,

I don't think Matthias is bringing FUD to the discussion. Many of the
people who maintain Kafka are familiar with Kafka client internals
and the
Java programming language, but not necessarily other JVM languages or
asynchronous runtimes. I think it's reasonable to ask for a code
snippet or
two that demonstrates what you'd like to do with the consumer today that
you can't because of restrictions around concurrent access, and this
is not
already addressed in the KIP. Linking to a docs page on Kotlin
coroutines
is helpful but still requires reviewers to gather a lot of context on
their
own that could more easily be provided in the KIP, and although the
description of KAFKA-7143 is more detailed, I find it a little hard to
follow as someone who isn't already familiar with the environment the
user
is working in.

It's also worth mentioning that what's proposed in the KIP is only
blocked
by the private access modifier on the KafkaConsumer::acquire and
KafkaConsumer::release methods. If we upgraded the visibility of these
methods from private to protected, it would be possible for
subclasses to
implement the proposal in KIP-944, without any KIPs or other changes
to the
official Java clients library.

Best,

Chris

On Sat, Jul 22, 2023 at 4:24 AM Erik van Oosten
  wrote:


Hi Matthias,

I am getting a bit frustrated here. All the concerns and questions I
have seen so far are addressed in KIP-944.

Please let me know if they are not clear enough, but please do not come
with FUD.

Kind regards,
   Erik.


Op 21-07-2023 om 21:13 schreef Matthias J. Sax:

I am not a clients (or threading) expert, but I tend to agree to
Colin's concerns.

In particular, it would be nice to see an example how you intent to
use the API (I am not familiar with Kotlin or it's co-routins), to
better understand what this changes help to solve to begin with.

Opening up the consumer sounds potentially dangerous