[jira] [Resolved] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-04-09 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16310.

Resolution: Fixed

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.8.0
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16487) Support to define server properties by ClusterTestDefaults

2024-04-09 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16487.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Support to define server properties by ClusterTestDefaults
> --
>
> Key: KAFKA-16487
> URL: https://issues.apache.org/jira/browse/KAFKA-16487
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
> Fix For: 3.8.0
>
>
> Sometimes we want to define server properties for all test cases, and using 
> `BeforeEach` to modify the `ClusterConfig` is the only way. The side effect 
> is that the IDE does not like the style since IDE can't recognize custom 
> ParameterResolver of `ClusterConfig`.
> The alternative is that we can take `ClusterInstance` from constructor first, 
> and then we modify the inner `ClusterConfig` in `BeforeEach` phase. However, 
> that may confuse users about the life cycle of "configs".
> In short, I prefer to define the server property by `ClusterTestDefaults`. It 
> already includes some server-side default property, and we can enhance that 
> to deal with more existent test case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16502) Fix flaky EOSUncleanShutdownIntegrationTest#shouldWorkWithUncleanShutdownWipeOutStateStore

2024-04-09 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16502:
--

 Summary: Fix flaky 
EOSUncleanShutdownIntegrationTest#shouldWorkWithUncleanShutdownWipeOutStateStore
 Key: KAFKA-16502
 URL: https://issues.apache.org/jira/browse/KAFKA-16502
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai


org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
Expected ERROR state but driver is on RUNNING ==> expected:  but was: 

at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
at 
app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396)
at 
app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444)
at 
app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393)
at 
app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)
at 
app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:350)
at 
app//org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore(EOSUncleanShutdownIntegrationTest.java:169)
at 
java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
 Method)
at 
java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
at 
app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
at 
app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
at 
java.base@11.0.16.1/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16501) Fix flaky DelegationTokenEndToEndAuthorizationWithOwnerTest#testCreateUserWithDelegationToken

2024-04-09 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16501:
--

 Summary: Fix flaky 
DelegationTokenEndToEndAuthorizationWithOwnerTest#testCreateUserWithDelegationToken
 Key: KAFKA-16501
 URL: https://issues.apache.org/jira/browse/KAFKA-16501
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai


java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.SaslAuthenticationException: Authentication 
failed during authentication due to invalid credentials with SASL mechanism 
SCRAM-SHA-256
at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at 
kafka.api.DelegationTokenEndToEndAuthorizationTest.createDelegationTokens(DelegationTokenEndToEndAuthorizationTest.scala:165)
at 
kafka.api.DelegationTokenEndToEndAuthorizationTest.createDelegationTokens(DelegationTokenEndToEndAuthorizationTest.scala:157)
at 
kafka.api.DelegationTokenEndToEndAuthorizationTest.configureSecurityAfterServersStart(DelegationTokenEndToEndAuthorizationTest.scala:100)
at 
kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.configureSecurityAfterServersStart(DelegationTokenEndToEndAuthorizationWithOwnerTest.scala:77)
at 
kafka.integration.KafkaServerTestHarness.setUp(KafkaServerTestHarness.scala:125)
at 
kafka.api.IntegrationTestHarness.doSetup(IntegrationTestHarness.scala:139)
at 
kafka.api.IntegrationTestHarness.setUp(IntegrationTestHarness.scala:119)
at 
kafka.api.EndToEndAuthorizationTest.setUp(EndToEndAuthorizationTest.scala:167)
at 
kafka.api.DelegationTokenEndToEndAuthorizationTest.setUp(DelegationTokenEndToEndAuthorizationTest.scala:135)
at jdk.internal.reflect.GeneratedMethodAccessor30.invoke(Unknown Source)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptLifecycleMethod(TimeoutExtension.java:128)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptBeforeEachMethod(TimeoutExtension.java:78)
at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
at 
org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeMethodInExtensionContext(ClassBasedTestDescriptor.java:521)
at 
org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$synthesizeBeforeEachMethodAdapter$23(ClassBasedTestDescriptor.java:506)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeEachMethods$3(TestMethodTestDescriptor.java:175)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeMethodsOrCallbacksUntilExceptionOccurs$6(TestMethodTestDescriptor.java:203)
at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeMethodsOrCallbacksUntilExceptionOccurs(TestMethodTestDescriptor.java:203)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeEachMethods(TestMethodTestDescriptor.java:172)
at 
org.junit.jupiter.engine.descriptor.TestMet

[jira] [Created] (KAFKA-16500) Fix flaky DynamicBrokerReconfigurationTest#testTrustStoreAlter

2024-04-09 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16500:
--

 Summary: Fix flaky 
DynamicBrokerReconfigurationTest#testTrustStoreAlter
 Key: KAFKA-16500
 URL: https://issues.apache.org/jira/browse/KAFKA-16500
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai


java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.InvalidRequestException: Invalid value 
org.apache.kafka.common.config.ConfigException: Validation of dynamic config 
update of SSLFactory failed: javax.net.ssl.SSLHandshakeException: PKIX path 
validation failed: java.security.cert.CertPathValidatorException: signature 
check failed for configuration Invalid dynamic configuration
at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180)
at 
kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(DynamicBrokerReconfigurationTest.scala:514)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94)
at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218)
at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at 
org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
at 
org.junit.platform.engine.support.hierarchical.N

[jira] [Created] (KAFKA-16499) Fix flaky ClientMetricsManagerTest#testCacheEviction

2024-04-09 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16499:
--

 Summary: Fix flaky ClientMetricsManagerTest#testCacheEviction
 Key: KAFKA-16499
 URL: https://issues.apache.org/jira/browse/KAFKA-16499
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai


org.opentest4j.AssertionFailedError: execution timed out after 300 ms
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:152)
at 
app//org.junit.jupiter.api.AssertTimeoutPreemptively.createAssertionFailure(AssertTimeoutPreemptively.java:132)
at 
app//org.junit.jupiter.api.AssertTimeoutPreemptively.resolveFutureAndHandleException(AssertTimeoutPreemptively.java:116)
at 
app//org.junit.jupiter.api.AssertTimeoutPreemptively.assertTimeoutPreemptively(AssertTimeoutPreemptively.java:82)
at 
app//org.junit.jupiter.api.AssertTimeoutPreemptively.assertTimeoutPreemptively(AssertTimeoutPreemptively.java:65)
at 
app//org.junit.jupiter.api.AssertTimeoutPreemptively.assertTimeoutPreemptively(AssertTimeoutPreemptively.java:47)
at 
app//org.junit.jupiter.api.AssertTimeoutPreemptively.assertTimeoutPreemptively(AssertTimeoutPreemptively.java:43)
at 
app//org.junit.jupiter.api.Assertions.assertTimeoutPreemptively(Assertions.java:3428)
at 
app//org.apache.kafka.server.ClientMetricsManagerTest.testCacheEviction(ClientMetricsManagerTest.java:1126)
at 
java.base@21.0.2/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base@21.0.2/java.lang.reflect.Method.invoke(Method.java:580)
at 
app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
at 
app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218)
at 
app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
at 
app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
at 
app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at 
app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at 
app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at 
app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
app//org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at 
app//org.junit.platform.engine.support.hierarchi

[jira] [Created] (KAFKA-16498) fix flaky OffsetsApiIntegrationTest34s#testGetSinkConnectorOffsets

2024-04-09 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16498:
--

 Summary: fix flaky 
OffsetsApiIntegrationTest34s#testGetSinkConnectorOffsets
 Key: KAFKA-16498
 URL: https://issues.apache.org/jira/browse/KAFKA-16498
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai


org.opentest4j.AssertionFailedError: Condition not met within timeout 3. 
Sink connector consumer group offsets should catch up to the topic end offsets 
==> expected:  but was: 
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
at 
app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396)
at 
app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444)
at 
app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393)
at 
app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)
at 
app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:367)
at 
app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.verifyExpectedSinkConnectorOffsets(OffsetsApiIntegrationTest.java:989)
at 
app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:224)
at 
app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsets(OffsetsApiIntegrationTest.java:171)
at 
java.base@21.0.2/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base@21.0.2/java.lang.reflect.Method.invoke(Method.java:580)
at 
app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at app//org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at 
app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at 
app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at 
app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
at 
java.base@21.0.2/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base@21.0.2/java.lang.reflect.Method.invoke(Method.java:580)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch

[jira] [Created] (KAFKA-16497) Fix flaky SaslAuthenticatorFailureNoDelayTest16s#testInvalidPasswordSaslScram

2024-04-09 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16497:
--

 Summary: Fix flaky 
SaslAuthenticatorFailureNoDelayTest16s#testInvalidPasswordSaslScram
 Key: KAFKA-16497
 URL: https://issues.apache.org/jira/browse/KAFKA-16497
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai


org.opentest4j.AssertionFailedError: Condition not met within timeout 14999. 
Metric not updated failed-authentication-total expected:<1.0> but was:<31.0> 
==> expected:  but was: 
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
at 
app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396)
at 
app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444)
at 
app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393)
at 
app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)
at 
app//org.apache.kafka.common.network.NioEchoServer.waitForMetrics(NioEchoServer.java:210)
at 
app//org.apache.kafka.common.network.NioEchoServer.verifyAuthenticationMetrics(NioEchoServer.java:169)
at 
app//org.apache.kafka.common.security.authenticator.SaslAuthenticatorFailureDelayTest.testInvalidPasswordSaslScram(SaslAuthenticatorFailureDelayTest.java:121)
at 
java.base@21.0.2/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base@21.0.2/java.lang.reflect.Method.invoke(Method.java:580)
at 
app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
at 
app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218)
at 
app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
at 
app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
at 
app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at 
app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at 
app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at 
app//org.junit.platform

[jira] [Resolved] (KAFKA-15160) Message bytes duplication in Kafka headers when compression is enabled

2024-04-09 Thread Phuc Hong Tran (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phuc Hong Tran resolved KAFKA-15160.

Resolution: Won't Fix

> Message bytes duplication in Kafka headers when compression is enabled
> --
>
> Key: KAFKA-15160
> URL: https://issues.apache.org/jira/browse/KAFKA-15160
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, compression, consumer
>Affects Versions: 3.2.3, 3.3.2
>Reporter: Vikash Mishra
>Assignee: Phuc Hong Tran
>Priority: Critical
> Attachments: dump-compressed-data-.7z, java heap dump.png, 
> wireshark-min.png
>
>
> I created a spring Kafka consumer using @KafkaListener.
> During this, I encounter a scenario where when data is compressed ( any 
> compression snappy/gzip) and consumed by the consumer then I see that in a 
> heap dump, there is a " byte" occupying the same amount of memory as in 
> Message value.
> This behavior is seen only in cases when compressed data is consumed by 
> consumers not in the case of uncompressed data.
> Tried to capture Kafka's message through Wireshark, there it shows the proper 
> size of data incoming from Kafka server & no extra bytes in headers. So, this 
> is definitely something in Kafka client. Spring doesn't do any actions about 
> compression; the whole functionality is done internally in the Kafka client 
> library.
> Attached is the screenshot of the heap dump and Wireshark.
> This seems like a critical issue as message size in memory almost gets 
> doubles impacting consumer memory and performance. Somewhere it feels like 
> the actual message value is copied to headers?
> *To Reproduce*
>  # Produce compressed data on any topic.
>  # Create a simple consumer consuming from the above-created topic.
>  # Capture heap dump.
> *Expected behavior*
> Headers should not show bytes consuming memory equivalent to value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15558) Determine if Timer should be used elsewhere in PrototypeAsyncConsumer.updateFetchPositions()

2024-04-09 Thread Phuc Hong Tran (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phuc Hong Tran resolved KAFKA-15558.

Resolution: Fixed

> Determine if Timer should be used elsewhere in 
> PrototypeAsyncConsumer.updateFetchPositions()
> 
>
> Key: KAFKA-15558
> URL: https://issues.apache.org/jira/browse/KAFKA-15558
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor, fetcher, timeout
> Fix For: 3.8.0
>
>
> This is a followup ticket based on a question from [~junrao] when reviewing 
> the [fetch request manager pull 
> request|https://github.com/apache/kafka/pull/14406]:
> {quote}It still seems weird that we only use the timer for 
> {{{}refreshCommittedOffsetsIfNeeded{}}}, but not for other cases where we 
> don't have valid fetch positions. For example, if all partitions are in 
> {{AWAIT_VALIDATION}} state, it seems that {{PrototypeAsyncConsumer.poll()}} 
> will just go in a busy loop, which is not efficient.
> {quote}
> The goal here is to determine if we should also be propagating the Timer to 
> the validate positions and reset positions operations.
> Note: we should also investigate if the existing {{KafkaConsumer}} 
> implementation should be fixed, too.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1022 Formatting and Updating Features

2024-04-09 Thread Justine Olshan
I took a quick look at the code -- looks like the previous behavior was not
to set a top level error if one particular feature had an issue. We can do
that.

I think it could make some sense to specify errors on features that were
not valid and use the top level error to indicate that the request didn't
update any features. The handling now is to complete the futures with the
top level error anyway.

As for the validation criteria. It seems like one bit of code that
validates whether a version is allowed is in the method
`reasonNotSupported` which checks the range of features available for the
given feature.
For metadata.version we have a method to do "additional checks" and we
could have those for the various other features as well. I have an
(internal) FeatureVersion interface in mind that would work well for this.
For any of these validations, we return the same error
`INVALID_UPDATE_VERSION`. I would think continuing to use this error
follows naturally, but if we think it is necessary to specify the error
code, I can do so in my KIP.

Justine

On Tue, Apr 9, 2024 at 1:46 PM Justine Olshan  wrote:

> José,
>
> INVALID_UPDATE_VERSION was added as part of KIP-497. The KIP seems to be
> lacking some details on the error.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-497%3A+Add+inter-broker+API+to+alter+ISR
>
> https://github.com/apache/kafka/commit/57de67db22eb373f92ec5dd449d317ed2bc8b8d1
>
> The error seems to be used in the feature update path as well, though that
> was also not included in KIP-584. I wonder if we were missing necessary
> details for many KIPs in 2020...
>
> I'm not sure I fully understand the proposal. Is the question for the
> exact error to use in UpdatableFeatureResult.ErrorCode and what to write
> in  UpdatableFeatureResult.ErrorMessage? If so, those errors and adding a
> message (the dependency that was violated for example) makes sense.
> I agree that it makes sense that any errors in updates should be a top
> level error and not have a partial update.
>
> I thought these were part of KIP-584, but I will take a look and update
> this KIP if they are not.
>
> Justine
>
> On Tue, Apr 9, 2024 at 1:10 PM José Armando García Sancio
>  wrote:
>
>> Hi Justine,
>>
>> Thanks for the KIP. I see that the KIP doesn't make any updates to the
>> UpdateFeatures RPC. I was trying to understand how errors will be
>> communicated to the client.
>>
>> Are you planning to use the INVALID_UPDATE_VERSION error and overwrite
>> the ErrorMessage field for all of the validations you mentioned in the
>> KIP? I see that INVALID_UPDATE_VERSION is in the code for Apache Kafka
>> but I couldn't find the KIP that adds this error. It is not in KIP-584
>> or KIP-778. If you agree, do you think we should document this error
>> in this KIP?
>>
>> It is also not clear to me when the UpdateFeaturesResponse will return
>> an error per feature versus an error for the entire RPC. KIP-584
>> defines this relationship but it doesn't specify when exactly a top
>> level error will be returned versus when a feature level error will be
>> returned. I think that most users wouldn't want partial failures. They
>> instead would like to be guaranteed that all of the feature updates
>> succeeded or none did. Do you agree? Should we update the KIP to make
>> this clear?
>>
>> Thanks!
>> --
>> -José
>>
>


Re: [DISCUSS] KIP-932: Queues for Kafka

2024-04-09 Thread Jun Rao
Hi, Andrew,

Thanks for the reply. A few more comments.

41.
41.1 How does the partition leader obtain the group epoch to set
WriteShareGroupStateRequest.StateEpoch?
41.2 What's the benefit of having the group coordinator initialize the
state and the partition leader set the SPSO? It seems simpler to have the
partition leader initialize both the state and the SPSO together?

42.
42.1 "I don’t think the group epoch needs to be bumped when the share group
offset is altered."
But the part on handling Alter share group offsets says "The share
coordinator writes a ShareCheckpoint record with the new state epoch to the
__share_group_state  topic." So, which is correct? We have two paths to
update the state in the share coordinator, one from the group coordinator
and another from the partition leader. I thought the benefit of bumping up
the epoch is to fence off a late request in the previous epoch from another
path.
42.2 When the group coordinator alters the share group offset in share
coordinator, how does the partition leader know the share group state has
been altered so that it could clear its in-memory state?

47. 56. BaseOffset typically refers to the base offset for the batch and
can be confusing. FirstOffset is clearer and matches LastOffset.

60. Could we include FindCoordinatorRequest in the top level index for
Kafka protocol changes?

61. I think it's probably ok to add time-based expiration later. But using
a default group in console-share-consumer probably won't help reduce the
garbage. In the common case, the user of the console consumer likely wants
to see the recently produced records for verification. If the default group
doesn't provide that (because of the stale state), the user will likely
just use a new group. It's true that we don't garbage collect idle topics.
However,  the share groups are similar to consumers, which does support
garbage collection. Typically, we read topics more than creating them.

77. If the generic compaction is inconvenient, we could use customized
logic. If we go with that route, option (b) seems cleaner and more
optimized. Since the share states for all groups fit in memory, we could
generate snapshots more efficiently than going through compaction. Having a
separate log per share partition is probably too much overhead. It's more
efficient to put the state changes for multiple share partitions in a
single log.

100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs: Should we name it
SessionTimeoutMs?

101. ShareGroupHeartbeatResponse.Assignment.Error: What kind of error could
we have when assigning partitions? What are the corresponding error codes?

102. Do we still need
ShareGroupDescribeResponse.Members.Assignment.{MetadataVersion,MetadataBytes}?

103. Could we list the error codes separately for
ShareFetchResponse.Responses.Partitions.ErrorCode and
ShareFetchResponse.Responses.Partitions.AcknowledgeErrorCode?

104. Should we add error message for the errorCode in ShareFetchResponse,
ShareAcknowledgeResponse, ReadShareGroupStateResponse,
WriteShareGroupStateResponse, DeleteShareGroupStateResponse,
ReadShareGroupOffsetsStateResponse and InitializeShareGroupStateResponse?

105. "about": "The state - 0:Available,2:Acked,4:Archived.": What about 1
and 3? Are we leaving them out intentionally?

106. Do we have usage of metadata in OffsetAndMetadata? If not, could we
remove it from AdminClient and KafkaShareConsumer?

107. ListGroupsRequest: Should we bump up the version since it now supports
a new group type "share"?

108. AdminClient.listShareGroupOffsets: Should it expose all the states
from ReadShareGroupStateResponse, instead of just SPSO?

109. DeleteShareGroupOffsetsResult exposes
  public KafkaFuture partitionResult(final TopicPartition partition)
DeleteShareGroupsResult exposes
  public Map> deletedGroups()
Should we make them more consistent?

110. Should ShareGroupDescription include fields like GroupEpoch,
AssignmentEpoch, MemberEpoch, and SubscribedTopicNames?

111. Should GroupListing include GroupState?

112. Do we need ListShareGroupOffsetsSpec? Could we just use
Set directly?

113. ListShareGroupsResult.errors(): How do we know which group has an
error?

Jun

On Mon, Apr 8, 2024 at 9:32 AM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:

> Hi David,
> Thanks for your questions.
>
> 70. The Group Coordinator communicates with the Share Coordinator over
> RPCs.
> In the general case, it’s an inter-broker call. It is probably possible to
> optimise
> for the situation in which the appropriate GC and SC shards are
> co-located, but the
> KIP does not delve that deep into potential performance optimisations.
>
> 71. Avoiding collisions would be a good idea, but I do worry about
> retrospectively
> introducing a naming convention for groups. I feel that naming conventions
> will
> typically be the responsibility of the cluster administrators based on
> organizational
> factors, such as the name of an application.
>
> 72. Personally, I don’t like 

Re: [jira] [Created] (KAFKA-16496) Make the default of receive.buffer.bytes -1 instead of 5

2024-04-09 Thread Thomas Lane
unsubscribe

> On Apr 9, 2024, at 5:56 PM, Hechao Li (Jira)  wrote:
> 
> Hechao Li created KAFKA-16496:
> -
> 
> Summary: Make the default of receive.buffer.bytes -1 instead of 5
> Key: KAFKA-16496
> URL: https://issues.apache.org/jira/browse/KAFKA-16496
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Hechao Li
> 
> 
> Currently, kafka has a default config receive.buffer.bytes=64k, which 
> translates to SO_RCVBUF = 65536. As a result, the Linux kernel can't auto 
> tune the receive window. This will cripple performance in many scenarios, 
> especially for internet traffic.
> 
> In our environment, we have seen a timeout after [a kernel 
> patch|https://lore.kernel.org/netdev/20230717152917.751987-1-eduma...@google.com/T/]
>  that makes the initial receive window 25% instead of 50% of SO_RCVBUF, 
> together with the default request.timeout.ms=3, it causes an 
> application-level timeout.
> 
> See also https://lore.kernel.org/all/20240409164355.1721078-1-...@netflix.com/
> 
> 
> 
> --
> This message was sent by Atlassian Jira
> (v8.20.10#820010)



[jira] [Created] (KAFKA-16496) Make the default of receive.buffer.bytes -1 instead of 5

2024-04-09 Thread Hechao Li (Jira)
Hechao Li created KAFKA-16496:
-

 Summary: Make the default of receive.buffer.bytes -1 instead of 5
 Key: KAFKA-16496
 URL: https://issues.apache.org/jira/browse/KAFKA-16496
 Project: Kafka
  Issue Type: Improvement
Reporter: Hechao Li


Currently, kafka has a default config receive.buffer.bytes=64k, which 
translates to SO_RCVBUF = 65536. As a result, the Linux kernel can't auto tune 
the receive window. This will cripple performance in many scenarios, especially 
for internet traffic.

In our environment, we have seen a timeout after [a kernel 
patch|https://lore.kernel.org/netdev/20230717152917.751987-1-eduma...@google.com/T/]
 that makes the initial receive window 25% instead of 50% of SO_RCVBUF, 
together with the default request.timeout.ms=3, it causes an 
application-level timeout.

See also https://lore.kernel.org/all/20240409164355.1721078-1-...@netflix.com/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-899: Allow clients to rebootstrap

2024-04-09 Thread Andrew Schofield
Hi Ivan,
I think you have to go one way or the other with the cluster ID, so I think 
removing that from this KIP might
be the best. I think there’s another KIP waiting to be written for ensuring 
consistency of clusters, but
I think that wouldn’t conflict at all with this one.

Thanks,
Andrew

> On 9 Apr 2024, at 19:11, Ivan Yurchenko  wrote:
>
> Hi Andrew and all,
>
> I looked deeper into the code [1] and it seems the Metadata class is OK with 
> cluster ID changing. So I'm thinking that the rebootstrapping shouldn't 
> introduce a new failure mode here. And I should remove the mention of this 
> cluster ID checks from the KIP.
>
> Best,
> Ivan
>
> [1] 
> https://github.com/apache/kafka/blob/ff90f78c700c582f9800013faad827c36b45ceb7/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L355
>
> On Tue, Apr 9, 2024, at 09:28, Andrew Schofield wrote:
>> Hi Ivan,
>> Thanks for the KIP. I can see situations in which this would be helpful. I 
>> have one question.
>>
>> The KIP says the client checks the cluster ID when it re-bootstraps and that 
>> it will fail if the
>> cluster ID doesn’t match the previously known one. How does it fail? Which 
>> exception does
>> it throw and when?
>>
>> In a similar vein, now that we are checking cluster IDs, I wonder if it 
>> could be extended to
>> cover all situations in which there are cluster ID mismatches, such as the 
>> bootstrap server
>> list erroneously pointing at brokers from different clusters and the problem 
>> only being
>> detectable later on.
>>
>> Thanks,
>> Andrew
>>
>>> On 8 Apr 2024, at 18:24, Ivan Yurchenko  wrote:
>>>
>>> Hello!
>>>
>>> I changed the KIP a bit, specifying that the certain benefit goes to 
>>> consumers not participating in a group, but that other clients can benefit 
>>> as well in certain situations.
>>>
>>> You can see the changes in the history [1]
>>>
>>> Thank you!
>>>
>>> Ivan
>>>
>>> [1] 
>>> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=240881396&originalVersion=10&revisedVersion=11
>>>
>>> On 2023/07/15 16:37:52 Ivan Yurchenko wrote:
 Hello!

 I've made several changes to the KIP based on the comments:

 1. Reduced the scope to producer and consumer clients only.
 2. Added more details to the description of the rebootstrap process.
 3. Documented the role of low values of reconnect.backoff.max.ms in
 preventing rebootstrapping.
 4. Some wording changes.

 You can see the changes in the history [1]

 I'm planning to put the KIP to a vote in some days if there are no new
 comments.

 Thank you!

 Ivan

 [1]
 https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=240881396&selectedPageVersions=9&selectedPageVersions=5

 On Tue, 30 May 2023 at 08:23, Ivan Yurchenko 
 wrote:

> Hi Chris and all,
>
>> I believe the logic you've linked is only applicable for the producer and
>> consumer clients; the admin client does something different (see [1]).
>
> I see, thank you for the pointer. It seems the admin client is fairly
> different from the producer and consumer. Probably it makes sense to 
> reduce
> the scope of the KIP to the producer and consumer clients only.
>
>> it'd be nice to have a definition of when re-bootstrapping
>> would occur that doesn't rely on internal implementation details. What
>> user-visible phenomena can we identify that would lead to a
>> re-bootstrapping?
>
> Let's put it this way: "Re-bootstrapping means that the client forgets
> about nodes it knows about and falls back on the bootstrap nodes as if it
> had just been initialized. Re-bootstrapping happens when, during a 
> metadata
> update (which may be scheduled by `metadata.max.age.ms` or caused by
> certain error responses like NOT_LEADER_OR_FOLLOWER, 
> REPLICA_NOT_AVAILABLE,
> etc.), the client doesn't have a node with an established connection or
> establishable connection."
> Does this sound good?
>
>> I also believe that if someone has "
>> reconnect.backoff.max.ms" set to a low-enough value,
>> NetworkClient::leastLoadedNode may never return null. In that case,
>> shouldn't we still attempt a re-bootstrap at some point (if the user has
>> enabled this feature)?
>
> Yes, you're right. Particularly `canConnect` here [1] can always be
> returning `true` if `reconnect.backoff.max.ms` is low enough.
> It seems pretty difficult to find a good criteria when re-bootstrapping
> should be forced in this case, so it'd be difficult to configure and 
> reason
> about. I think it's worth mentioning in the KIP and later in the
> documentation, but we should not try to do anything special here.
>
>> Would it make sense to re-bootstrap only after "
>> metadata.max.age.ms" has elapsed since the last metadata update, and
> when
>> at

Re: [DISCUSS] KIP-899: Allow clients to rebootstrap

2024-04-09 Thread Andrew Schofield
Hi Ivan,
I think you have to go one way or the other with the cluster ID, so I think 
removing that from this KIP might
be the best. I think there’s another KIP waiting to be written for ensuring 
consistency of clusters, but
I think that wouldn’t conflict at all with this one.

Thanks,
Andrew

> On 9 Apr 2024, at 19:11, Ivan Yurchenko  wrote:
>
> Hi Andrew and all,
>
> I looked deeper into the code [1] and it seems the Metadata class is OK with 
> cluster ID changing. So I'm thinking that the rebootstrapping shouldn't 
> introduce a new failure mode here. And I should remove the mention of this 
> cluster ID checks from the KIP.
>
> Best,
> Ivan
>
> [1] 
> https://github.com/apache/kafka/blob/ff90f78c700c582f9800013faad827c36b45ceb7/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L355
>
> On Tue, Apr 9, 2024, at 09:28, Andrew Schofield wrote:
>> Hi Ivan,
>> Thanks for the KIP. I can see situations in which this would be helpful. I 
>> have one question.
>>
>> The KIP says the client checks the cluster ID when it re-bootstraps and that 
>> it will fail if the
>> cluster ID doesn’t match the previously known one. How does it fail? Which 
>> exception does
>> it throw and when?
>>
>> In a similar vein, now that we are checking cluster IDs, I wonder if it 
>> could be extended to
>> cover all situations in which there are cluster ID mismatches, such as the 
>> bootstrap server
>> list erroneously pointing at brokers from different clusters and the problem 
>> only being
>> detectable later on.
>>
>> Thanks,
>> Andrew
>>
>>> On 8 Apr 2024, at 18:24, Ivan Yurchenko  wrote:
>>>
>>> Hello!
>>>
>>> I changed the KIP a bit, specifying that the certain benefit goes to 
>>> consumers not participating in a group, but that other clients can benefit 
>>> as well in certain situations.
>>>
>>> You can see the changes in the history [1]
>>>
>>> Thank you!
>>>
>>> Ivan
>>>
>>> [1] 
>>> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=240881396&originalVersion=10&revisedVersion=11
>>>
>>> On 2023/07/15 16:37:52 Ivan Yurchenko wrote:
 Hello!

 I've made several changes to the KIP based on the comments:

 1. Reduced the scope to producer and consumer clients only.
 2. Added more details to the description of the rebootstrap process.
 3. Documented the role of low values of reconnect.backoff.max.ms in
 preventing rebootstrapping.
 4. Some wording changes.

 You can see the changes in the history [1]

 I'm planning to put the KIP to a vote in some days if there are no new
 comments.

 Thank you!

 Ivan

 [1]
 https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=240881396&selectedPageVersions=9&selectedPageVersions=5

 On Tue, 30 May 2023 at 08:23, Ivan Yurchenko 
 wrote:

> Hi Chris and all,
>
>> I believe the logic you've linked is only applicable for the producer and
>> consumer clients; the admin client does something different (see [1]).
>
> I see, thank you for the pointer. It seems the admin client is fairly
> different from the producer and consumer. Probably it makes sense to 
> reduce
> the scope of the KIP to the producer and consumer clients only.
>
>> it'd be nice to have a definition of when re-bootstrapping
>> would occur that doesn't rely on internal implementation details. What
>> user-visible phenomena can we identify that would lead to a
>> re-bootstrapping?
>
> Let's put it this way: "Re-bootstrapping means that the client forgets
> about nodes it knows about and falls back on the bootstrap nodes as if it
> had just been initialized. Re-bootstrapping happens when, during a 
> metadata
> update (which may be scheduled by `metadata.max.age.ms` or caused by
> certain error responses like NOT_LEADER_OR_FOLLOWER, 
> REPLICA_NOT_AVAILABLE,
> etc.), the client doesn't have a node with an established connection or
> establishable connection."
> Does this sound good?
>
>> I also believe that if someone has "
>> reconnect.backoff.max.ms" set to a low-enough value,
>> NetworkClient::leastLoadedNode may never return null. In that case,
>> shouldn't we still attempt a re-bootstrap at some point (if the user has
>> enabled this feature)?
>
> Yes, you're right. Particularly `canConnect` here [1] can always be
> returning `true` if `reconnect.backoff.max.ms` is low enough.
> It seems pretty difficult to find a good criteria when re-bootstrapping
> should be forced in this case, so it'd be difficult to configure and 
> reason
> about. I think it's worth mentioning in the KIP and later in the
> documentation, but we should not try to do anything special here.
>
>> Would it make sense to re-bootstrap only after "
>> metadata.max.age.ms" has elapsed since the last metadata update, and
> when
>> at

Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2799

2024-04-09 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-1022 Formatting and Updating Features

2024-04-09 Thread Justine Olshan
José,

INVALID_UPDATE_VERSION was added as part of KIP-497. The KIP seems to be
lacking some details on the error.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-497%3A+Add+inter-broker+API+to+alter+ISR
https://github.com/apache/kafka/commit/57de67db22eb373f92ec5dd449d317ed2bc8b8d1

The error seems to be used in the feature update path as well, though that
was also not included in KIP-584. I wonder if we were missing necessary
details for many KIPs in 2020...

I'm not sure I fully understand the proposal. Is the question for the exact
error to use in UpdatableFeatureResult.ErrorCode and what to write in
UpdatableFeatureResult.ErrorMessage? If so, those errors and adding a
message (the dependency that was violated for example) makes sense.
I agree that it makes sense that any errors in updates should be a top
level error and not have a partial update.

I thought these were part of KIP-584, but I will take a look and update
this KIP if they are not.

Justine

On Tue, Apr 9, 2024 at 1:10 PM José Armando García Sancio
 wrote:

> Hi Justine,
>
> Thanks for the KIP. I see that the KIP doesn't make any updates to the
> UpdateFeatures RPC. I was trying to understand how errors will be
> communicated to the client.
>
> Are you planning to use the INVALID_UPDATE_VERSION error and overwrite
> the ErrorMessage field for all of the validations you mentioned in the
> KIP? I see that INVALID_UPDATE_VERSION is in the code for Apache Kafka
> but I couldn't find the KIP that adds this error. It is not in KIP-584
> or KIP-778. If you agree, do you think we should document this error
> in this KIP?
>
> It is also not clear to me when the UpdateFeaturesResponse will return
> an error per feature versus an error for the entire RPC. KIP-584
> defines this relationship but it doesn't specify when exactly a top
> level error will be returned versus when a feature level error will be
> returned. I think that most users wouldn't want partial failures. They
> instead would like to be guaranteed that all of the feature updates
> succeeded or none did. Do you agree? Should we update the KIP to make
> this clear?
>
> Thanks!
> --
> -José
>


Re: [DISCUSS] KIP-1022 Formatting and Updating Features

2024-04-09 Thread José Armando García Sancio
Hi Justine,

Thanks for the KIP. I see that the KIP doesn't make any updates to the
UpdateFeatures RPC. I was trying to understand how errors will be
communicated to the client.

Are you planning to use the INVALID_UPDATE_VERSION error and overwrite
the ErrorMessage field for all of the validations you mentioned in the
KIP? I see that INVALID_UPDATE_VERSION is in the code for Apache Kafka
but I couldn't find the KIP that adds this error. It is not in KIP-584
or KIP-778. If you agree, do you think we should document this error
in this KIP?

It is also not clear to me when the UpdateFeaturesResponse will return
an error per feature versus an error for the entire RPC. KIP-584
defines this relationship but it doesn't specify when exactly a top
level error will be returned versus when a feature level error will be
returned. I think that most users wouldn't want partial failures. They
instead would like to be guaranteed that all of the feature updates
succeeded or none did. Do you agree? Should we update the KIP to make
this clear?

Thanks!
-- 
-José


Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-04-09 Thread Walker Carlson
Hey all,

(1) no I hadn't considered just naming the methods differently. I actually
really like this idea and am for it. Except we need 3 different methods
now. One for no processor, one for a processor that should restore and one
that reprocesses. How about `addCustomGlobalStore` and
`addIsomorphicGlobalStore` and then just `addGlobalStateStore` for the no
processor case? If everyone likes that I can add that to the KIP and rename
the methods.

(2) we can have the the built in case use StoreBuilder and manually check for the TimestampedKeyValueStore. That is
fine with me.

Bruno I hope that was what you were intending.

(3) For the scala api, do we need to make it match the java api or are we
just making the minimum changes? as if we take point 1 I don't know how
much we need to change.

Thanks,
Walker


On Tue, Apr 2, 2024 at 8:38 AM Matthias J. Sax  wrote:

> One more thing:
>
> I was just looking into the WIP PR, and it seems we will also need to
> change `StreamsBuilder.scala`. The KIP needs to cover this changes as well.
>
>
> -Matthias
>
> On 4/1/24 10:33 PM, Bruno Cadonna wrote:
> > Hi Walker and Matthias,
> >
> > (2)
> > That is exactly my point about having a compile time error versus a
> > runtime error. The added flexibility as proposed by Matthias sounds good
> > to me.
> >
> > Regarding the Named parameter, I was not aware that the processor that
> > writes records to the global state store is named according to the name
> > passed in by Consumed. I thought Consumed strictly specifies the names
> > of source processors. So I am fine with not having an overload with a
> > Named parameter.
> >
> > Best,
> > Bruno
> >
> > On 3/31/24 11:30 AM, Matthias J. Sax wrote:
> >> Two more follow up thoughts:
> >>
> >> (1) I am still not a big fan of the boolean parameter we introduce.
> >> Did you consider to use different method names, like
> >> `addReadOnlyGlobalStore()` (for the optimized method, that would not
> >> reprocess data on restore), and maybe add `addModifiableGlobalStore()`
> >> (not a good name, but we cannot re-use existing `addGlobalStore()` --
> >> maybe somebody else has a good idea about a better `addXxxGlobalStore`
> >> that would describe it well).
> >>
> >> (2) I was thinking about Bruno's comment to limit the scope the store
> >> builder for the optimized case. I think we should actually do
> >> something about it, because in the end, the runtime (ie, the
> >> `Processor` we hard wire) would need to pick a store it supports and
> >> cast to the corresponding store? If the cast fails, we hit a runtime
> >> exception, but by putting the store we cast to into the signature we
> >> can actually convert it into a compile time error what seems better.
> >> -- If we want, we could make it somewhat flexible and support both
> >> `KeyValueStore` and `TimestampedKeyValueStore` -- ie, the signature
> >> would be `KeyValueStore` but we explicitly check if the builder gives
> >> us a `TimestampedKeyValueStore` instance and use it properly.
> >>
> >> If putting the signature does not work for some reason, we should at
> >> least clearly call it out in the JavaDocs what store type is expected.
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 3/28/24 5:05 PM, Walker Carlson wrote:
> >>> Hey all,
> >>>
> >>> Thanks for the feedback Bruno, Almog and Matthias!
> >>>
> >>> Almog: I like the idea, but I agree with Matthais. I actually looked at
> >>> that ticket a bit when doing this and found that while similar they are
> >>> actually pretty unrelated codewise. I would love to see it get taken
> >>> care
> >>> of.
> >>>
> >>> Bruno and Matthias: The Named parameter doesn't really make sense to
> >>> me to
> >>> put it here. The store in the Store builder is already named through
> >>> what
> >>> Matthais described and the processor doesn't actually have a name. That
> >>> would be the processor node that gets named via the Named parameter
> (in
> >>> the DSL) and the internal streams builder uses the consumed object to
> >>> make
> >>> a source name. I think we should just keep the Consumed object and used
> >>> that for the processor node name.
> >>>
> >>> As for the limitation of the store builder interface I don't think it
> is
> >>> necessary. It could be something we add later if we really want to.
> >>>
> >>> Anyways I think we are getting close enough to consensus that I'm
> >>> going to
> >>> open a vote and hopefully we can get it voted on soon!
> >>>
> >>> best,
> >>> Walker
> >>>
> >>> On Thu, Mar 28, 2024 at 5:55 AM Matthias J. Sax 
> >>> wrote:
> >>>
>  Hey,
> 
>  looking into the API, I am wondering why we would need to add an
>  overload talking a `Named` parameter?
> 
>  StreamsBuilder.addGlobalStore() (and .addGlobalTable()) already takes
> a
>  `Consumed` parameter that allows to set a name.
> 
> 
> > 2.
> > I do not understand what you mean with "maximum flexibility". The
>  built-in processor needs to assume a given state store 

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-09 Thread Loic Greffier
Hi Bruno and Bill,

To complete the Damien's purposes about the point 3.

Processing errors are caught and handled by the ProcessingErrorHandler, at the 
precise moment when records are processed by processor nodes. The handling will 
be performed in the "process" method of the ProcessorNode, such as:

public void process(final Record record) {
...

try {
...
} catch (final ClassCastException e) {
...
} catch (Exception e) {
ProcessingExceptionHandler.ProcessingHandlerResponse response = 
this.processingExceptionHandler
.handle(internalProcessorContext, (Record) 
record, e);

if (response == 
ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
throw new StreamsException("Processing exception handler is set 
to fail upon" +
" a processing error. If you would rather have the 
streaming pipeline" +
" continue after a processing error, please set the " +
DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " 
appropriately.",
e);
}
}
}
As you can see, the record is transmitted to the ProcessingExceptionHandler as 
a Record, as we are dealing with the input record of the 
processor at this point. It can be any type, including non-serializable types, 
as suggested by the Damien's example. As the ProcessingErrorHandler is not 
intended to perform any serialization, there should be no issue for the users 
to handle a Record.

I follow Damien on the other points.

For point 6, underlying public interfaces are renamed as well:
- The ProcessingHandlerResponse
- The 
ProcessingLogAndContinueExceptionHandler/ProcessingLogAndFailExceptionHandler
- The configuration DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG 
(default.processing.exception.handler)

Regards,

Loïc

De : Damien Gasparina 
Envoyé : mardi 9 avril 2024 20:08
À : dev@kafka.apache.org
Objet : Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler 
for exceptions occuring during processing

Warning External sender Do not click on any links or open any attachments 
unless you trust the sender and know the content is safe.

Hi Bruno, Bill,

First of all, thanks a lot for all your useful comments.

> 1. and 2.
> I am wondering whether we should expose the processor node ID -- which
> basically is the processor node name -- in the ProcessingContext
> interface. I think the processor node ID fits well in the
> ProcessingContext interface since it already contains application ID and
> task ID and it would make the API for the handler cleaner.

That's a good point, the actual ProcessorContextImpl is already holding the
current node in an attribute (currentNode), thus exposing the node ID should
not be a problem. Let me sleep on it and get back to you regarding this
point.

> 3.
> Could you elaborate -- maybe with an example -- when a record is in a
> state in which it cannot be serialized? This is not completely clear to
me.

The Record passed to the handler is the input record to the processor. In
the Kafka Streams API, it could be any POJO.
e.g. with the following topology `
streamsBuilder.stream("x")
.map((k, v) -> new KeyValue("foo", Pair.of("hello",
"world")))
.forEach((k, v) -> throw new RuntimeException())
I would expect the handler to receive a Record>.

> 4.
> Regarding the metrics, it is not entirely clear to me what the metric
> measures. Is it the number of calls to the process handler or is it the
> number of calls to process handler that returned FAIL?
> If it is the former, I was also wondering whether it would be better to
> put the task-level metrics to INFO reporting level and remove the
> thread-level metric, similar to the dropped-records metric. You can
> always roll-up the metrics to the thread level in your preferred
> monitoring system. Or do you think we end up with to many metrics?

We were thinking of the former, measuring the number of calls to the
process handler. That's a good point, having the information at the task
level could be beneficial. I updated the KIP to change the metric level
and to clarify the wording.

> 5.
> What do you think about naming the handler ProcessingExceptionHandler
> instead of ProcessExceptionHandler?
> The DeserializationExceptionHanlder and the ProductionExceptionHandler
> also use the noun of the action in their name and not the verb.

Good catch, I updated the KIP to rename it ProcessingExceptionHandler.

> 6.
> What record is exactly passed to the handler?
> Is it the input record to the task? Is it the input record to the
> processor node? Is it the input record to the processor?

The input record of the processor. I assume that is the most user
friendly record in this context.

> 7.
> Could you please add the packages of the Java classes/interfaces/enums
> you want to add?

Done, without any surprises: package org.apache.kafka.streams.errors;


Th

Re: [DISCUSS] KIP-899: Allow clients to rebootstrap

2024-04-09 Thread Ivan Yurchenko
Hi Andrew and all,

I looked deeper into the code [1] and it seems the Metadata class is OK with 
cluster ID changing. So I'm thinking that the rebootstrapping shouldn't 
introduce a new failure mode here. And I should remove the mention of this 
cluster ID checks from the KIP.

Best,
Ivan

[1] 
https://github.com/apache/kafka/blob/ff90f78c700c582f9800013faad827c36b45ceb7/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L355

On Tue, Apr 9, 2024, at 09:28, Andrew Schofield wrote:
> Hi Ivan,
> Thanks for the KIP. I can see situations in which this would be helpful. I 
> have one question.
> 
> The KIP says the client checks the cluster ID when it re-bootstraps and that 
> it will fail if the
> cluster ID doesn’t match the previously known one. How does it fail? Which 
> exception does
> it throw and when?
> 
> In a similar vein, now that we are checking cluster IDs, I wonder if it could 
> be extended to
> cover all situations in which there are cluster ID mismatches, such as the 
> bootstrap server
> list erroneously pointing at brokers from different clusters and the problem 
> only being
> detectable later on.
> 
> Thanks,
> Andrew
> 
> > On 8 Apr 2024, at 18:24, Ivan Yurchenko  wrote:
> > 
> > Hello!
> > 
> > I changed the KIP a bit, specifying that the certain benefit goes to 
> > consumers not participating in a group, but that other clients can benefit 
> > as well in certain situations.
> > 
> > You can see the changes in the history [1]
> > 
> > Thank you!
> > 
> > Ivan
> > 
> > [1] 
> > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=240881396&originalVersion=10&revisedVersion=11
> > 
> > On 2023/07/15 16:37:52 Ivan Yurchenko wrote:
> >> Hello!
> >> 
> >> I've made several changes to the KIP based on the comments:
> >> 
> >> 1. Reduced the scope to producer and consumer clients only.
> >> 2. Added more details to the description of the rebootstrap process.
> >> 3. Documented the role of low values of reconnect.backoff.max.ms in
> >> preventing rebootstrapping.
> >> 4. Some wording changes.
> >> 
> >> You can see the changes in the history [1]
> >> 
> >> I'm planning to put the KIP to a vote in some days if there are no new
> >> comments.
> >> 
> >> Thank you!
> >> 
> >> Ivan
> >> 
> >> [1]
> >> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=240881396&selectedPageVersions=9&selectedPageVersions=5
> >> 
> >> On Tue, 30 May 2023 at 08:23, Ivan Yurchenko 
> >> wrote:
> >> 
> >>> Hi Chris and all,
> >>> 
>  I believe the logic you've linked is only applicable for the producer and
>  consumer clients; the admin client does something different (see [1]).
> >>> 
> >>> I see, thank you for the pointer. It seems the admin client is fairly
> >>> different from the producer and consumer. Probably it makes sense to 
> >>> reduce
> >>> the scope of the KIP to the producer and consumer clients only.
> >>> 
>  it'd be nice to have a definition of when re-bootstrapping
>  would occur that doesn't rely on internal implementation details. What
>  user-visible phenomena can we identify that would lead to a
>  re-bootstrapping?
> >>> 
> >>> Let's put it this way: "Re-bootstrapping means that the client forgets
> >>> about nodes it knows about and falls back on the bootstrap nodes as if it
> >>> had just been initialized. Re-bootstrapping happens when, during a 
> >>> metadata
> >>> update (which may be scheduled by `metadata.max.age.ms` or caused by
> >>> certain error responses like NOT_LEADER_OR_FOLLOWER, 
> >>> REPLICA_NOT_AVAILABLE,
> >>> etc.), the client doesn't have a node with an established connection or
> >>> establishable connection."
> >>> Does this sound good?
> >>> 
>  I also believe that if someone has "
>  reconnect.backoff.max.ms" set to a low-enough value,
>  NetworkClient::leastLoadedNode may never return null. In that case,
>  shouldn't we still attempt a re-bootstrap at some point (if the user has
>  enabled this feature)?
> >>> 
> >>> Yes, you're right. Particularly `canConnect` here [1] can always be
> >>> returning `true` if `reconnect.backoff.max.ms` is low enough.
> >>> It seems pretty difficult to find a good criteria when re-bootstrapping
> >>> should be forced in this case, so it'd be difficult to configure and 
> >>> reason
> >>> about. I think it's worth mentioning in the KIP and later in the
> >>> documentation, but we should not try to do anything special here.
> >>> 
>  Would it make sense to re-bootstrap only after "
>  metadata.max.age.ms" has elapsed since the last metadata update, and
> >>> when
>  at least one request has been made to contact each known server and been
>  met with failure?
> >>> 
> >>> The first condition is satisfied by the check in the beginning of
> >>> `maybeUpdate` [2].
> >>> It seems to me, the second one is also satisfied by `leastLoadedNode`.
> >>> Admittedly, it's more relaxed than you propose: it tracks unavailability 
> >>> of

Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-09 Thread Damien Gasparina
Hi Bruno, Bill,

First of all, thanks a lot for all your useful comments.

> 1. and 2.
> I am wondering whether we should expose the processor node ID -- which
> basically is the processor node name -- in the ProcessingContext
> interface. I think the processor node ID fits well in the
> ProcessingContext interface since it already contains application ID and
> task ID and it would make the API for the handler cleaner.

That's a good point, the actual ProcessorContextImpl is already holding the
current node in an attribute (currentNode), thus exposing the node ID should
not be a problem. Let me sleep on it and get back to you regarding this
point.

> 3.
> Could you elaborate -- maybe with an example -- when a record is in a
> state in which it cannot be serialized? This is not completely clear to
me.

The Record passed to the handler is the input record to the processor. In
the Kafka Streams API, it could be any POJO.
e.g. with the following topology `
streamsBuilder.stream("x")
.map((k, v) -> new KeyValue("foo", Pair.of("hello",
"world")))
.forEach((k, v) -> throw new RuntimeException())
I would expect the handler to receive a Record>.

> 4.
> Regarding the metrics, it is not entirely clear to me what the metric
> measures. Is it the number of calls to the process handler or is it the
> number of calls to process handler that returned FAIL?
> If it is the former, I was also wondering whether it would be better to
> put the task-level metrics to INFO reporting level and remove the
> thread-level metric, similar to the dropped-records metric. You can
> always roll-up the metrics to the thread level in your preferred
> monitoring system. Or do you think we end up with to many metrics?

We were thinking of the former, measuring the number of calls to the
process handler. That's a good point, having the information at the task
 level could be beneficial. I updated the KIP to change the metric level
and to clarify the wording.

> 5.
> What do you think about naming the handler ProcessingExceptionHandler
> instead of ProcessExceptionHandler?
> The DeserializationExceptionHanlder and the ProductionExceptionHandler
> also use the noun of the action in their name and not the verb.

Good catch, I updated the KIP to rename it ProcessingExceptionHandler.

> 6.
> What record is exactly passed to the handler?
> Is it the input record to the task? Is it the input record to the
> processor node? Is it the input record to the processor?

The input record of the processor. I assume that is the most user
friendly record in this context.

> 7.
> Could you please add the packages of the Java classes/interfaces/enums
> you want to add?

Done, without any surprises: package org.apache.kafka.streams.errors;


Thanks a lot for your reviews! Cheers,
Damien


On Tue, 9 Apr 2024 at 18:04, Bill Bejeck  wrote:

> Hi Damien, Sebastien and Loic,
>
> Thanks for the KIP, this is a much-needed addition.
> I like the approach of getting the plumbing in for handling processor
> errors, allowing users to implement more complex solutions as needed.
>
> Overall how where the KIP Is now LGTM, modulo outstanding comments.  I
> think adding the example you included in this thread to the KIP is a great
> idea.
>
> Regarding the metrics, I'm thinking along the same lines as Bruno.  I'm
> wondering if we can make do with a task-level metric at the INFO level and
> the processor metric at DEBUG.  IMHO, when it comes to tracking exceptions
> in processing, these two areas are where users will want to focus, higher
> level metrics wouldn't be as useful in this case.
>
> Thanks,
> Bill
>
> On Tue, Apr 9, 2024 at 6:54 AM Bruno Cadonna  wrote:
>
> > Hi again,
> >
> > I have additional questions/comments.
> >
> > 6.
> > What record is exactly passed to the handler?
> > Is it the input record to the task? Is it the input record to the
> > processor node? Is it the input record to the processor?
> >
> >
> > 7.
> > Could you please add the packages of the Java classes/interfaces/enums
> > you want to add?
> >
> >
> > Best,
> > Bruno
> >
> >
> > On 4/9/24 10:17 AM, Bruno Cadonna wrote:
> > > Hi Loïc, Damien, and Sébastien,
> > >
> > > Thanks for the KIP!
> > > I find it really great that you contribute back to Kafka Streams
> > > concepts you developed for kstreamplify so that everybody can take
> > > advantage from your improvements.
> > >
> > > I have a couple of questions/comments:
> > >
> > > 1. and 2.
> > > I am wondering whether we should expose the processor node ID -- which
> > > basically is the processor node name -- in the ProcessingContext
> > > interface. I think the processor node ID fits well in the
> > > ProcessingContext interface since it already contains application ID
> and
> > > task ID and it would make the API for the handler cleaner.
> > >
> > >
> > > 3.
> > > Could you elaborate -- maybe with an example -- when a record is in a
> > > state in which it cannot be serialized? This is not c

[jira] [Created] (KAFKA-16495) Fix flaky TransactionsWithTieredStoreTest#testCommitTransactionTimeout

2024-04-09 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16495:
--

 Summary: Fix flaky 
TransactionsWithTieredStoreTest#testCommitTransactionTimeout
 Key: KAFKA-16495
 URL: https://issues.apache.org/jira/browse/KAFKA-16495
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai


org.apache.kafka.common.errors.TimeoutException: Timeout expired after 3000ms 
while awaiting InitProducerId



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16494) Fix flaky PlaintextConsumerFetchTest#testLowMaxFetchSizeForRequestAndPartition

2024-04-09 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16494:
--

 Summary: Fix flaky 
PlaintextConsumerFetchTest#testLowMaxFetchSizeForRequestAndPartition
 Key: KAFKA-16494
 URL: https://issues.apache.org/jira/browse/KAFKA-16494
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai


org.opentest4j.AssertionFailedError: Timed out while awaiting expected 
assignment HashSet(topic1-21, topic2-17, topic3-18, topic3-14, topic2-12, 
topic3-10, topic1-26, topic1-18, topic3-23, topic1-8, topic2-27, topic2-9, 
topic3-13, topic1-29, topic2-19, topic1-27, topic2-15, topic3-0, topic3-4, 
topic3-24, topic3-16, topic2-22, topic2-3, topic3-12, topic1-2, topic3-20, 
topic3-25, topic1-14, topic1-6, topic1-15, topic2-0, topic2-16, topic2-24, 
topic3-1, topic2-10, topic3-6, topic2-5, topic1-4, topic3-17, topic3-22, 
topic1-20, topic3-7, topic2-29, topic3-11, topic1-28, topic2-21, topic1-9, 
topic2-20, topic2-7, topic3-2, topic3-5, topic3-21, topic2-1, topic2-6, 
topic1-0, topic1-3, topic1-19, topic1-11, topic3-26, topic2-4, topic3-28, 
topic1-23, topic3-8, topic1-16, topic1-13, topic2-28, topic2-14, topic3-29, 
topic1-10, topic1-5, topic2-11, topic2-25, topic1-24, topic3-9, topic1-1, 
topic2-8, topic1-22, topic1-7, topic3-27, topic3-15, topic2-23, topic3-19, 
topic2-13, topic1-25, topic1-17, topic2-18, topic2-2, topic1-12, topic3-3, 
topic2-26). The current assignment is []
at 
app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:138)
at 
app//kafka.api.AbstractConsumerTest.awaitAssignment(AbstractConsumerTest.scala:87)
at 
app//kafka.api.PlaintextConsumerFetchTest.testLowMaxFetchSizeForRequestAndPartition(PlaintextConsumerFetchTest.scala:219)
at 
java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
 Method)
at 
java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
at 
app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
at 
app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at 
app//org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45)
at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218)
at 
app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
at 
app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
at 
app//org.junit.platform.engine.su

[jira] [Created] (KAFKA-16493) Avoid unneeded subscription regex check if metadata version unchanged

2024-04-09 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16493:
--

 Summary: Avoid unneeded subscription regex check if metadata 
version unchanged
 Key: KAFKA-16493
 URL: https://issues.apache.org/jira/browse/KAFKA-16493
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans


When using pattern subscription (java pattern), the new consumer regularly 
checks if the list of topics that match the regex has changed. This is done as 
part of the consumer poll loop, and it evaluates the regex using the latest 
cluster metadata. As an improvement, we should avoid evaluating the regex if 
the metadata version hasn't changed (similar to what the legacy coordinator 
does 
[here|https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L435C10-L435C41])




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13907) Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable

2024-04-09 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-13907.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable
> --
>
> Key: KAFKA-13907
> URL: https://issues.apache.org/jira/browse/KAFKA-13907
> Project: Kafka
>  Issue Type: Bug
>Reporter: Deng Ziming
>Assignee: Igor Soarez
>Priority: Major
>  Labels: newbie
> Fix For: 3.8.0
>
>
> ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable will hang 
> up waiting for controlled shutdown, there may be some bug related to it.
> since this bug can be reproduced locally, it won't be hard to investigated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1014: Managing Unstable Metadata Versions in Apache Kafka

2024-04-09 Thread Justine Olshan
Looks like this KIP has gone quiet for a bit, but here with an update
(maybe this will revitalize the conversation too :) )

For KIP-1022 we want to introduce more features and will have a config `
unstable.version.enable`
I know that this KIP proposed a config `unstable.metadata.version.enable` so
just want to figure out how these will interact.

I would propose just replacing the metadata specific one, but I know this
config is actually already in code and being used, so this makes it a bit
more tricky. An alternative approach is to have both configs. The metadata
config would just control metadata, and the general one all features.

What do folks think,

Justine

On Mon, Jan 29, 2024 at 1:42 PM Jun Rao  wrote:

> Hi, Colin,
>
> Thanks for the reply.
>
> "If Y changed the same file as X, we can just edit the file so that Y's
> change come first."
>
> It seems the effort is more than just editing the file. One has to change
> all the logic around the old IV. Also, what happens to the client? A client
> may have implemented a version for a request corresponding to an unstable
> feature. When the unstable feature is re-ordered, do all clients (including
> those non-java ones) need to change the PRC implementation for the existing
> underlying requests?
>
> Jun
>
> On Mon, Jan 29, 2024 at 9:57 AM Colin McCabe  wrote:
>
> > On Tue, Jan 23, 2024, at 11:21, Jun Rao wrote:
> > > Hi, Proven,
> > >
> > > Thanks for the KIP.
> > >
> > > I am not sure about the reordering approach proposed in the KIP. Let's
> > say
> > > in a release we have features X and Y, depending on MV IV1 and IV2,
> > > respectively. At the release time, feature Y is ready, but X is not. I
> > > guess the proposal is to move IV1 to a new MV IV3?
> >
> > Hi Jun,
> >
> > In your example, if X is not ready, it should be moved into an unstable
> > MV. Then if Y is ready, it can be moved into a stable MV and we can
> advance
> > the last stable MV.
> >
> > > The issue is that IV2
> > > could have made changes on top of IV1. For example, IV2 could have
> > evolved
> > > the schema of the same inter broker request as IV1. In that case, what
> > does
> > > IV3 represent? We can't simply take the changes associated with IV1
> since
> > > it could have conflicts with IV2.
> >
> > If Y changed the same file as X, we can just edit the file so that Y's
> > change come first.
> >
> > Nobody using a stable MV should be affected, since they will be
> generating
> > records on an older (stable) version.
> >
> > > Even when there are no conflicts, I am not sure if the approach
> supports
> > > the trunk deployment model. Let's say we move two unstable MV IV3 and
> IV4
> > > to IV7 and IV8. If someone deployed code corresponding to IV4 and later
> > > upgraded the code to IV7, he/she (1) wouldn't be able to set IV4 in the
> > > upgrade code since it no longer exists and (2) would be surprised that
> > the
> > > request protocol that worked under IV4 doesn't work now.
> > >
> >
> > Upgrades are not supported if you are running an unstable MV. It's
> > intended only for testing.
> >
> > Colin
> >
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Fri, Jan 19, 2024 at 2:13 PM Artem Livshits
> > >  wrote:
> > >
> > >> Hi Colin,
> > >>
> > >> >  I think feature flags are somewhat orthogonal to the stable /
> > unstable
> > >> discussion
> > >>
> > >> I think feature flags can be used as an alternative to achieve similar
> > >> results as stable / unstable functionality.  As well as long-lived
> > feature
> > >> branches.  In my experience, I've seen feature flags to be more
> > successful
> > >> than feature branches for changes of existing functionality.  I also
> > think
> > >> that stable / unstable MV would work better than feature branches. I
> > just
> > >> wanted to mention it for completeness, not sure if we should start a
> > full
> > >> fledged discussion on these topics.
> > >>
> > >> > I'm struggling a bit with your phrasing. Are you suggesting that
> > unstable
> > >> MVs would not be able to be in trunk?
> > >>
> > >> Unstable MV should be in trunk.  The wording is related to when we can
> > >> promote "unstable" to "stable".
> > >>
> > >> -Artem
> > >>
> > >>
> > >> On Mon, Jan 15, 2024 at 10:03 PM Colin McCabe 
> > wrote:
> > >>
> > >> > On Fri, Jan 12, 2024, at 11:32, Artem Livshits wrote:
> > >> > > I think using feature flags (whether we support a framework and
> > tooling
> > >> > for
> > >> > > feature flags or just an ad-hoc XyzEnabled flag) can be an
> > alternative
> > >> to
> > >> > > this KIP.  I think the value of this KIP is that it's trying to
> > >> propose a
> > >> > > systemic approach for gating functionality that may take multiple
> > >> > releases
> > >> > > to develop.  A problem with ad-hoc feature flags is that it's
> useful
> > >> > during
> > >> > > feature development, so that people who are working on this
> feature
> > (or
> > >> > are
> > >> > > interested in beta-testing the feature) can get early access
> > (without
> > 

Re: [DISCUSS] KIP-1022 Formatting and Updating Features

2024-04-09 Thread Justine Olshan
Hi Jun,

Makes sense to me. It seems like KIP-1014 has been inactive recently. I can
update my KIP and mention this change on that discussion thread.

Justine

On Tue, Apr 9, 2024 at 9:01 AM Jun Rao  wrote:

> Hi, Justine,
>
> A single config makes sense to me too. We just need to reach consensus with
> KIP-1014.
>
> Thanks,
>
> Jun
>
> On Mon, Apr 8, 2024 at 5:06 PM Justine Olshan  >
> wrote:
>
> > Hey Jun,
> >
> > That's a good question. I think maybe for simplicity, we can have a
> single
> > config?
> > If that makes sense, I will update the KIP.
> >
> > Justine
> >
> > On Mon, Apr 8, 2024 at 3:20 PM Jun Rao  wrote:
> >
> > > Hi, Justine,
> > >
> > > Thanks for the updated KIP.
> > >
> > > One more question related to KIP-1014. It introduced a new
> > > config unstable.metadata.versions.enable. Does each new feature need to
> > > have a corresponding config to enable the testing of unstable features
> or
> > > should we have a generic config enabling the testing of all unstable
> > > features?
> > >
> > > Jun
> > >
> > > On Thu, Apr 4, 2024 at 8:24 PM Justine Olshan
> >  > > >
> > > wrote:
> > >
> > > > I'm hoping this covers the majority of comments. I will go ahead and
> > open
> > > > the vote in the next day or so.
> > > >
> > > > Thanks,
> > > > Justine
> > > >
> > > > On Wed, Apr 3, 2024 at 3:31 PM Justine Olshan 
> > > > wrote:
> > > >
> > > > > Find and replace has failed me :(
> > > > >
> > > > > Group version seems a little vague, but we can update it. Hopefully
> > > find
> > > > > and replace won't fail me again, otherwise I will get another email
> > on
> > > > this.
> > > > >
> > > > > Justine
> > > > >
> > > > > On Wed, Apr 3, 2024 at 12:15 PM David Jacot
> > >  > > > >
> > > > > wrote:
> > > > >
> > > > >> Thanks, Justine.
> > > > >>
> > > > >> * Should we also use `group.version` (GV) as I suggested in my
> > > previous
> > > > >> message in order to be consistent?
> > > > >> * Should we add both names to the `Public Interfaces` section?
> > > > >> * There is still at least one usage of
> > `transaction.protocol.verison`
> > > in
> > > > >> the KIP too.
> > > > >>
> > > > >> Best,
> > > > >> David
> > > > >>
> > > > >> On Wed, Apr 3, 2024 at 6:29 PM Justine Olshan
> > > > >> 
> > > > >> wrote:
> > > > >>
> > > > >> > I had missed the David's message yesterday about the naming for
> > > > >> transaction
> > > > >> > version vs transaction protocol version.
> > > > >> >
> > > > >> > After some offline discussion with Jun, Artem, and David, we
> > agreed
> > > > that
> > > > >> > transaction version is simpler and conveys more than just
> protocol
> > > > >> changes
> > > > >> > (flexible records for example)
> > > > >> >
> > > > >> > I will update the KIP as well as KIP-890
> > > > >> >
> > > > >> > Thanks,
> > > > >> > Justine
> > > > >> >
> > > > >> > On Tue, Apr 2, 2024 at 2:50 PM Justine Olshan <
> > jols...@confluent.io
> > > >
> > > > >> > wrote:
> > > > >> >
> > > > >> > > Updated!
> > > > >> > >
> > > > >> > > Justine
> > > > >> > >
> > > > >> > > On Tue, Apr 2, 2024 at 2:40 PM Jun Rao
>  > >
> > > > >> wrote:
> > > > >> > >
> > > > >> > >> Hi, Justine,
> > > > >> > >>
> > > > >> > >> Thanks for the reply.
> > > > >> > >>
> > > > >> > >> 21. Sounds good. It would be useful to document that.
> > > > >> > >>
> > > > >> > >> 22. Should we add the IV in "metadata.version=17 has no
> > > > dependencies"
> > > > >> > too?
> > > > >> > >>
> > > > >> > >> Jun
> > > > >> > >>
> > > > >> > >>
> > > > >> > >> On Tue, Apr 2, 2024 at 11:31 AM Justine Olshan
> > > > >> > >> 
> > > > >> > >> wrote:
> > > > >> > >>
> > > > >> > >> > Jun,
> > > > >> > >> >
> > > > >> > >> > 21. Next producer ID field doesn't need to be populated for
> > TV
> > > 1.
> > > > >> We
> > > > >> > >> don't
> > > > >> > >> > have the same need to retain this since it is written
> > directly
> > > to
> > > > >> the
> > > > >> > >> > transaction log in InitProducerId. It is only needed for
> > > KIP-890
> > > > >> part
> > > > >> > 2
> > > > >> > >> /
> > > > >> > >> > TV 2.
> > > > >> > >> >
> > > > >> > >> > 22. We can do that.
> > > > >> > >> >
> > > > >> > >> > Justine
> > > > >> > >> >
> > > > >> > >> > On Tue, Apr 2, 2024 at 10:41 AM Jun Rao
> > >  > > > >
> > > > >> > >> wrote:
> > > > >> > >> >
> > > > >> > >> > > Hi, Justine,
> > > > >> > >> > >
> > > > >> > >> > > Thanks for the reply.
> > > > >> > >> > >
> > > > >> > >> > > 21. What about the new NextProducerId field? Will that be
> > > > >> populated
> > > > >> > >> with
> > > > >> > >> > TV
> > > > >> > >> > > 1?
> > > > >> > >> > >
> > > > >> > >> > > 22. In the dependencies output, should we show both IV
> and
> > > > level
> > > > >> for
> > > > >> > >> > > metadata.version too?
> > > > >> > >> > >
> > > > >> > >> > > Jun
> > > > >> > >> > >
> > > > >> > >> > > On Mon, Apr 1, 2024 at 4:43 PM Justine Olshan
> > > > >> > >> >  > > > >> > >> > > >
> > > > >> > >> > > wrote:
> > > > >> > >> > >
> > > > >> > >> > > > Hi Jun,
> > > > >> > >> > > >
> > > > >> > >

[jira] [Resolved] (KAFKA-16485) Fix broker metrics to follow kebab/hyphen case

2024-04-09 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-16485.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

Merged the PR to trunk.

> Fix broker metrics to follow kebab/hyphen case
> --
>
> Key: KAFKA-16485
> URL: https://issues.apache.org/jira/browse/KAFKA-16485
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-09 Thread Bill Bejeck
Hi Damien, Sebastien and Loic,

Thanks for the KIP, this is a much-needed addition.
I like the approach of getting the plumbing in for handling processor
errors, allowing users to implement more complex solutions as needed.

Overall how where the KIP Is now LGTM, modulo outstanding comments.  I
think adding the example you included in this thread to the KIP is a great
idea.

Regarding the metrics, I'm thinking along the same lines as Bruno.  I'm
wondering if we can make do with a task-level metric at the INFO level and
the processor metric at DEBUG.  IMHO, when it comes to tracking exceptions
in processing, these two areas are where users will want to focus, higher
level metrics wouldn't be as useful in this case.

Thanks,
Bill

On Tue, Apr 9, 2024 at 6:54 AM Bruno Cadonna  wrote:

> Hi again,
>
> I have additional questions/comments.
>
> 6.
> What record is exactly passed to the handler?
> Is it the input record to the task? Is it the input record to the
> processor node? Is it the input record to the processor?
>
>
> 7.
> Could you please add the packages of the Java classes/interfaces/enums
> you want to add?
>
>
> Best,
> Bruno
>
>
> On 4/9/24 10:17 AM, Bruno Cadonna wrote:
> > Hi Loïc, Damien, and Sébastien,
> >
> > Thanks for the KIP!
> > I find it really great that you contribute back to Kafka Streams
> > concepts you developed for kstreamplify so that everybody can take
> > advantage from your improvements.
> >
> > I have a couple of questions/comments:
> >
> > 1. and 2.
> > I am wondering whether we should expose the processor node ID -- which
> > basically is the processor node name -- in the ProcessingContext
> > interface. I think the processor node ID fits well in the
> > ProcessingContext interface since it already contains application ID and
> > task ID and it would make the API for the handler cleaner.
> >
> >
> > 3.
> > Could you elaborate -- maybe with an example -- when a record is in a
> > state in which it cannot be serialized? This is not completely clear to
> me.
> >
> >
> > 4.
> > Regarding the metrics, it is not entirely clear to me what the metric
> > measures. Is it the number of calls to the process handler or is it the
> > number of calls to process handler that returned FAIL?
> > If it is the former, I was also wondering whether it would be better to
> > put the task-level metrics to INFO reporting level and remove the
> > thread-level metric, similar to the dropped-records metric. You can
> > always roll-up the metrics to the thread level in your preferred
> > monitoring system. Or do you think we end up with to many metrics?
> >
> >
> > 5.
> > What do you think about naming the handler ProcessingExceptionHandler
> > instead of ProcessExceptionHandler?
> > The DeserializationExceptionHanlder and the ProductionExceptionHandler
> > also use the noun of the action in their name and not the verb.
> >
> >
> > Best,
> > Bruno
> >
> >
> > On 4/8/24 3:48 PM, Sebastien Viale wrote:
> >> Thanks for your review!
> >>
> >>   All the points make sense for us!
> >>
> >>
> >>
> >> We updated the KIP for points 1 and 4.
> >>
> >>
> >>
> >> 2/ We followed the DeserializationExceptionHandler interface
> >> signature, it was not on our mind that the record be forwarded with
> >> the ProcessorContext.
> >>
> >> The ProcessingContext is sufficient, we do expect that most people
> >> would need to access the RecordMetadata.
> >>
> >>
> >>
> >> 3/ The use of Record is required, as the error could
> >> occurred in the middle of a processor where records could be non
> >> serializable objects
> >>
> >>   As it is a global error catching, the user may need little
> >> information about the faulty record.
> >>
> >>   Assuming that users want to make some specific treatments to the
> >> record, they can add a try / catch block in the topology.
> >>
> >>   It is up to users to cast record value and key in the implementation
> >> of the ProcessorExceptionHandler.
> >>
> >>
> >>
> >> Cheers
> >>
> >> Loïc, Damien and Sébastien
> >>
> >> 
> >> De : Sophie Blee-Goldman 
> >> Envoyé : samedi 6 avril 2024 01:08
> >> À : dev@kafka.apache.org 
> >> Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception
> >> handler for exceptions occuring during processing
> >>
> >> Warning External sender Do not click on any links or open any
> >> attachments unless you trust the sender and know the content is safe.
> >>
> >> Hi Damien,
> >>
> >> First off thanks for the KIP, this is definitely a much needed
> >> feature. On
> >> the
> >> whole it seems pretty straightforward and I am in favor of the proposal.
> >> Just
> >> a few questions and suggestions here and there:
> >>
> >> 1. One of the #handle method's parameters is "ProcessorNode node", but
> >> ProcessorNode is an internal class (and would expose a lot of internals
> >> that we probably don't want to pass in to an exception handler). Would
> it
> >> be sufficient to just make this a String and pass in the processor name

Re: [DISCUSS] KIP-1022 Formatting and Updating Features

2024-04-09 Thread Jun Rao
Hi, Justine,

A single config makes sense to me too. We just need to reach consensus with
KIP-1014.

Thanks,

Jun

On Mon, Apr 8, 2024 at 5:06 PM Justine Olshan 
wrote:

> Hey Jun,
>
> That's a good question. I think maybe for simplicity, we can have a single
> config?
> If that makes sense, I will update the KIP.
>
> Justine
>
> On Mon, Apr 8, 2024 at 3:20 PM Jun Rao  wrote:
>
> > Hi, Justine,
> >
> > Thanks for the updated KIP.
> >
> > One more question related to KIP-1014. It introduced a new
> > config unstable.metadata.versions.enable. Does each new feature need to
> > have a corresponding config to enable the testing of unstable features or
> > should we have a generic config enabling the testing of all unstable
> > features?
> >
> > Jun
> >
> > On Thu, Apr 4, 2024 at 8:24 PM Justine Olshan
>  > >
> > wrote:
> >
> > > I'm hoping this covers the majority of comments. I will go ahead and
> open
> > > the vote in the next day or so.
> > >
> > > Thanks,
> > > Justine
> > >
> > > On Wed, Apr 3, 2024 at 3:31 PM Justine Olshan 
> > > wrote:
> > >
> > > > Find and replace has failed me :(
> > > >
> > > > Group version seems a little vague, but we can update it. Hopefully
> > find
> > > > and replace won't fail me again, otherwise I will get another email
> on
> > > this.
> > > >
> > > > Justine
> > > >
> > > > On Wed, Apr 3, 2024 at 12:15 PM David Jacot
> >  > > >
> > > > wrote:
> > > >
> > > >> Thanks, Justine.
> > > >>
> > > >> * Should we also use `group.version` (GV) as I suggested in my
> > previous
> > > >> message in order to be consistent?
> > > >> * Should we add both names to the `Public Interfaces` section?
> > > >> * There is still at least one usage of
> `transaction.protocol.verison`
> > in
> > > >> the KIP too.
> > > >>
> > > >> Best,
> > > >> David
> > > >>
> > > >> On Wed, Apr 3, 2024 at 6:29 PM Justine Olshan
> > > >> 
> > > >> wrote:
> > > >>
> > > >> > I had missed the David's message yesterday about the naming for
> > > >> transaction
> > > >> > version vs transaction protocol version.
> > > >> >
> > > >> > After some offline discussion with Jun, Artem, and David, we
> agreed
> > > that
> > > >> > transaction version is simpler and conveys more than just protocol
> > > >> changes
> > > >> > (flexible records for example)
> > > >> >
> > > >> > I will update the KIP as well as KIP-890
> > > >> >
> > > >> > Thanks,
> > > >> > Justine
> > > >> >
> > > >> > On Tue, Apr 2, 2024 at 2:50 PM Justine Olshan <
> jols...@confluent.io
> > >
> > > >> > wrote:
> > > >> >
> > > >> > > Updated!
> > > >> > >
> > > >> > > Justine
> > > >> > >
> > > >> > > On Tue, Apr 2, 2024 at 2:40 PM Jun Rao  >
> > > >> wrote:
> > > >> > >
> > > >> > >> Hi, Justine,
> > > >> > >>
> > > >> > >> Thanks for the reply.
> > > >> > >>
> > > >> > >> 21. Sounds good. It would be useful to document that.
> > > >> > >>
> > > >> > >> 22. Should we add the IV in "metadata.version=17 has no
> > > dependencies"
> > > >> > too?
> > > >> > >>
> > > >> > >> Jun
> > > >> > >>
> > > >> > >>
> > > >> > >> On Tue, Apr 2, 2024 at 11:31 AM Justine Olshan
> > > >> > >> 
> > > >> > >> wrote:
> > > >> > >>
> > > >> > >> > Jun,
> > > >> > >> >
> > > >> > >> > 21. Next producer ID field doesn't need to be populated for
> TV
> > 1.
> > > >> We
> > > >> > >> don't
> > > >> > >> > have the same need to retain this since it is written
> directly
> > to
> > > >> the
> > > >> > >> > transaction log in InitProducerId. It is only needed for
> > KIP-890
> > > >> part
> > > >> > 2
> > > >> > >> /
> > > >> > >> > TV 2.
> > > >> > >> >
> > > >> > >> > 22. We can do that.
> > > >> > >> >
> > > >> > >> > Justine
> > > >> > >> >
> > > >> > >> > On Tue, Apr 2, 2024 at 10:41 AM Jun Rao
> >  > > >
> > > >> > >> wrote:
> > > >> > >> >
> > > >> > >> > > Hi, Justine,
> > > >> > >> > >
> > > >> > >> > > Thanks for the reply.
> > > >> > >> > >
> > > >> > >> > > 21. What about the new NextProducerId field? Will that be
> > > >> populated
> > > >> > >> with
> > > >> > >> > TV
> > > >> > >> > > 1?
> > > >> > >> > >
> > > >> > >> > > 22. In the dependencies output, should we show both IV and
> > > level
> > > >> for
> > > >> > >> > > metadata.version too?
> > > >> > >> > >
> > > >> > >> > > Jun
> > > >> > >> > >
> > > >> > >> > > On Mon, Apr 1, 2024 at 4:43 PM Justine Olshan
> > > >> > >> >  > > >> > >> > > >
> > > >> > >> > > wrote:
> > > >> > >> > >
> > > >> > >> > > > Hi Jun,
> > > >> > >> > > >
> > > >> > >> > > > 20. I can update the KIP.
> > > >> > >> > > >
> > > >> > >> > > > 21. This is used to complete some of the work with
> KIP-360.
> > > (We
> > > >> > use
> > > >> > >> > > > previous producer ID there, but never persisted it which
> > was
> > > in
> > > >> > the
> > > >> > >> KIP
> > > >> > >> > > >
> > > >> > >> > >
> > > >> > >> >
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89068820
> > > >> > >> )
> > > >> > >> > > > The KIP also mentions including previous epoch but we
> > > >> explained in
>

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-09 Thread Bruno Cadonna

Hi Nick,

Thanks for breaking out the KIP from KIP-892!

Here a couple of comments/questions:

1.
In Kafka Streams, we have a design guideline which says to not use the 
"get"-prefix for getters on the public API. Could you please change 
getCommittedOffsets() to committedOffsets()?



2.
It is not clear to me how TaskManager#getTaskOffsetSums() should read 
offsets of tasks the stream thread does not own but that have a state 
directory on the Streams client by calling 
StateStore#getCommittedOffsets(). If the thread does not own a task it 
does also not create any state stores for the task, which means there is 
no state store on which to call getCommittedOffsets().
I would have rather expected that a checkpoint file is written for all 
state stores on close -- not only for the RocksDBStore -- and that this 
checkpoint file is read in TaskManager#getTaskOffsetSums() for the tasks 
that have a state directory on the client but are not currently assigned 
to any stream thread of the Streams client.



3.
In the javadocs for commit() you write

"... all writes since the last commit(Map), or since init(StateStore) 
*MUST* be available to readers, even after a restart."


This is only true for a clean close before the restart, isn't it?
If the task fails with a dirty close, Kafka Streams cannot guarantee 
that the in-memory structures of the state store (e.g. memtable in the 
case of RocksDB) are flushed so that the records and the committed 
offsets are persisted.



4.
The wrapper that provides the legacy checkpointing behavior is actually 
an implementation detail. I would remove it from the KIP, but still 
state that the legacy checkpointing behavior will be supported when the 
state store does not manage the checkpoints.



5.
Regarding the metrics, could you please add the tags, and the recording 
level (DEBUG or INFO) as done in KIP-607 or KIP-444.



Best,
Bruno

On 4/7/24 5:35 PM, Nick Telford wrote:

Hi everyone,

Based on some offline discussion, I've split out the "Atomic Checkpointing"
section from KIP-892: Transactional Semantics for StateStores, into its own
KIP

KIP-1035: StateStore managed changelog offsets
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets

While KIP-892 was adopted *with* the changes outlined in KIP-1035, these
changes were always the most contentious part, and continued to spur
discussion even after KIP-892 was adopted.

All the changes introduced in KIP-1035 have been removed from KIP-892, and
a hard dependency on KIP-1035 has been added to KIP-892 in their place.

I'm hopeful that with some more focus on this set of changes, we can
deliver something that we're all happy with.

Regards,
Nick



Re: [DISCUSS] KIP-950: Tiered Storage Disablement

2024-04-09 Thread Doğuşcan Namal
+1 let's not introduce a new api and mark it immediately as deprecated :)

On your second comment Luke, one thing we need to clarify is when do we
consider remote storage to be DISABLED for a topic?
Particularly, what is the state when the remote storage is being deleted in
case of disablement.policy=delete? Is it DISABLING or DISABLED?

If we move directly to the DISABLED state,

a) in case of failures, the leaders should continue remote storage deletion
even if the topic is moved to the DISABLED state, otherwise we risk having
stray data on remote storage.
b) on each restart, we should initiate the remote storage deletion because
although we replayed a record with a DISABLED state, we can not be sure if
the remote data is deleted or not.

We could either consider keeping the remote topic in DISABLING state until
all of the remote storage data is deleted, or we need an additional
mechanism to handle the remote stray data.

The existing topic deletion, for instance, handles stray logs on disk by
detecting them on KafkaBroker startup and deleting before the
ReplicaManager is started.
Maybe we need a similar mechanism here as well if we don't want a DISABLING
state. Otherwise, we need a callback from Brokers to validate that remote
storage data is deleted and now we could move to the DISABLED state.

Thanks.

On Tue, 9 Apr 2024 at 12:45, Luke Chen  wrote:

> Hi Christo,
>
> > I would then opt for moving information from DisableRemoteTopic
> within the StopReplicas API which will then disappear in KRaft world as it
> is already scheduled for deprecation. What do you think?
>
> Sounds good to me.
>
> Thanks.
> Luke
>
> On Tue, Apr 9, 2024 at 6:46 PM Christo Lolov 
> wrote:
>
> > Heya Luke!
> >
> > I thought a bit more about it and I reached the same conclusion as you
> for
> > 2 as a follow-up from 1. In other words, in KRaft world I don't think the
> > controller needs to wait for acknowledgements for the brokers. All we
> care
> > about is that the leader (who is responsible for archiving/deleting data
> in
> > tiered storage) knows about the change and applies it properly. If there
> is
> > a leadership change halfway through the operation then the new leader
> still
> > needs to apply the message from the state topic and we know that a
> > disable-message will be applied before a reenablement-message. I will
> > change the KIP later today/tomorrow morning to reflect this reasoning.
> >
> > However, with this I believe that introducing a new API just for
> > Zookeeper-based clusters (i.e. DisableRemoteTopic) becomes a bit of an
> > overkill. I would then opt for moving information from DisableRemoteTopic
> > within the StopReplicas API which will then disappear in KRaft world as
> it
> > is already scheduled for deprecation. What do you think?
> >
> > Best,
> > Christo
> >
> > On Wed, 3 Apr 2024 at 07:59, Luke Chen  wrote:
> >
> > > Hi Christo,
> > >
> > > 1. I agree with Doguscan that in KRaft mode, the controller won't send
> > RPCs
> > > to the brokers (except in the migration path).
> > > So, I think we could adopt the similar way we did to
> > `AlterReplicaLogDirs`
> > > (
> > > KIP-858
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft#KIP858:HandleJBODbrokerdiskfailureinKRaft-Intra-brokerreplicamovement
> > > >)
> > > that let the broker notify controller any update, instead of controller
> > to
> > > broker. And once the controller receives all the complete requests from
> > > brokers, it'll enter "Disabled" state. WDYT?
> > >
> > > 2. Why should we wait until all brokers to respond before moving to
> > > "Disabled" state in "KRaft mode"?
> > > Currently, only the leader node does the remote log upload/fetch tasks,
> > so
> > > does that mean the controller only need to make sure the leader
> completes
> > > the stopPartition?
> > > If during the leader node stopPartition process triggered leadership
> > > change, then the new leader should receive and apply the configRecord
> > > update before the leadership change record based on the KRaft design,
> > which
> > > means there will be no gap that the follower node becomes the leader
> and
> > > starting doing unexpected upload/fetch tasks, right?
> > > I agree we should make sure in ZK mode, all brokers are completed the
> > > stopPartitions before moving to "Disabled" state because ZK node
> watcher
> > is
> > > working in a separate thread. But not sure about KRaft mode.
> > >
> > > Thanks.
> > > Luke
> > >
> > >
> > > On Fri, Mar 29, 2024 at 4:14 PM Christo Lolov 
> > > wrote:
> > >
> > > > Heya everyone!
> > > >
> > > > re: Doguscan
> > > >
> > > > I believe the answer to 101 needs a bit more discussion. As far as I
> > > know,
> > > > tiered storage today has methods to update a metadata of a segment to
> > say
> > > > "hey, I would like this deleted", but actual deletion is left to
> plugin
> > > > implementations (or any background cleaners). In other words, there
> is
> >

Re: [DISCUSS] Minimum constraint for segment.ms

2024-04-09 Thread Tommi Vainikainen
Hi,

I support changing constraints for mentioned settings.

I've noticed that first producing big messages, and then setting
`segment.bytes` to low value causes unwanted consequences. I did not notice
that it would delete all the records, but I did not set it to one, but
instead the case is within the kilobytes range.

Instead if such a topic is repartitioned e.g. balance load between brokers,
reassignment of such partitions gets stuck with
`RecordBatchTooLargeException` because existing big message does not
anymore fit in with new lower `segment.bytes` limitation. If repartition
happens not because of rebalancing but to replace broken nodes, it can have
more severe consequences. Therefore I would argue that in some cases low
`segment.bytes` can indirectly cause data-loss if repairing operations do
not succeed in time. Has anyone else seen such cases?


On Thu, Mar 14, 2024 at 6:09 AM Kamal Chandraprakash <
kamal.chandraprak...@gmail.com> wrote:

> One use case I see for setting the `segment.bytes` to 1 is to delete all
> the records from the topic.
> We can mention about it in the doc to use the `kafka-delete-records` API
> instead.
>
>
>
>
> On Wed, Mar 13, 2024 at 6:59 PM Divij Vaidya 
> wrote:
>
> > + users@kafka
> >
> > Hi users of Apache Kafka
> >
> > With the upcoming 4.0 release, we have an opportunity to improve the
> > constraints and default values for various Kafka configurations.
> >
> > We are soliciting your feedback and suggestions on configurations where
> the
> > default values and/or constraints should be adjusted. Please reply in
> this
> > thread directly.
> >
> > --
> > Divij Vaidya
> > Apache Kafka PMC
> >
> >
> >
> > On Wed, Mar 13, 2024 at 12:56 PM Divij Vaidya 
> > wrote:
> >
> > > Thanks for the discussion folks. I have started a KIP
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations
> > > to keep track of the changes that we are discussion. Please consider
> this
> > > as a collaborative work-in-progress KIP and once it is ready to be
> > > published, we can start a discussion thread on it.
> > >
> > > I am also going to start a thread to solicit feedback from users@
> > mailing
> > > list as well.
> > >
> > > --
> > > Divij Vaidya
> > >
> > >
> > >
> > > On Wed, Mar 13, 2024 at 12:55 PM Christopher Shannon <
> > > christopher.l.shan...@gmail.com> wrote:
> > >
> > >> I think it's a great idea to raise a KIP to look at adjusting defaults
> > and
> > >> minimum/maximum config values for version 4.0.
> > >>
> > >> As pointed out, the minimum values for segment.ms and segment.bytes
> > don't
> > >> make sense and would probably bring down a cluster pretty quickly if
> set
> > >> that low, so version 4.0 is a good time to fix it and to also look at
> > the
> > >> other configs as well for adjustments.
> > >>
> > >> On Wed, Mar 13, 2024 at 4:39 AM Sergio Daniel Troiano
> > >>  wrote:
> > >>
> > >> > hey guys,
> > >> >
> > >> > Regarding to num.recovery.threads.per.data.dir: I agree, in our
> > company
> > >> we
> > >> > use the number of vCPUs to do so as this is not competing with ready
> > >> > cluster traffic.
> > >> >
> > >> >
> > >> > On Wed, 13 Mar 2024 at 09:29, Luke Chen  wrote:
> > >> >
> > >> > > Hi Divij,
> > >> > >
> > >> > > Thanks for raising this.
> > >> > > The valid minimum value 1 for `segment.ms` is completely
> > >> unreasonable.
> > >> > > Similarly for `segment.bytes`, `metadata.log.segment.ms`,
> > >> > > `metadata.log.segment.bytes`.
> > >> > >
> > >> > > In addition to that, there are also some config default values
> we'd
> > >> like
> > >> > to
> > >> > > propose to change in v4.0.
> > >> > > We can collect more comments from the community, and come out
> with a
> > >> KIP
> > >> > > for them.
> > >> > >
> > >> > > 1. num.recovery.threads.per.data.dir:
> > >> > > The current default value is 1. But the log recovery is happening
> > >> before
> > >> > > brokers are in ready state, which means, we should use all the
> > >> available
> > >> > > resource to speed up the log recovery to bring the broker to ready
> > >> state
> > >> > > soon. Default value should be... maybe 4 (to be decided)?
> > >> > >
> > >> > > 2. Other configs might be able to consider to change the default,
> > but
> > >> > open
> > >> > > for comments:
> > >> > >2.1. num.replica.fetchers: default is 1, but that's not enough
> > when
> > >> > > there are multiple partitions in the cluster
> > >> > >2.2. `socket.send.buffer.bytes`/`socket.receive.buffer.bytes`:
> > >> > > Currently, we set 100kb as default value, but that's not enough
> for
> > >> > > high-speed network.
> > >> > >
> > >> > > Thank you.
> > >> > > Luke
> > >> > >
> > >> > >
> > >> > > On Tue, Mar 12, 2024 at 1:32 AM Divij Vaidya <
> > divijvaidy...@gmail.com
> > >> >
> > >> > > wrote:
> > >> > >
> > >> > > > Hey folks
> > >> > > >
> > >> > > > Before I file a KIP to change this in 4.0, I wanted to
> understand
> > >> the
> > >> > 

Re: [DISCUSS] KIP-936 Throttle number of active PIDs

2024-04-09 Thread Claude Warren
I should also note that the probability of false positives does not fall
below shape.P because as it approaches shape.P a new layer is created and
filters are added to that.  So no layer in the LayeredBloomFilter exceeds
shape.P thus the entire filter does not exceed shape.P.

Claude

On Tue, Apr 9, 2024 at 2:26 PM Claude Warren  wrote:

> The overall design for KIP-936 seems sound to me.  I would make the
> following changes:
>
> Replace the "TimedBloomFilter" with a "LayeredBloomFilter" from
> commons-collections v4.5
>
> Define the producer.id.quota.window.size.seconds to be the length of time
> that a Bloom filter of PIDs will exist.
> Define a new configuration option "producer.id.quota.window.count" as the
> number of windows active in window.size.seconds.
>
> Define the "Shape" (See commons-collections bloomfilters v4.5) of the
> bloom filter from the average number of PIDs expected in
> window.size.seconds/window.count (call this N) and the probability of false
> positives (call this P).  Due to the way the LayeredBloomFilter works the
> number of items can be a lower number than the max.  I'll explain that in a
> minute.
>
> The LayeredBloomFilter implements the standard BloomFilter interface but
> internally keeps an ordered list of filters (called layers) from oldest
> created to newest.  It adds new layers when a specified Predicate
> (checkExtend) returns true.  It will remove filters as defined by a
> specified Consumer (filterCleanup).
>
> Everytime a BloomFilter is merged into the LayeredBloomFilter the filter
> checks to the "checkExtend" predicate.  If it fires the "filterCleanup" is
> called to remove any layers that should be removed and a new layer is added.
>
> Define the layers of the LayeredBloomFilter to comprise a standard
> BloomFilter and an associated expiration timestamp.
>
> We can thus define
>
>- "checkExtend" to require a new layer window.size.seconds /
>window.count seconds or when the current layer contains shape.N items.
>- "filterCleanup" to start at the head of the list of layers and
>remove any expired filters, usually 0, every window.size.seconds 1,
>infrequently more than 1.
>
> This system will correctly handle bursty loads.  There are 3 cases to
> consider:
>
>1. If the producer is producing fewer than shape.N PIDs the layer will
>not fill up before the next layer is added.
>2. If the producer is producing shape.N PIDs the layer will be
>processed as either a 1 or a 3 depending on system timings.
>3. If the producer is producing more than shape.N PIDs the layer will
>fill up and a new layer will be created with an expiration timestamp
>window.size.seconds from when it was created.  This is the case that leads
>to the filterCleanup infrequently having more than 1 layer to remove.
>
> The last case to consider is if a producer stops generating PIDs, in this
> case we should walk the map of producers,  call "filterCleanup", and then
> check to see if the LayeredBloomFilter is empty.  If so, remove it from the
> map.  It will be empty if the producer has not produced a PID for
> window.size.seconds.
>
> I have this solution mostly coded, though I must admit I do not know where
> to plugin the ProducerIdQuotaManager defined in the KIP
>
> Claude
>


[DISCUSS] KIP-936 Throttle number of active PIDs

2024-04-09 Thread Claude Warren
The overall design for KIP-936 seems sound to me.  I would make the
following changes:

Replace the "TimedBloomFilter" with a "LayeredBloomFilter" from
commons-collections v4.5

Define the producer.id.quota.window.size.seconds to be the length of time
that a Bloom filter of PIDs will exist.
Define a new configuration option "producer.id.quota.window.count" as the
number of windows active in window.size.seconds.

Define the "Shape" (See commons-collections bloomfilters v4.5) of the bloom
filter from the average number of PIDs expected in
window.size.seconds/window.count (call this N) and the probability of false
positives (call this P).  Due to the way the LayeredBloomFilter works the
number of items can be a lower number than the max.  I'll explain that in a
minute.

The LayeredBloomFilter implements the standard BloomFilter interface but
internally keeps an ordered list of filters (called layers) from oldest
created to newest.  It adds new layers when a specified Predicate
(checkExtend) returns true.  It will remove filters as defined by a
specified Consumer (filterCleanup).

Everytime a BloomFilter is merged into the LayeredBloomFilter the filter
checks to the "checkExtend" predicate.  If it fires the "filterCleanup" is
called to remove any layers that should be removed and a new layer is added.

Define the layers of the LayeredBloomFilter to comprise a standard
BloomFilter and an associated expiration timestamp.

We can thus define

   - "checkExtend" to require a new layer window.size.seconds /
   window.count seconds or when the current layer contains shape.N items.
   - "filterCleanup" to start at the head of the list of layers and remove
   any expired filters, usually 0, every window.size.seconds 1, infrequently
   more than 1.

This system will correctly handle bursty loads.  There are 3 cases to
consider:

   1. If the producer is producing fewer than shape.N PIDs the layer will
   not fill up before the next layer is added.
   2. If the producer is producing shape.N PIDs the layer will be processed
   as either a 1 or a 3 depending on system timings.
   3. If the producer is producing more than shape.N PIDs the layer will
   fill up and a new layer will be created with an expiration timestamp
   window.size.seconds from when it was created.  This is the case that leads
   to the filterCleanup infrequently having more than 1 layer to remove.

The last case to consider is if a producer stops generating PIDs, in this
case we should walk the map of producers,  call "filterCleanup", and then
check to see if the LayeredBloomFilter is empty.  If so, remove it from the
map.  It will be empty if the producer has not produced a PID for
window.size.seconds.

I have this solution mostly coded, though I must admit I do not know where
to plugin the ProducerIdQuotaManager defined in the KIP

Claude


[jira] [Created] (KAFKA-16492) Flaky test: testAlterSinkConnectorOffsetsDifferentKafkaClusterTargeted – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest

2024-04-09 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-16492:
-

 Summary: Flaky test: 
testAlterSinkConnectorOffsetsDifferentKafkaClusterTargeted – 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest
 Key: KAFKA-16492
 URL: https://issues.apache.org/jira/browse/KAFKA-16492
 Project: Kafka
  Issue Type: Bug
Reporter: Apoorv Mittal


Build: 
[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15680/1/tests/]

 
{code:java}
Errororg.opentest4j.AssertionFailedError: Condition not met within timeout 
3. Sink connector consumer group offsets should catch up to the topic end 
offsets ==> expected:  but was: 
Stacktraceorg.opentest4j.AssertionFailedError: Condition not met within 
timeout 3. Sink connector consumer group offsets should catch up to the 
topic end offsets ==> expected:  but was:at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
   at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) 
   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at 
app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)at 
app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396)
   at 
app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444)
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393)   
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)   
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:367)   
 at 
app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.verifyExpectedSinkConnectorOffsets(OffsetsApiIntegrationTest.java:989)
   at 
app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.alterAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:381)
   at 
app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:362)
   at 
java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
 Method) at 
java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566) at 
app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
  at 
app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
   at 
app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
   at 
app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
 at app//org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at 
app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)at 
app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)   at 
app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
  at 
app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
   at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) 
at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at 
app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)   at 
app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at 
app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)at 
app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
 at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)   
 at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413)   at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
   at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
   at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
  

[jira] [Created] (KAFKA-16491) Flaky test: randomClusterPerturbationsShouldConverge[rackAwareStrategy=balance_subtopology] – org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConverg

2024-04-09 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-16491:
-

 Summary: Flaky test: 
randomClusterPerturbationsShouldConverge[rackAwareStrategy=balance_subtopology] 
– 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest
 Key: KAFKA-16491
 URL: https://issues.apache.org/jira/browse/KAFKA-16491
 Project: Kafka
  Issue Type: Bug
Reporter: Apoorv Mittal


Build: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15680/1/tests/

 

 
{code:java}
Errorjava.lang.AssertionError: Assertion failed in randomized test. Reproduce 
with: 
`runRandomizedScenario(-3500059697111741230)`.Stacktracejava.lang.AssertionError:
 Assertion failed in randomized test. Reproduce with: 
`runRandomizedScenario(-3500059697111741230)`.   at 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545)
  at 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479)
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)   at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
 at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:568)   at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
  at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
   at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)   
 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
   at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)  at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)  at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)  at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413)at 
org.junit.runners.Suite.runChild(Suite.java:128) at 
org.junit.runners.Suite.runChild(Suite.java:27)  at 
org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)  at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)  at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)  at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)   
 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413)at 
org.junit.runner.JUnitCore.run(JUnitCore.java:137)   at 
org.junit.runner.JUnitCore.run(JUnitCore.java:115)   at 
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
at 
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
 at 
org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)   
 at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
   at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54){code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-950: Tiered Storage Disablement

2024-04-09 Thread Luke Chen
Hi Christo,

> I would then opt for moving information from DisableRemoteTopic
within the StopReplicas API which will then disappear in KRaft world as it
is already scheduled for deprecation. What do you think?

Sounds good to me.

Thanks.
Luke

On Tue, Apr 9, 2024 at 6:46 PM Christo Lolov  wrote:

> Heya Luke!
>
> I thought a bit more about it and I reached the same conclusion as you for
> 2 as a follow-up from 1. In other words, in KRaft world I don't think the
> controller needs to wait for acknowledgements for the brokers. All we care
> about is that the leader (who is responsible for archiving/deleting data in
> tiered storage) knows about the change and applies it properly. If there is
> a leadership change halfway through the operation then the new leader still
> needs to apply the message from the state topic and we know that a
> disable-message will be applied before a reenablement-message. I will
> change the KIP later today/tomorrow morning to reflect this reasoning.
>
> However, with this I believe that introducing a new API just for
> Zookeeper-based clusters (i.e. DisableRemoteTopic) becomes a bit of an
> overkill. I would then opt for moving information from DisableRemoteTopic
> within the StopReplicas API which will then disappear in KRaft world as it
> is already scheduled for deprecation. What do you think?
>
> Best,
> Christo
>
> On Wed, 3 Apr 2024 at 07:59, Luke Chen  wrote:
>
> > Hi Christo,
> >
> > 1. I agree with Doguscan that in KRaft mode, the controller won't send
> RPCs
> > to the brokers (except in the migration path).
> > So, I think we could adopt the similar way we did to
> `AlterReplicaLogDirs`
> > (
> > KIP-858
> > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft#KIP858:HandleJBODbrokerdiskfailureinKRaft-Intra-brokerreplicamovement
> > >)
> > that let the broker notify controller any update, instead of controller
> to
> > broker. And once the controller receives all the complete requests from
> > brokers, it'll enter "Disabled" state. WDYT?
> >
> > 2. Why should we wait until all brokers to respond before moving to
> > "Disabled" state in "KRaft mode"?
> > Currently, only the leader node does the remote log upload/fetch tasks,
> so
> > does that mean the controller only need to make sure the leader completes
> > the stopPartition?
> > If during the leader node stopPartition process triggered leadership
> > change, then the new leader should receive and apply the configRecord
> > update before the leadership change record based on the KRaft design,
> which
> > means there will be no gap that the follower node becomes the leader and
> > starting doing unexpected upload/fetch tasks, right?
> > I agree we should make sure in ZK mode, all brokers are completed the
> > stopPartitions before moving to "Disabled" state because ZK node watcher
> is
> > working in a separate thread. But not sure about KRaft mode.
> >
> > Thanks.
> > Luke
> >
> >
> > On Fri, Mar 29, 2024 at 4:14 PM Christo Lolov 
> > wrote:
> >
> > > Heya everyone!
> > >
> > > re: Doguscan
> > >
> > > I believe the answer to 101 needs a bit more discussion. As far as I
> > know,
> > > tiered storage today has methods to update a metadata of a segment to
> say
> > > "hey, I would like this deleted", but actual deletion is left to plugin
> > > implementations (or any background cleaners). In other words, there is
> no
> > > "immediate" deletion. In this KIP, we would like to continue doing the
> > same
> > > if the retention policy is set to delete. So I believe the answer is
> > > actually that a) we will update the metadata of the segments to mark
> them
> > > as deleted and b) we will advance the log start offset. Any deletion of
> > > actual files will still be delegated to plugin implementations. I
> believe
> > > this is further supported by "*remote.log.disable.policy=delete:* Logs
> > that
> > > are archived in the remote storage will not be part of the contiguous
> > > "active" log and will be deleted asynchronously as part of the
> > disablement
> > > process"
> > >
> > > Following from the above, I believe for 102 it is fine to allow setting
> > of
> > > remote.log.disable.policy on a disabled topic in much the same way we
> > allow
> > > other remote-related configurations to be set on a topic (i.e.
> > > local.retention.*) - it just won't have an effect. Granted, I do
> believe
> > we
> > > should restrict the policy being changed while a disablement is
> ongoing.
> > >
> > > re: Satish and Kamal
> > >
> > > 104, 1 and 2 are fair asks, I will work with Doguscan to update the KIP
> > > with the information!
> > >
> > > Best,
> > > Christo
> > >
> > > On Thu, 28 Mar 2024 at 10:31, Doğuşcan Namal  >
> > > wrote:
> > >
> > > > Hi Satish, I will try to answer as much as I can and the others could
> > > chime
> > > > in with further details.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > *101. For remote.log.disable.policy=delete: Does it delete the re

[jira] [Created] (KAFKA-16490) Upgrade gradle from 8.6 to 8.7

2024-04-09 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16490:
--

 Summary: Upgrade gradle from 8.6 to 8.7
 Key: KAFKA-16490
 URL: https://issues.apache.org/jira/browse/KAFKA-16490
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai


gradle 8.7: 
https://docs.gradle.org/8.7/release-notes.html?_gl=1*meg7rg*_ga*MTA4Mzk2MzA3MC4xNzEwOTI1MjQx*_ga_7W7NC6YNPT*MTcxMjY2MjM3My4yMC4wLjE3MTI2NjIzNzMuNjAuMC4w

As there is a unresolved issue about 8.6 [0], it would be nice to test all 
instructions in readme when running this update.

[0] https://github.com/apache/kafka/pull/15553



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16488) fix flaky MirrorConnectorsIntegrationExactlyOnceTest#testReplication

2024-04-09 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16488:
--

 Summary: fix flaky 
MirrorConnectorsIntegrationExactlyOnceTest#testReplication
 Key: KAFKA-16488
 URL: https://issues.apache.org/jira/browse/KAFKA-16488
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai


org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
reset connector offsets. Error response: {"error_code":500,"message":"Failed to 
perform zombie fencing for source connector prior to modifying offsets"}
at 
app//org.apache.kafka.connect.util.clusters.EmbeddedConnect.resetConnectorOffsets(EmbeddedConnect.java:646)
at 
app//org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.resetConnectorOffsets(EmbeddedConnectCluster.java:48)
at 
app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.resetAllMirrorMakerConnectorOffsets(MirrorConnectorsIntegrationBaseTest.java:1063)
at 
app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testReplication(MirrorConnectorsIntegrationExactlyOnceTest.java:90)
at 
java.base@21.0.2/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base@21.0.2/java.lang.reflect.Method.invoke(Method.java:580)
at 
app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
at 
app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218)
at 
app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
at 
app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
at 
app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at 
app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at 
app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at 
app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
app//org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at 
app//org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at java.base@21.0.2/java.util.ArrayList.forEach(ArrayList.java:1596)
at 
app//org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
   

[jira] [Created] (KAFKA-16489) Fix flaky ZkMigrationIntegrationTest#testDualWrite

2024-04-09 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16489:
--

 Summary: Fix flaky ZkMigrationIntegrationTest#testDualWrite
 Key: KAFKA-16489
 URL: https://issues.apache.org/jira/browse/KAFKA-16489
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai


java.util.concurrent.TimeoutException
at 
java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180)
at 
kafka.zk.ZkMigrationIntegrationTest.testDualWrite(ZkMigrationIntegrationTest.scala:541)
at 
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at 
org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94)
at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218)
at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at 
org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:226)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:204)
at 
org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:142)
at 
org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.lambda$execute$2(TestTemplateTestDescriptor.java:110)
at 
java.base/java.util.strea

Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-09 Thread Bruno Cadonna

Hi again,

I have additional questions/comments.

6.
What record is exactly passed to the handler?
Is it the input record to the task? Is it the input record to the 
processor node? Is it the input record to the processor?



7.
Could you please add the packages of the Java classes/interfaces/enums 
you want to add?



Best,
Bruno


On 4/9/24 10:17 AM, Bruno Cadonna wrote:

Hi Loïc, Damien, and Sébastien,

Thanks for the KIP!
I find it really great that you contribute back to Kafka Streams 
concepts you developed for kstreamplify so that everybody can take 
advantage from your improvements.


I have a couple of questions/comments:

1. and 2.
I am wondering whether we should expose the processor node ID -- which 
basically is the processor node name -- in the ProcessingContext 
interface. I think the processor node ID fits well in the 
ProcessingContext interface since it already contains application ID and 
task ID and it would make the API for the handler cleaner.



3.
Could you elaborate -- maybe with an example -- when a record is in a 
state in which it cannot be serialized? This is not completely clear to me.



4.
Regarding the metrics, it is not entirely clear to me what the metric 
measures. Is it the number of calls to the process handler or is it the 
number of calls to process handler that returned FAIL?
If it is the former, I was also wondering whether it would be better to 
put the task-level metrics to INFO reporting level and remove the 
thread-level metric, similar to the dropped-records metric. You can 
always roll-up the metrics to the thread level in your preferred 
monitoring system. Or do you think we end up with to many metrics?



5.
What do you think about naming the handler ProcessingExceptionHandler 
instead of ProcessExceptionHandler?
The DeserializationExceptionHanlder and the ProductionExceptionHandler 
also use the noun of the action in their name and not the verb.



Best,
Bruno


On 4/8/24 3:48 PM, Sebastien Viale wrote:

Thanks for your review!

  All the points make sense for us!



We updated the KIP for points 1 and 4.



2/ We followed the DeserializationExceptionHandler interface 
signature, it was not on our mind that the record be forwarded with 
the ProcessorContext.


    The ProcessingContext is sufficient, we do expect that most people 
would need to access the RecordMetadata.




3/ The use of Record is required, as the error could 
occurred in the middle of a processor where records could be non 
serializable objects


  As it is a global error catching, the user may need little 
information about the faulty record.


  Assuming that users want to make some specific treatments to the 
record, they can add a try / catch block in the topology.


  It is up to users to cast record value and key in the implementation 
of the ProcessorExceptionHandler.




Cheers

Loïc, Damien and Sébastien


De : Sophie Blee-Goldman 
Envoyé : samedi 6 avril 2024 01:08
À : dev@kafka.apache.org 
Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception 
handler for exceptions occuring during processing


Warning External sender Do not click on any links or open any 
attachments unless you trust the sender and know the content is safe.


Hi Damien,

First off thanks for the KIP, this is definitely a much needed 
feature. On

the
whole it seems pretty straightforward and I am in favor of the proposal.
Just
a few questions and suggestions here and there:

1. One of the #handle method's parameters is "ProcessorNode node", but
ProcessorNode is an internal class (and would expose a lot of internals
that we probably don't want to pass in to an exception handler). Would it
be sufficient to just make this a String and pass in the processor name?

2. Another of the parameters in the ProcessorContext. This would enable
the handler to potentially forward records, which imo should not be done
from the handler since it could only ever call #forward but not direct 
where

the record is actually forwarded to, and could cause confusion if users
aren't aware that the handler is effectively calling from the context 
of the

processor that threw the exception.
2a. If you don't explicitly want the ability to forward records, I would
suggest changing the type of this parameter to ProcessingContext, which
has all the metadata and useful info of the ProcessorContext but without
the
forwarding APIs. This would also lets us sidestep the following issue:
2b. If you *do* want the ability to forward records, setting aside 
whether

that
in of itself makes sense to do, we would need to pass in either a regular
ProcessorContext or a FixedKeyProcessorContext, depending on what kind
of processor it is. I'm not quite sure how we could design a clean API 
here,
so I'll hold off until you clarify whether you even want forwarding or 
not.
We would also need to split the input record into a Record vs 
FixedKeyRecord


3. One notable difference between this handler and the existin

Re: [DISCUSS] KIP-950: Tiered Storage Disablement

2024-04-09 Thread Christo Lolov
Heya Luke!

I thought a bit more about it and I reached the same conclusion as you for
2 as a follow-up from 1. In other words, in KRaft world I don't think the
controller needs to wait for acknowledgements for the brokers. All we care
about is that the leader (who is responsible for archiving/deleting data in
tiered storage) knows about the change and applies it properly. If there is
a leadership change halfway through the operation then the new leader still
needs to apply the message from the state topic and we know that a
disable-message will be applied before a reenablement-message. I will
change the KIP later today/tomorrow morning to reflect this reasoning.

However, with this I believe that introducing a new API just for
Zookeeper-based clusters (i.e. DisableRemoteTopic) becomes a bit of an
overkill. I would then opt for moving information from DisableRemoteTopic
within the StopReplicas API which will then disappear in KRaft world as it
is already scheduled for deprecation. What do you think?

Best,
Christo

On Wed, 3 Apr 2024 at 07:59, Luke Chen  wrote:

> Hi Christo,
>
> 1. I agree with Doguscan that in KRaft mode, the controller won't send RPCs
> to the brokers (except in the migration path).
> So, I think we could adopt the similar way we did to `AlterReplicaLogDirs`
> (
> KIP-858
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft#KIP858:HandleJBODbrokerdiskfailureinKRaft-Intra-brokerreplicamovement
> >)
> that let the broker notify controller any update, instead of controller to
> broker. And once the controller receives all the complete requests from
> brokers, it'll enter "Disabled" state. WDYT?
>
> 2. Why should we wait until all brokers to respond before moving to
> "Disabled" state in "KRaft mode"?
> Currently, only the leader node does the remote log upload/fetch tasks, so
> does that mean the controller only need to make sure the leader completes
> the stopPartition?
> If during the leader node stopPartition process triggered leadership
> change, then the new leader should receive and apply the configRecord
> update before the leadership change record based on the KRaft design, which
> means there will be no gap that the follower node becomes the leader and
> starting doing unexpected upload/fetch tasks, right?
> I agree we should make sure in ZK mode, all brokers are completed the
> stopPartitions before moving to "Disabled" state because ZK node watcher is
> working in a separate thread. But not sure about KRaft mode.
>
> Thanks.
> Luke
>
>
> On Fri, Mar 29, 2024 at 4:14 PM Christo Lolov 
> wrote:
>
> > Heya everyone!
> >
> > re: Doguscan
> >
> > I believe the answer to 101 needs a bit more discussion. As far as I
> know,
> > tiered storage today has methods to update a metadata of a segment to say
> > "hey, I would like this deleted", but actual deletion is left to plugin
> > implementations (or any background cleaners). In other words, there is no
> > "immediate" deletion. In this KIP, we would like to continue doing the
> same
> > if the retention policy is set to delete. So I believe the answer is
> > actually that a) we will update the metadata of the segments to mark them
> > as deleted and b) we will advance the log start offset. Any deletion of
> > actual files will still be delegated to plugin implementations. I believe
> > this is further supported by "*remote.log.disable.policy=delete:* Logs
> that
> > are archived in the remote storage will not be part of the contiguous
> > "active" log and will be deleted asynchronously as part of the
> disablement
> > process"
> >
> > Following from the above, I believe for 102 it is fine to allow setting
> of
> > remote.log.disable.policy on a disabled topic in much the same way we
> allow
> > other remote-related configurations to be set on a topic (i.e.
> > local.retention.*) - it just won't have an effect. Granted, I do believe
> we
> > should restrict the policy being changed while a disablement is ongoing.
> >
> > re: Satish and Kamal
> >
> > 104, 1 and 2 are fair asks, I will work with Doguscan to update the KIP
> > with the information!
> >
> > Best,
> > Christo
> >
> > On Thu, 28 Mar 2024 at 10:31, Doğuşcan Namal 
> > wrote:
> >
> > > Hi Satish, I will try to answer as much as I can and the others could
> > chime
> > > in with further details.
> > >
> > >
> > >
> > >
> > >
> > > *101. For remote.log.disable.policy=delete: Does it delete the remote
> log
> > > data immediately and the data in remote storage will not be taken into
> > > account by any replica? That means log-start-offset is moved to the
> > earlier
> > > local-log-start-offset.*
> > > *Exactly. RemoteLogData will be deleted immediately. *
> > > *So before the deletion starts we move LogStart offset to
> > > LocalLogStartOffset to ensure that no RemoteLog will be accessed after
> > that
> > > point.*
> > >
> > >
> > > * 102. Can we update the remote.log.disable.policy after tiered storage
> > is
> > > disabled

Re: [DISCUSS] KIP-950: Tiered Storage Disablement

2024-04-09 Thread Satish Duggana
Hi Christo, Doguscan,
Can you please address the comments in this mail thread and update the
KIP accordingly?

Thanks,
Satish.

On Wed, 3 Apr 2024 at 12:30, Luke Chen  wrote:
>
> Hi Christo,
>
> 1. I agree with Doguscan that in KRaft mode, the controller won't send RPCs
> to the brokers (except in the migration path).
> So, I think we could adopt the similar way we did to `AlterReplicaLogDirs` (
> KIP-858
> )
> that let the broker notify controller any update, instead of controller to
> broker. And once the controller receives all the complete requests from
> brokers, it'll enter "Disabled" state. WDYT?
>
> 2. Why should we wait until all brokers to respond before moving to
> "Disabled" state in "KRaft mode"?
> Currently, only the leader node does the remote log upload/fetch tasks, so
> does that mean the controller only need to make sure the leader completes
> the stopPartition?
> If during the leader node stopPartition process triggered leadership
> change, then the new leader should receive and apply the configRecord
> update before the leadership change record based on the KRaft design, which
> means there will be no gap that the follower node becomes the leader and
> starting doing unexpected upload/fetch tasks, right?
> I agree we should make sure in ZK mode, all brokers are completed the
> stopPartitions before moving to "Disabled" state because ZK node watcher is
> working in a separate thread. But not sure about KRaft mode.
>
> Thanks.
> Luke
>
>
> On Fri, Mar 29, 2024 at 4:14 PM Christo Lolov 
> wrote:
>
> > Heya everyone!
> >
> > re: Doguscan
> >
> > I believe the answer to 101 needs a bit more discussion. As far as I know,
> > tiered storage today has methods to update a metadata of a segment to say
> > "hey, I would like this deleted", but actual deletion is left to plugin
> > implementations (or any background cleaners). In other words, there is no
> > "immediate" deletion. In this KIP, we would like to continue doing the same
> > if the retention policy is set to delete. So I believe the answer is
> > actually that a) we will update the metadata of the segments to mark them
> > as deleted and b) we will advance the log start offset. Any deletion of
> > actual files will still be delegated to plugin implementations. I believe
> > this is further supported by "*remote.log.disable.policy=delete:* Logs that
> > are archived in the remote storage will not be part of the contiguous
> > "active" log and will be deleted asynchronously as part of the disablement
> > process"
> >
> > Following from the above, I believe for 102 it is fine to allow setting of
> > remote.log.disable.policy on a disabled topic in much the same way we allow
> > other remote-related configurations to be set on a topic (i.e.
> > local.retention.*) - it just won't have an effect. Granted, I do believe we
> > should restrict the policy being changed while a disablement is ongoing.
> >
> > re: Satish and Kamal
> >
> > 104, 1 and 2 are fair asks, I will work with Doguscan to update the KIP
> > with the information!
> >
> > Best,
> > Christo
> >
> > On Thu, 28 Mar 2024 at 10:31, Doğuşcan Namal 
> > wrote:
> >
> > > Hi Satish, I will try to answer as much as I can and the others could
> > chime
> > > in with further details.
> > >
> > >
> > >
> > >
> > >
> > > *101. For remote.log.disable.policy=delete: Does it delete the remote log
> > > data immediately and the data in remote storage will not be taken into
> > > account by any replica? That means log-start-offset is moved to the
> > earlier
> > > local-log-start-offset.*
> > > *Exactly. RemoteLogData will be deleted immediately. *
> > > *So before the deletion starts we move LogStart offset to
> > > LocalLogStartOffset to ensure that no RemoteLog will be accessed after
> > that
> > > point.*
> > >
> > >
> > > * 102. Can we update the remote.log.disable.policy after tiered storage
> > is
> > > disabled on a topic?*
> > >
> > > *This is a good point. I think we should not allow modifying this
> > > configuration*
> > > *because changing the policy from Deletion to Retain when there is an
> > > ongoing Deletion will result in an undefined behaviour and where we
> > retain
> > > half of the remote log and delete the other half.*
> > >
> > > * 103. Do we plan to add any metrics related to this feature?*
> > >
> > >
> > >
> > > *Any recommendations?*
> > > *We may emit a gauge showing the enablement state of a topic but we could
> > > gather that info from the logs as well.*
> > > *The total duration for remote topic deletion could be added as well but
> > > this is more of a metric for the RemotePartitionRemover itself.*
> > >
> > >
> > >
> > > *104. Please add configuration details about copier thread pool,
> > expiration
> > > thread pool and the migration of the existing
> > > RemoteLogManagerScheduledThreadPoo

Re: [DISCUSS] KIP-1023: Follower fetch from tiered offset

2024-04-09 Thread Satish Duggana
+1 to Jun for adding the consumer fetching from a follower scenario
also to the existing section that talked about the drawback when a
node built with last-tiered-offset has become a leader. As Abhijeet
mentioned, we plan to have a follow-up KIP that will address these by
having a deprioritzation of these brokers. The deprioritization of
those brokers can be removed once they catchup until the local log
retention.

Thanks,
Satish.

On Tue, 9 Apr 2024 at 14:08, Luke Chen  wrote:
>
> Hi Abhijeet,
>
> Thanks for the KIP to improve the tiered storage feature!
>
> Questions:
> 1. We could also get the "pending-upload-offset" and epoch via remote log
> metadata, instead of adding a new API to fetch from the leader. Could you
> explain why you choose the later approach, instead of the former?
> 2.
> > We plan to have a follow-up KIP that will address both the
> deprioritization
> of these brokers from leadership, as well as
> for consumption (when fetching from followers is allowed).
>
> I agree with Jun that we might need to make it clear all possible drawbacks
> that could have. So, could we add the drawbacks that Jun mentioned about
> the performance issue when consumer fetch from follower?
>
> 3. Could we add "Rejected Alternatives" section to the end of the KIP to
> add some of them?
> Like the "ListOffsetRequest" approach VS "Earliest-Pending-Upload-Offset"
> approach, or getting the "Earliest-Pending-Upload-Offset" from remote log
> metadata... etc.
>
> Thanks.
> Luke
>
>
> On Tue, Apr 9, 2024 at 2:25 PM Abhijeet Kumar 
> wrote:
>
> > Hi Christo,
> >
> > Please find my comments inline.
> >
> > On Fri, Apr 5, 2024 at 12:36 PM Christo Lolov 
> > wrote:
> >
> > > Hello Abhijeet and Jun,
> > >
> > > I have been mulling this KIP over a bit more in recent days!
> > >
> > > re: Jun
> > >
> > > I wasn't aware we apply 2.1 and 2.2 for reserving new timestamps - in
> > > retrospect it should have been fairly obvious. I would need to go an
> > update
> > > KIP-1005 myself then, thank you for giving the useful reference!
> > >
> > > 4. I think Abhijeet wants to rebuild state from latest-tiered-offset and
> > > fetch from latest-tiered-offset + 1 only for new replicas (or replicas
> > > which experienced a disk failure) to decrease the time a partition spends
> > > in under-replicated state. In other words, a follower which has just
> > fallen
> > > out of ISR, but has local data will continue using today's Tiered Storage
> > > replication protocol (i.e. fetching from earliest-local). I further
> > believe
> > > he has taken this approach so that local state of replicas which have
> > just
> > > fallen out of ISR isn't forcefully wiped thus leading to situation 1.
> > > Abhijeet, have I understood (and summarised) what you are proposing
> > > correctly?
> > >
> > > Yes, your understanding is correct. We want to limit the behavior changes
> > only to new replicas.
> >
> >
> > > 5. I think in today's Tiered Storage we know the leader's
> > log-start-offset
> > > from the FetchResponse and we can learn its local-log-start-offset from
> > the
> > > ListOffsets by asking for earliest-local timestamp (-4). But granted,
> > this
> > > ought to be added as an additional API call in the KIP.
> > >
> > >
> > Yes, I clarified this in my reply to Jun. I will add this missing detail in
> > the KIP.
> >
> >
> > > re: Abhijeet
> > >
> > > 101. I am still a bit confused as to why you want to include a new offset
> > > (i.e. pending-upload-offset) when you yourself mention that you could use
> > > an already existing offset (i.e. last-tiered-offset + 1). In essence, you
> > > end your Motivation with "In this KIP, we will focus only on the follower
> > > fetch protocol using the *last-tiered-offset*" and then in the following
> > > sections you talk about pending-upload-offset. I understand this might be
> > > classified as an implementation detail, but if you introduce a new offset
> > > (i.e. pending-upload-offset) you have to make a change to the ListOffsets
> > > API (i.e. introduce -6) and thus document it in this KIP as such.
> > However,
> > > the last-tiered-offset ought to already be exposed as part of KIP-1005
> > > (under implementation). Am I misunderstanding something here?
> > >
> >
> > I have tried to clarify this in my reply to Jun.
> >
> > > The follower needs to build the local data starting from the offset
> > > Earliest-Pending-Upload-Offset. Hence it needs the offset and the
> > > corresponding leader-epoch.
> > > There are two ways to do this:
> > >1. We add support in ListOffsetRequest to be able to fetch this offset
> > > (and leader epoch) from the leader.
> > >2. Or, fetch the tiered-offset (which is already supported). From this
> > > offset, we can get the Earliest-Pending-Upload-Offset. We can just add 1
> > to
> > > the tiered-offset.
> > >   However, we still need the leader epoch for offset, since there is
> > > no guarantee that the leader epoch for Earliest-Pending-Upload-Offset
> > will
> 

Re: [DISCUSS] KIP-1023: Follower fetch from tiered offset

2024-04-09 Thread Luke Chen
Hi Abhijeet,

Thanks for the KIP to improve the tiered storage feature!

Questions:
1. We could also get the "pending-upload-offset" and epoch via remote log
metadata, instead of adding a new API to fetch from the leader. Could you
explain why you choose the later approach, instead of the former?
2.
> We plan to have a follow-up KIP that will address both the
deprioritization
of these brokers from leadership, as well as
for consumption (when fetching from followers is allowed).

I agree with Jun that we might need to make it clear all possible drawbacks
that could have. So, could we add the drawbacks that Jun mentioned about
the performance issue when consumer fetch from follower?

3. Could we add "Rejected Alternatives" section to the end of the KIP to
add some of them?
Like the "ListOffsetRequest" approach VS "Earliest-Pending-Upload-Offset"
approach, or getting the "Earliest-Pending-Upload-Offset" from remote log
metadata... etc.

Thanks.
Luke


On Tue, Apr 9, 2024 at 2:25 PM Abhijeet Kumar 
wrote:

> Hi Christo,
>
> Please find my comments inline.
>
> On Fri, Apr 5, 2024 at 12:36 PM Christo Lolov 
> wrote:
>
> > Hello Abhijeet and Jun,
> >
> > I have been mulling this KIP over a bit more in recent days!
> >
> > re: Jun
> >
> > I wasn't aware we apply 2.1 and 2.2 for reserving new timestamps - in
> > retrospect it should have been fairly obvious. I would need to go an
> update
> > KIP-1005 myself then, thank you for giving the useful reference!
> >
> > 4. I think Abhijeet wants to rebuild state from latest-tiered-offset and
> > fetch from latest-tiered-offset + 1 only for new replicas (or replicas
> > which experienced a disk failure) to decrease the time a partition spends
> > in under-replicated state. In other words, a follower which has just
> fallen
> > out of ISR, but has local data will continue using today's Tiered Storage
> > replication protocol (i.e. fetching from earliest-local). I further
> believe
> > he has taken this approach so that local state of replicas which have
> just
> > fallen out of ISR isn't forcefully wiped thus leading to situation 1.
> > Abhijeet, have I understood (and summarised) what you are proposing
> > correctly?
> >
> > Yes, your understanding is correct. We want to limit the behavior changes
> only to new replicas.
>
>
> > 5. I think in today's Tiered Storage we know the leader's
> log-start-offset
> > from the FetchResponse and we can learn its local-log-start-offset from
> the
> > ListOffsets by asking for earliest-local timestamp (-4). But granted,
> this
> > ought to be added as an additional API call in the KIP.
> >
> >
> Yes, I clarified this in my reply to Jun. I will add this missing detail in
> the KIP.
>
>
> > re: Abhijeet
> >
> > 101. I am still a bit confused as to why you want to include a new offset
> > (i.e. pending-upload-offset) when you yourself mention that you could use
> > an already existing offset (i.e. last-tiered-offset + 1). In essence, you
> > end your Motivation with "In this KIP, we will focus only on the follower
> > fetch protocol using the *last-tiered-offset*" and then in the following
> > sections you talk about pending-upload-offset. I understand this might be
> > classified as an implementation detail, but if you introduce a new offset
> > (i.e. pending-upload-offset) you have to make a change to the ListOffsets
> > API (i.e. introduce -6) and thus document it in this KIP as such.
> However,
> > the last-tiered-offset ought to already be exposed as part of KIP-1005
> > (under implementation). Am I misunderstanding something here?
> >
>
> I have tried to clarify this in my reply to Jun.
>
> > The follower needs to build the local data starting from the offset
> > Earliest-Pending-Upload-Offset. Hence it needs the offset and the
> > corresponding leader-epoch.
> > There are two ways to do this:
> >1. We add support in ListOffsetRequest to be able to fetch this offset
> > (and leader epoch) from the leader.
> >2. Or, fetch the tiered-offset (which is already supported). From this
> > offset, we can get the Earliest-Pending-Upload-Offset. We can just add 1
> to
> > the tiered-offset.
> >   However, we still need the leader epoch for offset, since there is
> > no guarantee that the leader epoch for Earliest-Pending-Upload-Offset
> will
> > be the same as the
> >   leader epoch for tiered-offset. We may need another API call to the
> > leader for this.
> > I prefer the first approach. The only problem with the first approach is
> > that it introduces one more offset. The second approach avoids this
> problem
> > but is a little complicated.
>
>
>
> >
> > Best,
> > Christo
> >
> > On Thu, 4 Apr 2024 at 19:37, Jun Rao  wrote:
> >
> > > Hi, Abhijeet,
> > >
> > > Thanks for the KIP. Left a few comments.
> > >
> > > 1. "A drawback of using the last-tiered-offset is that this new
> follower
> > > would possess only a limited number of locally stored segments. Should
> it
> > > ascend to the role of leader, there is a risk of n

Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-09 Thread Bruno Cadonna

Hi Loïc, Damien, and Sébastien,

Thanks for the KIP!
I find it really great that you contribute back to Kafka Streams 
concepts you developed for kstreamplify so that everybody can take 
advantage from your improvements.


I have a couple of questions/comments:

1. and 2.
I am wondering whether we should expose the processor node ID -- which 
basically is the processor node name -- in the ProcessingContext 
interface. I think the processor node ID fits well in the 
ProcessingContext interface since it already contains application ID and 
task ID and it would make the API for the handler cleaner.



3.
Could you elaborate -- maybe with an example -- when a record is in a 
state in which it cannot be serialized? This is not completely clear to me.



4.
Regarding the metrics, it is not entirely clear to me what the metric 
measures. Is it the number of calls to the process handler or is it the 
number of calls to process handler that returned FAIL?
If it is the former, I was also wondering whether it would be better to 
put the task-level metrics to INFO reporting level and remove the 
thread-level metric, similar to the dropped-records metric. You can 
always roll-up the metrics to the thread level in your preferred 
monitoring system. Or do you think we end up with to many metrics?



5.
What do you think about naming the handler ProcessingExceptionHandler 
instead of ProcessExceptionHandler?
The DeserializationExceptionHanlder and the ProductionExceptionHandler 
also use the noun of the action in their name and not the verb.



Best,
Bruno


On 4/8/24 3:48 PM, Sebastien Viale wrote:

Thanks for your review!

  All the points make sense for us!



We updated the KIP for points 1 and 4.



2/ We followed the DeserializationExceptionHandler interface signature, it was 
not on our mind that the record be forwarded with the ProcessorContext.

The ProcessingContext is sufficient, we do expect that most people would 
need to access the RecordMetadata.



3/ The use of Record is required, as the error could occurred 
in the middle of a processor where records could be non serializable objects

  As it is a global error catching, the user may need little information about 
the faulty record.

  Assuming that users want to make some specific treatments to the record, they 
can add a try / catch block in the topology.

  It is up to users to cast record value and key in the implementation of the 
ProcessorExceptionHandler.



Cheers

Loïc, Damien and Sébastien


De : Sophie Blee-Goldman 
Envoyé : samedi 6 avril 2024 01:08
À : dev@kafka.apache.org 
Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for 
exceptions occuring during processing

Warning External sender Do not click on any links or open any attachments 
unless you trust the sender and know the content is safe.

Hi Damien,

First off thanks for the KIP, this is definitely a much needed feature. On
the
whole it seems pretty straightforward and I am in favor of the proposal.
Just
a few questions and suggestions here and there:

1. One of the #handle method's parameters is "ProcessorNode node", but
ProcessorNode is an internal class (and would expose a lot of internals
that we probably don't want to pass in to an exception handler). Would it
be sufficient to just make this a String and pass in the processor name?

2. Another of the parameters in the ProcessorContext. This would enable
the handler to potentially forward records, which imo should not be done
from the handler since it could only ever call #forward but not direct where
the record is actually forwarded to, and could cause confusion if users
aren't aware that the handler is effectively calling from the context of the
processor that threw the exception.
2a. If you don't explicitly want the ability to forward records, I would
suggest changing the type of this parameter to ProcessingContext, which
has all the metadata and useful info of the ProcessorContext but without
the
forwarding APIs. This would also lets us sidestep the following issue:
2b. If you *do* want the ability to forward records, setting aside whether
that
in of itself makes sense to do, we would need to pass in either a regular
ProcessorContext or a FixedKeyProcessorContext, depending on what kind
of processor it is. I'm not quite sure how we could design a clean API here,
so I'll hold off until you clarify whether you even want forwarding or not.
We would also need to split the input record into a Record vs FixedKeyRecord

3. One notable difference between this handler and the existing ones you
pointed out, the Deserialization/ProductionExceptionHandler, is that the
records passed in to those are in serialized bytes, whereas the record
here would be POJOs. You account for this by making the parameter
type a Record, but I just wonder how users would be
able to read the key/value and figure out what type it should be. For
example, would they need to maintain a map from processo

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-09 Thread Loic Greffier
Hi,

To complete the Sébastien's answer about the point 3, here is an example of how 
users could simply type the record key or value, based on a custom process 
exception handler:

Properties streamProps = new Properties();
streamProps.put(StreamsConfig.DEFAULT_PROCESS_EXCEPTION_HANDLER_CLASS_CONFIG, 
CustomProcessExceptionHandler.class);

public class CustomProcessExceptionHandler implements ProcessExceptionHandler {

@Override
public ProcessHandlerResponse handle(ProcessingContext context, String 
nodeName, Record record, Exception exception) {
log.info("Error in node: {}, key: {}, value: {}, exception: {}", 
nodeName, record.key(), record.value(), exception);

if (record.value() instanceof Animal) {
Animal value = (Animal) record.value();
// Do something
}

return ProcessHandlerResponse.CONTINUE;
}

@Override
public void configure(Map configs) {

}
}

The example will be added to the KIP.

Regards

Sébastien, Damien and Loïc

De : Sebastien Viale 
Envoyé : lundi 8 avril 2024 15:49
À : dev@kafka.apache.org
Objet : RE: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler 
for exceptions occuring during processing

Thanks for your review!

All the points make sense for us!



We updated the KIP for points 1 and 4.



2/ We followed the DeserializationExceptionHandler interface signature, it was 
not on our mind that the record be forwarded with the ProcessorContext.

The ProcessingContext is sufficient, we do expect that most people would need 
to access the RecordMetadata.



3/ The use of Record is required, as the error could occurred 
in the middle of a processor where records could be non serializable objects

As it is a global error catching, the user may need little information about 
the faulty record.

Assuming that users want to make some specific treatments to the record, they 
can add a try / catch block in the topology.

It is up to users to cast record value and key in the implementation of the 
ProcessorExceptionHandler.



Cheers

Loïc, Damien and Sébastien


De : Sophie Blee-Goldman mailto:sop...@responsive.dev>>
Envoyé : samedi 6 avril 2024 01:08
À : dev@kafka.apache.org 
mailto:dev@kafka.apache.org>>
Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for 
exceptions occuring during processing

Warning External sender Do not click on any links or open any attachments 
unless you trust the sender and know the content is safe.

Hi Damien,

First off thanks for the KIP, this is definitely a much needed feature. On
the
whole it seems pretty straightforward and I am in favor of the proposal.
Just
a few questions and suggestions here and there:

1. One of the #handle method's parameters is "ProcessorNode node", but
ProcessorNode is an internal class (and would expose a lot of internals
that we probably don't want to pass in to an exception handler). Would it
be sufficient to just make this a String and pass in the processor name?

2. Another of the parameters in the ProcessorContext. This would enable
the handler to potentially forward records, which imo should not be done
from the handler since it could only ever call #forward but not direct where
the record is actually forwarded to, and could cause confusion if users
aren't aware that the handler is effectively calling from the context of the
processor that threw the exception.
2a. If you don't explicitly want the ability to forward records, I would
suggest changing the type of this parameter to ProcessingContext, which
has all the metadata and useful info of the ProcessorContext but without
the
forwarding APIs. This would also lets us sidestep the following issue:
2b. If you *do* want the ability to forward records, setting aside whether
that
in of itself makes sense to do, we would need to pass in either a regular
ProcessorContext or a FixedKeyProcessorContext, depending on what kind
of processor it is. I'm not quite sure how we could design a clean API here,
so I'll hold off until you clarify whether you even want forwarding or not.
We would also need to split the input record into a Record vs FixedKeyRecord

3. One notable difference between this handler and the existing ones you
pointed out, the Deserialization/ProductionExceptionHandler, is that the
records passed in to those are in serialized bytes, whereas the record
here would be POJOs. You account for this by making the parameter
type a Record, but I just wonder how users would be
able to read the key/value and figure out what type it should be. For
example, would they need to maintain a map from processor name to
input record types?

If you could provide an example of this new feature in the KIP, it would be
very helpful in understanding whether we could do something to make it
easier for users to use, for if it would be fine as-is

4. We should include all the relevant info for a new metric, such as the
me

[jira] [Created] (KAFKA-16487) Support to define server properties by ClusterTestDefaults

2024-04-09 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16487:
--

 Summary: Support to define server properties by ClusterTestDefaults
 Key: KAFKA-16487
 URL: https://issues.apache.org/jira/browse/KAFKA-16487
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai
Assignee: PoAn Yang


Sometimes we want to define server properties for all test cases, and using 
`BeforeEach` to modify the `ClusterConfig` is the only way. The side effect is 
that the IDE does not like the style since IDE can't recognize custom 
ParameterResolver of `ClusterConfig`.

The alternative is that we can take `ClusterInstance` from constructor first, 
and then we modify the inner `ClusterConfig` in `BeforeEach` phase. However, 
that may confuse users about the life cycle of "configs".

In short, I prefer to define the server property by `ClusterTestDefaults`. It 
already includes some server-side default property, and we can enhance that to 
deal with more existent test case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16456) Can't stop kafka debug logs

2024-04-09 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash resolved KAFKA-16456.
--
Resolution: Not A Problem

You can also dynamically change the broker loggers using the 
{{kafka-configs.sh}} script:
(eg)
{code}
sh kafka-configs.sh --bootstrap-server localhost:9092 --entity-type 
broker-loggers --entity-name  --add-config 
org.apache.kafka.clients.NetworkClient=INFO --alter
{code}

> Can't stop kafka debug logs
> ---
>
> Key: KAFKA-16456
> URL: https://issues.apache.org/jira/browse/KAFKA-16456
> Project: Kafka
>  Issue Type: Bug
>  Components: logging
>Affects Versions: 3.6.0
>Reporter: Rajan Choudhary
>Priority: Major
>
> I am getting kafka debug logs, which are flooding our logs. Sample below
>  
> {code:java}
> 09:50:38.073 [kafka-producer-network-thread | maximo-mp] DEBUG 
> org.apache.kafka.clients.NetworkClient - [Producer clientId=maximo-mp, 
> transactionalId=sqout-3664816744674374805414] Received API_VERSIONS response 
> from node 5 for request with header RequestHeader(apiKey=API_VERSIONS, 
> apiVersion=3, clientId=maximo-mp, correlationId=8): 
> ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, 
> minVersion=0, maxVersion=9), ApiVersion(apiKey=1, minVersion=0, 
> maxVersion=13), ApiVersion(apiKey=2, minVersion=0, maxVersion=7), 
> ApiVersion(apiKey=3, minVersion=0, maxVersion=12), ApiVersion(apiKey=4, 
> minVersion=0, maxVersion=5), ApiVersion(apiKey=5, minVersion=0, 
> maxVersion=3), ApiVersion(apiKey=6, minVersion=0, maxVersion=7), 
> ApiVersion(apiKey=7, minVersion=0, maxVersion=3), ApiVersion(apiKey=8, 
> minVersion=0, maxVersion=8), ApiVersion(apiKey=9, minVersion=0, 
> maxVersion=8), ApiVersion(apiKey=10, minVersion=0, maxVersion=4), 
> ApiVersion(apiKey=11, minVersion=0, maxVersion=7), ApiVersion(apiKey=12, 
> minVersion=0, m...
> 09:50:38.073 [kafka-producer-network-thread | maximo-mp] DEBUG 
> org.apache.kafka.clients.NetworkClient - [Producer clientId=maximo-mp, 
> transactionalId=sqout-3664816744674374805414] Node 5 has finalized features 
> epoch: 1, finalized features: [], supported features: [], API versions: 
> (Produce(0): 0 to 9 [usable: 9], Fetch(1): 0 to 13 [usable: 12], 
> ListOffsets(2): 0 to 7 [usable: 6], Metadata(3): 0 to 12 [usable: 11], 
> LeaderAndIsr(4): 0 to 5 [usable: 5], StopReplica(5): 0 to 3 [usable: 3], 
> UpdateMetadata(6): 0 to 7 [usable: 7], ControlledShutdown(7): 0 to 3 [usable: 
> 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 8 [usable: 7], 
> FindCoordinator(10): 0 to 4 [usable: 3], JoinGroup(11): 0 to 7 [usable: 7], 
> Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], 
> SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], 
> ListGroups(16): 0 to 4 [usable: 4], SaslHandshake(17): 0 to 1 [usable: 1], 
> ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 7], 
> Del...
> 09:50:38.074 [kafka-producer-network-thread | maximo-mp] DEBUG 
> org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
> clientId=maximo-mp, transactionalId=sqout-3664816744674374805414] ProducerId 
> of partition sqout-0 set to 43458621 with epoch 0. Reinitialize sequence at 
> beginning.
> 09:50:38.074 [kafka-producer-network-thread | maximo-mp] DEBUG 
> org.apache.kafka.clients.producer.internals.RecordAccumulator - [Producer 
> clientId=maximo-mp, transactionalId=sqout-3664816744674374805414] Assigned 
> producerId 43458621 and producerEpoch 0 to batch with base sequence 0 being 
> sent to partition sqout-0
> 09:50:38.075 [kafka-producer-network-thread | maximo-mp] DEBUG 
> org.apache.kafka.clients.NetworkClient - [Producer clientId=maximo-mp, 
> transactionalId=sqout-3664816744674374805414] Sending PRODUCE request with 
> header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=maximo-mp, 
> correlationId=9) and timeout 3 to node 5: 
> {acks=-1,timeout=3,partitionSizes=[sqout-0=4181]}
> 09:50:38.095 [kafka-producer-network-thread | maximo-mp] DEBUG 
> org.apache.kafka.clients.NetworkClient - [Producer clientId=maximo-mp, 
> transactionalId=sqout-3664816744674374805414] Received PRODUCE response from 
> node 5 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, 
> clientId=maximo-mp, correlationId=9): 
> ProduceResponseData(responses=[TopicProduceResponse(name='sqout', 
> partitionResponses=[PartitionProduceResponse(index=0, errorCode=0, 
> baseOffset=796494, logAppendTimeMs=-1, logStartOffset=768203, 
> recordErrors=[], errorMessage=null)])], throttleTimeMs=0)
> 09:50:38.095 [kafka-producer-network-thread | maximo-mp] DEBUG 
> org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
> clientId=maximo-mp, transactionalId=sqout-3664816744674374805414] ProducerId: 
> 43458621; Set l

Re: [DISCUSS] KIP-899: Allow clients to rebootstrap

2024-04-09 Thread Andrew Schofield
Hi Ivan,
Thanks for the KIP. I can see situations in which this would be helpful. I have 
one question.

The KIP says the client checks the cluster ID when it re-bootstraps and that it 
will fail if the
cluster ID doesn’t match the previously known one. How does it fail? Which 
exception does
it throw and when?

In a similar vein, now that we are checking cluster IDs, I wonder if it could 
be extended to
cover all situations in which there are cluster ID mismatches, such as the 
bootstrap server
list erroneously pointing at brokers from different clusters and the problem 
only being
detectable later on.

Thanks,
Andrew

> On 8 Apr 2024, at 18:24, Ivan Yurchenko  wrote:
> 
> Hello!
> 
> I changed the KIP a bit, specifying that the certain benefit goes to 
> consumers not participating in a group, but that other clients can benefit as 
> well in certain situations.
> 
> You can see the changes in the history [1]
> 
> Thank you!
> 
> Ivan
> 
> [1] 
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=240881396&originalVersion=10&revisedVersion=11
> 
> On 2023/07/15 16:37:52 Ivan Yurchenko wrote:
>> Hello!
>> 
>> I've made several changes to the KIP based on the comments:
>> 
>> 1. Reduced the scope to producer and consumer clients only.
>> 2. Added more details to the description of the rebootstrap process.
>> 3. Documented the role of low values of reconnect.backoff.max.ms in
>> preventing rebootstrapping.
>> 4. Some wording changes.
>> 
>> You can see the changes in the history [1]
>> 
>> I'm planning to put the KIP to a vote in some days if there are no new
>> comments.
>> 
>> Thank you!
>> 
>> Ivan
>> 
>> [1]
>> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=240881396&selectedPageVersions=9&selectedPageVersions=5
>> 
>> On Tue, 30 May 2023 at 08:23, Ivan Yurchenko 
>> wrote:
>> 
>>> Hi Chris and all,
>>> 
 I believe the logic you've linked is only applicable for the producer and
 consumer clients; the admin client does something different (see [1]).
>>> 
>>> I see, thank you for the pointer. It seems the admin client is fairly
>>> different from the producer and consumer. Probably it makes sense to reduce
>>> the scope of the KIP to the producer and consumer clients only.
>>> 
 it'd be nice to have a definition of when re-bootstrapping
 would occur that doesn't rely on internal implementation details. What
 user-visible phenomena can we identify that would lead to a
 re-bootstrapping?
>>> 
>>> Let's put it this way: "Re-bootstrapping means that the client forgets
>>> about nodes it knows about and falls back on the bootstrap nodes as if it
>>> had just been initialized. Re-bootstrapping happens when, during a metadata
>>> update (which may be scheduled by `metadata.max.age.ms` or caused by
>>> certain error responses like NOT_LEADER_OR_FOLLOWER, REPLICA_NOT_AVAILABLE,
>>> etc.), the client doesn't have a node with an established connection or
>>> establishable connection."
>>> Does this sound good?
>>> 
 I also believe that if someone has "
 reconnect.backoff.max.ms" set to a low-enough value,
 NetworkClient::leastLoadedNode may never return null. In that case,
 shouldn't we still attempt a re-bootstrap at some point (if the user has
 enabled this feature)?
>>> 
>>> Yes, you're right. Particularly `canConnect` here [1] can always be
>>> returning `true` if `reconnect.backoff.max.ms` is low enough.
>>> It seems pretty difficult to find a good criteria when re-bootstrapping
>>> should be forced in this case, so it'd be difficult to configure and reason
>>> about. I think it's worth mentioning in the KIP and later in the
>>> documentation, but we should not try to do anything special here.
>>> 
 Would it make sense to re-bootstrap only after "
 metadata.max.age.ms" has elapsed since the last metadata update, and
>>> when
 at least one request has been made to contact each known server and been
 met with failure?
>>> 
>>> The first condition is satisfied by the check in the beginning of
>>> `maybeUpdate` [2].
>>> It seems to me, the second one is also satisfied by `leastLoadedNode`.
>>> Admittedly, it's more relaxed than you propose: it tracks unavailability of
>>> nodes that was detected by all types of requests, not only by metadata
>>> requests.
>>> What do you think, would this be enough?
>>> 
>>> [1]
>>> https://github.com/apache/kafka/blob/c9a42c85e2c903329b3550181d230527e90e3646/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L698
>>> [2]
>>> https://github.com/apache/kafka/blob/c9a42c85e2c903329b3550181d230527e90e3646/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1034-L1041
>>> 
>>> Best,
>>> Ivan
>>> 
>>> 
>>> On Tue, 21 Feb 2023 at 20:07, Chris Egerton 
>>> wrote:
>>> 
 Hi Ivan,
 
 I believe the logic you've linked is only applicable for the producer and
 consumer clients; the admin client does something different (see [1]).