[jira] [Created] (KAFKA-16510) java.lang.OutOfMemoryError in kafka-metadata-quorum.sh

2024-04-10 Thread HiroArai (Jira)
HiroArai created KAFKA-16510:


 Summary: java.lang.OutOfMemoryError in kafka-metadata-quorum.sh
 Key: KAFKA-16510
 URL: https://issues.apache.org/jira/browse/KAFKA-16510
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 3.4.1
Reporter: HiroArai


kafka-metadata-quorum is not available in SASL_PLAIN.
I got this error, I only use SASL_PLAIN. not use SSL.
I found a person with a similar situation, but he is using mTLS.
https://issues.apache.org/jira/browse/KAFKA-16006

{code:java}
sh-4.2$ /opt/kafka/bin/kafka-metadata-quorum.sh --bootstrap-server :9093 --command-config controller-admin.properties  describe --replication
[2024-04-11 04:12:54,128] ERROR Uncaught exception in thread 
‘kafka-admin-client-thread | adminclient-1': 
(org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space
at java.base/java.nio.HeapByteBuffer.(HeapByteBuffer.java:64)
at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363)
at 
org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102)
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:476)
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:573)
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:251)
at 
org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:585)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1504)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1435)
at java.base/java.lang.Thread.run(Thread.java:840)
org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has 
exited. Call: describeMetadataQuorum
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has 
exited. Call: describeMetadataQuorum
at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at 
org.apache.kafka.tools.MetadataQuorumCommand.handleDescribeReplication(MetadataQuorumCommand.java:158)
at 
org.apache.kafka.tools.MetadataQuorumCommand.execute(MetadataQuorumCommand.java:106)
at 
org.apache.kafka.tools.MetadataQuorumCommand.mainNoExit(MetadataQuorumCommand.java:62)
at 
org.apache.kafka.tools.MetadataQuorumCommand.main(MetadataQuorumCommand.java:57)
Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient 
thread has exited. Call: describeMetadataQuorum {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2804

2024-04-10 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-10 Thread Matthias J. Sax

Thanks for the KIP. Great discussion.

I am not sure if I understand the proposal from Bruno to hand in the 
processor node id? Isn't this internal (could not even find it quickly). 
We do have a processor name, right? Or do I mix up something?


Another question is about `ProcessingContext` -- it contains a lot of 
(potentially irrelevant?) metadata. We should think carefully about what 
we want to pass in and what not -- removing stuff is hard, but adding 
stuff is easy. It's always an option to create a new interface that only 
exposes stuff we find useful, and allows us to evolve this interface 
independent of others. Re-using an existing interface always has the 
danger to introduce an undesired coupling that could bite us in the 
future. -- It make total sense to pass in `RecordMetadata`, but 
`ProcessingContext` (even if already limited compared to 
`ProcessorContext`) still seems to be too broad? For example, there is 
`getStateStore()` and `schedule()` methods which I think we should not 
expose.


The other interesting question is about "what record gets passed in". 
For the PAPI, passing in the Processor's input record make a lot of 
sense. However, for DSL operators, I am not 100% sure? The DSL often 
uses internal types not exposed to the user, and thus I am not sure if 
users could write useful code for this case? -- In general, I still 
agree that the handler should be implement with a try-catch around 
`Processor.process()` but it might not be too useful for DSL processor. 
Hence, I am wondering if we need to so something more in the DSL? I 
don't have a concrete proposal (a few high level ideas only) and if we 
don't do anything special for the DSL I am ok with moving forward with 
this KIP as-is, but we should be aware of potential limitations for DSL 
users. We can always do a follow up KIP to close gaps when we understand 
the impact better -- covering the DSL would also expand the scope of 
this KIP significantly...


About the metric: just to double check. Do we think it's worth to add a 
new metric? Or could we re-use the existing "dropped record metric"?




-Matthias


On 4/10/24 5:11 AM, Sebastien Viale wrote:

Hi,

You are right, it will simplify types.

We update the KIP

regards

Sébastien *VIALE***

*MICHELIN GROUP* - InfORMATION Technology
*Technical Expert Kafka*

  Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand


*De :* Bruno Cadonna 
*Envoyé :* mercredi 10 avril 2024 10:38
*À :* dev@kafka.apache.org 
*Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception 
handler for exceptions occuring during processing
Warning External sender Do not click on any links or open any 
attachments unless you trust the sender and know the content is safe.


Hi Loïc, Damien, and Sébastien,

Great that we are converging!


3.
Damien and Loïc, I think in your examples the handler will receive
Record because an Record is passed to
the processor in the following code line:
https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152
 


I see that we do not need to pass into the the handler a Record just because we do that for the DeserializationExceptionHandler
and the ProductionExceptionHandler. When those two handlers are called,
the record is already serialized. This is not the case for the
ProcessingExceptionHandler. However, I would propose to use Record
for the record that is passed to the ProcessingExceptionHandler because
it makes the handler API more flexible.


Best,
Bruno

This email was screened for spam and malicious content but exercise 
caution anyway.





On 4/9/24 9:09 PM, Loic Greffier wrote:
 > Hi Bruno and Bill,
 >
 > To complete the Damien's purposes about the point 3.
 >
 > Processing errors are caught and handled by the 
ProcessingErrorHandler, at the precise moment when records are processed 
by processor nodes. The handling will be performed in the "process" 
method of the ProcessorNode, such as:

 >
 > public void process(final Record record) {
 > ...
 >
 > try {
 > ...
 > } catch (final ClassCastException e) {
 > ...
 > } catch (Exception e) {
 > ProcessingExceptionHandler.ProcessingHandlerResponse response = 
this.processingExceptionHandler

 > .handle(internalProcessorContext, (Record) record, e);
 >
 > if (response == 
ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
 > throw new StreamsException("Processing exception handler is set to 
fail upon" +

 > " a processing error. If you would rather have the streaming pipeline" +
 > " continue after a processing error, please set the " +
 > DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
 > e);
 > }
 > }
 > }
 > As you can 

[jira] [Created] (KAFKA-16509) CurrentControllerId metric is unreliable in ZK mode

2024-04-10 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-16509:


 Summary: CurrentControllerId metric is unreliable in ZK mode
 Key: KAFKA-16509
 URL: https://issues.apache.org/jira/browse/KAFKA-16509
 Project: Kafka
  Issue Type: Bug
Reporter: Colin McCabe


The CurrentControllerId metric added by KIP-1001 is unreliable in ZK mode. 
Sometimes when there is no active ZK-based controller, it still shows the 
previous controller ID. Instead, it should show -1 in that situation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-10 Thread Matthias J. Sax

Thanks for the KIP Fred.

Couple of nits: it's not clear from the "Public API" section what is new 
and what is existing API w/o going back to the code. For existing 
methods which are not changed, it's also best to actually omit them. -- 
It would also be best to only put the interface itself down, but not the 
implementation (ie, no private members and no method body).


Thus, it might be better to do something like this:

+

public class RecordDeserializationException extends SerializationException {

   // newly added
   public RecordDeserializationException(TopicPartition partition,
 ConsumerRecord 
record,

 String message,
 Throwable cause);

   public ConsumerRecord getConsumerRecord();
}

+

From the description it's not clear to me if you propose to change the 
existing constructor, or propose to add a new constructor. From a 
compatibility POV, we cannot really change the existing constructor (but 
we could deprecate it and remove in the future (and add a new one in 
parallel). But I also agree with Kirk that there could be cases for 
which we cannot pass in a `ConsumerRecord` and thus keeping the old 
constructor could make sense (and change the new getter to return an 
`Optinal`).


Another small thing: in Kafka, getter methods are not using a `get` 
prefix, and thus it should be `consumerRecord()` w/o the "get".




-Matthias


On 4/10/24 4:21 PM, Kirk True wrote:

Hi Fred,

Thanks for the KIP!

Questions/comments:

How do we handle the case where CompletedFetch.parseRecord isn’t able to 
construct a well-formed ConsumerRecord (i.e. the values it needs are 
missing/corrupted/etc.)?
Please change RecordDeserializationException’s getConsumerRecord() method to be 
named consumerRecord() to be consistent.
Should we change the return type of consumerRecord() to be 
Optional> in the cases where even a “raw” 
ConsumerRecord can’t be created?
To avoid the above, does it make sense to include a Record object instead of a 
ConsumerRecord? The former doesn’t include the leaderEpoch or TimestampType, 
but maybe that’s OK?

Thanks,
Kirk


On Apr 10, 2024, at 8:47 AM, Frédérik Rouleau  
wrote:

Hi everyone,

To make implementation of DLQ in consumer easier, I would like to add the
raw ConsumerRecord into the RecordDeserializationException.

Details are in KIP-1036

.

Thanks for your feedback.

Regards,
Fred





Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-10 Thread Kirk True
Hi Fred,

Thanks for the KIP!

Questions/comments:

How do we handle the case where CompletedFetch.parseRecord isn’t able to 
construct a well-formed ConsumerRecord (i.e. the values it needs are 
missing/corrupted/etc.)?
Please change RecordDeserializationException’s getConsumerRecord() method to be 
named consumerRecord() to be consistent.   
Should we change the return type of consumerRecord() to be 
Optional> in the cases where even a “raw” 
ConsumerRecord can’t be created?
To avoid the above, does it make sense to include a Record object instead of a 
ConsumerRecord? The former doesn’t include the leaderEpoch or TimestampType, 
but maybe that’s OK?

Thanks,
Kirk

> On Apr 10, 2024, at 8:47 AM, Frédérik Rouleau  
> wrote:
> 
> Hi everyone,
> 
> To make implementation of DLQ in consumer easier, I would like to add the
> raw ConsumerRecord into the RecordDeserializationException.
> 
> Details are in KIP-1036
> 
> .
> 
> Thanks for your feedback.
> 
> Regards,
> Fred



Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2803

2024-04-10 Thread Apache Jenkins Server
See 




[jira] [Resolved] (KAFKA-16294) Add group protocol migration enabling config

2024-04-10 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-16294.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

> Add group protocol migration enabling config
> 
>
> Key: KAFKA-16294
> URL: https://issues.apache.org/jira/browse/KAFKA-16294
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dongnuo Lyu
>Assignee: Dongnuo Lyu
>Priority: Major
> Fix For: 3.8.0
>
>
> The online upgrade is triggered when a consumer group heartbeat request is 
> received in a classic group. The downgrade is triggered when any old protocol 
> request is received in a consumer group. We only accept upgrade/downgrade if 
> the corresponding group migration config policy is enabled.
> This is the first part of the implementation of online group protocol 
> migration, adding the kafka config group protocol migration. The config has 
> four valid values – both(both upgrade and downgrade are allowed), 
> upgrade(only upgrade is allowed), downgrade(only downgrade is allowed) and 
> none(neither is allowed.).
> At present the default value is NONE. When we start enabling the migration, 
> we expect to set BOTH to default so that it's easier to roll back to the old 
> protocol as a quick fix for anything wrong in the new protocol; when using 
> consumer groups becomes default and the migration is near finished, we will 
> set the default policy to UPGRADE to prevent unwanted downgrade causing too 
> frequent migration. DOWNGRADE could be useful for revert or debug purposes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1006: Remove SecurityManager Support

2024-04-10 Thread Mickael Maison
Hi,

It looks like some of the SecurityManager APIs are starting to be
removed in JDK 23, see
- https://bugs.openjdk.org/browse/JDK-8296244
- https://github.com/quarkusio/quarkus/issues/39634

JDK 23 is currently planned for September 2024.
Considering the timelines and that we only drop support for Java
versions in major Kafka releases, I think the proposed approach of
detecting the APIs to use makes sense.

Thanks,
Mickael

On Tue, Nov 21, 2023 at 8:38 AM Greg Harris
 wrote:
>
> Hey Ashwin,
>
> Thanks for your question!
>
> I believe we have only removed support for two Java versions:
> 7: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-118%3A+Drop+Support+for+Java+7
> in 2.0
> 8: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308223
> in 4.0
>
> In both cases, we changed the gradle sourceCompatibility and
> targetCompatibility at the same time, which I believe changes the
> "-target" option in javac.
>
> We have no plans currently for dropping support for 11 or 17, but I
> presume they would work in much the same way.
>
> Hope this helps!
> Greg
>
> On Mon, Nov 20, 2023 at 11:19 PM Ashwin  wrote:
> >
> > Hi Greg,
> >
> > Thanks for writing this KIP.
> > I agree with you that handling this now will help us react to the
> > deprecation of SecurityManager, whenever it happens.
> >
> > I had a question regarding how we deprecate JDKs supported by Apache Kafka.
> > When we drop support for JDK 17, will we set the “-target” option of Javac
> > such that the resulting JARs will not load in JVMs which are lesser than or
> > equal to that version ?
> >
> > Thanks,
> > Ashwin
> >
> >
> > On Tue, Nov 21, 2023 at 6:18 AM Greg Harris 
> > wrote:
> >
> > > Hi all,
> > >
> > > I'd like to invite you all to discuss removing SecurityManager support
> > > from Kafka. This affects the client and server SASL mechanism, Tiered
> > > Storage, and Connect classloading.
> > >
> > > Find the KIP here:
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1006%3A+Remove+SecurityManager+Support
> > >
> > > I think this is a "code higiene" effort that doesn't need to be dealt
> > > with urgently, but it would prevent a lot of headache later when Java
> > > does decide to remove support.
> > >
> > > If you are currently using the SecurityManager with Kafka, I'd really
> > > appreciate hearing how you're using it, and how you're planning around
> > > its removal.
> > >
> > > Thanks!
> > > Greg Harris
> > >


Re: [VOTE] KIP-1022 Formatting and Updating Features

2024-04-10 Thread José Armando García Sancio
Hi Justine,

+1 (binding)

Thanks for the improvement.
-- 
-José


Re: [DISCUSS] KIP-1022 Formatting and Updating Features

2024-04-10 Thread José Armando García Sancio
Hi Justine,

On Tue, Apr 9, 2024 at 4:19 PM Justine Olshan
 wrote:
> As for the validation criteria. It seems like one bit of code that
> validates whether a version is allowed is in the method
> `reasonNotSupported` which checks the range of features available for the
> given feature.
> For metadata.version we have a method to do "additional checks" and we
> could have those for the various other features as well. I have an
> (internal) FeatureVersion interface in mind that would work well for this.
> For any of these validations, we return the same error
> `INVALID_UPDATE_VERSION`. I would think continuing to use this error
> follows naturally, but if we think it is necessary to specify the error
> code, I can do so in my KIP.

Thanks for looking into this. The updates to the KIP look good to me.

-- 
-José


[jira] [Created] (KAFKA-16508) Infinte loop if output topic does not exisit

2024-04-10 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16508:
---

 Summary: Infinte loop if output topic does not exisit
 Key: KAFKA-16508
 URL: https://issues.apache.org/jira/browse/KAFKA-16508
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams supports `ProductionExceptionHandler` to drop records on error 
when writing into an output topic.

However, if the output topic does not exist, the corresponding error cannot be 
skipped over because the handler is not called.

The issue is, that the producer internally retires to fetch the output topic 
metadata until it times out, an a `TimeoutException` (which is a 
`RetriableException`) is returned via the registered `Callback`. However, for 
`RetriableException` there is different code path and the 
`ProductionExceptionHandler` is not called.

In general, Kafka Streams correctly tries to handle as many errors a possible 
internally, and a `RetriableError` falls into this category (and thus there is 
no need to call the handler). However, for this particular case, just retrying 
does not solve the issue – it's unclear if throwing a retryable 
`TimeoutException` is actually the right thing to do for the Producer? Also not 
sure what the right way to address this ticket would be (currently, we cannot 
really detect this case, except if we would do some nasty error message String 
comparison what sounds hacky...)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-10 Thread Frédérik Rouleau
Hi everyone,

To make implementation of DLQ in consumer easier, I would like to add the
raw ConsumerRecord into the RecordDeserializationException.

Details are in KIP-1036

.

Thanks for your feedback.

Regards,
Fred


[jira] [Created] (KAFKA-16507) Add raw record into RecordDeserialisationException

2024-04-10 Thread Fred Rouleau (Jira)
Fred Rouleau created KAFKA-16507:


 Summary: Add raw record into RecordDeserialisationException
 Key: KAFKA-16507
 URL: https://issues.apache.org/jira/browse/KAFKA-16507
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Fred Rouleau


[KIP-334|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=87297793]
 introduced into the Consumer the RecordDeserializationException with offsets 
information. That is useful to skip a poison pill but as you do not have access 
to the Record, it still prevents easy implementation of dead letter queue or 
simply logging the faulty data.

Changes are described in 
[KIP-1036|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1036%3A+Extend+RecordDeserializationException+exception]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-932: Queues for Kafka

2024-04-10 Thread Andrew Schofield
Hi Jun,
Thanks for your questions.

41.
41.1. The partition leader obtains the state epoch in the response from
ReadShareGroupState. When it becomes a share-partition leader,
it reads the share-group state and one of the things it learns is the
current state epoch. Then it uses the state epoch in all subsequent
calls to WriteShareGroupState. The fencing is to prevent writes for
a previous state epoch, which are very unlikely but which would mean
that a leader was using an out-of-date epoch and was likely no longer
the current leader at all, perhaps due to a long pause for some reason.

41.2. If the group coordinator were to set the SPSO, wouldn’t it need
to discover the initial offset? I’m trying to avoid yet another inter-broker
hop.

42.
42.1. I think I’ve confused things. When the share group offset is altered
using AdminClient.alterShareGroupOffsets, the group coordinator WILL
update the state epoch. I don’t think it needs to update the group epoch
at the same time (although it could) because the group epoch will have
been bumped when the group became empty. If the share group offset
is altered multiple times when the group remains empty, it would be
harmless if the same state epoch was reused to initialize the state.

When the share-partition leader updates the SPSO as a result of
the usual flow of record delivery, it does not update the state epoch.

42.2. The share-partition leader will notice the alteration because,
when it issues WriteShareGroupState, the response will contain the
error code FENCED_STATE_EPOCH. This is supposed to be the
last-resort way of catching this.

When the share-partition leader handles its first ShareFetch request,
it learns the state epoch from the response to ReadShareGroupState.

In normal running, the state epoch will remain constant, but, when there
are no consumers and the group is empty, it might change. As a result,
I think it would be sensible when the set of share sessions transitions
from 0 to 1, which is a reasonable proxy for the share group transitioning
from empty to non-empty, for the share-partition leader to issue
ReadShareGroupOffsetsState to validate the state epoch. If its state
epoch is out of date, it can then ReadShareGroupState to re-initialize.

I’ve changed the KIP accordingly.

47, 56. If I am to change BaseOffset to FirstOffset, we need to have
a clear view of which is the correct term. Having reviewed all of the
instances, my view is that BaseOffset should become FirstOffset in
ALL schemas defined in the KIP. Then, BaseOffset is just used in
record batches, which is already a known concept.

Please let me know if you agree.

60. I’ve added FindCoordinator to the top level index for protocol changes.

61. OK. I expect you are correct about how users will be using the
console share consumer. When I use the console consumer, I always get
a new consumer group. I have changed the default group ID for console
share consumer to “console-share-consumer” to match the console consumer
better and give more of an idea where this mysterious group has come from.

77. I will work on a proposal that does not use compaction and we can
make a judgement about whether it’s a better course for KIP-932. Personally,
until I’ve written it down and lived with the ideas for a few days, I won’t be
able to choose which I prefer.

I should be able to get the proposal written by the end of this week.

100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs matches
ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs from KIP-848.
I prefer to maintain the consistency.

101. Thanks for catching this. The ShareGroupHeartbeatResponse was originally
created from KIP-848. This part of the schema does not apply and I have removed
it. I have also renamed AssignedTopicPartitions to simply TopicPartitions which
aligns with the actual definition of ConsumerGroupHeartbeatResponse.

102. No, I don’t think we do. Removed.

103. I’ve changed the description for the error codes for ShareFetchResponse.

104. Interesting. I have added ErrorMessages to these RPCs as you suggest.
It’s a good improvement for problem determination.

105. The values are reserved in my brain. Actually, 1 is Acquired which
is not persisted, and I have another non-terminal state in mind for 3.

106. A few people have raised the question of whether OffsetAndMetadata
is sensible in this KIP, given that the optional Metadata part comes from when
a regular consumer commits offsets. However, it is correct that there will
never be metadata with a share group. I have changed
the KIP to replace OffsetAndMetadata with Long.

107. Yes, you are right. I have learnt during this process that a version bump
can be a logical not just a physical change to the schema. KIP updated.

108. I would prefer not to extend this RPC for all of the states at this point.
I think there is definitely scope for another KIP focused on administration
of share groups that might want this information so someone could build a
UI and other tools on top. 

Re: the migration of command tools

2024-04-10 Thread Federico Valeri
Hi, if a tool already has a wrapper bash script in bin and
bin/windows, then there is no need to create a redirection in Scala,
as users are supposed to use the script. If there is any ST left which
is using the tool directly, a bash script should be created, and the
ST changed to use the script with new releases only.

On Wed, Apr 10, 2024 at 3:08 PM Chia-Ping Tsai  wrote:
>
> hi David
>
> thanks for quickly response!!
>
> According to KIP-906, the BC rules are
>
> 1) The old package name must be deprecated in the target release (e.g. 3.5)
> and redirection removed in the next major release (e.g. 4.0).
> 2) Existing users will get a deprecation warning when using the old package
> name, while old SPIs and classes will be marked as deprecated.
>
> I will file a jira to make sure we don't violate that rules
>
> Best,
> Chia-Ping
>
> David Jacot  於 2024年4月10日 週三 下午8:57寫道:
>
> > Hey,
> >
> > I think that we discussed this in this KIP:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines
> > .
> > I don't remember all the details though.
> >
> > Best,
> > David
> >
> > On Wed, Apr 10, 2024 at 2:54 PM Chia-Ping Tsai  wrote:
> >
> > > Dear Kafka,
> > >
> > > Migrating command tools from core module to tools module is not news.
> > > However, I want to make sure I don't misunderstand the BC rules.
> > >
> > > The question is "Should we keep origin class?"
> > >
> > > FeatureCommand (
> > >
> > >
> > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala
> > > )
> > > is a good example. We keep the origin class file due to backward
> > > compatibility. However, we don't do that to other tools.
> > >
> > > It seems to me that we should align the BC rules for all tools. And here
> > is
> > > my two cents: the expected way of using command tool is by script file,
> > so
> > > we DON'T need to keep origin class file.
> > >
> > > WDYT?
> > >
> > > Best,
> > > Chia-Ping
> > >
> >


Re: [DISCUSS] KIP-932: Queues for Kafka

2024-04-10 Thread Omnia Ibrahim
Hi Andrew (and Tom)
> Imagine that we do have `group.type` as a group configuration. How would
> we end up with groups with
> the same ID but different types on the two ends of MM2? Assuming that both
> ends have KIP-932 enabled,
> either the configuration was not set, and a consumer group was made on one
> end while a share group was
> made on the other, OR, the configuration was set but its value changed,
> and again we get a divergence.
 The have two use case on top of my mind were a `group.id` can be duplicated 
between the two ends of MM2 with different group type are:
group.id is set to generic id (not unique enough) on a multi-tenant clusters 
from multiple teams who aren’t aware of each other existing. Specially that 
Kafka doesn’t really enforce `group.id` to be unique. And MM2 DR is not 
necessary a DR for the whole source cluster all the time but for subset of use 
case e.g the clusters’ use cases aren’t identical.  
The other use case when application with `group.id = G1` is in middle of 
upgrading versions where some instances upgraded to groupTypeA and the rest are 
registering as groupTypeB without changing the `group.id`. This might happened 
when client are concerned about keeping the same offsets when they upgrade to 
different group protocol. 
This question and concern maybe was better fit to be highlighted when KIP-848 
was proposed however as KIP-932 is building on top of KIP-848 it is worth 
asking. 

> I think that on balance, having `group.type` as a configuration does at
> least mean there’s a better chance that
> the two ends of MM2 do agree on the type of group. I’m happy to consider
> other ways to do this better. 
Having way to make group id in Kafka unique and limited isn’t a bad idea in 
itself. I am not sure if there is a better solution for this than what you are 
proposing. I was just trying to see if this angle have been considered or not. 
In general this issue will impact application which is rolling back from one 
version that taking advantage of newer group type to an older version that use 
different group type and they didn’t change their `group.id`. Maybe this is 
just need to be called out in the KIP that once a `group.id` register as 
specific `group.type` it shouldn’t be rolled back to any incompatible type. 

> The fact that we have different kinds of group in the same namespace is the
> tricky thing. I think this was possible
> before this KIP, but it’s much more likely now.
True, specially that `CONSUMER` and `CLASSIC` type doesn’t disallow group from 
being created as far as I know. 

> I don’t believe the MM2 is at all aware of dynamic group configurations and
> does not mirror these configurations across clusters. Personally, I’m slightly
> on the fence about whether it should. Group configurations are about
> ensuring that the consumers in a group have consistent configuration for
> timeouts and so on. So, MM2 could mirror these, but then why would it
> not also mirror broker configurations?
Configuration like timeouts are tricky as MM2 doesn’t really know if synced 
timeouts fits the network between the client and both ends of MM2. 
However if the `group.type` going to dedicate the protocol or makes group a 
unique for specific type then this type of config need to be synced. 

> 1) Should MM2 mirror group configurations?
> Perhaps it should. MM2 also does not know how to mirror the
> persistent share-group state. I think there’s an MM2 KIP for share groups
> in the future.
So for DR for consumer applications I would say MM2 should replicate anything 
that might prevent an application from consuming when it switch between 
bootstrap.servers of any of the ends of MM2.
This included topic configuration like partition numbers and some other config 
like retentions as well as topic ACLs. The only thing MM2 isn’t syncing for 
consumers at the moment is the authentication configs as these are tricky to 
sync. 

> 2) How should MM2 cope with mismatches in the set of valid
> configurations when the clusters are at different versions?
> I have no good answer for this and I defer to people with better knowledge
> of MM2.

Do you mean If the source and destination aren’t on the same version? At the 
moment if the two ends of MM2 are on different versions that have impact on 
Admin Client then I believe MM2 will throw  `UnsupportedVersionException` when 
Admin client is triggered. 

And I believe no one will be notified until the producer/consumer client try to 
move between clusters. Which same goes to the clients themselves, for example 
if client on source cluster relies on new protocol to be deployed on the 
brokers if the destination isn’t upgraded to >= version as source they will 
also fail when they move to destination. Which isn’t a great situation!
I am not aware of any easy solution to handle beside coordinate the upgrades of 
clients/MM2/source and destination clusters. But I would refer to maybe others 
who has more knowledge than me. 

> 3) Is 

[jira] [Created] (KAFKA-16506) add the scala version of tool-related class back to core module to follow KIP-906

2024-04-10 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16506:
--

 Summary: add the scala version of tool-related class back to core 
module to follow KIP-906
 Key: KAFKA-16506
 URL: https://issues.apache.org/jira/browse/KAFKA-16506
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai


According to 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines
 , we have to deprecate the scala version of tool-related classes instead of 
deleting them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-10 Thread Nick Telford
Hi Bruno,

Immediately after I sent my response, I looked at the codebase and came to
the same conclusion. If it's possible at all, it will need to be done by
creating temporary StateManagers and StateStores during rebalance. I think
it is possible, and probably not too expensive, but the devil will be in
the detail.

I'll try to find some time to explore the idea to see if it's possible and
report back, because we'll need to determine this before we can vote on the
KIP.

Regards,
Nick

On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna  wrote:

> Hi Nick,
>
> Thanks for reacting on my comments so quickly!
>
>
> 2.
> Some thoughts on your proposal.
> State managers (and state stores) are parts of tasks. If the task is not
> assigned locally, we do not create those tasks. To get the offsets with
> your approach, we would need to either create kind of inactive tasks
> besides active and standby tasks or store and manage state managers of
> non-assigned tasks differently than the state managers of assigned
> tasks. Additionally, the cleanup thread that removes unassigned task
> directories needs to concurrently delete those inactive tasks or
> task-less state managers of unassigned tasks. This seems all quite messy
> to me.
> Could we create those state managers (or state stores) for locally
> existing but unassigned tasks on demand when
> TaskManager#getTaskOffsetSums() is executed? Or have a different
> encapsulation for the unused task directories?
>
>
> Best,
> Bruno
>
>
>
> On 4/10/24 11:31 AM, Nick Telford wrote:
> > Hi Bruno,
> >
> > Thanks for the review!
> >
> > 1, 4, 5.
> > Done
> >
> > 3.
> > You're right. I've removed the offending paragraph. I had originally
> > adapted this from the guarantees outlined in KIP-892. But it's difficult
> to
> > provide these guarantees without the KIP-892 transaction buffers.
> Instead,
> > we'll add the guarantees back into the JavaDoc when KIP-892 lands.
> >
> > 2.
> > Good point! This is the only part of the KIP that was (significantly)
> > changed when I extracted it from KIP-892. My prototype currently
> maintains
> > this "cache" of changelog offsets in .checkpoint, but doing so becomes
> very
> > messy. My intent with this change was to try to better encapsulate this
> > offset "caching", especially for StateStores that can cheaply provide the
> > offsets stored directly in them without needing to duplicate them in this
> > cache.
> >
> > It's clear some more work is needed here to better encapsulate this. My
> > immediate thought is: what if we construct *but don't initialize* the
> > StateManager and StateStores for every Task directory on-disk? That
> should
> > still be quite cheap to do, and would enable us to query the offsets for
> > all on-disk stores, even if they're not open. If the StateManager (aka.
> > ProcessorStateManager/GlobalStateManager) proves too expensive to hold
> open
> > for closed stores, we could always have a "StubStateManager" in its
> place,
> > that enables the querying of offsets, but nothing else?
> >
> > IDK, what do you think?
> >
> > Regards,
> >
> > Nick
> >
> > On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna  wrote:
> >
> >> Hi Nick,
> >>
> >> Thanks for breaking out the KIP from KIP-892!
> >>
> >> Here a couple of comments/questions:
> >>
> >> 1.
> >> In Kafka Streams, we have a design guideline which says to not use the
> >> "get"-prefix for getters on the public API. Could you please change
> >> getCommittedOffsets() to committedOffsets()?
> >>
> >>
> >> 2.
> >> It is not clear to me how TaskManager#getTaskOffsetSums() should read
> >> offsets of tasks the stream thread does not own but that have a state
> >> directory on the Streams client by calling
> >> StateStore#getCommittedOffsets(). If the thread does not own a task it
> >> does also not create any state stores for the task, which means there is
> >> no state store on which to call getCommittedOffsets().
> >> I would have rather expected that a checkpoint file is written for all
> >> state stores on close -- not only for the RocksDBStore -- and that this
> >> checkpoint file is read in TaskManager#getTaskOffsetSums() for the tasks
> >> that have a state directory on the client but are not currently assigned
> >> to any stream thread of the Streams client.
> >>
> >>
> >> 3.
> >> In the javadocs for commit() you write
> >>
> >> "... all writes since the last commit(Map), or since init(StateStore)
> >> *MUST* be available to readers, even after a restart."
> >>
> >> This is only true for a clean close before the restart, isn't it?
> >> If the task fails with a dirty close, Kafka Streams cannot guarantee
> >> that the in-memory structures of the state store (e.g. memtable in the
> >> case of RocksDB) are flushed so that the records and the committed
> >> offsets are persisted.
> >>
> >>
> >> 4.
> >> The wrapper that provides the legacy checkpointing behavior is actually
> >> an implementation detail. I would remove it from the KIP, but still
> >> state that the legacy 

Re: the migration of command tools

2024-04-10 Thread Chia-Ping Tsai
hi David

thanks for quickly response!!

According to KIP-906, the BC rules are

1) The old package name must be deprecated in the target release (e.g. 3.5)
and redirection removed in the next major release (e.g. 4.0).
2) Existing users will get a deprecation warning when using the old package
name, while old SPIs and classes will be marked as deprecated.

I will file a jira to make sure we don't violate that rules

Best,
Chia-Ping

David Jacot  於 2024年4月10日 週三 下午8:57寫道:

> Hey,
>
> I think that we discussed this in this KIP:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines
> .
> I don't remember all the details though.
>
> Best,
> David
>
> On Wed, Apr 10, 2024 at 2:54 PM Chia-Ping Tsai  wrote:
>
> > Dear Kafka,
> >
> > Migrating command tools from core module to tools module is not news.
> > However, I want to make sure I don't misunderstand the BC rules.
> >
> > The question is "Should we keep origin class?"
> >
> > FeatureCommand (
> >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala
> > )
> > is a good example. We keep the origin class file due to backward
> > compatibility. However, we don't do that to other tools.
> >
> > It seems to me that we should align the BC rules for all tools. And here
> is
> > my two cents: the expected way of using command tool is by script file,
> so
> > we DON'T need to keep origin class file.
> >
> > WDYT?
> >
> > Best,
> > Chia-Ping
> >
>


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2802

2024-04-10 Thread Apache Jenkins Server
See 




Re: the migration of command tools

2024-04-10 Thread David Jacot
Hey,

I think that we discussed this in this KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines.
I don't remember all the details though.

Best,
David

On Wed, Apr 10, 2024 at 2:54 PM Chia-Ping Tsai  wrote:

> Dear Kafka,
>
> Migrating command tools from core module to tools module is not news.
> However, I want to make sure I don't misunderstand the BC rules.
>
> The question is "Should we keep origin class?"
>
> FeatureCommand (
>
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala
> )
> is a good example. We keep the origin class file due to backward
> compatibility. However, we don't do that to other tools.
>
> It seems to me that we should align the BC rules for all tools. And here is
> my two cents: the expected way of using command tool is by script file, so
> we DON'T need to keep origin class file.
>
> WDYT?
>
> Best,
> Chia-Ping
>


the migration of command tools

2024-04-10 Thread Chia-Ping Tsai
Dear Kafka,

Migrating command tools from core module to tools module is not news.
However, I want to make sure I don't misunderstand the BC rules.

The question is "Should we keep origin class?"

FeatureCommand (
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala)
is a good example. We keep the origin class file due to backward
compatibility. However, we don't do that to other tools.

It seems to me that we should align the BC rules for all tools. And here is
my two cents: the expected way of using command tool is by script file, so
we DON'T need to keep origin class file.

WDYT?

Best,
Chia-Ping


Re: [DISCUSS] KIP-899: Allow clients to rebootstrap

2024-04-10 Thread Ivan Yurchenko
Hi Andrew and all,

I did the mentioned change.

As there are no more comments, I'm letting this sit for e.g. a week and then I 
will call for the vote.

Best,
Ivan


On Tue, Apr 9, 2024, at 23:51, Andrew Schofield wrote:
> Hi Ivan,
> I think you have to go one way or the other with the cluster ID, so I think 
> removing that from this KIP might
> be the best. I think there’s another KIP waiting to be written for ensuring 
> consistency of clusters, but
> I think that wouldn’t conflict at all with this one.
> 
> Thanks,
> Andrew
> 
> > On 9 Apr 2024, at 19:11, Ivan Yurchenko  wrote:
> >
> > Hi Andrew and all,
> >
> > I looked deeper into the code [1] and it seems the Metadata class is OK 
> > with cluster ID changing. So I'm thinking that the rebootstrapping 
> > shouldn't introduce a new failure mode here. And I should remove the 
> > mention of this cluster ID checks from the KIP.
> >
> > Best,
> > Ivan
> >
> > [1] 
> > https://github.com/apache/kafka/blob/ff90f78c700c582f9800013faad827c36b45ceb7/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L355
> >
> > On Tue, Apr 9, 2024, at 09:28, Andrew Schofield wrote:
> >> Hi Ivan,
> >> Thanks for the KIP. I can see situations in which this would be helpful. I 
> >> have one question.
> >>
> >> The KIP says the client checks the cluster ID when it re-bootstraps and 
> >> that it will fail if the
> >> cluster ID doesn’t match the previously known one. How does it fail? Which 
> >> exception does
> >> it throw and when?
> >>
> >> In a similar vein, now that we are checking cluster IDs, I wonder if it 
> >> could be extended to
> >> cover all situations in which there are cluster ID mismatches, such as the 
> >> bootstrap server
> >> list erroneously pointing at brokers from different clusters and the 
> >> problem only being
> >> detectable later on.
> >>
> >> Thanks,
> >> Andrew
> >>
> >>> On 8 Apr 2024, at 18:24, Ivan Yurchenko  wrote:
> >>>
> >>> Hello!
> >>>
> >>> I changed the KIP a bit, specifying that the certain benefit goes to 
> >>> consumers not participating in a group, but that other clients can 
> >>> benefit as well in certain situations.
> >>>
> >>> You can see the changes in the history [1]
> >>>
> >>> Thank you!
> >>>
> >>> Ivan
> >>>
> >>> [1] 
> >>> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=240881396=10=11
> >>>
> >>> On 2023/07/15 16:37:52 Ivan Yurchenko wrote:
>  Hello!
> 
>  I've made several changes to the KIP based on the comments:
> 
>  1. Reduced the scope to producer and consumer clients only.
>  2. Added more details to the description of the rebootstrap process.
>  3. Documented the role of low values of reconnect.backoff.max.ms in
>  preventing rebootstrapping.
>  4. Some wording changes.
> 
>  You can see the changes in the history [1]
> 
>  I'm planning to put the KIP to a vote in some days if there are no new
>  comments.
> 
>  Thank you!
> 
>  Ivan
> 
>  [1]
>  https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=240881396=9=5
> 
>  On Tue, 30 May 2023 at 08:23, Ivan Yurchenko 
>  wrote:
> 
> > Hi Chris and all,
> >
> >> I believe the logic you've linked is only applicable for the producer 
> >> and
> >> consumer clients; the admin client does something different (see [1]).
> >
> > I see, thank you for the pointer. It seems the admin client is fairly
> > different from the producer and consumer. Probably it makes sense to 
> > reduce
> > the scope of the KIP to the producer and consumer clients only.
> >
> >> it'd be nice to have a definition of when re-bootstrapping
> >> would occur that doesn't rely on internal implementation details. What
> >> user-visible phenomena can we identify that would lead to a
> >> re-bootstrapping?
> >
> > Let's put it this way: "Re-bootstrapping means that the client forgets
> > about nodes it knows about and falls back on the bootstrap nodes as if 
> > it
> > had just been initialized. Re-bootstrapping happens when, during a 
> > metadata
> > update (which may be scheduled by `metadata.max.age.ms` or caused by
> > certain error responses like NOT_LEADER_OR_FOLLOWER, 
> > REPLICA_NOT_AVAILABLE,
> > etc.), the client doesn't have a node with an established connection or
> > establishable connection."
> > Does this sound good?
> >
> >> I also believe that if someone has "
> >> reconnect.backoff.max.ms" set to a low-enough value,
> >> NetworkClient::leastLoadedNode may never return null. In that case,
> >> shouldn't we still attempt a re-bootstrap at some point (if the user 
> >> has
> >> enabled this feature)?
> >
> > Yes, you're right. Particularly `canConnect` here [1] can always be
> > returning `true` if `reconnect.backoff.max.ms` is low enough.
> > It seems pretty difficult to find 

[jira] [Resolved] (KAFKA-15568) Use incrementalAlterConfigs to update the dynamic config of broker in ConfigCommand tool

2024-04-10 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-15568.

Resolution: Duplicate

duplicate to KAFKA-16181

> Use incrementalAlterConfigs to update the dynamic config of broker in 
> ConfigCommand tool
> 
>
> Key: KAFKA-15568
> URL: https://issues.apache.org/jira/browse/KAFKA-15568
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Aman Singh
>Assignee: Aman Singh
>Priority: Major
>
> As part of [this 
> KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API]
> incrementalAlterConfigs  API was introduced to change any config dynamically.
> - `kakfa-config.sh (CommandConfig)` still uses `alterConfig`  to update the 
> config.
> - The tool first describes the configs and then replaces all the configs.
> -  We need to remember all the sensitive configs since sensitive fields are 
> not returned by DescribeConfigs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-10 Thread Bruno Cadonna

Hi Nick,

Thanks for reacting on my comments so quickly!


2.
Some thoughts on your proposal.
State managers (and state stores) are parts of tasks. If the task is not 
assigned locally, we do not create those tasks. To get the offsets with 
your approach, we would need to either create kind of inactive tasks 
besides active and standby tasks or store and manage state managers of 
non-assigned tasks differently than the state managers of assigned 
tasks. Additionally, the cleanup thread that removes unassigned task 
directories needs to concurrently delete those inactive tasks or 
task-less state managers of unassigned tasks. This seems all quite messy 
to me.
Could we create those state managers (or state stores) for locally 
existing but unassigned tasks on demand when 
TaskManager#getTaskOffsetSums() is executed? Or have a different 
encapsulation for the unused task directories?



Best,
Bruno



On 4/10/24 11:31 AM, Nick Telford wrote:

Hi Bruno,

Thanks for the review!

1, 4, 5.
Done

3.
You're right. I've removed the offending paragraph. I had originally
adapted this from the guarantees outlined in KIP-892. But it's difficult to
provide these guarantees without the KIP-892 transaction buffers. Instead,
we'll add the guarantees back into the JavaDoc when KIP-892 lands.

2.
Good point! This is the only part of the KIP that was (significantly)
changed when I extracted it from KIP-892. My prototype currently maintains
this "cache" of changelog offsets in .checkpoint, but doing so becomes very
messy. My intent with this change was to try to better encapsulate this
offset "caching", especially for StateStores that can cheaply provide the
offsets stored directly in them without needing to duplicate them in this
cache.

It's clear some more work is needed here to better encapsulate this. My
immediate thought is: what if we construct *but don't initialize* the
StateManager and StateStores for every Task directory on-disk? That should
still be quite cheap to do, and would enable us to query the offsets for
all on-disk stores, even if they're not open. If the StateManager (aka.
ProcessorStateManager/GlobalStateManager) proves too expensive to hold open
for closed stores, we could always have a "StubStateManager" in its place,
that enables the querying of offsets, but nothing else?

IDK, what do you think?

Regards,

Nick

On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna  wrote:


Hi Nick,

Thanks for breaking out the KIP from KIP-892!

Here a couple of comments/questions:

1.
In Kafka Streams, we have a design guideline which says to not use the
"get"-prefix for getters on the public API. Could you please change
getCommittedOffsets() to committedOffsets()?


2.
It is not clear to me how TaskManager#getTaskOffsetSums() should read
offsets of tasks the stream thread does not own but that have a state
directory on the Streams client by calling
StateStore#getCommittedOffsets(). If the thread does not own a task it
does also not create any state stores for the task, which means there is
no state store on which to call getCommittedOffsets().
I would have rather expected that a checkpoint file is written for all
state stores on close -- not only for the RocksDBStore -- and that this
checkpoint file is read in TaskManager#getTaskOffsetSums() for the tasks
that have a state directory on the client but are not currently assigned
to any stream thread of the Streams client.


3.
In the javadocs for commit() you write

"... all writes since the last commit(Map), or since init(StateStore)
*MUST* be available to readers, even after a restart."

This is only true for a clean close before the restart, isn't it?
If the task fails with a dirty close, Kafka Streams cannot guarantee
that the in-memory structures of the state store (e.g. memtable in the
case of RocksDB) are flushed so that the records and the committed
offsets are persisted.


4.
The wrapper that provides the legacy checkpointing behavior is actually
an implementation detail. I would remove it from the KIP, but still
state that the legacy checkpointing behavior will be supported when the
state store does not manage the checkpoints.


5.
Regarding the metrics, could you please add the tags, and the recording
level (DEBUG or INFO) as done in KIP-607 or KIP-444.


Best,
Bruno

On 4/7/24 5:35 PM, Nick Telford wrote:

Hi everyone,

Based on some offline discussion, I've split out the "Atomic

Checkpointing"

section from KIP-892: Transactional Semantics for StateStores, into its

own

KIP

KIP-1035: StateStore managed changelog offsets


https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets


While KIP-892 was adopted *with* the changes outlined in KIP-1035, these
changes were always the most contentious part, and continued to spur
discussion even after KIP-892 was adopted.

All the changes introduced in KIP-1035 have been removed from KIP-892,

and

a hard dependency on KIP-1035 has been added to KIP-892 in their place.

I'm 

Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-04-10 Thread Bruno Cadonna

Hi Walker,

Thanks for the updates!


(1) While I like naming the methods differently, I have also to say that 
I do not like addIsomorphicGlobalStore() because it does not really tell 
what the method does. I could also not come up with a better name than 
addGlobalStoreWithReprocessingOnRestore(). However, I had two ideas on 
which I would like to have your opinion.


(a) Add a new GlobalStoreBuilder in which users can set if the global 
state store should reprocess on restore. Additionally, to the option to 
enable or disable reprocessing on restore, you could also NOT offer a 
way to enable or disable logging in the GlobalStoreBuilder. Currently, 
if users enable logging for a store builder that they pass into 
addGlobalStore(), Kafka Streams needs to explicitly disable it again, 
which is not ideal.


(b) Add a new GlobalProcessorSupplier in which users can set if the 
global state store should reprocess on restore. Another ugliness that 
could be fixed with this is passing Void, Void to ProcessorSupplier. The 
GlobalProcessorSupplier would just have two type parameters . 
The nice aspect of this idea is that the option to enable/disable 
reprocessing on restore is only needed when a processor supplier is 
passed into the methods. That is not true for idea (a).



(2) Yes, that was my intent.


Best,
Bruno

On 4/9/24 9:33 PM, Walker Carlson wrote:

Hey all,

(1) no I hadn't considered just naming the methods differently. I actually
really like this idea and am for it. Except we need 3 different methods
now. One for no processor, one for a processor that should restore and one
that reprocesses. How about `addCustomGlobalStore` and
`addIsomorphicGlobalStore` and then just `addGlobalStateStore` for the no
processor case? If everyone likes that I can add that to the KIP and rename
the methods.

(2) we can have the the built in case use StoreBuilder and manually check for the TimestampedKeyValueStore. That is
fine with me.

Bruno I hope that was what you were intending.

(3) For the scala api, do we need to make it match the java api or are we
just making the minimum changes? as if we take point 1 I don't know how
much we need to change.

Thanks,
Walker


On Tue, Apr 2, 2024 at 8:38 AM Matthias J. Sax  wrote:


One more thing:

I was just looking into the WIP PR, and it seems we will also need to
change `StreamsBuilder.scala`. The KIP needs to cover this changes as well.


-Matthias

On 4/1/24 10:33 PM, Bruno Cadonna wrote:

Hi Walker and Matthias,

(2)
That is exactly my point about having a compile time error versus a
runtime error. The added flexibility as proposed by Matthias sounds good
to me.

Regarding the Named parameter, I was not aware that the processor that
writes records to the global state store is named according to the name
passed in by Consumed. I thought Consumed strictly specifies the names
of source processors. So I am fine with not having an overload with a
Named parameter.

Best,
Bruno

On 3/31/24 11:30 AM, Matthias J. Sax wrote:

Two more follow up thoughts:

(1) I am still not a big fan of the boolean parameter we introduce.
Did you consider to use different method names, like
`addReadOnlyGlobalStore()` (for the optimized method, that would not
reprocess data on restore), and maybe add `addModifiableGlobalStore()`
(not a good name, but we cannot re-use existing `addGlobalStore()` --
maybe somebody else has a good idea about a better `addXxxGlobalStore`
that would describe it well).

(2) I was thinking about Bruno's comment to limit the scope the store
builder for the optimized case. I think we should actually do
something about it, because in the end, the runtime (ie, the
`Processor` we hard wire) would need to pick a store it supports and
cast to the corresponding store? If the cast fails, we hit a runtime
exception, but by putting the store we cast to into the signature we
can actually convert it into a compile time error what seems better.
-- If we want, we could make it somewhat flexible and support both
`KeyValueStore` and `TimestampedKeyValueStore` -- ie, the signature
would be `KeyValueStore` but we explicitly check if the builder gives
us a `TimestampedKeyValueStore` instance and use it properly.

If putting the signature does not work for some reason, we should at
least clearly call it out in the JavaDocs what store type is expected.



-Matthias



On 3/28/24 5:05 PM, Walker Carlson wrote:

Hey all,

Thanks for the feedback Bruno, Almog and Matthias!

Almog: I like the idea, but I agree with Matthais. I actually looked at
that ticket a bit when doing this and found that while similar they are
actually pretty unrelated codewise. I would love to see it get taken
care
of.

Bruno and Matthias: The Named parameter doesn't really make sense to
me to
put it here. The store in the Store builder is already named through
what
Matthais described and the processor doesn't actually have a name. That
would be the processor node that gets named via the Named parameter

(in

the DSL) 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-10 Thread Nick Telford
Hi Bruno,

Thanks for the review!

1, 4, 5.
Done

3.
You're right. I've removed the offending paragraph. I had originally
adapted this from the guarantees outlined in KIP-892. But it's difficult to
provide these guarantees without the KIP-892 transaction buffers. Instead,
we'll add the guarantees back into the JavaDoc when KIP-892 lands.

2.
Good point! This is the only part of the KIP that was (significantly)
changed when I extracted it from KIP-892. My prototype currently maintains
this "cache" of changelog offsets in .checkpoint, but doing so becomes very
messy. My intent with this change was to try to better encapsulate this
offset "caching", especially for StateStores that can cheaply provide the
offsets stored directly in them without needing to duplicate them in this
cache.

It's clear some more work is needed here to better encapsulate this. My
immediate thought is: what if we construct *but don't initialize* the
StateManager and StateStores for every Task directory on-disk? That should
still be quite cheap to do, and would enable us to query the offsets for
all on-disk stores, even if they're not open. If the StateManager (aka.
ProcessorStateManager/GlobalStateManager) proves too expensive to hold open
for closed stores, we could always have a "StubStateManager" in its place,
that enables the querying of offsets, but nothing else?

IDK, what do you think?

Regards,

Nick

On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna  wrote:

> Hi Nick,
>
> Thanks for breaking out the KIP from KIP-892!
>
> Here a couple of comments/questions:
>
> 1.
> In Kafka Streams, we have a design guideline which says to not use the
> "get"-prefix for getters on the public API. Could you please change
> getCommittedOffsets() to committedOffsets()?
>
>
> 2.
> It is not clear to me how TaskManager#getTaskOffsetSums() should read
> offsets of tasks the stream thread does not own but that have a state
> directory on the Streams client by calling
> StateStore#getCommittedOffsets(). If the thread does not own a task it
> does also not create any state stores for the task, which means there is
> no state store on which to call getCommittedOffsets().
> I would have rather expected that a checkpoint file is written for all
> state stores on close -- not only for the RocksDBStore -- and that this
> checkpoint file is read in TaskManager#getTaskOffsetSums() for the tasks
> that have a state directory on the client but are not currently assigned
> to any stream thread of the Streams client.
>
>
> 3.
> In the javadocs for commit() you write
>
> "... all writes since the last commit(Map), or since init(StateStore)
> *MUST* be available to readers, even after a restart."
>
> This is only true for a clean close before the restart, isn't it?
> If the task fails with a dirty close, Kafka Streams cannot guarantee
> that the in-memory structures of the state store (e.g. memtable in the
> case of RocksDB) are flushed so that the records and the committed
> offsets are persisted.
>
>
> 4.
> The wrapper that provides the legacy checkpointing behavior is actually
> an implementation detail. I would remove it from the KIP, but still
> state that the legacy checkpointing behavior will be supported when the
> state store does not manage the checkpoints.
>
>
> 5.
> Regarding the metrics, could you please add the tags, and the recording
> level (DEBUG or INFO) as done in KIP-607 or KIP-444.
>
>
> Best,
> Bruno
>
> On 4/7/24 5:35 PM, Nick Telford wrote:
> > Hi everyone,
> >
> > Based on some offline discussion, I've split out the "Atomic
> Checkpointing"
> > section from KIP-892: Transactional Semantics for StateStores, into its
> own
> > KIP
> >
> > KIP-1035: StateStore managed changelog offsets
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets
> >
> > While KIP-892 was adopted *with* the changes outlined in KIP-1035, these
> > changes were always the most contentious part, and continued to spur
> > discussion even after KIP-892 was adopted.
> >
> > All the changes introduced in KIP-1035 have been removed from KIP-892,
> and
> > a hard dependency on KIP-1035 has been added to KIP-892 in their place.
> >
> > I'm hopeful that with some more focus on this set of changes, we can
> > deliver something that we're all happy with.
> >
> > Regards,
> > Nick
> >
>


[DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-10 Thread Sebastien Viale
Hi,

You are right, it will simplify types.

We update the KIP

regards


Sébastien VIALE

MICHELIN GROUP - InfORMATION Technology
Technical Expert Kafka

[cid:1f4ea9d0-bfb8-49da-b32a-05d63c7f41a6] Carmes / Bâtiment A17 4e / 63100 
Clermont-Ferrand


De : Bruno Cadonna 
Envoyé : mercredi 10 avril 2024 10:38
À : dev@kafka.apache.org 
Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for 
exceptions occuring during processing

Warning External sender Do not click on any links or open any attachments 
unless you trust the sender and know the content is safe.

Hi Loïc, Damien, and Sébastien,

Great that we are converging!


3.
Damien and Loïc, I think in your examples the handler will receive
Record because an Record is passed to
the processor in the following code line:
https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152

I see that we do not need to pass into the the handler a Record just because we do that for the DeserializationExceptionHandler
and the ProductionExceptionHandler. When those two handlers are called,
the record is already serialized. This is not the case for the
ProcessingExceptionHandler. However, I would propose to use Record
for the record that is passed to the ProcessingExceptionHandler because
it makes the handler API more flexible.


Best,
Bruno

This email was screened for spam and malicious content but exercise caution 
anyway.




On 4/9/24 9:09 PM, Loic Greffier wrote:
> Hi Bruno and Bill,
>
> To complete the Damien's purposes about the point 3.
>
> Processing errors are caught and handled by the ProcessingErrorHandler, at 
> the precise moment when records are processed by processor nodes. The 
> handling will be performed in the "process" method of the ProcessorNode, such 
> as:
>
> public void process(final Record record) {
> ...
>
> try {
> ...
> } catch (final ClassCastException e) {
> ...
> } catch (Exception e) {
> ProcessingExceptionHandler.ProcessingHandlerResponse response = 
> this.processingExceptionHandler
> .handle(internalProcessorContext, (Record) record, e);
>
> if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
> throw new StreamsException("Processing exception handler is set to fail upon" 
> +
> " a processing error. If you would rather have the streaming pipeline" +
> " continue after a processing error, please set the " +
> DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
> e);
> }
> }
> }
> As you can see, the record is transmitted to the ProcessingExceptionHandler 
> as a Record, as we are dealing with the input record of the 
> processor at this point. It can be any type, including non-serializable 
> types, as suggested by the Damien's example. As the ProcessingErrorHandler is 
> not intended to perform any serialization, there should be no issue for the 
> users to handle a Record.
>
> I follow Damien on the other points.
>
> For point 6, underlying public interfaces are renamed as well:
> - The ProcessingHandlerResponse
> - The 
> ProcessingLogAndContinueExceptionHandler/ProcessingLogAndFailExceptionHandler
> - The configuration DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG 
> (default.processing.exception.handler)
>
> Regards,
>
> Loïc
>
> De : Damien Gasparina 
> Envoyé : mardi 9 avril 2024 20:08
> À : dev@kafka.apache.org
> Objet : Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler 
> for exceptions occuring during processing
>
> Warning External sender Do not click on any links or open any attachments 
> unless you trust the sender and know the content is safe.
>
> Hi Bruno, Bill,
>
> First of all, thanks a lot for all your useful comments.
>
>> 1. and 2.
>> I am wondering whether we should expose the processor node ID -- which
>> basically is the processor node name -- in the ProcessingContext
>> interface. I think the processor node ID fits well in the
>> ProcessingContext interface since it already contains application ID and
>> task ID and it would make the API for the handler cleaner.
>
> That's a good point, the actual ProcessorContextImpl is already holding the
> current node in an attribute (currentNode), thus exposing the node ID should
> not be a problem. Let me sleep on it and get back to you regarding this
> point.
>
>> 3.
>> Could you elaborate -- maybe with an example -- when a record is in a
>> state in which it cannot be serialized? This is not completely clear to
> me.
>
> The Record passed to the handler is the input record to the processor. In
> the Kafka Streams API, it could be any POJO.
> e.g. with the following topology `
> streamsBuilder.stream("x")
> .map((k, v) -> new KeyValue("foo", Pair.of("hello",
> "world")))
> 

[jira] [Created] (KAFKA-16505) KIP-1034: Dead letter queue in Kafka Streams

2024-04-10 Thread Damien Gasparina (Jira)
Damien Gasparina created KAFKA-16505:


 Summary: KIP-1034: Dead letter queue in Kafka Streams
 Key: KAFKA-16505
 URL: https://issues.apache.org/jira/browse/KAFKA-16505
 Project: Kafka
  Issue Type: Improvement
Reporter: Damien Gasparina


See KIP: KIP-1034: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-15793) Flaky test ZkMigrationIntegrationTest.testMigrateTopicDeletions

2024-04-10 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez reopened KAFKA-15793:
-

This has come up again:

 
{code:java}
[2024-04-09T21:06:17.307Z] Gradle Test Run :core:test > Gradle Test Executor 97 
> DeleteTopicsRequestTest > 
testTopicDeletionClusterHasOfflinePartitions(String) > 
"testTopicDeletionClusterHasOfflinePartitions(String).quorum=zk" STARTED
[2024-04-09T21:06:17.307Z] 
kafka.zk.ZkMigrationIntegrationTest.testMigrateTopicDeletions(ClusterInstance)[7]
 failed, log available in 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15522/core/build/reports/testOutput/kafka.zk.ZkMigrationIntegrationTest.testMigrateTopicDeletions(ClusterInstance)[7].test.stdout
[2024-04-09T21:06:17.307Z] 
[2024-04-09T21:06:17.307Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZkMigrationIntegrationTest > testMigrateTopicDeletions(ClusterInstance) > 
testMigrateTopicDeletions [7] Type=ZK, MetadataVersion=3.7-IV4, 
Security=PLAINTEXT FAILED
[2024-04-09T21:06:17.307Z]     
org.apache.kafka.server.fault.FaultHandlerException: nonFatalFaultHandler: 
Unhandled error in MetadataChangeEvent: Check op on KRaft Migration ZNode 
failed. Expected zkVersion = 5. This indicates that another KRaft controller is 
making writes to ZooKeeper.
[2024-04-09T21:06:17.307Z]         at 
app//kafka.zk.KafkaZkClient.handleUnwrappedMigrationResult$1(KafkaZkClient.scala:2001)
[2024-04-09T21:06:17.307Z]         at 
app//kafka.zk.KafkaZkClient.unwrapMigrationResponse$1(KafkaZkClient.scala:2027)
[2024-04-09T21:06:17.307Z]         at 
app//kafka.zk.KafkaZkClient.$anonfun$retryMigrationRequestsUntilConnected$2(KafkaZkClient.scala:2052)
[2024-04-09T21:06:17.307Z]         at 
app//scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
[2024-04-09T21:06:17.307Z]         at 
app//scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
[2024-04-09T21:06:17.307Z]         at 
app//scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43)
[2024-04-09T21:06:17.307Z]         at 
app//kafka.zk.KafkaZkClient.retryMigrationRequestsUntilConnected(KafkaZkClient.scala:2052)
[2024-04-09T21:06:17.307Z]         at 
app//kafka.zk.migration.ZkTopicMigrationClient.$anonfun$updateTopicPartitions$1(ZkTopicMigrationClient.scala:265)
[2024-04-09T21:06:17.307Z]         at 
app//kafka.zk.migration.ZkTopicMigrationClient.updateTopicPartitions(ZkTopicMigrationClient.scala:255)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsDelta$20(KRaftMigrationZkWriter.java:334)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:248)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$300(KRaftMigrationDriver.java:62)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriver$MetadataChangeEvent.lambda$run$1(KRaftMigrationDriver.java:532)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriver.lambda$countingOperationConsumer$6(KRaftMigrationDriver.java:845)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsDelta$21(KRaftMigrationZkWriter.java:331)
[2024-04-09T21:06:17.307Z]         at 
java.base@17.0.7/java.util.HashMap.forEach(HashMap.java:1421)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleTopicsDelta(KRaftMigrationZkWriter.java:297)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleDelta(KRaftMigrationZkWriter.java:112)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriver$MetadataChangeEvent.run(KRaftMigrationDriver.java:531)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:128)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182)
[2024-04-09T21:06:17.307Z]         at 
java.base@17.0.7/java.lang.Thread.run(Thread.java:833)
[2024-04-09T21:06:17.307Z] 
[2024-04-09T21:06:17.307Z]         Caused by:
[2024-04-09T21:06:17.307Z]         java.lang.RuntimeException: Check op on 
KRaft Migration ZNode failed. Expected zkVersion = 5. This indicates that 
another KRaft controller is making writes to ZooKeeper.
[2024-04-09T21:06:17.307Z]             at 
kafka.zk.KafkaZkClient.handleUnwrappedMigrationResult$1(KafkaZkClient.scala:2001)
[2024-04-09T21:06:17.307Z]             ... 22 

[jira] [Created] (KAFKA-16504) Flaky test org.apache.kafka.controller.QuorumControllerTest.testConfigurationOperations

2024-04-10 Thread Igor Soarez (Jira)
Igor Soarez created KAFKA-16504:
---

 Summary: Flaky test 
org.apache.kafka.controller.QuorumControllerTest.testConfigurationOperations
 Key: KAFKA-16504
 URL: https://issues.apache.org/jira/browse/KAFKA-16504
 Project: Kafka
  Issue Type: Test
  Components: controller
Reporter: Igor Soarez


{code:java}
[2024-04-09T20:26:55.840Z] Gradle Test Run :metadata:test > Gradle Test 
Executor 54 > QuorumControllerTest > testConfigurationOperations() STARTED
[2024-04-09T20:26:55.840Z] 
org.apache.kafka.controller.QuorumControllerTest.testConfigurationOperations() 
failed, log available in 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15522/metadata/build/reports/testOutput/org.apache.kafka.controller.QuorumControllerTest.testConfigurationOperations().test.stdout
[2024-04-09T20:26:55.840Z] 
[2024-04-09T20:26:55.840Z] Gradle Test Run :metadata:test > Gradle Test 
Executor 54 > QuorumControllerTest > testConfigurationOperations() FAILED
[2024-04-09T20:26:55.840Z]     java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.NotControllerException: No controller appears to 
be active.
[2024-04-09T20:26:55.840Z]         at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
[2024-04-09T20:26:55.840Z]         at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
[2024-04-09T20:26:55.840Z]         at 
org.apache.kafka.controller.QuorumControllerTest.testConfigurationOperations(QuorumControllerTest.java:202)
[2024-04-09T20:26:55.840Z] 
[2024-04-09T20:26:55.840Z]         Caused by:
[2024-04-09T20:26:55.840Z]         
org.apache.kafka.common.errors.NotControllerException: No controller appears to 
be active. {code}
 

[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15522/5/tests/]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-10 Thread Bruno Cadonna

Hi Loïc, Damien, and Sébastien,

Great that we are converging!


3.
Damien and Loïc, I think in your examples the handler will receive 
Record because an Record is passed to 
the processor in the following code line:

https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152

I see that we do not need to pass into the the handler a Recordbyte[]> just because we do that for the DeserializationExceptionHandler 
and the ProductionExceptionHandler. When those two handlers are called, 
the record is already serialized. This is not the case for the 
ProcessingExceptionHandler. However, I would propose to use Record 
for the record that is passed to the ProcessingExceptionHandler because 
it makes the handler API more flexible.



Best,
Bruno


On 4/9/24 9:09 PM, Loic Greffier wrote:

Hi Bruno and Bill,

To complete the Damien's purposes about the point 3.

Processing errors are caught and handled by the ProcessingErrorHandler, at the precise 
moment when records are processed by processor nodes. The handling will be performed in 
the "process" method of the ProcessorNode, such as:

public void process(final Record record) {
 ...

 try {
 ...
 } catch (final ClassCastException e) {
 ...
 } catch (Exception e) {
 ProcessingExceptionHandler.ProcessingHandlerResponse response = 
this.processingExceptionHandler
 .handle(internalProcessorContext, (Record) 
record, e);

 if (response == 
ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
 throw new StreamsException("Processing exception handler is set to 
fail upon" +
 " a processing error. If you would rather have the 
streaming pipeline" +
 " continue after a processing error, please set the " +
 DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " 
appropriately.",
 e);
 }
 }
}
As you can see, the record is transmitted to the ProcessingExceptionHandler as a 
Record, as we are dealing with the input record of the processor at 
this point. It can be any type, including non-serializable types, as suggested by the 
Damien's example. As the ProcessingErrorHandler is not intended to perform any 
serialization, there should be no issue for the users to handle a 
Record.

I follow Damien on the other points.

For point 6, underlying public interfaces are renamed as well:
- The ProcessingHandlerResponse
- The 
ProcessingLogAndContinueExceptionHandler/ProcessingLogAndFailExceptionHandler
- The configuration DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG 
(default.processing.exception.handler)

Regards,

Loïc

De : Damien Gasparina 
Envoyé : mardi 9 avril 2024 20:08
À : dev@kafka.apache.org
Objet : Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler 
for exceptions occuring during processing

Warning External sender Do not click on any links or open any attachments 
unless you trust the sender and know the content is safe.

Hi Bruno, Bill,

First of all, thanks a lot for all your useful comments.


1. and 2.
I am wondering whether we should expose the processor node ID -- which
basically is the processor node name -- in the ProcessingContext
interface. I think the processor node ID fits well in the
ProcessingContext interface since it already contains application ID and
task ID and it would make the API for the handler cleaner.


That's a good point, the actual ProcessorContextImpl is already holding the
current node in an attribute (currentNode), thus exposing the node ID should
not be a problem. Let me sleep on it and get back to you regarding this
point.


3.
Could you elaborate -- maybe with an example -- when a record is in a
state in which it cannot be serialized? This is not completely clear to

me.

The Record passed to the handler is the input record to the processor. In
the Kafka Streams API, it could be any POJO.
e.g. with the following topology `
streamsBuilder.stream("x")
.map((k, v) -> new KeyValue("foo", Pair.of("hello",
"world")))
.forEach((k, v) -> throw new RuntimeException())
I would expect the handler to receive a Record>.


4.
Regarding the metrics, it is not entirely clear to me what the metric
measures. Is it the number of calls to the process handler or is it the
number of calls to process handler that returned FAIL?
If it is the former, I was also wondering whether it would be better to
put the task-level metrics to INFO reporting level and remove the
thread-level metric, similar to the dropped-records metric. You can
always roll-up the metrics to the thread level in your preferred
monitoring system. Or do you think we end up with to many metrics?


We were thinking of the former, measuring the number of calls to the
process handler. That's a good point, having the information at 

[DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-10 Thread Damien Gasparina
Hi everyone,

To continue on our effort to improve Kafka Streams error handling, we
propose a new KIP to add out of the box support for Dead Letter Queue.
The goal of this KIP is to provide a default implementation that
should be suitable for most applications and allow users to override
it if they have specific requirements.

In order to build a suitable payload, some additional changes are
included in this KIP:
  1. extend the ProcessingContext to hold, when available, the source
node raw key/value byte[]
  2. expose the ProcessingContext to the ProductionExceptionHandler,
it is currently not available in the handle parameters.

Regarding point 2.,  to expose the ProcessingContext to the
ProductionExceptionHandler, we considered two choices:
  1. exposing the ProcessingContext as a parameter in the handle()
method. That's the cleanest way IMHO, but we would need to deprecate
the old method.
  2. exposing the ProcessingContext as an attribute in the interface.
This way, no method is deprecated, but we would not be consistent with
the other ExceptionHandler.

In the KIP, we chose the 1. solution (new handle signature with old
one deprecated), but we could use other opinions on this part.
More information is available directly on the KIP.

KIP link: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams

Feedbacks and suggestions are welcome,

Cheers,
Damien, Sebastien and Loic


[jira] [Created] (KAFKA-16503) getOrMaybeCreateClassicGroup should not thrown GroupIdNotFoundException

2024-04-10 Thread David Jacot (Jira)
David Jacot created KAFKA-16503:
---

 Summary: getOrMaybeCreateClassicGroup should not thrown 
GroupIdNotFoundException
 Key: KAFKA-16503
 URL: https://issues.apache.org/jira/browse/KAFKA-16503
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Jacot


It looks like `getOrMaybeCreateClassicGroup` method throws an 
`GroupIdNotFoundException` error when the group exists but with the wrong type. 
As `getOrMaybeCreateClassicGroup` is mainly used on the join-group/sync-group 
APIs, this seems incorrect. We need to double check and fix.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-15538) Client support for java regex based subscription

2024-04-10 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reopened KAFKA-15538:
---

> Client support for java regex based subscription
> 
>
> Key: KAFKA-15538
> URL: https://issues.apache.org/jira/browse/KAFKA-15538
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Blocker
>  Labels: kip-848-client-support, newbie, regex
> Fix For: 3.8.0
>
>
> When using subscribe with a java regex (Pattern), we need to resolve it on 
> the client side to send the broker a list of topic names to subscribe to.
> Context:
> The new consumer group protocol uses [Google 
> RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
> new methods in the consumer API to subscribe using a `SubscribePattern`. The 
> subscribe using a java `Pattern` will be still supported for a while but 
> eventually removed.
>  * When the subscribe with SubscriptionPattern is used, the client should 
> just send the regex to the broker and it will be resolved on the server side.
>  * In the case of the subscribe with Pattern, the regex should be resolved on 
> the client side.
> As part of this task, we should re-enable all integration tests defined in 
> the PlainTextAsyncConsumer that relate to subscription with pattern and that 
> are currently disabled for the new consumer + new protocol



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-1022 Formatting and Updating Features

2024-04-10 Thread David Jacot
+1 (binding). Thanks for the KIP!

On Mon, Apr 8, 2024 at 7:23 PM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:

> Hi Justine,
> Thanks for the KIP.
>
> +1 (non-binding)
>
> Thanks,
> Andrew
>
> > On 8 Apr 2024, at 18:07, Justine Olshan 
> wrote:
> >
> > Hello all,
> > I would like to start a vote for KIP-1022 Formatting and Updating
> Features
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1022%3A+Formatting+and+Updating+Features
> >
> >
> > Please take a look and cast your vote.
> >
> > Thanks,
> > Justine
>
>