[jira] [Created] (KAFKA-16582) Feature Request: Introduce max.record.size Configuration Parameter for Producers

2024-04-17 Thread Ramiz Mehran (Jira)
Ramiz Mehran created KAFKA-16582:


 Summary: Feature Request: Introduce max.record.size Configuration 
Parameter for Producers
 Key: KAFKA-16582
 URL: https://issues.apache.org/jira/browse/KAFKA-16582
 Project: Kafka
  Issue Type: New Feature
  Components: producer 
Affects Versions: 3.6.2
Reporter: Ramiz Mehran


{*}Summary{*}:

Currently, Kafka producers have a {{max.request.size}} configuration that 
limits the size of the request sent to Kafka brokers, which includes both 
compressed and uncompressed data sizes. However, it is also the maximum size of 
an individual record before it is compressed. This can lead to inefficiencies 
and unexpected behaviours, particularly when records are significantly large 
before compression but fit multiple times into the {{max.request.size}} after 
compression.

{*}Problem{*}:

During spikes in data transmission, especially with large records, even when 
compressed within the limits of {{{}max.request.size{}}}, it causes an increase 
in latency and potential backlog in processing due to the large batch sizes 
formed by compressed records. This problem is particularly pronounced when 
using highly efficient compression algorithms like zstd, where the compressed 
size may allow for large batches that are inefficient to process.

{*}Proposed Solution{*}:

Introduce a new producer configuration parameter: {{{}max.record.size{}}}. This 
parameter will allow administrators to define the maximum size of a record 
before it is compressed. This would help in managing expectations and system 
behavior more predictably by separating uncompressed record size limit from 
compressed request size limit.

{*}Benefits{*}:
 # {*}Predictability{*}: Producers can reject records that exceed the 
{{max.record.size}} before spending resources on compression.
 # {*}Efficiency{*}: Helps in maintaining efficient batch sizes and system 
throughput, especially under high load conditions.
 # {*}System Stability{*}: Avoids the potential for large batch processing 
which can affect latency and throughput negatively.

{*}Example{*}: Consider a scenario where the producer sends records up to 20 MB 
in size which, when compressed, fit into a batch under the 25 MB 
{{max.request.size }}multiple times. These batches can be problematic to 
process efficiently, even though they meet the current maximum request size 
constraints. With {{{}max.record.size{}}}, we could separate max.request.size 
to only limit compressed request size creation, thus helping us limit that to 
say 5 MB. Thus, preventing very large requests being made, and causing latency 
spikes.

{*}Steps to Reproduce{*}:
 # Configure a Kafka producer with {{max.request.size}} set to 25 MB.
 # Send multiple uncompressed records close to 20 MB that compress to less than 
25 MB.
 # Observe the impact on Kafka broker performance and client side latency.

{*}Expected Behavior{*}: The producer should allow administrators to set both 
pre-compression record size limits and total request size limits post 
compression.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #2823

2024-04-17 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 349676 lines...]
[2024-04-18T03:36:13.801Z] 
[2024-04-18T03:36:13.801Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > SynchronizedPartitionGroupTest > testAddRawRecords() PASSED
[2024-04-18T03:36:13.801Z] 
[2024-04-18T03:36:13.801Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > SynchronizedPartitionGroupTest > testUpdatePartitions() STARTED
[2024-04-18T03:36:13.801Z] 
[2024-04-18T03:36:13.801Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > SynchronizedPartitionGroupTest > testUpdatePartitions() PASSED
[2024-04-18T03:36:13.801Z] 
[2024-04-18T03:36:13.801Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > RackAwareGraphConstructorFactoryTest > shouldReturnMinCostConstructor() 
STARTED
[2024-04-18T03:36:13.801Z] 
[2024-04-18T03:36:13.801Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > RackAwareGraphConstructorFactoryTest > shouldReturnMinCostConstructor() 
PASSED
[2024-04-18T03:36:13.801Z] 
[2024-04-18T03:36:13.801Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > RackAwareGraphConstructorFactoryTest > 
shouldReturnBalanceSubtopologyConstructor() STARTED
[2024-04-18T03:36:13.801Z] 
[2024-04-18T03:36:13.801Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > RackAwareGraphConstructorFactoryTest > 
shouldReturnBalanceSubtopologyConstructor() PASSED
[2024-04-18T03:36:15.354Z] 
[2024-04-18T03:36:15.354Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldSetUncaughtStreamsException() STARTED
[2024-04-18T03:36:16.907Z] 
[2024-04-18T03:36:16.907Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldSetUncaughtStreamsException() PASSED
[2024-04-18T03:36:16.907Z] 
[2024-04-18T03:36:16.907Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldClearTaskTimeoutOnProcessed() STARTED
[2024-04-18T03:36:16.907Z] 
[2024-04-18T03:36:16.907Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldClearTaskTimeoutOnProcessed() PASSED
[2024-04-18T03:36:16.907Z] 
[2024-04-18T03:36:16.907Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldUnassignTaskWhenRequired() STARTED
[2024-04-18T03:36:16.907Z] 
[2024-04-18T03:36:16.907Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldUnassignTaskWhenRequired() PASSED
[2024-04-18T03:36:16.907Z] 
[2024-04-18T03:36:16.907Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldClearTaskReleaseFutureOnShutdown() STARTED
[2024-04-18T03:36:16.907Z] 
[2024-04-18T03:36:16.907Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldClearTaskReleaseFutureOnShutdown() PASSED
[2024-04-18T03:36:16.907Z] 
[2024-04-18T03:36:16.907Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldProcessTasks() STARTED
[2024-04-18T03:36:16.907Z] 
[2024-04-18T03:36:16.907Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldProcessTasks() PASSED
[2024-04-18T03:36:16.907Z] 
[2024-04-18T03:36:16.907Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldPunctuateStreamTime() STARTED
[2024-04-18T03:36:16.907Z] 
[2024-04-18T03:36:16.907Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldPunctuateStreamTime() PASSED
[2024-04-18T03:36:16.907Z] 
[2024-04-18T03:36:16.907Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldShutdownTaskExecutor() STARTED
[2024-04-18T03:36:16.907Z] 
[2024-04-18T03:36:16.907Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldShutdownTaskExecutor() PASSED
[2024-04-18T03:36:16.907Z] 
[2024-04-18T03:36:16.907Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldAwaitProcessableTasksIfNoneAssignable() 
STARTED
[2024-04-18T03:36:16.907Z] 
[2024-04-18T03:36:16.907Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldAwaitProcessableTasksIfNoneAssignable() 
PASSED
[2024-04-18T03:36:16.907Z] 
[2024-04-18T03:36:16.907Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > 
shouldRespectPunctuationDisabledByTaskExecutionMetadata() STARTED
[2024-04-18T03:36:16.907Z] 
[2024-04-18T03:36:16.907Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > 
shouldRespectPunctuationDisabledByTaskExecutionMetadata() PASSED
[2024-04-18T03:36:16.907Z] 
[2024-04-18T03:36:16.907Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldSetTaskTimeoutOnTimeoutException() 

Re: [PR] KAFKA-16467: Add how to integrate with kafka repo [kafka-site]

2024-04-17 Thread via GitHub


showuon merged PR #596:
URL: https://github.com/apache/kafka-site/pull/596


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16581) Implement KafkaRaftClient unittest for membership change

2024-04-17 Thread Jira
José Armando García Sancio created KAFKA-16581:
--

 Summary: Implement KafkaRaftClient unittest for membership change
 Key: KAFKA-16581
 URL: https://issues.apache.org/jira/browse/KAFKA-16581
 Project: Kafka
  Issue Type: Sub-task
Reporter: José Armando García Sancio
 Fix For: 3.8.0


Add a new test suite like KafkaRaftClientTest and KafkaRaftClientSnapshotTest 
that validates the implementation.

In addition KafkaRaftClientTest and KafkaRaftClientSnapshotTest should work 
with all versions for the RPCs.

Test upgrades from kraft.version 0 to 1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16580) Write simulation tests for kraft membership change

2024-04-17 Thread Jira
José Armando García Sancio created KAFKA-16580:
--

 Summary: Write simulation tests for kraft membership change
 Key: KAFKA-16580
 URL: https://issues.apache.org/jira/browse/KAFKA-16580
 Project: Kafka
  Issue Type: Sub-task
Reporter: José Armando García Sancio
 Fix For: 3.8.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2822

2024-04-17 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-16579) Revert changes to consumer_rolling_upgrade_test.py for the new async Consumer

2024-04-17 Thread Kirk True (Jira)
Kirk True created KAFKA-16579:
-

 Summary: Revert changes to consumer_rolling_upgrade_test.py for 
the new async Consumer
 Key: KAFKA-16579
 URL: https://issues.apache.org/jira/browse/KAFKA-16579
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer, system tests
Affects Versions: 3.8.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated a 
slew of system tests to run both the "old" and "new" implementations. 
KAFKA-16272 updated the system tests in {{connect_distributed_test.py}} so it 
could test the new consumer with Connect. However, we are not supporting 
Connect with the new consumer in 3.8.0. Unsurprisingly, when we run the Connect 
system tests with the new {{{}AsyncKafkaConsumer{}}}, we get errors like the 
following:
{code:java}
test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=eager.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   6 minutes 3.899 seconds


InsufficientResourcesError('Not enough nodes available to allocate. linux 
nodes requested: 1. linux nodes available: 0')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py",
 line 919, in test_exactly_once_source
consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, consumer_timeout_ms=1000, print_key=True)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/console_consumer.py",
 line 97, in __init__
BackgroundThreadService.__init__(self, context, num_nodes)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py",
 line 26, in __init__
super(BackgroundThreadService, self).__init__(context, num_nodes, 
cluster_spec, *args, **kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py",
 line 107, in __init__
self.allocate_nodes()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py",
 line 217, in allocate_nodes
self.nodes = self.cluster.alloc(self.cluster_spec)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/cluster.py",
 line 54, in alloc
allocated = self.do_alloc(cluster_spec)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/finite_subcluster.py",
 line 31, in do_alloc
allocated = self._available_nodes.remove_spec(cluster_spec)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/node_container.py",
 line 117, in remove_spec
raise InsufficientResourcesError("Not enough nodes available to allocate. " 
+ msg)
ducktape.cluster.node_container.InsufficientResourcesError: Not enough nodes 
available to allocate. linux nodes requested: 1. linux nodes available: 0
{code}
The task here is to revert the changes made in KAFKA-16272 [PR 
15576|https://github.com/apache/kafka/pull/15576].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16578) Revert changes to connect_distributed_test.py for the new async Consumer

2024-04-17 Thread Kirk True (Jira)
Kirk True created KAFKA-16578:
-

 Summary: Revert changes to connect_distributed_test.py for the new 
async Consumer
 Key: KAFKA-16578
 URL: https://issues.apache.org/jira/browse/KAFKA-16578
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer, system tests
Affects Versions: 3.8.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated a 
slew of system tests to run both the "old" and "new" implementations. 
KAFKA-16272 updated the system tests in {{connect_distributed_test.py}} so it 
could test the new consumer with Connect. However, we are not supporting 
Connect with the new consumer in 3.8.0. Unsurprisingly, when we run the Connect 
system tests with the new {{{}AsyncKafkaConsumer{}}}, we get errors like the 
following:
{code:java}
test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=eager.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   6 minutes 3.899 seconds


InsufficientResourcesError('Not enough nodes available to allocate. linux 
nodes requested: 1. linux nodes available: 0')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py",
 line 919, in test_exactly_once_source
consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, consumer_timeout_ms=1000, print_key=True)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/console_consumer.py",
 line 97, in __init__
BackgroundThreadService.__init__(self, context, num_nodes)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py",
 line 26, in __init__
super(BackgroundThreadService, self).__init__(context, num_nodes, 
cluster_spec, *args, **kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py",
 line 107, in __init__
self.allocate_nodes()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py",
 line 217, in allocate_nodes
self.nodes = self.cluster.alloc(self.cluster_spec)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/cluster.py",
 line 54, in alloc
allocated = self.do_alloc(cluster_spec)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/finite_subcluster.py",
 line 31, in do_alloc
allocated = self._available_nodes.remove_spec(cluster_spec)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/node_container.py",
 line 117, in remove_spec
raise InsufficientResourcesError("Not enough nodes available to allocate. " 
+ msg)
ducktape.cluster.node_container.InsufficientResourcesError: Not enough nodes 
available to allocate. linux nodes requested: 1. linux nodes available: 0
{code}
The task here is to revert the changes made in KAFKA-16272 [PR 
15576|https://github.com/apache/kafka/pull/15576].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16577) New consumer fails with stop timeout in consumer_test.py’s test_consumer_bounce system test

2024-04-17 Thread Kirk True (Jira)
Kirk True created KAFKA-16577:
-

 Summary: New consumer fails with stop timeout in 
consumer_test.py’s test_consumer_bounce system test
 Key: KAFKA-16577
 URL: https://issues.apache.org/jira/browse/KAFKA-16577
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, system tests
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


The {{consumer_test.py}} system test intermittently fails with the following 
error:

{code}
test_id:
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_failure.clean_shutdown=True.enable_autocommit=True.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   42.582 seconds


AssertionError()
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
 line 399, in test_consumer_failure
assert partition_owner is not None
AssertionError
Notify
{code}

Affected tests:
 * {{test_consumer_failure}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16576) New consumer fails with assert in consumer_test.py’s test_consumer_failure system test

2024-04-17 Thread Kirk True (Jira)
Kirk True created KAFKA-16576:
-

 Summary: New consumer fails with assert in consumer_test.py’s 
test_consumer_failure system test
 Key: KAFKA-16576
 URL: https://issues.apache.org/jira/browse/KAFKA-16576
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, system tests
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


The {{consumer_test.py}} system test fails with the following errors:

{quote}
* Timed out waiting for consumption
{quote}

Affected tests:

* {{test_broker_failure}}
* {{test_consumer_bounce}}
* {{test_static_consumer_bounce}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is unstable: Kafka » Kafka Branch Builder » 3.7 #138

2024-04-17 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-1023: Follower fetch from tiered offset

2024-04-17 Thread Jun Rao
Hi, Abhijeet,

Thanks for the reply.

1. I am wondering if we could achieve the same result by just lowering
local.retention.ms and local.retention.bytes. This also allows the newly
started follower to build up the local data before serving the consumer
traffic.

2. Have you updated the KIP?

Thanks,

Jun

On Tue, Apr 9, 2024 at 3:36 AM Satish Duggana 
wrote:

> +1 to Jun for adding the consumer fetching from a follower scenario
> also to the existing section that talked about the drawback when a
> node built with last-tiered-offset has become a leader. As Abhijeet
> mentioned, we plan to have a follow-up KIP that will address these by
> having a deprioritzation of these brokers. The deprioritization of
> those brokers can be removed once they catchup until the local log
> retention.
>
> Thanks,
> Satish.
>
> On Tue, 9 Apr 2024 at 14:08, Luke Chen  wrote:
> >
> > Hi Abhijeet,
> >
> > Thanks for the KIP to improve the tiered storage feature!
> >
> > Questions:
> > 1. We could also get the "pending-upload-offset" and epoch via remote log
> > metadata, instead of adding a new API to fetch from the leader. Could you
> > explain why you choose the later approach, instead of the former?
> > 2.
> > > We plan to have a follow-up KIP that will address both the
> > deprioritization
> > of these brokers from leadership, as well as
> > for consumption (when fetching from followers is allowed).
> >
> > I agree with Jun that we might need to make it clear all possible
> drawbacks
> > that could have. So, could we add the drawbacks that Jun mentioned about
> > the performance issue when consumer fetch from follower?
> >
> > 3. Could we add "Rejected Alternatives" section to the end of the KIP to
> > add some of them?
> > Like the "ListOffsetRequest" approach VS "Earliest-Pending-Upload-Offset"
> > approach, or getting the "Earliest-Pending-Upload-Offset" from remote log
> > metadata... etc.
> >
> > Thanks.
> > Luke
> >
> >
> > On Tue, Apr 9, 2024 at 2:25 PM Abhijeet Kumar <
> abhijeet.cse@gmail.com>
> > wrote:
> >
> > > Hi Christo,
> > >
> > > Please find my comments inline.
> > >
> > > On Fri, Apr 5, 2024 at 12:36 PM Christo Lolov 
> > > wrote:
> > >
> > > > Hello Abhijeet and Jun,
> > > >
> > > > I have been mulling this KIP over a bit more in recent days!
> > > >
> > > > re: Jun
> > > >
> > > > I wasn't aware we apply 2.1 and 2.2 for reserving new timestamps - in
> > > > retrospect it should have been fairly obvious. I would need to go an
> > > update
> > > > KIP-1005 myself then, thank you for giving the useful reference!
> > > >
> > > > 4. I think Abhijeet wants to rebuild state from latest-tiered-offset
> and
> > > > fetch from latest-tiered-offset + 1 only for new replicas (or
> replicas
> > > > which experienced a disk failure) to decrease the time a partition
> spends
> > > > in under-replicated state. In other words, a follower which has just
> > > fallen
> > > > out of ISR, but has local data will continue using today's Tiered
> Storage
> > > > replication protocol (i.e. fetching from earliest-local). I further
> > > believe
> > > > he has taken this approach so that local state of replicas which have
> > > just
> > > > fallen out of ISR isn't forcefully wiped thus leading to situation 1.
> > > > Abhijeet, have I understood (and summarised) what you are proposing
> > > > correctly?
> > > >
> > > > Yes, your understanding is correct. We want to limit the behavior
> changes
> > > only to new replicas.
> > >
> > >
> > > > 5. I think in today's Tiered Storage we know the leader's
> > > log-start-offset
> > > > from the FetchResponse and we can learn its local-log-start-offset
> from
> > > the
> > > > ListOffsets by asking for earliest-local timestamp (-4). But granted,
> > > this
> > > > ought to be added as an additional API call in the KIP.
> > > >
> > > >
> > > Yes, I clarified this in my reply to Jun. I will add this missing
> detail in
> > > the KIP.
> > >
> > >
> > > > re: Abhijeet
> > > >
> > > > 101. I am still a bit confused as to why you want to include a new
> offset
> > > > (i.e. pending-upload-offset) when you yourself mention that you
> could use
> > > > an already existing offset (i.e. last-tiered-offset + 1). In
> essence, you
> > > > end your Motivation with "In this KIP, we will focus only on the
> follower
> > > > fetch protocol using the *last-tiered-offset*" and then in the
> following
> > > > sections you talk about pending-upload-offset. I understand this
> might be
> > > > classified as an implementation detail, but if you introduce a new
> offset
> > > > (i.e. pending-upload-offset) you have to make a change to the
> ListOffsets
> > > > API (i.e. introduce -6) and thus document it in this KIP as such.
> > > However,
> > > > the last-tiered-offset ought to already be exposed as part of
> KIP-1005
> > > > (under implementation). Am I misunderstanding something here?
> > > >
> > >
> > > I have tried to clarify this in my reply to Jun.
> > >
> > > > The follower needs to build the local 

[jira] [Created] (KAFKA-16575) Automatically remove KTable aggregation result when group becomes empty

2024-04-17 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16575:
---

 Summary: Automatically remove KTable aggregation result when group 
becomes empty
 Key: KAFKA-16575
 URL: https://issues.apache.org/jira/browse/KAFKA-16575
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Using `KTable.groupBy(...).aggregate(...)` can handle updates (inserts, 
deletes, actual updates) of the input KTable, by calling the provided `Adder` 
and `Subtractor`. However, when all records from the input table (which map to 
the same group/row in the result table) get removed, the result entry is not 
removed automatically.

For example, if we implement a "count", the count would go to zero for a group 
by default, instead of removing the row from the result, if all input record 
for this group got deleted.

Users can let their `Subtractor` return `null` for this case, to actually 
delete the row, but it's not well documented and it seems it should be a 
built-in feature of the table-aggregation to remove "empty groups" from the 
result, instead of relying on "correct" behavior of user-code.

(Also the built-in `count()` does not return `null`, but actually zero...)

An internal counter how many elements are in a group should be sufficient. Of 
course, there is backward compatibility questions we need to answer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16574) The metrics of LogCleaner disappear after reconfiguration

2024-04-17 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16574:
--

 Summary: The metrics of LogCleaner disappear after reconfiguration
 Key: KAFKA-16574
 URL: https://issues.apache.org/jira/browse/KAFKA-16574
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


see 
[https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/core/src/main/scala/kafka/log/LogCleaner.scala#L227]

We don't rebuild the metrics after calling shutdown. The following test can 
prove that.
{code:java}
@Test
def testMetricsAfterReconfiguration(): Unit = {
  val logCleaner = new LogCleaner(new CleanerConfig(true),
logDirs = Array(TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
time = time)

  def check(): Unit =
LogCleaner.MetricNames.foreach(name => 
assertNotNull(KafkaYammerMetrics.defaultRegistry.allMetrics().get(logCleaner.metricsGroup
  .metricName(name, java.util.Collections.emptyMap())), s"$name is gone?"))

  try {
check()
logCleaner.reconfigure(new KafkaConfig(TestUtils.createBrokerConfig(1, 
"localhost:2181")),
  new KafkaConfig(TestUtils.createBrokerConfig(1, "localhost:2181")))
check()
  } finally logCleaner.shutdown()
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16405) Mismatch assignment error when running consumer rolling upgrade system tests

2024-04-17 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True resolved KAFKA-16405.
---
  Reviewer: Lucas Brutschy
Resolution: Fixed

> Mismatch assignment error when running consumer rolling upgrade system tests
> 
>
> Key: KAFKA-16405
> URL: https://issues.apache.org/jira/browse/KAFKA-16405
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> relevant to [https://github.com/apache/kafka/pull/15578]
>  
> We are seeing:
> {code:java}
> 
> SESSION REPORT (ALL TESTS)
> ducktape version: 0.11.4
> session_id:   2024-03-21--001
> run time: 3 minutes 24.632 seconds
> tests run:7
> passed:   5
> flaky:0
> failed:   2
> ignored:  0
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=COMBINED_KRAFT.use_new_coordinator=True.group_protocol=classic
> status: PASS
> run time:   24.599 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=COMBINED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   26.638 seconds
> AssertionError("Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=3), 
> TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1), 
> TopicPartition(topic='test_topic', partition=2)})}")
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py",
>  line 77, in rolling_update_test
> self._verify_range_assignment(consumer)
>   File 
> "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py",
>  line 38, in _verify_range_assignment
> assert assignment == set([
> AssertionError: Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=3), 
> TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1), 
> TopicPartition(topic='test_topic', partition=2)})}
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
> status: PASS
> run time:   29.815 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True
> status: PASS
> run time:   29.766 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=classic
> status: PASS
> run time:   30.086 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   35.965 seconds
> AssertionError("Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=3), 
> TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1), 
> TopicPartition(topic='test_topic', partition=2)})}")
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
>   

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-17 Thread Sophie Blee-Goldman
Thanks Bruno! I can provide a bit of context behind some of these
decisions but I just want to say up front that I agree with every single one
of your points, though I am going to push back a bit on the first one.

[1] The idea here is to help avoid some confusion around the overloaded
term "client", which can mean either "an instance of Kafka Streams" or
"a consumer/producer client". The problem is that the former applies to
the entire Streams process and therefore should be interpreted as "all
of the StreamThread on an instance" whereas the latter is typically used
interchangeably to mean the consumer client in the consumer group,
which implies a scope of just a single StreamThread on an instance.
The "Node" name here was an attempt to clear this up, since differentiating
between instance and thread level is critical to understanding and properly
implementing the custom assignor.

I do see what you mean about there not being a concept of Node in the
Kafka Streams codebase, and that we usually do use "instance" when we
need to differentiate between consumer client/one StreamThread and
Kafka Streams client/all StreamThreads. As I'm typing this I'm convincing
myself even more that we shouldn't just use "Client" without further
distinction, but I'm not sure "Node" has to be the answer either.

Could we replace "Node" with "KafkaStreamsClient" or is that too wordy?
I honestly do still like Node personally, and don't see what's wrong with
introducing a new term since the "node" terminology is used heavily
on the broker side and it means effectively the same thing in theory.
But if we can't compromise between "Node" and "Client" then maybe
we can settle on "Instance"? (Does feel a bit wordy too...maybe "Process"?)

[2] Good catch(es). Makes sense to me

[3] Totally agree, a single enum makes way more sense

[4] Here again I can provide some background -- this is actually following
a pattern that we used when refactoring the old PartitionAssignor into
the new (at the time) Consumer PartitionAssignor interface. The idea was
to wrap the return type to protect the assign method in case we ever wanted
to add something to what was returned, such as metadata for the entire
group. This way we could avoid a massively disruptive deprecation-and-
migration cycle for everyone who implements a custom assignor.
That said, I just checked the GroupAssignment class we added for this
in the ConsumerPartitionAssignor interface, and to this day we've never
added anything other that the map of consumer client to assignment.

So maybe that was overly cautious. I'd be ok with flattening this map out.
I guess the question is just, can we imagine any case in which we might
want the custom assignor to return additional metadata? To be honest
I think this might be more likely than with the plain consumer client case,
but again, I'm totally fine with just flattening it to a plain map return
type

[5] I guess not. I think ApplicationMetadata was added during the initial
KIP discussion so that's probably why it doesn't follow the same naming
pattern. Personally I'm fine either way (I do think ApplicationMetadata
sounds a bit better but that's not a good enough reason :P)

Thanks Bruno!

On Wed, Apr 17, 2024 at 7:08 AM Bruno Cadonna  wrote:

> Hi,
>
> sorry, I am late to the party.
>
> I have a couple of comments:
>
> (1)
> I would prefer Client* instead of Node* in the names. In Kafka Streams
> we do not really have the concept of node but we have the concept of
> client (admittedly, we sometimes also use instance). I would like to
> avoid introducing a new term to basically describe the Streams client.
> I know that we already have a ClientState but that would be in a
> different package.
>
> (2)
> Did you consider to use Instant instead of long as return type of
> followupRebalanceDeadline()? Instant is a bit more flexible and readable
> as a plain long, IMO. BTW, you list followupRebalanceDeadline() twice in
> interface NodeAssignment.
>
> (3)
> Did you consider to use an enum instead of class AssignedTask? As far as
> I understand not all combinations are possible. A stateless standby task
> does not exist. An enum with values STATELESS, STATEFUL, STANDBY would
> be clearer. Or even better instead of two methods in AssignedTask that
> return a boolean you could have one method -- say type() -- that returns
> the enum.
>
> (4)
> Does the return type of assignment need to be a map from task ID to
> AssignedTask? Wouldn't it be enough to be a collection of AssignedTasks
> with AssignedTask containing the task ID?
>
> (5)
> I there a semantic difference between *State and *Metadata? I was
> wondering whether ApplicationMetadata could also be ApplicationState for
> the sake of consistency.
>
> Best,
> Bruno
>
>
> On 4/5/24 11:18 PM, Sophie Blee-Goldman wrote:
> > Cool, looks good to me!
> >
> > Seems like there is no further feedback, so maybe we can start to call
> for
> > a vote?
> >
> > However, since as noted we are setting aside time 

Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-04-17 Thread Chris Egerton
Hi Alieh,

Thanks for the KIP! The issue with writing to non-existent topics is
particularly frustrating for users of Kafka Connect and has been the source
of a handful of Jira tickets over the years. My thoughts:

1. An additional detail we can add to the motivation (or possibly rejected
alternatives) section is that this kind of custom retry logic can't be
implemented by hand by, e.g., setting retries to 0 in the producer config
and handling exceptions at the application level. Or rather, it can, but 1)
it's a bit painful to have to reimplement at every call-site for
Producer::send (and any code that awaits the returned Future) and 2) it's
impossible to do this without losing idempotency on retries.

2. That said, I wonder if a pluggable interface is really the right call
here. Documenting the interactions of a producer with
a ClientExceptionHandler instance will be tricky, and implementing them
will also be a fair amount of work. I believe that there needs to be some
more granularity for how writes to non-existent topics (or really,
UNKNOWN_TOPIC_OR_PARTITION and related errors from the broker) are handled,
but I'm torn between keeping it simple with maybe one or two new producer
config properties, or a full-blown pluggable interface. If there are more
cases that would benefit from a pluggable interface, it would be nice to
identify these and add them to the KIP to strengthen the motivation. Right
now, I'm not sure the two that are mentioned in the motivation are
sufficient.

3. Alternatively, a possible compromise is for this KIP to introduce new
properties that dictate how to handle unknown-topic-partition and
record-too-large errors, with the thinking that if we introduce a pluggable
interface later on, these properties will be recognized by the default
implementation of that interface but could be completely ignored or
replaced by alternative implementations.

4. (Nit) You can remove the "This page is meant as a template for writing a
KIP..." part from the KIP. It's not a template anymore :)

5. If we do go the pluggable interface route, wouldn't we want to add the
possibility for retry logic? The simplest version of this could be to add a
RETRY value to the ClientExceptionHandlerResponse enum.

6. I think "SKIP" or "DROP" might be clearer instead of "CONTINUE" for
the ClientExceptionHandlerResponse enum, since they cause records to be
dropped.

Cheers,

Chris

On Wed, Apr 17, 2024 at 12:25 PM Justine Olshan
 wrote:

> Hey Alieh,
>
> I echo what Omnia says, I'm not sure I understand the implications of the
> change and I think more detail is needed.
>
> This comment also confused me a bit:
> * {@code ClientExceptionHandler} that continues the transaction even if a
> record is too large.
> * Otherwise, it makes the transaction to fail.
>
> Relatedly, I've been working with some folks on a KIP for transactions
> errors and how they are handled. Specifically for the
> RecordTooLargeException (and a few other errors), we want to give a new
> error category for this error that allows the application to choose how it
> is handled. Maybe this KIP is something that you are looking for? Stay
> tuned :)
>
> Justine
>
>
>
>
>
> On Wed, Apr 17, 2024 at 8:03 AM Omnia Ibrahim 
> wrote:
>
> > Hi Alieh,
> > Thanks for the KIP! I have couple of comments
> > - You mentioned in the KIP motivation,
> > > Another example for which a production exception handler could be
> useful
> > is if a user tries to write into a non-existing topic, which returns a
> > retryable error code; with infinite retries, the producer would hang
> > retrying forever. A handler could help to break the infinite retry loop.
> >
> > How the handler can differentiate between something that is temporary and
> > it should keep retrying and something permanent like forgot to create the
> > topic? temporary here could be
> >  the producer get deployed before the topic creation finish (specially if
> > the topic creation is handled via IaC)
> >  temporary offline partitions
> >  leadership changing
> > Isn’t this putting the producer at risk of dropping records
> > unintentionally?
> > - Can you elaborate more on what is written in the compatibility /
> > migration plan section please by explaining in bit more details what is
> the
> > changing behaviour and how this will impact client who are upgrading?
> > - In the proposal changes can you elaborate in the KIP where in the
> > producer lifecycle will ClientExceptionHandler and
> > TransactionExceptionHandler get triggered, and how will the producer
> > configure them to point to costumed implementation.
> >
> > Thanks
> > Omnia
> >
> > > On 17 Apr 2024, at 13:13, Alieh Saeedi 
> > wrote:
> > >
> > > Hi all,
> > > Here is the KIP-1038: Add Custom Error Handler to Producer.
> > > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1038%3A+Add+Custom+Error+Handler+to+Producer
> > >
> > > I look forward to your feedback!
> > >
> > > Cheers,
> > > Alieh
> >
> >
>


Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-04-17 Thread Lucas Brutschy
Hey Alieh,

some comments:

* "Compatibility" section wasn't clear to me. Are we just introducing
the interfaces or are we changing the default behavior? If so, that
should be explained in more detail.
* If we are introducing a new interface `ClientExceptionHandler`, what
is it going to be used for? Will TranscationExceptionHandler be the
only class implementing it? Should the existing streams exception
handlers also extend it?
* How do I configure a producer to use a custom exception handler?

* nit: the intro mentions "this ticket", but it's a KIP

Cheers,
Lucas



On Wed, Apr 17, 2024 at 6:25 PM Justine Olshan
 wrote:
>
> Hey Alieh,
>
> I echo what Omnia says, I'm not sure I understand the implications of the
> change and I think more detail is needed.
>
> This comment also confused me a bit:
> * {@code ClientExceptionHandler} that continues the transaction even if a
> record is too large.
> * Otherwise, it makes the transaction to fail.
>
> Relatedly, I've been working with some folks on a KIP for transactions
> errors and how they are handled. Specifically for the
> RecordTooLargeException (and a few other errors), we want to give a new
> error category for this error that allows the application to choose how it
> is handled. Maybe this KIP is something that you are looking for? Stay
> tuned :)
>
> Justine
>
>
>
>
>
> On Wed, Apr 17, 2024 at 8:03 AM Omnia Ibrahim 
> wrote:
>
> > Hi Alieh,
> > Thanks for the KIP! I have couple of comments
> > - You mentioned in the KIP motivation,
> > > Another example for which a production exception handler could be useful
> > is if a user tries to write into a non-existing topic, which returns a
> > retryable error code; with infinite retries, the producer would hang
> > retrying forever. A handler could help to break the infinite retry loop.
> >
> > How the handler can differentiate between something that is temporary and
> > it should keep retrying and something permanent like forgot to create the
> > topic? temporary here could be
> >  the producer get deployed before the topic creation finish (specially if
> > the topic creation is handled via IaC)
> >  temporary offline partitions
> >  leadership changing
> > Isn’t this putting the producer at risk of dropping records
> > unintentionally?
> > - Can you elaborate more on what is written in the compatibility /
> > migration plan section please by explaining in bit more details what is the
> > changing behaviour and how this will impact client who are upgrading?
> > - In the proposal changes can you elaborate in the KIP where in the
> > producer lifecycle will ClientExceptionHandler and
> > TransactionExceptionHandler get triggered, and how will the producer
> > configure them to point to costumed implementation.
> >
> > Thanks
> > Omnia
> >
> > > On 17 Apr 2024, at 13:13, Alieh Saeedi 
> > wrote:
> > >
> > > Hi all,
> > > Here is the KIP-1038: Add Custom Error Handler to Producer.
> > > <
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1038%3A+Add+Custom+Error+Handler+to+Producer
> > >
> > > I look forward to your feedback!
> > >
> > > Cheers,
> > > Alieh
> >
> >


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2821

2024-04-17 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-16573) Streams does not specify where a Serde is needed

2024-04-17 Thread Ayoub Omari (Jira)
Ayoub Omari created KAFKA-16573:
---

 Summary: Streams does not specify where a Serde is needed
 Key: KAFKA-16573
 URL: https://issues.apache.org/jira/browse/KAFKA-16573
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 3.7.0
Reporter: Ayoub Omari


Example topology:
{code:java}
 builder
   .table("input", Consumed.`with`(Serdes.String(), Serdes.String()))
   .groupBy((key, value) => new KeyValue(value, key))
   .count()
   .toStream()
   .to("output", Produced.`with`(Serdes.String(), Serdes.Long()))
 {code}
At runtime, we get the following exception 
{code:java}
Please specify a key serde or set one through 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
org.apache.kafka.common.config.ConfigException: Please specify a key serde or 
set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
    at 
org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857)
    at 
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92)
    at 
org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
    at 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
    at 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90)
    at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188)
    at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143)
    at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
    at 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code}
The error does not give information about the line or the processor causing the 
issue.

Here a Grouped was missing inside the groupBy, but because the groupBy api 
doesn't force to define Grouped, this one can be missed, and it could be 
difficult to spot on a more complex topology. 

Also, for someone who needs control over serdes in the topology and doesn't 
want to define default serdes.

 

  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-04-17 Thread Justine Olshan
Hey Alieh,

I echo what Omnia says, I'm not sure I understand the implications of the
change and I think more detail is needed.

This comment also confused me a bit:
* {@code ClientExceptionHandler} that continues the transaction even if a
record is too large.
* Otherwise, it makes the transaction to fail.

Relatedly, I've been working with some folks on a KIP for transactions
errors and how they are handled. Specifically for the
RecordTooLargeException (and a few other errors), we want to give a new
error category for this error that allows the application to choose how it
is handled. Maybe this KIP is something that you are looking for? Stay
tuned :)

Justine





On Wed, Apr 17, 2024 at 8:03 AM Omnia Ibrahim 
wrote:

> Hi Alieh,
> Thanks for the KIP! I have couple of comments
> - You mentioned in the KIP motivation,
> > Another example for which a production exception handler could be useful
> is if a user tries to write into a non-existing topic, which returns a
> retryable error code; with infinite retries, the producer would hang
> retrying forever. A handler could help to break the infinite retry loop.
>
> How the handler can differentiate between something that is temporary and
> it should keep retrying and something permanent like forgot to create the
> topic? temporary here could be
>  the producer get deployed before the topic creation finish (specially if
> the topic creation is handled via IaC)
>  temporary offline partitions
>  leadership changing
> Isn’t this putting the producer at risk of dropping records
> unintentionally?
> - Can you elaborate more on what is written in the compatibility /
> migration plan section please by explaining in bit more details what is the
> changing behaviour and how this will impact client who are upgrading?
> - In the proposal changes can you elaborate in the KIP where in the
> producer lifecycle will ClientExceptionHandler and
> TransactionExceptionHandler get triggered, and how will the producer
> configure them to point to costumed implementation.
>
> Thanks
> Omnia
>
> > On 17 Apr 2024, at 13:13, Alieh Saeedi 
> wrote:
> >
> > Hi all,
> > Here is the KIP-1038: Add Custom Error Handler to Producer.
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1038%3A+Add+Custom+Error+Handler+to+Producer
> >
> > I look forward to your feedback!
> >
> > Cheers,
> > Alieh
>
>


Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-04-17 Thread Omnia Ibrahim
Hi Alieh, 
Thanks for the KIP! I have couple of comments
- You mentioned in the KIP motivation, 
> Another example for which a production exception handler could be useful is 
> if a user tries to write into a non-existing topic, which returns a retryable 
> error code; with infinite retries, the producer would hang retrying forever. 
> A handler could help to break the infinite retry loop.

How the handler can differentiate between something that is temporary and it 
should keep retrying and something permanent like forgot to create the topic? 
temporary here could be 
 the producer get deployed before the topic creation finish (specially if the 
topic creation is handled via IaC)
 temporary offline partitions 
 leadership changing
Isn’t this putting the producer at risk of dropping records 
unintentionally?
- Can you elaborate more on what is written in the compatibility / migration 
plan section please by explaining in bit more details what is the changing 
behaviour and how this will impact client who are upgrading?
- In the proposal changes can you elaborate in the KIP where in the producer 
lifecycle will ClientExceptionHandler and TransactionExceptionHandler get 
triggered, and how will the producer configure them to point to costumed 
implementation.

Thanks 
Omnia

> On 17 Apr 2024, at 13:13, Alieh Saeedi  wrote:
> 
> Hi all,
> Here is the KIP-1038: Add Custom Error Handler to Producer.
> 
> I look forward to your feedback!
> 
> Cheers,
> Alieh



Re: [VOTE] KIP-1031: Control offset translation in MirrorSourceConnector

2024-04-17 Thread Omnia Ibrahim
Thanks Chris and Mickael for the votes. 
Can I please get one last +1 binding vote please?

Thanks
Omnia

> On 12 Apr 2024, at 13:21, Chris Egerton  wrote:
> 
> +1 (binding), thanks Omnia!
> 
> On Fri, Apr 12, 2024, 03:46 Mickael Maison  wrote:
> 
>> Hi Omnia,
>> 
>> +1 (binding), thanks for the KIP!
>> 
>> Mickael
>> 
>> On Fri, Apr 12, 2024 at 9:01 AM Omnia Ibrahim 
>> wrote:
>>> 
>>> Hi everyone, I would like to start a voting thread for KIP-1031: Control
>> offset translation in MirrorSourceConnector
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1031%3A+Control+offset+translation+in+MirrorSourceConnector
>>> 
>>> For comments or feedback please check the discussion thread here
>> https://lists.apache.org/thread/ym6zr0wrhglft5c000x9c8ych098s7h6
>>> 
>>> Thanks
>>> Omnia
>>> 
>> 



Re: [VOTE] KIP-899: Allow producer and consumer clients to rebootstrap

2024-04-17 Thread Omnia Ibrahim
Hi Ivan, 
Thanks for the KIP this is a very nice feature to have. 
+1(non-binding) 
Omnia
> On 15 Apr 2024, at 14:33, Andrew Schofield  wrote:
> 
> Thanks for the KIP
> 
> +1 (non-binding)
> 
> Andrew
> 
>> On 15 Apr 2024, at 14:16, Chris Egerton  wrote:
>> 
>> Hi Ivan,
>> 
>> Thanks for the KIP. After the recent changes, this LGTM. +1 (binding)
>> 
>> Cheers,
>> 
>> Chris
>> 
>> On Wed, Aug 2, 2023 at 12:15 AM Ivan Yurchenko 
>> wrote:
>> 
>>> Hello,
>>> 
>>> The discussion [1] for KIP-899 [2] has been open for quite some time. I'd
>>> like to put the KIP up for a vote.
>>> 
>>> Best,
>>> Ivan
>>> 
>>> [1] https://lists.apache.org/thread/m0ncbmfxs5m87sszby2jbmtjx2bdpcdl
>>> [2]
>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-899%3A+Allow+producer+and+consumer+clients+to+rebootstrap
>>> 
> 



Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-17 Thread Bruno Cadonna

Hi,

sorry, I am late to the party.

I have a couple of comments:

(1)
I would prefer Client* instead of Node* in the names. In Kafka Streams 
we do not really have the concept of node but we have the concept of 
client (admittedly, we sometimes also use instance). I would like to 
avoid introducing a new term to basically describe the Streams client.
I know that we already have a ClientState but that would be in a 
different package.


(2)
Did you consider to use Instant instead of long as return type of 
followupRebalanceDeadline()? Instant is a bit more flexible and readable 
as a plain long, IMO. BTW, you list followupRebalanceDeadline() twice in 
interface NodeAssignment.


(3)
Did you consider to use an enum instead of class AssignedTask? As far as 
I understand not all combinations are possible. A stateless standby task 
does not exist. An enum with values STATELESS, STATEFUL, STANDBY would 
be clearer. Or even better instead of two methods in AssignedTask that 
return a boolean you could have one method -- say type() -- that returns 
the enum.


(4)
Does the return type of assignment need to be a map from task ID to 
AssignedTask? Wouldn't it be enough to be a collection of AssignedTasks 
with AssignedTask containing the task ID?


(5)
I there a semantic difference between *State and *Metadata? I was 
wondering whether ApplicationMetadata could also be ApplicationState for 
the sake of consistency.


Best,
Bruno


On 4/5/24 11:18 PM, Sophie Blee-Goldman wrote:

Cool, looks good to me!

Seems like there is no further feedback, so maybe we can start to call for
a vote?

However, since as noted we are setting aside time to discuss this during
the sync next Thursday, we can also wait until after that meeting to
officially kick off the vote.

On Fri, Apr 5, 2024 at 12:19 PM Rohan Desai  wrote:


Thanks for the feedback Sophie!

re1: Totally agree. The fact that it's related to the partition assignor is
clear from just `task.assignor`. I'll update.
re3: This is a good point, and something I would find useful personally. I
think its worth adding an interface that lets the plugin observe the final
assignment. I'll add that.
re4: I like the new `NodeAssignment` type. I'll update the KIP with that.

On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai 
wrote:


Thanks for the feedback so far! I think pretty much all of it is
reasonable. I'll reply to it inline:


1. All the API logic is granular at the Task level, except the

previousOwnerForPartition func. I’m not clear what’s the motivation

behind

it, does our controller also want to change how the partitions->tasks
mapping is formed?
You're right that this is out of place. I've removed this method as it's
not needed by the task assignor.


2. Just on the API layering itself: it feels a bit weird to have the

three built-in functions (defaultStandbyTaskAssignment etc) sitting in

the

ApplicationMetadata class. If we consider them as some default util
functions, how about introducing moving those into their own static util
methods to separate from the ApplicationMetadata “fact objects” ?
Agreed. Updated in the latest revision of the kip. These have been moved
to TaskAssignorUtils


3. I personally prefer `NodeAssignment` to be a read-only object

containing the decisions made by the assignor, including the
requestFollowupRebalance flag. For manipulating the half-baked results
inside the assignor itself, maybe we can just be flexible to let users

use

whatever struts / their own classes even, if they like. WDYT?
Agreed. Updated in the latest version of the kip.


1. For the API, thoughts on changing the method signature to return a

(non-Optional) TaskAssignor? Then we can either have the default
implementation return new HighAvailabilityTaskAssignor or just have a
default implementation class that people can extend if they don't want to
implement every method.
Based on some other discussion, I actually decided to get rid of the
plugin interface, and instead use config to specify individual plugin
behaviour. So the method you're referring to is no longer part of the
proposal.


3. Speaking of ApplicationMetadata, the javadoc says it's read only but

theres methods that return void on it? It's not totally clear to me how
that interface is supposed to be used by the assignor. It'd be nice if we
could flip that interface such that it becomes part of the output instead
of an input to the plugin.
I've moved those methods to a util class. They're really utility methods
the assignor might want to call to do some default or optimized

assignment

for some cases like rack-awareness.


4. We should consider wrapping UUID in a ProcessID class so that we

control
the interface (there are a few places where UUID is directly used).
I like it. Updated the proposal.


5. What does NodeState#newAssignmentForNode() do? I thought the point

was
for the plugin to make the assignment? Is that the result of the default
logic?
It doesn't need to be part of the interface. I've removed 

Re: [PR] KAFKA-16467: Add how to integrate with kafka repo [kafka-site]

2024-04-17 Thread via GitHub


FrankYang0529 commented on code in PR #596:
URL: https://github.com/apache/kafka-site/pull/596#discussion_r1568848118


##
README.md:
##
@@ -10,4 +10,32 @@ You can run it with the following command, note that it 
requires docker:
 
 Then you can open [localhost:8080](http://localhost:8080) on your browser and 
browse the documentation.
 
-To kill the process, just type ctrl + c
\ No newline at end of file
+To kill the process, just type ctrl + c.
+
+## How to preview the latest documentation changes in Kafka repository?
+
+1. Generating document from kafka repository:
+
+```shell
+# change directory into kafka repository
+cd KAFKA_REPO
+./gradlew clean siteDocTar
+# supposing built with scala 2.13
+tar zxvf core/build/distributions/kafka_2.13-$(./gradlew properties | grep 
version: | awk '{print $NF}' | head -n 1)-site-docs.tgz
+```
+
+2. Copying the generated documents from Kafka repository into kafka-site, and 
preview them (note that it requires docker):
+
+```shell
+# change directory into kafka-site repository
+cd KAFKA_SITE_REPO
+# copy the generated documents into dev folder
+rm -rf dev
+mkdir dev
+# change directory into kafka repository
+cp -r KAFKA_REPO/site-docs/* dev

Review Comment:
   I remove `# change directory into kafka repository`. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-17 Thread Bruno Cadonna

Hi Nick and Sophie,

I think the task ID is not enough to create a state store that can read 
the offsets of non-assigned tasks for lag computation during 
rebalancing. The state store also needs the state directory so that it 
knows where to find the information that it needs to return from 
changelogOffsets().


In general, I think we should proceed with the plain .checkpoint file 
for now and iterate back to the state store solution later since it 
seems it is not that straightforward. Alternatively, Nick could timebox 
an effort to better understand what would be needed for the state store 
solution. Nick, let us know your decision.


Regarding your question about the state store instance. I am not too 
familiar with that part of the code, but I think the state store is 
build when the processor topology is build and the processor topology is 
build per stream task. So there is one instance of processor topology 
and state store per stream task. Try to follow the call in [1].


Best,
Bruno

[1] 
https://github.com/apache/kafka/blob/f52575b17225828d2ff11996030ab7304667deab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java#L153 





On 4/16/24 8:59 PM, Nick Telford wrote:

That does make sense. The one thing I can't figure out is how per-Task
StateStore instances are constructed.

It looks like we construct one StateStore instance for the whole Topology
(in InternalTopologyBuilder), and pass that into ProcessorStateManager (via
StateManagerUtil) for each Task, which then initializes it.

This can't be the case though, otherwise multiple partitions of the same
sub-topology (aka Tasks) would share the same StateStore instance, which
they don't.

What am I missing?

On Tue, 16 Apr 2024 at 16:22, Sophie Blee-Goldman 
wrote:


I don't think we need to *require* a constructor accept the TaskId, but we
would definitely make sure that the RocksDB state store changes its
constructor to one that accepts the TaskID (which we can do without
deprecation since its an internal API), and custom state stores can just
decide for themselves whether they want to opt-in/use the TaskId param
or not. I mean custom state stores would have to opt-in anyways by
implementing the new StoreSupplier#get(TaskId) API and the only
reason to do that would be to have created a constructor that accepts
a TaskId

Just to be super clear about the proposal, this is what I had in mind.
It's actually fairly simple and wouldn't add much to the scope of the
KIP (I think -- if it turns out to be more complicated than I'm assuming,
we should definitely do whatever has the smallest LOE to get this done

Anyways, the (only) public API changes would be to add this new
method to the StoreSupplier API:

default T get(final TaskId taskId) {
 return get();
}

We can decide whether or not to deprecate the old #get but it's not
really necessary and might cause a lot of turmoil, so I'd personally
say we just leave both APIs in place.

And that's it for public API changes! Internally, we would just adapt
each of the rocksdb StoreSupplier classes to implement this new
API. So for example with the RocksDBKeyValueBytesStoreSupplier,
we just add

@Override
public KeyValueStore get(final TaskId taskId) {
 return returnTimestampedStore ?
 new RocksDBTimestampedStore(name, metricsScope(), taskId) :
 new RocksDBStore(name, metricsScope(), taskId);
}

And of course add the TaskId parameter to each of the actual
state store constructors returned here.

Does that make sense? It's entirely possible I'm missing something
important here, but I think this would be a pretty small addition that
would solve the problem you mentioned earlier while also being
useful to anyone who uses custom state stores.

On Mon, Apr 15, 2024 at 10:21 AM Nick Telford 
wrote:


Hi Sophie,

Interesting idea! Although what would that mean for the StateStore
interface? Obviously we can't require that the constructor take the

TaskId.

Is it enough to add the parameter to the StoreSupplier?

Would doing this be in-scope for this KIP, or are we over-complicating

it?


Nick

On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman 
Somewhat minor point overall, but it actually drives me crazy that you
can't get access to the taskId of a StateStore until #init is called.

This

has caused me a huge headache personally (since the same is true for
processors and I was trying to do something that's probably too hacky

to

actually complain about here lol)

Can we just change the StateStoreSupplier to receive and pass along the
taskId when creating a new store? Presumably by adding a new version of

the

#get method that takes in a taskId parameter? We can have it default to
invoking the old one for compatibility reasons and it should be

completely

safe to tack on.

Would also prefer the same for a ProcessorSupplier, but that's

definitely

outside the scope of this KIP

On Fri, Apr 12, 2024 at 3:31 AM Nick Telford 
wrote:


On further thought, 

[DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-04-17 Thread Alieh Saeedi
Hi all,
Here is the KIP-1038: Add Custom Error Handler to Producer.

I look forward to your feedback!

Cheers,
Alieh


Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2024-04-17 Thread Nick Telford
Hi Walker,

Feel free to ask away, either on the mailing list of the Confluent
Community Slack, where I hang out :-)

The implementation is *mostly* complete, although it needs some polishing.
It's worth noting that KIP-1035 is a hard prerequisite for this.

Regards,
Nick


Re: [DISCUSS] KIP-1028: Docker Official Image for Apache Kafka

2024-04-17 Thread Krish Vora
Hi Manikumar,

Thanks for the comments.

Maybe as part of the release process, RM can create a JIRA for this
> task. This can be taken by RM or any comitter or any contributor (with
> some help from commiters to run "Docker Image Preparation via GitHub
> Actions:"

This sounds like a good idea. This step would be beneficial. By creating a
JIRA ticket, it will also serve as a reminder to complete the post-release
steps for the Docker official images. Have updated the KIP with this step.

Is this using GitHub Actions workflow? or manual testing?

This will be done by a Github Actions workflow, which will test the static
Docker Official Image assets for a specific release version.

Is it mandatory for RM/comitters to raise the PR to Docker Hub’s
> official images repository (or) can it be done by any contributor.

I believe that it can be done by any contributor (ref: This link

quotes "*Anyone can provide feedback, contribute code, suggest process
changes, or even propose a new Official Image.*")

Also I was thinking, once the KIP gets voted, we should try to release
> kafka:3.7.0 (or 3.7.1) Docker Official image. This will help us to
> validate the process and allow us to fix any changes suggested by
> Dockerhub before the 3.8.0 release.

This sounds like a great idea. This KIP proposes release of DOI as a
post-release process, which can be done anytime post release. Since 3.7.0
is already released, we can perform these steps for that release too. By
the time the KIP gets implemented, if 3.7.1 is released, we could do these
steps for 3.7.1, instead of 3.7.0. This would allow us to make changes to
the Dockerfiles and other assets based on feedback from Docker Hub before
the release of version 3.8.0.

Thanks,
Krish.

On Mon, Apr 15, 2024 at 12:59 PM Manikumar 
wrote:

> Hi Krish,
>
> Thanks for the updated KIP. a few comments below.
>
> > "These actions can be carried out by the RM or any contributor post the
> release process."
> Maybe as part of the release process, RM can create a JIRA for this
> task. This can be taken by RM or any comitter or any contributor (with
> some help from commiters to run "Docker Image Preparation via GitHub
> Actions:"
>
> > "Perform Docker build tests to ensure image integrity"
> Is this using GitHub Actions workflow? or manual testing?
>
> > "The RM will manually raise the final PR to Docker Hub’s official images
> repository using the contents of the generated file"
>  Is it mandatory for RM/comitters to raise the PR to Docker Hub’s
> official images repository (or) can it be done by any contributor.
>
> Also I was thinking, once the KIP gets voted, we should try to release
> kafka:3.7.0 (or 3.7.1) Docker Official image. This will help us to
> validate the process and allow us to fix any changes suggested by
> Dockerhub before the 3.8.0 release.
>
>
> Thanks,
>
> On Mon, Apr 8, 2024 at 2:33 PM Krish Vora  wrote:
> >
> > Hi Manikumar and Luke.
> > Thanks for the questions.
> >
> > 1. No, the Docker inventory files and configurations will not be the same
> > for Open Source Software (OSS) Images and Docker Official Images (DOI).
> >
> > For OSS images, the Dockerfile located in docker/jvm/dockerfile is
> > utilized. This process is integrated with the existing release pipeline
> as
> > outlined in KIP-975
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka#KIP975:DockerImageforApacheKafka-Status
> >,
> > where the Kafka URL is provided as a build argument. This method allows
> for
> > building, testing, and releasing OSS images dynamically. The OSS images
> > will continue to be released under the standard release process .
> >
> > In contrast, the release process for DOIs requires providing the Docker
> Hub
> > team with a specific directory for each version release that contains a
> > standalone Dockerfile. These Dockerfiles are designed to be
> > self-sufficient, hence require hardcoded values instead of relying on
> build
> > arguments. To accommodate this, in our proposed approach, a new directory
> > named docker_official_images has been created. This directory contains
> > version-specific directories, having Dockerfiles with hardcoded
> > configurations for each release, acting as the source of truth for DOI
> > releases. The hardcoded dockerfiles will be created using the
> > docker/jvm/dockerfile as a template. Thus, as part of post release we
> will
> > be creating a Dockerfile that will be reviewed by the Dockerhub community
> > and might need changes as per their review. This approach ensures that
> DOIs
> > are built consistently and meet the specific requirements set by Docker
> Hub.
> >
> > 2. Yes Manikumar, transitioning the release of Docker Official Images
> (DOI)
> > to a post-release activity does address the concerns about complicating
> the
> > release process. Initially, we considered incorporating DOI release
> > directly into Kafka's