Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #2837

2024-04-22 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-22 Thread Matthias J. Sax

Thanks for all the updates. Great discussion.

Few follow up questions from my side:

99: I agree with Damien about Bruno's point (2). We should not return 
`Record` (cf point 103 below why).



100: Can we imagine a case, for which the `ProcessingExceptionHandler` 
would want/need to have access to a Processor's state stores? When the 
handler is executed, we are technically still in the right context, and 
could maybe allow accessing the state store. Just a wild idea; also 
don't want to enlarge the scope unnecessarily, and we might be able to 
do this in a follow up KIP, too, if we believe it would be useful. I 
though I just throw out the idea for completeness.



101: Does the name `ErrorHandlerContext` align to what Sophie had in 
mind about using this interface somewhere else?



102 `ErrorHandlerContext`: Is it intentional to have both `partition()` 
and `taskId()` -- the `TaskId` encodes the partition implicitly, so it's 
kinda redundant to also have `partition()`. Don't feel strongly about 
it, but might be worth to call out in the KIP why both are added.



103 `ErrorHandlerContext#header`: the return type is `Headers` which 
does not ensure immutability. I believe we need to introduce new 
`ReadOnlyHeaders` (or maybe some better name) interface...



104 `ErrorHandlerContext#convertToProcessorContext()`: I understand why 
this method was added, but I don't think that's the right approach to 
handle this case. We should not add this leaky abstraction IMHO, but 
instead add this method to a `DefaultImpl` class, and add a cast into 
the implementation from the interface to the class to access it. (Not 
100% sure about the details how to setup the code, so it would be great 
to get a POC PR up to see how we can do this w/o the need to add this 
method to the interface.)



105 `ProductionExceptionHandler`: why does the existing method get a 
default implementation that throws an exception? Would be good to 
clarify in the KIP why this change in necessary in this way. -- Could we 
also let it `return FAIL` instead?



106 `ProductionExceptionHandler`: why do we add two new methods? IIRC, 
we added `handleSerializationException(...)` only because we could not 
re-use the existing `handle(...)` method (cf KIP-399: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions). 
I think it's sufficient to only add a single new method which 
"blends/unifies" both exiting ones:


  handle(final ErrorHandlerContext context,
 final ProducerRecord record, // no generic types
 final Exception exception)


107 `DeserializationExceptionHandler`: same question as above, about the 
default impl and letting it throw an exception.



108 `default.process.exception.handler`: we use the prefix `default.` 
for both existing handlers, because we allow to pass in topic specific 
handlers via `Consumed` and `Produced` overwrites, ie, the default can 
be overwritten. We don't want to allow to pass in a Processor specific 
handler as pointed out in "Rejected Alternatives" section, and thus the 
configured handler is not really a "default" as it cannot be 
overwritten. For this case, we should drop the `default.` prefix in the 
config name.



109: Lucas brought up the idea on the KIP-1034 discussion to also 
include `TimestampExtractor` interface for DLQ, what I think makes a lot 
of sense. Not sure if we would need any extentions in this KIP to get 
this done? I would rather include timestamp extraction issue in the DLQ 
KIP from day one on. The interface is quite different though, so we 
would need to think a little bit about it in more details how to do 
this. Right now, the contract is that returning `-1` as extracted 
timestamp is an implicit "drop record" signal to the runtime, what is 
rather subtle. Can we do anything about this in a meaningful way?




-Matthias

On 4/22/24 8:20 AM, Damien Gasparina wrote:

Hi Bruno,

1) Good point, naming stuff is definitely hard, I renamed
ProcessingMetadata to ErrorHandlerContext


Is there any reason you did not use something like
Record sourceRecord()


2) The record class is used in many locations and, I assume, could be
expanded by new features.
As the error handler metadata could be stored in memory for a longer
duration due to the ProductionExceptionHandle, I think it is wiser to
have an independent class that is the leanest possible.
Maybe I am overthinking this, what do you think?

3) Oops, good point, I didn't notice that it was still there. I
removed the parameter.

4) To clarify the KIP, I specified the DeserializationExceptionHandler
interface in the KIP.

5) Good point, I updated the KIP with the order you mentioned:
context, record, exception

6) I was indeed thinking of extending the ProcessingContext to store
it there. Let me update the KIP to make it clear.

 From past experience, deprecating an interface is always a bit
painful, in this KIP, I relied on default 

[jira] [Created] (KAFKA-16603) Data loss when kafka connect sending data to Kafka

2024-04-22 Thread Anil Dasari (Jira)
Anil Dasari created KAFKA-16603:
---

 Summary: Data loss when kafka connect sending data to Kafka
 Key: KAFKA-16603
 URL: https://issues.apache.org/jira/browse/KAFKA-16603
 Project: Kafka
  Issue Type: Bug
  Components: clients, producer 
Affects Versions: 3.3.1
Reporter: Anil Dasari


We are experiencing a data loss when Kafka Source connector is failed to send 
data to Kafka topic and offset topic. 

Kafka cluster and Kafka connect details:
 # Kafka version : Confluent community version 7.3.1 i.e Kafka 3.3.1
 # Cluster size : 3 brokers
 # Number of partitions in all topics = 3
 # Replication factor = 3
 # Min ISR set 2
 # Uses no transformations in Kafka connector
 # Use default error tolerance i.e None.

Our connector checkpoints the offsets info received in SourceTask#commitRecord 
and resume the data process from the persisted checkpoint.

The data loss is noticed when broker is unresponsive for few mins due to high 
load and kafka connector was restarted. However, Kafka connector graceful 
shutdown failed.

Logs:

 
{code:java}
[Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] 
Discovered group coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
Apr 22, 2024 @ 15:56:16.152 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be 
attempted.
Apr 22, 2024 @ 15:56:16.153 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Requesting disconnect from 
last known coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
Apr 22, 2024 @ 15:56:16.514 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 0 disconnected.
Apr 22, 2024 @ 15:56:16.708 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Node 0 
disconnected.
Apr 22, 2024 @ 15:56:16.710 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 2147483647 disconnected.
Apr 22, 2024 @ 15:56:16.731 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
to cause: coordinator unavailable. isDisconnected: true. Rediscovery will be 
attempted.
Apr 22, 2024 @ 15:56:19.103 == Trying to sleep while stop == (** custom log **)
Apr 22, 2024 @ 15:56:19.755 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Broker coordinator was 
unreachable for 3000ms. Revoking previous assignment Assignment{error=0, 
leader='connect-1-8f41a1d2-6cc9-4956-9be3-1fbae9c6d305', 
leaderUrl='http://10.75.100.46:8083/', offset=4, 
connectorIds=[d094a5d7bbb046b99d62398cb84d648c], 
taskIds=[d094a5d7bbb046b99d62398cb84d648c-0], revokedConnectorIds=[], 
revokedTaskIds=[], delay=0} to avoid running tasks while not being a member the 
group
Apr 22, 2024 @ 15:56:19.866 Stopping connector d094a5d7bbb046b99d62398cb84d648c
Apr 22, 2024 @ 15:56:19.874 Stopping task d094a5d7bbb046b99d62398cb84d648c-0
Apr 22, 2024 @ 15:56:19.880 Scheduled shutdown for 
WorkerConnectorWorkerConnector{id=d094a5d7bbb046b99d62398cb84d648c}
Apr 22, 2024 @ 15:56:24.105 Connector 'd094a5d7bbb046b99d62398cb84d648c' failed 
to properly shut down, has become unresponsive, and may be consuming external 
resources. Correct the configuration for this connector or remove the 
connector. After fixing the connector, it may be necessary to restart this 
worker to release any consumed resources.
Apr 22, 2024 @ 15:56:24.110 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Closing the 
Kafka producer with timeoutMillis = 0 ms.
Apr 22, 2024 @ 15:56:24.110 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Proceeding to 
force close the producer since pending requests could not be completed within 
timeout 0 ms.
Apr 22, 2024 @ 15:56:24.112 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Beginning 
shutdown of Kafka producer I/O thread, sending remaining records.
Apr 22, 2024 @ 15:56:24.112 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Aborting 
incomplete batches due to forced shutdown
Apr 22, 2024 @ 15:56:24.113 
WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} 
Committing offsets
Apr 22, 2024 @ 15:56:24.113 
WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} Either 
no records were produced by the task since the last offset commit, or every 
record has been filtered out by a transformation or dropped due to 
transformation or conversion errors.
Apr 22, 2024 @ 15:56:24.146 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Finished stopping tasks in 

[jira] [Created] (KAFKA-16602) Flaky test – org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord()

2024-04-22 Thread Igor Soarez (Jira)
Igor Soarez created KAFKA-16602:
---

 Summary: Flaky test – 
org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord()
 Key: KAFKA-16602
 URL: https://issues.apache.org/jira/browse/KAFKA-16602
 Project: Kafka
  Issue Type: Test
  Components: controller
Reporter: Igor Soarez


 
org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord()
 failed with:

 
h4. Error
{code:java}
org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: 
exception while renouncing leadership: Attempt to resign from epoch 1 which is 
larger than the current epoch 0{code}
h4. Stacktrace
{code:java}
org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: 
exception while renouncing leadership: Attempt to resign from epoch 1 which is 
larger than the current epoch 0  at 
app//org.apache.kafka.metalog.LocalLogManager.resign(LocalLogManager.java:808)  
at 
app//org.apache.kafka.controller.QuorumController.renounce(QuorumController.java:1270)
  at 
app//org.apache.kafka.controller.QuorumController.handleEventException(QuorumController.java:547)
  at 
app//org.apache.kafka.controller.QuorumController.access$800(QuorumController.java:179)
  at 
app//org.apache.kafka.controller.QuorumController$ControllerWriteEvent.complete(QuorumController.java:881)
  at 
app//org.apache.kafka.controller.QuorumController$ControllerWriteEvent.handleException(QuorumController.java:871)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventContext.completeWithException(KafkaEventQueue.java:149)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:138)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182)
  at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829) Caused by: 
java.lang.IllegalArgumentException: Attempt to resign from epoch 1 which is 
larger than the current epoch 0{code}
 

Source: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15701/3/tests/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16601) Flaky test – org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest.testClosingQuorumControllerClosesMetrics()

2024-04-22 Thread Igor Soarez (Jira)
Igor Soarez created KAFKA-16601:
---

 Summary: Flaky test – 
org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest.testClosingQuorumControllerClosesMetrics()
 Key: KAFKA-16601
 URL: https://issues.apache.org/jira/browse/KAFKA-16601
 Project: Kafka
  Issue Type: Test
  Components: controller
Reporter: Igor Soarez


 
org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest.testClosingQuorumControllerClosesMetrics()
 failed with:
h4. Error
{code:java}
org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: 
exception while renouncing leadership: Attempt to resign from epoch 1 which is 
larger than the current epoch 0{code}
h4. Stacktrace
{code:java}
org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: 
exception while renouncing leadership: Attempt to resign from epoch 1 which is 
larger than the current epoch 0  at 
app//org.apache.kafka.metalog.LocalLogManager.resign(LocalLogManager.java:808)  
at 
app//org.apache.kafka.controller.QuorumController.renounce(QuorumController.java:1270)
  at 
app//org.apache.kafka.controller.QuorumController.handleEventException(QuorumController.java:547)
  at 
app//org.apache.kafka.controller.QuorumController.access$800(QuorumController.java:179)
  at 
app//org.apache.kafka.controller.QuorumController$ControllerWriteEvent.complete(QuorumController.java:881)
  at 
app//org.apache.kafka.controller.QuorumController$ControllerWriteEvent.handleException(QuorumController.java:871)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventContext.completeWithException(KafkaEventQueue.java:149)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:138)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182)
  at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829) Caused by: 
java.lang.IllegalArgumentException: Attempt to resign from epoch 1 which is 
larger than the current epoch 0{code}
 

Source:

https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15701/3/tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is unstable: Kafka » Kafka Branch Builder » 3.7 #142

2024-04-22 Thread Apache Jenkins Server
See 




[DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-22 Thread Sebastien Viale
Hi,

Thanks for your remarks

L1. I would say "who can do the most can do the least", even though most people 
will fail and stop, we found it interesting to offer the possibility to 
fail-and-send-to-DLQ

L2: We did not consider extending the TimestampExtractor because we estimate it 
out of scope for this KIP. Perhaps it will be possible to include it in an 
ExceptionHandler later.

L3: we will include an example in the KIP, but as we mentioned earlier, the DLQ 
topic can be different in each custom Exception Handler:

When providing custom handlers, users would have the possibility to return:
 * FAIL
* CONTINUE
* FAIL.withDeadLetterQueueRecord(record, "dlq-topic")
* CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic")

cheers !
Sébastien



De : Lucas Brutschy 
Envoyé : lundi 22 avril 2024 14:36
À : dev@kafka.apache.org 
Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

Warning External sender Do not click on any links or open any attachments 
unless you trust the sender and know the content is safe.

Hi!

Thanks for the KIP, great stuff.

L1. I was a bit confused that the default configuration (once you set
a DLQ topic) is going to be fail-and-send-to-DLQ, if I understood
correctly. Is this something that will be a common use-case, and is it
a configuration that we want to encourage? It expected that you either
want to fail or skip-and-send-to-DLQ.

L2. Have you considered extending the `TimestampExtractor` interface
so that it can also produce to DLQ? AFAIK it's not covered by any of
the existing exception handlers, but it can cause similar failures
(potentially custom logic, depends on validity input record). There
could also be a default implementation as a subclass of
`ExtractRecordMetadataTimestamp`.

L3. It would be nice to include an example of how to produce to
multiple topics in the KIP, as I can imagine that this will be a
common use-case. I wasn't sure how much code would be involved to make
it work. If a lot of code is required, we may want to consider
exposing some utils that make it easier.

Cheers,
Lucas

This email was screened for spam and malicious content but exercise caution 
anyway.



On Sun, Apr 21, 2024 at 7:58 PM Damien Gasparina  wrote:
>
> Hi everyone,
>
> Following all the discussion on this KIP and KIP-1033, we introduced a
> new container class containing only processing context metadata:
> ProcessingMetadata. This new container class is actually part of
> KIP-1033, thus, I added a hard dependency for this KIP on KIP-1033, I
> think it's the wisest implementation wise.
>
> I also clarified the interface of the enums:
> withDeadLetterQueueRecords(Iterable byte[]>> deadLetterQueueRecords) . Very likely most users would just
> send one DLQ record, but there might be specific use-cases and what
> can do more can do less, so I added an Iterable.
>
> I took some time to think about the impact of storing the
> ProcessingMetadata on the ProductionExceptionHandler. I think storing
> the topic/offset/partition should be fine, but I am concerned about
> storing the rawSourceKey/Value. I think it could impact some specific
> use-cases, for example, a high-throughput Kafka Streams application
> "counting" messages could have huge source input messages, and very
> small sink messages, here, I assume storing the rawSourceKey/Value
> could significantly require more memory than the actual Kafka Producer
> buffer.
>
> I think the safest approach is actually to only store the fixed-size
> metadata for the ProductionExceptionHandler.handle:
> topic/partition/offset/processorNodeId/taskId, it might be confusing
> for the user, but 1) it is still better than nowaday where there are
> no context information at all, 2) it would be clearly stated in the
> javadoc, 3) the rawSourceKey/Value are already nullable (e.g. the
> punctuate case). .
>
> Do you think it would be a suitable design Sophie?
>
> Cheers,
> Damien
>
> On Sun, 14 Apr 2024 at 21:30, Loic Greffier  
> wrote:
> >
> > Hi Sophie,
> >
> > Thanks for your feedback.
> > Completing the Damien's comments here for points S1 and S5B.
> >
> > S1:
> > > I'm confused -- are you saying that we're introducing a new kind of 
> > > ProducerRecord class for this?
> >
> > I am wondering if it makes sense to alter the ProducerRecord from Clients 
> > API with a "deadLetterQueueTopicName" attribute dedicated to Kafka Streams 
> > DLQ.
> > Adding "deadLetterQueueTopicName" as an additional parameter to 
> > "withDeadLetterQueueRecord" is a good option, and may allow users to send 
> > records to different DLQ topics depending on conditions:
> > @Override
> > public ProductionExceptionHandlerResponse handle(final ProcessingContext 
> > context,
> > ProducerRecord record,
> > Exception exception) {
> > if (condition1) {
> > return ProductionExceptionHandlerResponse.CONTINUE
> > .withDeadLetterQueueRecord(record, "dlq-topic-a");
> > }
> > if (condition2) {
> > return 

[jira] [Created] (KAFKA-16600) Periodically receive "Failed to transition to PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN" during streams close

2024-04-22 Thread Alex Leung (Jira)
Alex Leung created KAFKA-16600:
--

 Summary: Periodically receive "Failed to transition to 
PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN" during streams close
 Key: KAFKA-16600
 URL: https://issues.apache.org/jira/browse/KAFKA-16600
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.3.2
Reporter: Alex Leung


>From time to time, we observe the following ERROR message during streams close:
{code:java}
2024-04-13 07:40:16,222 INFO o.a.k.s.KafkaStreams [32] stream-client 
[testapp-83aa0e09-a6ef-45e4-9393-69dfba4928bf] State transition from RUNNING to 
PENDING_SHUTDOWN
2024-04-13 07:40:16,222 ERROR o.a.k.s.KafkaStreams [55] stream-client 
[testapp-83aa0e09-a6ef-45e4-9393-69dfba4928bf] Failed to transition to 
PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN
{code}
These ERRORs started showing up when we moved from 2.7.2 to 3.3.2 and we have 
not changed any code related to streams shutdown.



When the problem does not occur (most of the time), it looks like the following:
{code:java}
2024-04-13 07:40:11,333 INFO o.a.k.s.KafkaStreams [55] stream-client 
[testapp-e2212b31-e9c2-4c75-92f6-1e557e27bf66] State transition from RUNNING to 
PENDING_SHUTDOWN
2024-04-13 07:40:11,341 INFO o.a.k.s.KafkaStreams [32] stream-client 
[testapp-e2212b31-e9c2-4c75-92f6-1e557e27bf66] Streams client is in 
PENDING_SHUTDOWN, all resources are being closed and the client will be 
stopped. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1023: Follower fetch from tiered offset

2024-04-22 Thread Abhijeet Kumar
Hi Jun,

Please find my comments inline.


On Thu, Apr 18, 2024 at 3:26 AM Jun Rao  wrote:

> Hi, Abhijeet,
>
> Thanks for the reply.
>
> 1. I am wondering if we could achieve the same result by just lowering
> local.retention.ms and local.retention.bytes. This also allows the newly
> started follower to build up the local data before serving the consumer
> traffic.
>

I am not sure I fully followed this. Do you mean we could lower the
local.retention (by size and time)
so that there is little data on the leader's local storage so that the
follower can quickly catch up with the leader?

In that case, we will need to set small local retention across brokers in
the cluster. It will have the undesired
effect where there will be increased remote log fetches for serving consume
requests, and this can cause
degradations. Also, this behaviour (of increased remote fetches) will
happen on all brokers at all times, whereas in
the KIP we are restricting the behavior only to the newly bootstrapped
brokers and only until the time it fully builds
the local logs as per retention defined at the cluster level.
(Deprioritization of the broker could help reduce the impact
 even further)


>
> 2. Have you updated the KIP?
>

The KIP has been updated now.


>
> Thanks,
>
> Jun
>
> On Tue, Apr 9, 2024 at 3:36 AM Satish Duggana 
> wrote:
>
> > +1 to Jun for adding the consumer fetching from a follower scenario
> > also to the existing section that talked about the drawback when a
> > node built with last-tiered-offset has become a leader. As Abhijeet
> > mentioned, we plan to have a follow-up KIP that will address these by
> > having a deprioritzation of these brokers. The deprioritization of
> > those brokers can be removed once they catchup until the local log
> > retention.
> >
> > Thanks,
> > Satish.
> >
> > On Tue, 9 Apr 2024 at 14:08, Luke Chen  wrote:
> > >
> > > Hi Abhijeet,
> > >
> > > Thanks for the KIP to improve the tiered storage feature!
> > >
> > > Questions:
> > > 1. We could also get the "pending-upload-offset" and epoch via remote
> log
> > > metadata, instead of adding a new API to fetch from the leader. Could
> you
> > > explain why you choose the later approach, instead of the former?
> > > 2.
> > > > We plan to have a follow-up KIP that will address both the
> > > deprioritization
> > > of these brokers from leadership, as well as
> > > for consumption (when fetching from followers is allowed).
> > >
> > > I agree with Jun that we might need to make it clear all possible
> > drawbacks
> > > that could have. So, could we add the drawbacks that Jun mentioned
> about
> > > the performance issue when consumer fetch from follower?
> > >
> > > 3. Could we add "Rejected Alternatives" section to the end of the KIP
> to
> > > add some of them?
> > > Like the "ListOffsetRequest" approach VS
> "Earliest-Pending-Upload-Offset"
> > > approach, or getting the "Earliest-Pending-Upload-Offset" from remote
> log
> > > metadata... etc.
> > >
> > > Thanks.
> > > Luke
> > >
> > >
> > > On Tue, Apr 9, 2024 at 2:25 PM Abhijeet Kumar <
> > abhijeet.cse@gmail.com>
> > > wrote:
> > >
> > > > Hi Christo,
> > > >
> > > > Please find my comments inline.
> > > >
> > > > On Fri, Apr 5, 2024 at 12:36 PM Christo Lolov <
> christolo...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hello Abhijeet and Jun,
> > > > >
> > > > > I have been mulling this KIP over a bit more in recent days!
> > > > >
> > > > > re: Jun
> > > > >
> > > > > I wasn't aware we apply 2.1 and 2.2 for reserving new timestamps -
> in
> > > > > retrospect it should have been fairly obvious. I would need to go
> an
> > > > update
> > > > > KIP-1005 myself then, thank you for giving the useful reference!
> > > > >
> > > > > 4. I think Abhijeet wants to rebuild state from
> latest-tiered-offset
> > and
> > > > > fetch from latest-tiered-offset + 1 only for new replicas (or
> > replicas
> > > > > which experienced a disk failure) to decrease the time a partition
> > spends
> > > > > in under-replicated state. In other words, a follower which has
> just
> > > > fallen
> > > > > out of ISR, but has local data will continue using today's Tiered
> > Storage
> > > > > replication protocol (i.e. fetching from earliest-local). I further
> > > > believe
> > > > > he has taken this approach so that local state of replicas which
> have
> > > > just
> > > > > fallen out of ISR isn't forcefully wiped thus leading to situation
> 1.
> > > > > Abhijeet, have I understood (and summarised) what you are proposing
> > > > > correctly?
> > > > >
> > > > > Yes, your understanding is correct. We want to limit the behavior
> > changes
> > > > only to new replicas.
> > > >
> > > >
> > > > > 5. I think in today's Tiered Storage we know the leader's
> > > > log-start-offset
> > > > > from the FetchResponse and we can learn its local-log-start-offset
> > from
> > > > the
> > > > > ListOffsets by asking for earliest-local timestamp (-4). But
> granted,
> > > > this
> > > > > ought to be added as an 

Re: [DISCUSS] KIP-932: Queues for Kafka

2024-04-22 Thread Jun Rao
Hi, Andrew,

Thanks for the reply.

123. "The share group does not persist the target assignment."
  What's the impact of this? Everytime that GC fails over, it needs to
recompute the assignment for every member. Do we expect the member
assignment to change on every GC failover?

125. Should the GC also write ShareGroupPartitionMetadata?

127. So, group epoch is only propagated to SC when
InitializeShareGroupState request is sent. This sounds good.

130. Should we have a group.share.min.record.lock.duration.ms to pair with
group.share.max.record.lock.duration.ms?

131. Sounds good. The name group.share.record.lock.partition.limit doesn't
seem very intuitive. How about something
like group.share.partition.max.records.pending.ack?

136. Could we describe the process of GC failover? I guess it needs to
compute member reassignment and check if there is any new topic/partition
matching the share subscription. Does it bump up the group epoch?

137. Metrics:
137.1 It would be useful to document who reports each metric. Is it any
broker, GC, SC or SPL?
137.2 partition-load-time: Is that the loading time at SPL or SC?
137.3 "The time taken in milliseconds to load the share-group state from
the share-group state partitions loaded in the last 30 seconds."
  The window depends on metrics.num.samples and metrics.sample.window.ms
and is not always 30 seconds, right?
137.4 Could you explain write/write-latency a bit more? Does it include the
time to write to the internal topic?

Jun

On Mon, Apr 22, 2024 at 2:57 AM Andrew Schofield 
wrote:

> Hi Jun,
> Thanks for your comments.
>
> 120. Thanks. Fixed.
>
> 121. ShareUpdateValue.SnapshotEpoch indicates which snapshot
> the update applies to. It should of course be the snapshot that precedes
> it in the log. It’s just there to provide a consistency check.
>
> I also noticed that ShareSnapshotValue was missing StateEpoch. It
> isn’t any more.
>
> 122. In KIP-848, ConsumerGroupMemberMetadataValue includes
> GroupEpoch, but in the code it does not. In fact, there is considerable
> divergence between the KIP and the code for this record value schema
> which I expect will be resolved when the migration code has been
> completed.
>
> 123. The share group does not persist the target assignment.
>
> 124. Share groups have three kinds of record:
> i) ShareGroupMetadata
>   - this contains the group epoch and is written whenever the group
> epoch changes.
> ii) ShareGroupMemberMetadata
>- this does not contain the group epoch.
> iii) ShareGroupPartitionMetadata
>- this currently contains the epoch, but I think that is unnecessary.
>  For one thing, the ConsumerGroupPartitionMetadata definition
>  contains the group epoch, but the value appears never to be set.
>  David Jacot confirms that it’s not necessary and is removing it.
>
> I have removed the Epoch from ShareGroupPartitionMetadata.
> The only purpose of the persisting the epoch for a share group is so that
> when a group coordinator takes over the share group, it is able to
> continue the sequence of epochs. ShareGroupMetadataValue.Epoch
> is used for this.
>
> 125. The group epoch will be incremented in this case and
> consequently a ShareGroupMetadata will be written. KIP updated.
>
> 126. Not directly. A share group can only be deleted when it has no
> members, so the tombstones for ShareGroupMemberMetadata will
> have been written when the members left. I have clarified this.
>
> 127. The share coordinator is ignorant of the group epoch. When the
> group coordinator is initializing the share-group state the first time that
> a share-partition is being added to an assignment in the group, the
> group epoch is used as the state epoch. But as the group epoch
> increases over time, the share coordinator is entirely unaware.
>
> When the first consumer for a share-partition fetches records from a
> share-partition leader, the SPL calls the share coordinator to
> ReadShareGroupState. If the SPL has previously read the information
> and again it’s going from 0 to 1 consumer, it confirms it's up to date by
> calling ReadShareGroupOffsetsState.
>
> Even if many consumers are joining at the same time, any share-partition
> which is being initialized will not be included in their assignments. Once
> the initialization is complete, the next rebalance will assign the
> partition
> to some consumers which will discover this by ShareGroupHeartbeat
> response. And then, the fetching begins.
>
> If an SPL receives a ShareFetch request before it’s read the state
> from the SC, it can make the ShareFetch request wait up to MaxWaitMs
> and then it can return an empty set of records if it’s still not ready.
>
> So, I don’t believe there will be too much load. If a topic with many
> partitions is added to the subscribed topics for a share group, the fact
> that the assignments will only start to include the partitions as their
> initialization completes should soften the impact.
>
> 128, 129: The “proper” way to turn on 

Re: [VOTE] KIP-853: KRaft Controller Membership Changes

2024-04-22 Thread José Armando García Sancio
I am going to close the vote tomorrow morning (PST).

On Mon, Apr 22, 2024 at 10:06 AM José Armando García Sancio
 wrote:
>
> +1 binding.
>
> On Mon, Apr 22, 2024 at 9:28 AM Jason Gustafson
>  wrote:
> >
> > Thanks Jose. +1. Great KIP!
> >
> > On Fri, Mar 29, 2024 at 11:16 AM Jun Rao  wrote:
> >
> > > Hi, Jose,
> > >
> > > Thanks for the KIP. +1
> > >
> > > Jun
> > >
> > > On Fri, Mar 29, 2024 at 9:55 AM José Armando García Sancio
> > >  wrote:
> > >
> > > > Hi all,
> > > >
> > > > I would like to call a vote to adopt KIP-853.
> > > >
> > > > KIP: https://cwiki.apache.org/confluence/x/nyH1D
> > > > Discussion thread:
> > > > https://lists.apache.org/thread/6o3sjvcb8dx1ozqfpltb7p0w76b4nd46
> > > >
> > > > Thanks,
> > > > --
> > > > -José
> > > >
> > >
>
>
>
> --
> -José



-- 
-José


Re: [DISCUSS] KIP-1023: Follower fetch from tiered offset

2024-04-22 Thread Abhijeet Kumar
Hi Luke,

Thanks for your comments. Please find my responses inline.

On Tue, Apr 9, 2024 at 2:08 PM Luke Chen  wrote:

> Hi Abhijeet,
>
> Thanks for the KIP to improve the tiered storage feature!
>
> Questions:
> 1. We could also get the "pending-upload-offset" and epoch via remote log
> metadata, instead of adding a new API to fetch from the leader. Could you
> explain why you choose the later approach, instead of the former?
>

The remote log metadata could be tracking overlapping log segments. The
maximum offset
across the log segments it may be tracking, may not be the
last-tiered-offset because of truncations
during unclean leader election. Remote Log metadata alone is not sufficient
to identify last-tiered-offset or
pending-upload-offset.

Only the leader knows the correct lineage of offsets that is required to
identify the "pending-upload-offset".
That is the reason we chose to add a new API to fetch this information from
the leader.


2.
> > We plan to have a follow-up KIP that will address both the
> deprioritization
> of these brokers from leadership, as well as
> for consumption (when fetching from followers is allowed).
>
> I agree with Jun that we might need to make it clear all possible drawbacks
> that could have. So, could we add the drawbacks that Jun mentioned about
> the performance issue when consumer fetch from follower?
>
>
Updated the KIP to call this out.


> 3. Could we add "Rejected Alternatives" section to the end of the KIP to
> add some of them?
> Like the "ListOffsetRequest" approach VS "Earliest-Pending-Upload-Offset"
> approach, or getting the "Earliest-Pending-Upload-Offset" from remote log
> metadata... etc.
>
> Added the section on Rejected Alternatives


> Thanks.
> Luke
>
>
> On Tue, Apr 9, 2024 at 2:25 PM Abhijeet Kumar 
> wrote:
>
> > Hi Christo,
> >
> > Please find my comments inline.
> >
> > On Fri, Apr 5, 2024 at 12:36 PM Christo Lolov 
> > wrote:
> >
> > > Hello Abhijeet and Jun,
> > >
> > > I have been mulling this KIP over a bit more in recent days!
> > >
> > > re: Jun
> > >
> > > I wasn't aware we apply 2.1 and 2.2 for reserving new timestamps - in
> > > retrospect it should have been fairly obvious. I would need to go an
> > update
> > > KIP-1005 myself then, thank you for giving the useful reference!
> > >
> > > 4. I think Abhijeet wants to rebuild state from latest-tiered-offset
> and
> > > fetch from latest-tiered-offset + 1 only for new replicas (or replicas
> > > which experienced a disk failure) to decrease the time a partition
> spends
> > > in under-replicated state. In other words, a follower which has just
> > fallen
> > > out of ISR, but has local data will continue using today's Tiered
> Storage
> > > replication protocol (i.e. fetching from earliest-local). I further
> > believe
> > > he has taken this approach so that local state of replicas which have
> > just
> > > fallen out of ISR isn't forcefully wiped thus leading to situation 1.
> > > Abhijeet, have I understood (and summarised) what you are proposing
> > > correctly?
> > >
> > > Yes, your understanding is correct. We want to limit the behavior
> changes
> > only to new replicas.
> >
> >
> > > 5. I think in today's Tiered Storage we know the leader's
> > log-start-offset
> > > from the FetchResponse and we can learn its local-log-start-offset from
> > the
> > > ListOffsets by asking for earliest-local timestamp (-4). But granted,
> > this
> > > ought to be added as an additional API call in the KIP.
> > >
> > >
> > Yes, I clarified this in my reply to Jun. I will add this missing detail
> in
> > the KIP.
> >
> >
> > > re: Abhijeet
> > >
> > > 101. I am still a bit confused as to why you want to include a new
> offset
> > > (i.e. pending-upload-offset) when you yourself mention that you could
> use
> > > an already existing offset (i.e. last-tiered-offset + 1). In essence,
> you
> > > end your Motivation with "In this KIP, we will focus only on the
> follower
> > > fetch protocol using the *last-tiered-offset*" and then in the
> following
> > > sections you talk about pending-upload-offset. I understand this might
> be
> > > classified as an implementation detail, but if you introduce a new
> offset
> > > (i.e. pending-upload-offset) you have to make a change to the
> ListOffsets
> > > API (i.e. introduce -6) and thus document it in this KIP as such.
> > However,
> > > the last-tiered-offset ought to already be exposed as part of KIP-1005
> > > (under implementation). Am I misunderstanding something here?
> > >
> >
> > I have tried to clarify this in my reply to Jun.
> >
> > > The follower needs to build the local data starting from the offset
> > > Earliest-Pending-Upload-Offset. Hence it needs the offset and the
> > > corresponding leader-epoch.
> > > There are two ways to do this:
> > >1. We add support in ListOffsetRequest to be able to fetch this
> offset
> > > (and leader epoch) from the leader.
> > >2. Or, fetch the tiered-offset (which is already supported). From
> this
> 

Re: [DISCUSS] KIP-1028: Docker Official Image for Apache Kafka

2024-04-22 Thread Vedarth Sharma
Hey folks,

Thanks a lot for reviewing the KIP and providing feedback.
The discussion thread seems resolved and KIP has been updated accordingly.
We will be starting the voting thread for this KIP in the next few days.
Please take a look at the KIP and let us know if any further discussion
is needed.

Thanks and regards,
Vedarth

On Fri, Apr 19, 2024 at 1:33 PM Manikumar  wrote:

> Thanks Krish. KIP looks good to me.
>
> On Wed, Apr 17, 2024 at 1:38 PM Krish Vora  wrote:
> >
> > Hi Manikumar,
> >
> > Thanks for the comments.
> >
> > Maybe as part of the release process, RM can create a JIRA for this
> > > task. This can be taken by RM or any comitter or any contributor (with
> > > some help from commiters to run "Docker Image Preparation via GitHub
> > > Actions:"
> >
> > This sounds like a good idea. This step would be beneficial. By creating
> a
> > JIRA ticket, it will also serve as a reminder to complete the
> post-release
> > steps for the Docker official images. Have updated the KIP with this
> step.
> >
> > Is this using GitHub Actions workflow? or manual testing?
> >
> > This will be done by a Github Actions workflow, which will test the
> static
> > Docker Official Image assets for a specific release version.
> >
> > Is it mandatory for RM/comitters to raise the PR to Docker Hub’s
> > > official images repository (or) can it be done by any contributor.
> >
> > I believe that it can be done by any contributor (ref: This link
> > 
> > quotes "*Anyone can provide feedback, contribute code, suggest process
> > changes, or even propose a new Official Image.*")
> >
> > Also I was thinking, once the KIP gets voted, we should try to release
> > > kafka:3.7.0 (or 3.7.1) Docker Official image. This will help us to
> > > validate the process and allow us to fix any changes suggested by
> > > Dockerhub before the 3.8.0 release.
> >
> > This sounds like a great idea. This KIP proposes release of DOI as a
> > post-release process, which can be done anytime post release. Since 3.7.0
> > is already released, we can perform these steps for that release too. By
> > the time the KIP gets implemented, if 3.7.1 is released, we could do
> these
> > steps for 3.7.1, instead of 3.7.0. This would allow us to make changes to
> > the Dockerfiles and other assets based on feedback from Docker Hub before
> > the release of version 3.8.0.
> >
> > Thanks,
> > Krish.
> >
> > On Mon, Apr 15, 2024 at 12:59 PM Manikumar 
> > wrote:
> >
> > > Hi Krish,
> > >
> > > Thanks for the updated KIP. a few comments below.
> > >
> > > > "These actions can be carried out by the RM or any contributor post
> the
> > > release process."
> > > Maybe as part of the release process, RM can create a JIRA for this
> > > task. This can be taken by RM or any comitter or any contributor (with
> > > some help from commiters to run "Docker Image Preparation via GitHub
> > > Actions:"
> > >
> > > > "Perform Docker build tests to ensure image integrity"
> > > Is this using GitHub Actions workflow? or manual testing?
> > >
> > > > "The RM will manually raise the final PR to Docker Hub’s official
> images
> > > repository using the contents of the generated file"
> > >  Is it mandatory for RM/comitters to raise the PR to Docker Hub’s
> > > official images repository (or) can it be done by any contributor.
> > >
> > > Also I was thinking, once the KIP gets voted, we should try to release
> > > kafka:3.7.0 (or 3.7.1) Docker Official image. This will help us to
> > > validate the process and allow us to fix any changes suggested by
> > > Dockerhub before the 3.8.0 release.
> > >
> > >
> > > Thanks,
> > >
> > > On Mon, Apr 8, 2024 at 2:33 PM Krish Vora 
> wrote:
> > > >
> > > > Hi Manikumar and Luke.
> > > > Thanks for the questions.
> > > >
> > > > 1. No, the Docker inventory files and configurations will not be the
> same
> > > > for Open Source Software (OSS) Images and Docker Official Images
> (DOI).
> > > >
> > > > For OSS images, the Dockerfile located in docker/jvm/dockerfile is
> > > > utilized. This process is integrated with the existing release
> pipeline
> > > as
> > > > outlined in KIP-975
> > > > <
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka#KIP975:DockerImageforApacheKafka-Status
> > > >,
> > > > where the Kafka URL is provided as a build argument. This method
> allows
> > > for
> > > > building, testing, and releasing OSS images dynamically. The OSS
> images
> > > > will continue to be released under the standard release process .
> > > >
> > > > In contrast, the release process for DOIs requires providing the
> Docker
> > > Hub
> > > > team with a specific directory for each version release that
> contains a
> > > > standalone Dockerfile. These Dockerfiles are designed to be
> > > > self-sufficient, hence require hardcoded values instead of relying on
> > > build
> > > > arguments. To accommodate this, in our proposed 

Re: [VOTE] KIP-853: KRaft Controller Membership Changes

2024-04-22 Thread José Armando García Sancio
+1 binding.

On Mon, Apr 22, 2024 at 9:28 AM Jason Gustafson
 wrote:
>
> Thanks Jose. +1. Great KIP!
>
> On Fri, Mar 29, 2024 at 11:16 AM Jun Rao  wrote:
>
> > Hi, Jose,
> >
> > Thanks for the KIP. +1
> >
> > Jun
> >
> > On Fri, Mar 29, 2024 at 9:55 AM José Armando García Sancio
> >  wrote:
> >
> > > Hi all,
> > >
> > > I would like to call a vote to adopt KIP-853.
> > >
> > > KIP: https://cwiki.apache.org/confluence/x/nyH1D
> > > Discussion thread:
> > > https://lists.apache.org/thread/6o3sjvcb8dx1ozqfpltb7p0w76b4nd46
> > >
> > > Thanks,
> > > --
> > > -José
> > >
> >



-- 
-José


Re: [VOTE] KIP-853: KRaft Controller Membership Changes

2024-04-22 Thread Jason Gustafson
Thanks Jose. +1. Great KIP!

On Fri, Mar 29, 2024 at 11:16 AM Jun Rao  wrote:

> Hi, Jose,
>
> Thanks for the KIP. +1
>
> Jun
>
> On Fri, Mar 29, 2024 at 9:55 AM José Armando García Sancio
>  wrote:
>
> > Hi all,
> >
> > I would like to call a vote to adopt KIP-853.
> >
> > KIP: https://cwiki.apache.org/confluence/x/nyH1D
> > Discussion thread:
> > https://lists.apache.org/thread/6o3sjvcb8dx1ozqfpltb7p0w76b4nd46
> >
> > Thanks,
> > --
> > -José
> >
>


[jira] [Resolved] (KAFKA-16103) Review client logic for triggering offset commit callbacks

2024-04-22 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy resolved KAFKA-16103.

Resolution: Fixed

> Review client logic for triggering offset commit callbacks
> --
>
> Key: KAFKA-16103
> URL: https://issues.apache.org/jira/browse/KAFKA-16103
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lucas Brutschy
>Priority: Critical
>  Labels: kip-848-client-support, offset
> Fix For: 3.8.0
>
>
> Review logic for triggering commit callbacks, ensuring that all callbacks are 
> triggered before returning from commitSync



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16599) Always await async commit callbacks in commitSync and close

2024-04-22 Thread Lucas Brutschy (Jira)
Lucas Brutschy created KAFKA-16599:
--

 Summary: Always await async commit callbacks in commitSync and 
close
 Key: KAFKA-16599
 URL: https://issues.apache.org/jira/browse/KAFKA-16599
 Project: Kafka
  Issue Type: Task
Reporter: Lucas Brutschy






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-22 Thread Damien Gasparina
Hi Bruno,

1) Good point, naming stuff is definitely hard, I renamed
ProcessingMetadata to ErrorHandlerContext

> Is there any reason you did not use something like
> Record sourceRecord()

2) The record class is used in many locations and, I assume, could be
expanded by new features.
As the error handler metadata could be stored in memory for a longer
duration due to the ProductionExceptionHandle, I think it is wiser to
have an independent class that is the leanest possible.
Maybe I am overthinking this, what do you think?

3) Oops, good point, I didn't notice that it was still there. I
removed the parameter.

4) To clarify the KIP, I specified the DeserializationExceptionHandler
interface in the KIP.

5) Good point, I updated the KIP with the order you mentioned:
context, record, exception

6) I was indeed thinking of extending the ProcessingContext to store
it there. Let me update the KIP to make it clear.

>From past experience, deprecating an interface is always a bit
painful, in this KIP, I relied on default implementation to ensure
backward compatibility while encouraging people to implement the new
method signature. If you know a better approach, I'll take :-)

Cheers,
Damien

On Mon, 22 Apr 2024 at 11:01, Bruno Cadonna  wrote:
>
> Hi Damien,
>
> Thanks a lot for the updates!
>
> I have the following comments:
>
> (1)
> Could you rename ProcessingMetadata to ErrorHandlerContext or
> ErrorHandlerMetadata (I am preferring the former)? I think it makes it
> clearer for what this context/metadata is for.
>
>
> (2)
> Is there any reason you did not use something like
>
> Record sourceRecord()
>
> in ProcessingMetadata instead of sourceRawKey() and sourceRawValue() and
> headers()? The headers() method refers to the record read from the input
> topic of the sub-topology, right? If yes, maybe that is also something
> to mention more explicitly.
>
>
> (3)
> Since you added the processor node ID to the ProcessingMetadata, you can
> remove it from the signature of method handle() in
> ProcessingExceptionHandler.
>
>
> (4)
> Where are the mentioned changes to the DeserializationExceptionHandler?
>
>
> (5)
> To be consistent, the order of the parameters in the
> ProductionExceptionHandler should be:
> 1. context
> 2. record
> 3. exception
>
>
> (6)
> I am wondering where the implementation of ProcessingMetadata gets the
> sourceRawKey/Value from. Do we need additional changes in
> ProcessingContext and implementations?
>
>
> Best,
> Bruno
>
>
> On 4/21/24 2:23 PM, Damien Gasparina wrote:
> > Hi Everyone,
> >
> > Following the discussions on KIP-1033 and KIP-1034, we did a few changes:
> >- We introduced a new ProcessingMetadata class containing only the
> > ProcessorContext metadata: topic, partition, offset, headers[],
> > sourceRawKey, sourceRawValue, TaskId, ProcessorNodeName
> >- To be consistent, we propose to deprecate the existing
> > DeserializationExceptionHandler and ProductionExceptionHandler methods
> > to rely on the new ProcessingMetadata
> >- The creation and the ProcessingMetadata and the deprecation of old
> > methods is owned by KIP-1033, KIP-1034 (DLQ) is now only focusing on
> > Dead Letter Queue implementation without touching any interfaces. We
> > introduced a hard dependency for KIP-1034 regarding KIP-1033, we think
> > it's the wisest implementation wise.
> > - Instead of creating a new metric, KIP-1033 updates the
> > dropped-record metric.
> >
> > Let me know what you think, if everything's fine, I think we should be
> > good to start a VOTE?
> >
> > Cheers,
> > Damien
> >
> >
> >
> >
> >
> > On Fri, 12 Apr 2024 at 22:24, Sophie Blee-Goldman  
> > wrote:
> >>
> >> Fully agree about creating a new class for the bits of ProcessingContext
> >> that are specific to metadata only. In fact, more or less this same point
> >> just came up in the related KIP 1034 for DLQs, since the RecordMetadata
> >> can't always be trusted to remain immutable. Maybe it's possible to solve
> >> both issues at once, with the same class?
> >>
> >> On another related note -- I had actually also just proposed that we
> >> deprecate the existing DeserializationExceptionHandler method and replace
> >> it with one using the new PAPI as part of KIP-1034. But now that I'm
> >> reading this, I would say it probably makes more sense to do in this KIP.
> >> We can also push that out into a smaller-scoped third KIP if you want, but
> >> clearly, there is some overlap here and so however you guys (the authors)
> >> want to organize this part of the work is fine with me. I do think it
> >> should be done alongside/before this KIP and 1034 though, for all the
> >> reasons already stated.
> >>
> >> Everything else in the discussion so far I agree with! The
> >> ProcessingContext thing is the only open question in my mind
> >>
> >> On Thu, Apr 11, 2024 at 5:41 AM Damien Gasparina 
> >> wrote:
> >>
> >>> Hi Matthias, Bruno,
> >>>
> >>> 1.a During my previous comment, by Processor Node ID, I meant
> >>> 

[jira] [Created] (KAFKA-16598) Mirgrate `ResetConsumerGroupOffsetTest` to new test infra

2024-04-22 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16598:
--

 Summary: Mirgrate `ResetConsumerGroupOffsetTest` to new test infra
 Key: KAFKA-16598
 URL: https://issues.apache.org/jira/browse/KAFKA-16598
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


as title.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-04-22 Thread Alieh Saeedi
Thank you all for the feedback!

Addressing the main concern: The KIP is about giving the user the ability
to handle producer exceptions, but to be more conservative and avoid future
issues, we decided to be limited to a short list of exceptions. I included
*RecordTooLargeExceptin* and *UnknownTopicOrPartitionException. *Open to
suggestion for adding some more ;-)

KIP Updates:
- clarified the way that the user should configure the Producer to use the
custom handler. I think adding a producer config property is the cleanest
one.
- changed the *ClientExceptionHandler* to *ProducerExceptionHandler* to be
closer to what we are changing.
- added the ProducerRecord as the input parameter of the handle() method as
well.
- increased the response types to 3 to have fail and two types of continue.
- The default behaviour is having no custom handler, having the
corresponding config parameter set to null. Therefore, the KIP provides no
default implementation of the interface.
- We follow the interface solution as described in the
Rejected Alternetives section.


Cheers,
Alieh


On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax  wrote:

> Thanks for the KIP Alieh! It addresses an important case for error
> handling.
>
> I agree that using this handler would be an expert API, as mentioned by
> a few people. But I don't think it would be a reason to not add it. It's
> always a tricky tradeoff what to expose to users and to avoid foot guns,
> but we added similar handlers to Kafka Streams, and have good experience
> with it. Hence, I understand, but don't share the concern raised.
>
> I also agree that there is some responsibility by the user to understand
> how such a handler should be implemented to not drop data by accident.
> But it seem unavoidable and acceptable.
>
> While I understand that a "simpler / reduced" API (eg via configs) might
> also work, I personally prefer a full handler. Configs have the same
> issue that they could be miss-used potentially leading to incorrectly
> dropped data, but at the same time are less flexible (and thus maybe
> ever harder to use correctly...?). Base on my experience, there is also
> often weird corner case for which it make sense to also drop records for
> other exceptions, and a full handler has the advantage of full
> flexibility and "absolute power!".
>
> To be fair: I don't know the exact code paths of the producer in
> details, so please keep me honest. But my understanding is, that the KIP
> aims to allow users to react to internal exception, and decide to keep
> retrying internally, swallow the error and drop the record, or raise the
> error?
>
> Maybe the KIP would need to be a little bit more precises what error we
> want to cover -- I don't think this list must be exhaustive, as we can
> always do follow up KIP to also apply the handler to other errors to
> expand the scope of the handler. The KIP does mention examples, but it
> might be good to explicitly state for what cases the handler gets applied?
>
> I am also not sure if CONTINUE and FAIL are enough options? Don't we
> need three options? Or would `CONTINUE` have different meaning depending
> on the type of error? Ie, for a retryable error `CONTINUE` would mean
> keep retrying internally, but for a non-retryable error `CONTINUE` means
> swallow the error and drop the record? This semantic overload seems
> tricky to reason about by users, so it might better to split `CONTINUE`
> into two cases -> `RETRY` and `SWALLOW` (or some better names).
>
> Additionally, should we just ship a `DefaultClientExceptionHandler`
> which would return `FAIL`, for backward compatibility. Or don't have any
> default handler to begin with and allow it to be `null`? I don't see the
> need for a specific `TransactionExceptionHandler`. To me, the goal
> should be to not modify the default behavior at all, but to just allow
> users to change the default behavior if there is a need.
>
> What is missing on the KIP though it, how the handler is passed into the
> producer thought? Would we need a new config which allows to set a
> custom handler? And/or would we allow to pass in an instance via the
> constructor or add a new method to set a handler?
>
>
> -Matthias
>
> On 4/18/24 10:02 AM, Andrew Schofield wrote:
> > Hi Alieh,
> > Thanks for the KIP.
> >
> > Exception handling in the Kafka producer and consumer is really not
> ideal.
> > It’s even harder working out what’s going on with the consumer.
> >
> > I’m a bit nervous about this KIP and I agree with Chris that it could do
> with additional
> > motivation. This would be an expert-level interface given how complicated
> > the exception handling for Kafka has become.
> >
> > 7. The application is not really aware of the batching being done on its
> behalf.
> > The ProduceResponse can actually return an array of records which failed
> > per batch. If you get RecordTooLargeException, and want to retry, you
> probably
> > need to remove the offending records from the batch and retry it. 

[jira] [Resolved] (KAFKA-16549) suppress the warnings from RemoteLogManager

2024-04-22 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16549.

Fix Version/s: 3.8.0
   Resolution: Fixed

> suppress the warnings from RemoteLogManager
> ---
>
> Key: KAFKA-16549
> URL: https://issues.apache.org/jira/browse/KAFKA-16549
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: charliecheng
>Priority: Trivial
> Fix For: 3.8.0
>
>
> {quote}
> /home/chia7712/project/kafka/core/src/main/java/kafka/log/remote/RemoteLogManager.java:234:
>  warning: [removal] AccessController in java.security has been deprecated and 
> marked for removal
> return java.security.AccessController.doPrivileged(new 
> PrivilegedAction() {
> ^
> /home/chia7712/project/kafka/core/src/main/java/kafka/log/remote/RemoteLogManager.java:256:
>  warning: [removal] AccessController in java.security has been deprecated and 
> marked for removal
> return java.security.AccessController.doPrivileged(new 
> PrivilegedAction() {
> {quote}
> we should add @SuppressWarnings("removal") to those methods



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-22 Thread Lucas Brutschy
Hi!

Thanks for the KIP, great stuff.

L1. I was a bit confused that the default configuration (once you set
a DLQ topic) is going to be fail-and-send-to-DLQ, if I understood
correctly. Is this something that will be a common use-case, and is it
a configuration that we want to encourage? It expected that you either
want to fail or skip-and-send-to-DLQ.

L2. Have you considered extending the `TimestampExtractor` interface
so that it can also produce to DLQ? AFAIK it's not covered by any of
the existing exception handlers, but it can cause similar failures
(potentially custom logic, depends on validity input record). There
could also be a default implementation as a subclass of
`ExtractRecordMetadataTimestamp`.

L3. It would be nice to include an example of how to produce to
multiple topics in the KIP, as I can imagine that this will be a
common use-case. I wasn't sure how much code would be involved to make
it work. If a lot of code is required, we may want to consider
exposing some utils that make it easier.

Cheers,
Lucas

On Sun, Apr 21, 2024 at 7:58 PM Damien Gasparina  wrote:
>
> Hi everyone,
>
> Following all the discussion on this KIP and KIP-1033, we introduced a
> new container class containing only processing context metadata:
> ProcessingMetadata. This new container class is actually part of
> KIP-1033, thus, I added a hard dependency for this KIP on KIP-1033, I
> think it's the wisest implementation wise.
>
> I also clarified the interface of the enums:
> withDeadLetterQueueRecords(Iterable byte[]>> deadLetterQueueRecords) . Very likely most users would just
> send one DLQ record, but there might be specific use-cases and what
> can do more can do less, so I added an Iterable.
>
> I took some time to think about the impact of storing the
> ProcessingMetadata on the ProductionExceptionHandler. I think storing
> the topic/offset/partition should be fine, but I am concerned about
> storing the rawSourceKey/Value. I think it could impact some specific
> use-cases, for example, a high-throughput Kafka Streams application
> "counting" messages could have huge source input messages, and very
> small sink messages, here, I assume storing the rawSourceKey/Value
> could significantly require more memory than the actual Kafka Producer
> buffer.
>
> I think the safest approach is actually to only store the fixed-size
> metadata for the ProductionExceptionHandler.handle:
> topic/partition/offset/processorNodeId/taskId, it might be confusing
> for the user, but 1) it is still better than nowaday where there are
> no context information at all, 2) it would be clearly stated in the
> javadoc, 3) the rawSourceKey/Value are already nullable (e.g. the
> punctuate case). .
>
> Do you think it would be a suitable design Sophie?
>
> Cheers,
> Damien
>
> On Sun, 14 Apr 2024 at 21:30, Loic Greffier  
> wrote:
> >
> > Hi Sophie,
> >
> > Thanks for your feedback.
> > Completing the Damien's comments here for points S1 and S5B.
> >
> > S1:
> > > I'm confused -- are you saying that we're introducing a new kind of 
> > > ProducerRecord class for this?
> >
> > I am wondering if it makes sense to alter the ProducerRecord from Clients 
> > API with a "deadLetterQueueTopicName" attribute dedicated to Kafka Streams 
> > DLQ.
> > Adding "deadLetterQueueTopicName" as an additional parameter to 
> > "withDeadLetterQueueRecord" is a good option, and may allow users to send 
> > records to different DLQ topics depending on conditions:
> > @Override
> > public ProductionExceptionHandlerResponse handle(final ProcessingContext 
> > context,
> >  ProducerRecord > byte[]> record,
> >  Exception exception) {
> > if (condition1) {
> > return ProductionExceptionHandlerResponse.CONTINUE
> >.withDeadLetterQueueRecord(record, "dlq-topic-a");
> > }
> > if (condition2) {
> > return ProductionExceptionHandlerResponse.CONTINUE
> > .withDeadLetterQueueRecord(record, "dlq-topic-b");
> > }
> > return ProductionExceptionHandlerResponse.CONTINUE
> > .withDeadLetterQueueRecord(record, "dlq-topic-c");
> > }
> >
> > S5B:
> > > I was having a bit of trouble understanding what the behavior would be if 
> > > someone configured a "errors.deadletterqueue.topic.name" but didn't 
> > > implement the handlers.
> >
> > The provided LogAndContinueExceptionHandler, LogAndFailExceptionHandler and 
> > DefaultProductionExceptionHandler should be able to tell if records should 
> > be sent to DLQ or not.
> > The "errors.deadletterqueue.topic.name" takes place to:
> >
> >   *   Specifying if the provided handlers should or should not send records 
> > to DLQ.
> >  *   If the value is empty, the handlers should not send records to DLQ.
> >  *   If the value is not empty, the handlers should send records to DLQ.
> >   *   Define the name of the DLQ topic that should be used by the provided 
> > 

Re: [VOTE] KIP-1037: Allow WriteTxnMarkers API with Alter Cluster Permission

2024-04-22 Thread Andrew Schofield
Hi Nikhil,
Thanks for the KIP. Looks good to me.

+1 (non-binding)

Thanks,
Andrew

> On 22 Apr 2024, at 09:17, Christo Lolov  wrote:
>
> Heya Nikhil,
>
> Thanks for the proposal, as mentioned before it makes sense to me!
>
> +1 (binding)
>
> Best,
> Christo
>
> On Sat, 20 Apr 2024 at 00:25, Justine Olshan 
> wrote:
>
>> Hey Nikhil,
>>
>> I meant to comment on the discussion thread, but my draft took so long, you
>> opened the vote.
>>
>> Regardless, I just wanted to say that it makes sense to me. +1 (binding)
>>
>> Justine
>>
>> On Fri, Apr 19, 2024 at 7:22 AM Nikhil Ramakrishnan <
>> ramakrishnan.nik...@gmail.com> wrote:
>>
>>> Hi everyone,
>>>
>>> I would like to start a voting thread for KIP-1037: Allow
>>> WriteTxnMarkers API with Alter Cluster Permission
>>> (
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1037%3A+Allow+WriteTxnMarkers+API+with+Alter+Cluster+Permission
>>> )
>>> as there have been no objections on the discussion thread.
>>>
>>> For comments or feedback please check the discussion thread here:
>>> https://lists.apache.org/thread/bbkyt8mrc8xp3jfyvhph7oqtjxl29xmn
>>>
>>> Thanks,
>>> Nikhil
>>>
>>



[jira] [Resolved] (KAFKA-16596) Flaky test – org.apache.kafka.clients.ClientUtilsTest.testParseAndValidateAddressesWithReverseLookup()

2024-04-22 Thread Viktor Somogyi-Vass (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass resolved KAFKA-16596.
-
Fix Version/s: 3.8.0
 Assignee: Andras Katona
   Resolution: Fixed

> Flaky test – 
> org.apache.kafka.clients.ClientUtilsTest.testParseAndValidateAddressesWithReverseLookup()
>  
> ---
>
> Key: KAFKA-16596
> URL: https://issues.apache.org/jira/browse/KAFKA-16596
> Project: Kafka
>  Issue Type: Test
>Reporter: Igor Soarez
>Assignee: Andras Katona
>Priority: Major
>  Labels: GoodForNewContributors, good-first-issue
> Fix For: 3.8.0
>
>
> org.apache.kafka.clients.ClientUtilsTest.testParseAndValidateAddressesWithReverseLookup()
>  failed in the following way:
>  
> {code:java}
> org.opentest4j.AssertionFailedError: Unexpected addresses [93.184.215.14, 
> 2606:2800:21f:cb07:6820:80da:af6b:8b2c] ==> expected:  but was:  
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)   
>  at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) 
> at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)  
>   at 
> app//org.apache.kafka.clients.ClientUtilsTest.testParseAndValidateAddressesWithReverseLookup(ClientUtilsTest.java:65)
>  {code}
> As a result of the following assertions:
>  
> {code:java}
> // With lookup of example.com, either one or two addresses are expected 
> depending on
> // whether ipv4 and ipv6 are enabled
> List validatedAddresses = 
> checkWithLookup(asList("example.com:1"));
> assertTrue(validatedAddresses.size() >= 1, "Unexpected addresses " + 
> validatedAddresses);
> List validatedHostNames = 
> validatedAddresses.stream().map(InetSocketAddress::getHostName)
> .collect(Collectors.toList());
> List expectedHostNames = asList("93.184.216.34", 
> "2606:2800:220:1:248:1893:25c8:1946"); {code}
> It seems that the DNS result has changed for example.com.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16597) Flaky test - org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificStalePartitionStoresMultiStreamThreads()

2024-04-22 Thread Igor Soarez (Jira)
Igor Soarez created KAFKA-16597:
---

 Summary: Flaky test - 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificStalePartitionStoresMultiStreamThreads()
 Key: KAFKA-16597
 URL: https://issues.apache.org/jira/browse/KAFKA-16597
 Project: Kafka
  Issue Type: Test
  Components: streams
Reporter: Igor Soarez


org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificStalePartitionStoresMultiStreamThreads()
 failed with:
{code:java}
Error

org.apache.kafka.streams.errors.InvalidStateStorePartitionException: The 
specified partition 1 for store source-table does not exist.

Stacktrace

org.apache.kafka.streams.errors.InvalidStateStorePartitionException: The 
specified partition 1 for store source-table does not exist.   at 
app//org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:63)
at 
app//org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:53)
 at 
app//org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificStalePartitionStoresMultiStreamThreads(StoreQueryIntegrationTest.java:411)
 {code}
 

https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15701/2/tests/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16596) Flaky test – org.apache.kafka.clients.ClientUtilsTest.testParseAndValidateAddressesWithReverseLookup()

2024-04-22 Thread Igor Soarez (Jira)
Igor Soarez created KAFKA-16596:
---

 Summary: Flaky test – 
org.apache.kafka.clients.ClientUtilsTest.testParseAndValidateAddressesWithReverseLookup()
 
 Key: KAFKA-16596
 URL: https://issues.apache.org/jira/browse/KAFKA-16596
 Project: Kafka
  Issue Type: Test
Reporter: Igor Soarez


org.apache.kafka.clients.ClientUtilsTest.testParseAndValidateAddressesWithReverseLookup()
 failed in the following way:

 
{code:java}
org.opentest4j.AssertionFailedError: Unexpected addresses [93.184.215.14, 
2606:2800:21f:cb07:6820:80da:af6b:8b2c] ==> expected:  but was:  
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
   at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) 
   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at 
app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)at 
app//org.apache.kafka.clients.ClientUtilsTest.testParseAndValidateAddressesWithReverseLookup(ClientUtilsTest.java:65)
 {code}
As a result of the following assertions:

 
{code:java}
// With lookup of example.com, either one or two addresses are expected 
depending on
// whether ipv4 and ipv6 are enabled
List validatedAddresses = 
checkWithLookup(asList("example.com:1"));
assertTrue(validatedAddresses.size() >= 1, "Unexpected addresses " + 
validatedAddresses);
List validatedHostNames = 
validatedAddresses.stream().map(InetSocketAddress::getHostName)
.collect(Collectors.toList());
List expectedHostNames = asList("93.184.216.34", 
"2606:2800:220:1:248:1893:25c8:1946"); {code}
It seems that the DNS result has changed for example.com.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15736) KRaft support in PlaintextConsumerTest

2024-04-22 Thread Walter Hernandez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walter Hernandez resolved KAFKA-15736.
--
Resolution: Done

> KRaft support in PlaintextConsumerTest
> --
>
> Key: KAFKA-15736
> URL: https://issues.apache.org/jira/browse/KAFKA-15736
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
>
> The following tests in PlaintextConsumerTest in 
> core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala need to 
> be updated to support KRaft
> 49 : def testHeaders(): Unit = {
> 136 : def testDeprecatedPollBlocksForAssignment(): Unit = {
> 144 : def testHeadersSerializerDeserializer(): Unit = {
> 153 : def testMaxPollRecords(): Unit = {
> 169 : def testMaxPollIntervalMs(): Unit = {
> 194 : def testMaxPollIntervalMsDelayInRevocation(): Unit = {
> 234 : def testMaxPollIntervalMsDelayInAssignment(): Unit = {
> 258 : def testAutoCommitOnClose(): Unit = {
> 281 : def testAutoCommitOnCloseAfterWakeup(): Unit = {
> 308 : def testAutoOffsetReset(): Unit = {
> 319 : def testGroupConsumption(): Unit = {
> 339 : def testPatternSubscription(): Unit = {
> 396 : def testSubsequentPatternSubscription(): Unit = {
> 447 : def testPatternUnsubscription(): Unit = {
> 473 : def testCommitMetadata(): Unit = {
> 494 : def testAsyncCommit(): Unit = {
> 513 : def testExpandingTopicSubscriptions(): Unit = {
> 527 : def testShrinkingTopicSubscriptions(): Unit = {
> 541 : def testPartitionsFor(): Unit = {
> 551 : def testPartitionsForAutoCreate(): Unit = {
> 560 : def testPartitionsForInvalidTopic(): Unit = {
> 566 : def testSeek(): Unit = {
> 621 : def testPositionAndCommit(): Unit = {
> 653 : def testPartitionPauseAndResume(): Unit = {
> 671 : def testFetchInvalidOffset(): Unit = {
> 696 : def testFetchOutOfRangeOffsetResetConfigEarliest(): Unit = {
> 717 : def testFetchOutOfRangeOffsetResetConfigLatest(): Unit = {
> 743 : def testFetchRecordLargerThanFetchMaxBytes(): Unit = {
> 772 : def testFetchHonoursFetchSizeIfLargeRecordNotFirst(): Unit = {
> 804 : def testFetchHonoursMaxPartitionFetchBytesIfLargeRecordNotFirst(): Unit 
> = {
> 811 : def testFetchRecordLargerThanMaxPartitionFetchBytes(): Unit = {
> 819 : def testLowMaxFetchSizeForRequestAndPartition(): Unit = {
> 867 : def testRoundRobinAssignment(): Unit = {
> 903 : def testMultiConsumerRoundRobinAssignor(): Unit = {
> 940 : def testMultiConsumerStickyAssignor(): Unit = {
> 986 : def testMultiConsumerDefaultAssignor(): Unit = {
> 1024 : def testRebalanceAndRejoin(assignmentStrategy: String): Unit = {
> 1109 : def testMultiConsumerDefaultAssignorAndVerifyAssignment(): Unit = {
> 1141 : def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = {
> 1146 : def testMultiConsumerSessionTimeoutOnClose(): Unit = {
> 1151 : def testInterceptors(): Unit = {
> 1210 : def testAutoCommitIntercept(): Unit = {
> 1260 : def testInterceptorsWithWrongKeyValue(): Unit = {
> 1286 : def testConsumeMessagesWithCreateTime(): Unit = {
> 1303 : def testConsumeMessagesWithLogAppendTime(): Unit = {
> 1331 : def testListTopics(): Unit = {
> 1351 : def testUnsubscribeTopic(): Unit = {
> 1367 : def testPauseStateNotPreservedByRebalance(): Unit = {
> 1388 : def testCommitSpecifiedOffsets(): Unit = {
> 1415 : def testAutoCommitOnRebalance(): Unit = {
> 1454 : def testPerPartitionLeadMetricsCleanUpWithSubscribe(): Unit = {
> 1493 : def testPerPartitionLagMetricsCleanUpWithSubscribe(): Unit = {
> 1533 : def testPerPartitionLeadMetricsCleanUpWithAssign(): Unit = {
> 1562 : def testPerPartitionLagMetricsCleanUpWithAssign(): Unit = {
> 1593 : def testPerPartitionLagMetricsWhenReadCommitted(): Unit = {
> 1616 : def testPerPartitionLeadWithMaxPollRecords(): Unit = {
> 1638 : def testPerPartitionLagWithMaxPollRecords(): Unit = {
> 1661 : def testQuotaMetricsNotCreatedIfNoQuotasConfigured(): Unit = {
> 1809 : def testConsumingWithNullGroupId(): Unit = {
> 1874 : def testConsumingWithEmptyGroupId(): Unit = {
> 1923 : def testStaticConsumerDetectsNewPartitionCreatedAfterRestart(): Unit = 
> {
> Scanned 1951 lines. Found 0 KRaft tests out of 61 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-932: Queues for Kafka

2024-04-22 Thread Andrew Schofield
Hi Jun,
Thanks for your comments.

120. Thanks. Fixed.

121. ShareUpdateValue.SnapshotEpoch indicates which snapshot
the update applies to. It should of course be the snapshot that precedes
it in the log. It’s just there to provide a consistency check.

I also noticed that ShareSnapshotValue was missing StateEpoch. It
isn’t any more.

122. In KIP-848, ConsumerGroupMemberMetadataValue includes
GroupEpoch, but in the code it does not. In fact, there is considerable
divergence between the KIP and the code for this record value schema
which I expect will be resolved when the migration code has been
completed.

123. The share group does not persist the target assignment.

124. Share groups have three kinds of record:
i) ShareGroupMetadata
  - this contains the group epoch and is written whenever the group
epoch changes.
ii) ShareGroupMemberMetadata
   - this does not contain the group epoch.
iii) ShareGroupPartitionMetadata
   - this currently contains the epoch, but I think that is unnecessary.
 For one thing, the ConsumerGroupPartitionMetadata definition
 contains the group epoch, but the value appears never to be set.
 David Jacot confirms that it’s not necessary and is removing it.

I have removed the Epoch from ShareGroupPartitionMetadata.
The only purpose of the persisting the epoch for a share group is so that
when a group coordinator takes over the share group, it is able to
continue the sequence of epochs. ShareGroupMetadataValue.Epoch
is used for this.

125. The group epoch will be incremented in this case and
consequently a ShareGroupMetadata will be written. KIP updated.

126. Not directly. A share group can only be deleted when it has no
members, so the tombstones for ShareGroupMemberMetadata will
have been written when the members left. I have clarified this.

127. The share coordinator is ignorant of the group epoch. When the
group coordinator is initializing the share-group state the first time that
a share-partition is being added to an assignment in the group, the
group epoch is used as the state epoch. But as the group epoch
increases over time, the share coordinator is entirely unaware.

When the first consumer for a share-partition fetches records from a
share-partition leader, the SPL calls the share coordinator to
ReadShareGroupState. If the SPL has previously read the information
and again it’s going from 0 to 1 consumer, it confirms it's up to date by
calling ReadShareGroupOffsetsState.

Even if many consumers are joining at the same time, any share-partition
which is being initialized will not be included in their assignments. Once
the initialization is complete, the next rebalance will assign the partition
to some consumers which will discover this by ShareGroupHeartbeat
response. And then, the fetching begins.

If an SPL receives a ShareFetch request before it’s read the state
from the SC, it can make the ShareFetch request wait up to MaxWaitMs
and then it can return an empty set of records if it’s still not ready.

So, I don’t believe there will be too much load. If a topic with many
partitions is added to the subscribed topics for a share group, the fact
that the assignments will only start to include the partitions as their
initialization completes should soften the impact.

128, 129: The “proper” way to turn on this feature when it’s finished will
be using `group.coordinator.rebalance.protocols` and `group.version`.
While it’s in Early Access and for test cases, the `group.share.enable`
configuration will turn it on.

I have described `group.share.enable` as an internal configuration in
the KIP.

130. The config `group.share.record.lock.duration.ms` applies to groups
which do not specify a group-level configuration for lock duration. The
minimum and maximum for this configuration are intended to give it
sensible bounds.

If a group does specify its own `group.share.record.lock.duration.ms`,
the broker-level `group.share.max.record.lock.duration.ms` gives the
cluster administrator a way of setting a maximum value for all groups.

While editing, I renamed `group.share.record.lock.duration.max.ms` to
`group.share.max.record.lock.duration.ms` for consistency with the
rest of the min/max configurations.

131. This is the limit per partition so you can go wider with multiple 
partitions.
I have set the initial value low for safety. I expect to be able to increase 
this
significantly when we have mature code which has been battle-tested.
Rather than try to guess how high it can safely go, I’ve erred on the side of
caution and expect to open it up in a future KIP.

132. Good catch. The problem is that I have missed two group configurations,
now added. These are group.share.session.timeout.ms and
group.share.heartbeat.timeout.ms . The configurations you mentioned
are the bounds for the group-level configurations.

133. The name `group.share.max.size` was chosen to mirror the existing
`group.consumer.max.size`.

134. It is intended to be a list of all of the valid 

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-22 Thread Bruno Cadonna

Hi Damien,

Thanks a lot for the updates!

I have the following comments:

(1)
Could you rename ProcessingMetadata to ErrorHandlerContext or 
ErrorHandlerMetadata (I am preferring the former)? I think it makes it 
clearer for what this context/metadata is for.



(2)
Is there any reason you did not use something like

Record sourceRecord()

in ProcessingMetadata instead of sourceRawKey() and sourceRawValue() and 
headers()? The headers() method refers to the record read from the input 
topic of the sub-topology, right? If yes, maybe that is also something 
to mention more explicitly.



(3)
Since you added the processor node ID to the ProcessingMetadata, you can 
remove it from the signature of method handle() in 
ProcessingExceptionHandler.



(4)
Where are the mentioned changes to the DeserializationExceptionHandler?


(5)
To be consistent, the order of the parameters in the 
ProductionExceptionHandler should be:

1. context
2. record
3. exception


(6)
I am wondering where the implementation of ProcessingMetadata gets the 
sourceRawKey/Value from. Do we need additional changes in 
ProcessingContext and implementations?



Best,
Bruno


On 4/21/24 2:23 PM, Damien Gasparina wrote:

Hi Everyone,

Following the discussions on KIP-1033 and KIP-1034, we did a few changes:
   - We introduced a new ProcessingMetadata class containing only the
ProcessorContext metadata: topic, partition, offset, headers[],
sourceRawKey, sourceRawValue, TaskId, ProcessorNodeName
   - To be consistent, we propose to deprecate the existing
DeserializationExceptionHandler and ProductionExceptionHandler methods
to rely on the new ProcessingMetadata
   - The creation and the ProcessingMetadata and the deprecation of old
methods is owned by KIP-1033, KIP-1034 (DLQ) is now only focusing on
Dead Letter Queue implementation without touching any interfaces. We
introduced a hard dependency for KIP-1034 regarding KIP-1033, we think
it's the wisest implementation wise.
- Instead of creating a new metric, KIP-1033 updates the
dropped-record metric.

Let me know what you think, if everything's fine, I think we should be
good to start a VOTE?

Cheers,
Damien





On Fri, 12 Apr 2024 at 22:24, Sophie Blee-Goldman  wrote:


Fully agree about creating a new class for the bits of ProcessingContext
that are specific to metadata only. In fact, more or less this same point
just came up in the related KIP 1034 for DLQs, since the RecordMetadata
can't always be trusted to remain immutable. Maybe it's possible to solve
both issues at once, with the same class?

On another related note -- I had actually also just proposed that we
deprecate the existing DeserializationExceptionHandler method and replace
it with one using the new PAPI as part of KIP-1034. But now that I'm
reading this, I would say it probably makes more sense to do in this KIP.
We can also push that out into a smaller-scoped third KIP if you want, but
clearly, there is some overlap here and so however you guys (the authors)
want to organize this part of the work is fine with me. I do think it
should be done alongside/before this KIP and 1034 though, for all the
reasons already stated.

Everything else in the discussion so far I agree with! The
ProcessingContext thing is the only open question in my mind

On Thu, Apr 11, 2024 at 5:41 AM Damien Gasparina 
wrote:


Hi Matthias, Bruno,

1.a During my previous comment, by Processor Node ID, I meant
Processor name. This is important information to expose in the handler
as it allows users to identify the location of the exception in the
topology.
I assume this information could be useful in other places, that's why
I would lean toward adding this as an attribute in the
ProcessingContext.

1.b Looking at the ProcessingContext, I do think the following 3
methods should not be accessible in the exception handlers:
getStateStore(), schedule() and commit().
Having a separate interface would make a cleaner signature. It would
also be a great time to ensure that all exception handlers are
consistent, at the moment, the
DeserializationExceptionHandler.handle() relies on the old PAPI
ProcessorContext and the ProductionExceptionHandler.handle() has none.
It could make sense to build the new interface in this KIP and track
the effort to migrate the existing handlers in a separate KIP, what do
you think?
Maybe I am overthinking this part and the ProcessingContext would be fine.

4. Good point regarding the dropped-record metric, as it is used by
the other handlers, I do think it makes sense to leverage it instead
of creating a new metric.
I will update the KIP to update the dropped-record-metric.

8. Regarding the DSL, I am aligned with Bruno, I think we could close
the gaps in a future KIP.

Cheers,
Damien


On Thu, 11 Apr 2024 at 11:56, Bruno Cadonna  wrote:


Hi Matthias,


1.a
With processor node ID, I mean the ID that is exposed in the tags of
processor node metrics. That ID cannot be internal since it is exposed
in metrics. I think the processor 

Re: [VOTE] KIP-1037: Allow WriteTxnMarkers API with Alter Cluster Permission

2024-04-22 Thread Christo Lolov
Heya Nikhil,

Thanks for the proposal, as mentioned before it makes sense to me!

+1 (binding)

Best,
Christo

On Sat, 20 Apr 2024 at 00:25, Justine Olshan 
wrote:

> Hey Nikhil,
>
> I meant to comment on the discussion thread, but my draft took so long, you
> opened the vote.
>
> Regardless, I just wanted to say that it makes sense to me. +1 (binding)
>
> Justine
>
> On Fri, Apr 19, 2024 at 7:22 AM Nikhil Ramakrishnan <
> ramakrishnan.nik...@gmail.com> wrote:
>
> > Hi everyone,
> >
> > I would like to start a voting thread for KIP-1037: Allow
> > WriteTxnMarkers API with Alter Cluster Permission
> > (
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1037%3A+Allow+WriteTxnMarkers+API+with+Alter+Cluster+Permission
> > )
> > as there have been no objections on the discussion thread.
> >
> > For comments or feedback please check the discussion thread here:
> > https://lists.apache.org/thread/bbkyt8mrc8xp3jfyvhph7oqtjxl29xmn
> >
> > Thanks,
> > Nikhil
> >
>


Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-22 Thread Bruno Cadonna

Hi all,

How should we proceed here?

1. with the plain .checkpoint file
2. with a way to use the state store interface on unassigned but locally 
existing task state


While I like option 2, I think option 1 is less risky and will give us 
the benefits of transactional state stores sooner. We should consider 
the interface approach afterwards, though.



Best,
Bruno



On 4/17/24 3:15 PM, Bruno Cadonna wrote:

Hi Nick and Sophie,

I think the task ID is not enough to create a state store that can read 
the offsets of non-assigned tasks for lag computation during 
rebalancing. The state store also needs the state directory so that it 
knows where to find the information that it needs to return from 
changelogOffsets().


In general, I think we should proceed with the plain .checkpoint file 
for now and iterate back to the state store solution later since it 
seems it is not that straightforward. Alternatively, Nick could timebox 
an effort to better understand what would be needed for the state store 
solution. Nick, let us know your decision.


Regarding your question about the state store instance. I am not too 
familiar with that part of the code, but I think the state store is 
build when the processor topology is build and the processor topology is 
build per stream task. So there is one instance of processor topology 
and state store per stream task. Try to follow the call in [1].


Best,
Bruno

[1] 
https://github.com/apache/kafka/blob/f52575b17225828d2ff11996030ab7304667deab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java#L153




On 4/16/24 8:59 PM, Nick Telford wrote:

That does make sense. The one thing I can't figure out is how per-Task
StateStore instances are constructed.

It looks like we construct one StateStore instance for the whole Topology
(in InternalTopologyBuilder), and pass that into ProcessorStateManager 
(via

StateManagerUtil) for each Task, which then initializes it.

This can't be the case though, otherwise multiple partitions of the same
sub-topology (aka Tasks) would share the same StateStore instance, which
they don't.

What am I missing?

On Tue, 16 Apr 2024 at 16:22, Sophie Blee-Goldman 
wrote:

I don't think we need to *require* a constructor accept the TaskId, 
but we

would definitely make sure that the RocksDB state store changes its
constructor to one that accepts the TaskID (which we can do without
deprecation since its an internal API), and custom state stores can just
decide for themselves whether they want to opt-in/use the TaskId param
or not. I mean custom state stores would have to opt-in anyways by
implementing the new StoreSupplier#get(TaskId) API and the only
reason to do that would be to have created a constructor that accepts
a TaskId

Just to be super clear about the proposal, this is what I had in mind.
It's actually fairly simple and wouldn't add much to the scope of the
KIP (I think -- if it turns out to be more complicated than I'm 
assuming,

we should definitely do whatever has the smallest LOE to get this done

Anyways, the (only) public API changes would be to add this new
method to the StoreSupplier API:

default T get(final TaskId taskId) {
 return get();
}

We can decide whether or not to deprecate the old #get but it's not
really necessary and might cause a lot of turmoil, so I'd personally
say we just leave both APIs in place.

And that's it for public API changes! Internally, we would just adapt
each of the rocksdb StoreSupplier classes to implement this new
API. So for example with the RocksDBKeyValueBytesStoreSupplier,
we just add

@Override
public KeyValueStore get(final TaskId taskId) {
 return returnTimestampedStore ?
 new RocksDBTimestampedStore(name, metricsScope(), taskId) :
 new RocksDBStore(name, metricsScope(), taskId);
}

And of course add the TaskId parameter to each of the actual
state store constructors returned here.

Does that make sense? It's entirely possible I'm missing something
important here, but I think this would be a pretty small addition that
would solve the problem you mentioned earlier while also being
useful to anyone who uses custom state stores.

On Mon, Apr 15, 2024 at 10:21 AM Nick Telford 
wrote:


Hi Sophie,

Interesting idea! Although what would that mean for the StateStore
interface? Obviously we can't require that the constructor take the

TaskId.

Is it enough to add the parameter to the StoreSupplier?

Would doing this be in-scope for this KIP, or are we over-complicating

it?


Nick

On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman 


wrote:


Somewhat minor point overall, but it actually drives me crazy that you
can't get access to the taskId of a StateStore until #init is called.

This

has caused me a huge headache personally (since the same is true for
processors and I was trying to do something that's probably too hacky

to

actually complain about here lol)

Can we just change the StateStoreSupplier to receive and pass along