[jira] [Commented] (KAFKA-16377) Fix flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores

2024-03-16 Thread Kuan Po Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827766#comment-17827766
 ] 

Kuan Po Tseng commented on KAFKA-16377:
---

I'm willing to take over this issue, many thanks !

> Fix flaky 
> HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
> --
>
> Key: KAFKA-16377
> URL: https://issues.apache.org/jira/browse/KAFKA-16377
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Priority: Major
>
> {quote}
> [2024-03-13T16:07:11.125Z] Gradle Test Run :streams:test > Gradle Test 
> Executor 95 > HighAvailabilityTaskAssignorIntegrationTest > 
> shouldScaleOutWithWarmupTasksAndPersistentStores(String, TestInfo) > 
> "shouldScaleOutWithWarmupTasksAndPersistentStores(String, 
> TestInfo).balance_subtopology" FAILED
> [2024-03-13T16:07:11.125Z] java.lang.AssertionError: the first assignment 
> after adding a node should be unstable while we warm up the state.
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.assertFalseNoRetry(HighAvailabilityTaskAssignorIntegrationTest.java:310)
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.lambda$shouldScaleOutWithWarmupTasks$7(HighAvailabilityTaskAssignorIntegrationTest.java:237)
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:395)
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:443)
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:392)
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376)
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366)
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:232)
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:130)
> {quote}
> https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-pr/branches/PR-15474/runs/12/nodes/9/steps/88/log/?start=0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16377) Fix flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores

2024-03-16 Thread Kuan Po Tseng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kuan Po Tseng reassigned KAFKA-16377:
-

Assignee: Kuan Po Tseng

> Fix flaky 
> HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
> --
>
> Key: KAFKA-16377
> URL: https://issues.apache.org/jira/browse/KAFKA-16377
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Assignee: Kuan Po Tseng
>Priority: Major
>
> {quote}
> [2024-03-13T16:07:11.125Z] Gradle Test Run :streams:test > Gradle Test 
> Executor 95 > HighAvailabilityTaskAssignorIntegrationTest > 
> shouldScaleOutWithWarmupTasksAndPersistentStores(String, TestInfo) > 
> "shouldScaleOutWithWarmupTasksAndPersistentStores(String, 
> TestInfo).balance_subtopology" FAILED
> [2024-03-13T16:07:11.125Z] java.lang.AssertionError: the first assignment 
> after adding a node should be unstable while we warm up the state.
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.assertFalseNoRetry(HighAvailabilityTaskAssignorIntegrationTest.java:310)
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.lambda$shouldScaleOutWithWarmupTasks$7(HighAvailabilityTaskAssignorIntegrationTest.java:237)
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:395)
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:443)
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:392)
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376)
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366)
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:232)
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:130)
> {quote}
> https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-pr/branches/PR-15474/runs/12/nodes/9/steps/88/log/?start=0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15302: Stale value returned when using store.all() with key deletion [docs] [kafka]

2024-03-16 Thread via GitHub


jinyongchoi commented on code in PR #15495:
URL: https://github.com/apache/kafka/pull/15495#discussion_r1527410280


##
docs/streams/developer-guide/memory-mgmt.html:
##
@@ -151,6 +151,10 @@
 Serdes.Long())
   .withCachingEnabled();
   Record caches are not supported for versioned state stores.
+  Caution: When using withCachingEnabled(),
+if you delete() a key from the Store while iterating(e.g., 
KeyValueIterator) through the keys,
+the value of the key that hasn't been flushed from the cache may be 
returned as a stale value.
+Therefore, when deleting, you must flush() the cache before the 
iterator.

Review Comment:
   > I would rephrase this sentenc.
   > ```
   > To avoid reading stale data, you can `flush()` the store before creating 
the iterator. Note, that flushing too often can lead to performance degration 
if RocksDB is used, so we advice to avoid flushing manually in general.
   
   Ok. I will change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16273: Update consumer_bench_test.py to use consumer group protocol [kafka]

2024-03-16 Thread via GitHub


philipnee commented on PR #15548:
URL: https://github.com/apache/kafka/pull/15548#issuecomment-2002293547

   Hi @lucasbru @kirktrue - This is the ask to add consumer group protocol to 
the test.  It should be pretty straightforward and the results are here:
   ```
   

   SESSION REPORT (ALL TESTS)
   ducktape version: 0.11.4
   session_id:   2024-03-16--002
   run time: 76 minutes 36.150 seconds
   tests run:28
   passed:   25
   flaky:0
   failed:   3
   ignored:  0
   

   test_id:
kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.0-4.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
   status: PASS
   run time:   2 minutes 5.067 seconds
   

   test_id:
kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.0-4.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=classic
   status: PASS
   run time:   2 minutes 3.908 seconds
   

   test_id:
kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.0-4.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
   status: FAIL
   run time:   12 minutes 10.546 seconds
   
   
   TimeoutError('consume_workload failed to finish in the expected amount 
of time.')
   Traceback (most recent call last):
 File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
   data = self.run_test()
 File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
   return self.test_context.function(self.test)
 File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
   return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
 File "/opt/kafka-dev/tests/kafkatest/tests/core/consume_bench_test.py", 
line 111, in test_consume_bench
   consume_workload.wait_for_done(timeout_sec=360)
 File "/opt/kafka-dev/tests/kafkatest/services/trogdor/trogdor.py", line 
352, in wait_for_done
   wait_until(lambda: self.done(),
 File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
58, in wait_until
   raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
   ducktape.errors.TimeoutError: consume_workload failed to finish in the 
expected amount of time.
   
   

   test_id:
kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.0-4.metadata_quorum=ZK.use_new_coordinator=False
   status: PASS
   run time:   2 minutes 2.461 seconds
   

   test_id:
kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
   status: PASS
   run time:   2 minutes 6.186 seconds
   

   test_id:
kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=classic
   status: PASS
   run time:   2 minutes 4.417 seconds
   

   test_id:
kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
   status: PASS
   run time:   2 minutes 4.510 seconds
   

   test_id:
kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.metadata_quorum=ZK.use_new_coordinator=False
   status: PASS
   run time:   2 minutes 0.361 seconds
   

   test_id:
kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_multiple_consumers_random_group_partitions.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
   status: PASS
   run time:   2 minutes 14.046 seconds
   

   test_id:

[PR] KAFKA-16273: Update consumer_bench_test.py to use consumer group protocol [kafka]

2024-03-16 Thread via GitHub


philipnee opened a new pull request, #15548:
URL: https://github.com/apache/kafka/pull/15548

   Adding this as part of the greater effort to modify the system tests to 
incorporate the use of consumer group protocol from KIP-848.  Following is the 
test results and the tests using protocol = consumer are expected to fail:
   ```
   

   SESSION REPORT (ALL TESTS)
   ducktape version: 0.11.4
   session_id:   2024-03-16--002
   run time: 76 minutes 36.150 seconds
   tests run:28
   passed:   25
   flaky:0
   failed:   3
   ignored:  0
   

   test_id:
kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.0-4.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
   status: PASS
   run time:   2 minutes 5.067 seconds
   

   test_id:
kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.0-4.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=classic
   status: PASS
   run time:   2 minutes 3.908 seconds
   

   test_id:
kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.0-4.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
   status: FAIL
   run time:   12 minutes 10.546 seconds
   
   
   TimeoutError('consume_workload failed to finish in the expected amount 
of time.')
   Traceback (most recent call last):
 File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
   data = self.run_test()
 File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
   return self.test_context.function(self.test)
 File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
   return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
 File "/opt/kafka-dev/tests/kafkatest/tests/core/consume_bench_test.py", 
line 111, in test_consume_bench
   consume_workload.wait_for_done(timeout_sec=360)
 File "/opt/kafka-dev/tests/kafkatest/services/trogdor/trogdor.py", line 
352, in wait_for_done
   wait_until(lambda: self.done(),
 File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
58, in wait_until
   raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
   ducktape.errors.TimeoutError: consume_workload failed to finish in the 
expected amount of time.
   
   

   test_id:
kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.0-4.metadata_quorum=ZK.use_new_coordinator=False
   status: PASS
   run time:   2 minutes 2.461 seconds
   

   test_id:
kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
   status: PASS
   run time:   2 minutes 6.186 seconds
   

   test_id:
kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=classic
   status: PASS
   run time:   2 minutes 4.417 seconds
   

   test_id:
kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
   status: PASS
   run time:   2 minutes 4.510 seconds
   

   test_id:
kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.metadata_quorum=ZK.use_new_coordinator=False
   status: PASS
   run time:   2 minutes 0.361 seconds
   

   test_id:
kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_multiple_consumers_random_group_partitions.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
   status: PASS
   run time:   2 minutes 14.046 seconds
   

   test_id:

Re: [PR] KAFKA-12187: replace assertTrue(obj instanceof X) with assertInstanceOf [kafka]

2024-03-16 Thread via GitHub


chia7712 commented on PR #15512:
URL: https://github.com/apache/kafka/pull/15512#issuecomment-2002292346

   > should we include this one too ?
   
   yep, please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12187: replace assertTrue(obj instanceof X) with assertInstanceOf [kafka]

2024-03-16 Thread via GitHub


chia7712 commented on code in PR #15512:
URL: https://github.com/apache/kafka/pull/15512#discussion_r1527404009


##
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##
@@ -850,7 +851,7 @@ public void testMetadataTimeoutWithMissingTopic(boolean 
isIdempotenceEnabled) th
 try {
 future.get();

Review Comment:
   yep, we are on the same page :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15282) Implement client support for KIP-848 client-side assignors

2024-03-16 Thread clownxc (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827765#comment-17827765
 ] 

clownxc commented on KAFKA-15282:
-

Hi team, if this ticket is open I will pick it up and try to complete it.

> Implement client support for KIP-848 client-side assignors
> --
>
> Key: KAFKA-15282
> URL: https://issues.apache.org/jira/browse/KAFKA-15282
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 4.0.0
>
>
> The client-side assignor provides the logic for the partition assignments 
> instead of on the server. Client-side assignment is the main approach used by 
> the “old protocol” for divvying up partitions. While the “new protocol” 
> favors server-side assignors, the client-side assignor will continue to be 
> used for backward compatibility, including KSQL, Connect, etc.
> Note: I _*think*_ that the client-side assignor logic and the reconciliation 
> logic can remain separate from each other. We should strive to keep the two 
> pieces unencumbered, unless it’s unavoidable.
> This task includes:
>  * Validate the client’s configuration for assignor selection
>  * Integrate with the new {{PartitionAssignor}} interface to invoke the logic 
> from the user-provided assignor implementation
>  * Implement the necessary logic around the request/response from the 
> {{ConsumerGroupPrepareAssignment}} RPC call using the information from the 
> {{PartitionAssignor}} above
>  * Implement the necessary logic around the request/response from the 
> {{ConsumerGroupInstallAssignment}} RPC call, again using the information 
> calculated by the {{PartitionAssignor}}
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15551) Evaluate conditions for short circuiting consumer API calls

2024-03-16 Thread clownxc (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827764#comment-17827764
 ] 

clownxc commented on KAFKA-15551:
-

It seems not so difficult... Could I pick this up?

> Evaluate conditions for short circuiting consumer API calls
> ---
>
> Key: KAFKA-15551
> URL: https://issues.apache.org/jira/browse/KAFKA-15551
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, events
> Fix For: 3.8.0
>
>
> For conditions like:
>  * Committing empty offset
>  * Fetching offsets for empty partitions
>  * Getting empty topic partition position
> Should be short circuit possibly at the API level.
> As a bonus, we should double-check whether the existing {{KafkaConsumer}} 
> implementation suffers from this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-16 Thread via GitHub


mjsax commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1527393681


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -727,7 +801,7 @@ public void testWindowing() {
 }
 
 @Test
-public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {
+public void testShouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {

Review Comment:
   For this PR, let it be, but for the future, please avoid unnecessary 
re-naming.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -223,57 +224,51 @@ private void emitNonJoinedOuterRecords(
 try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
+boolean outerJoinLeftWindowOpen = false;
+boolean outerJoinRightWindowOpen = false;
 while (it.hasNext()) {
-boolean outerJoinLeftBreak = false;
-boolean outerJoinRightBreak = false;
 final KeyValue, 
LeftOrRightValue> next = it.next();
 final TimestampedKeyAndJoinSide 
timestampedKeyAndJoinSide = next.key;
-final LeftOrRightValue value = next.value;
-final K key = timestampedKeyAndJoinSide.getKey();
 final long timestamp = 
timestampedKeyAndJoinSide.getTimestamp();
 sharedTimeTracker.minTime = timestamp;
 
-// Skip next records if window has not closed
+// Skip next records if window has not closed yet
+// We rely on the  
ordering of KeyValueIterator
 final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
 if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
 if (timestampedKeyAndJoinSide.isLeftSide()) {
-outerJoinLeftBreak = true; // there are no more 
candidates to emit on left-outerJoin-side
-} else {
-outerJoinRightBreak = true; // there are no more 
candidates to emit on right-outerJoin-side
-}
-if (outerJoinLeftBreak && outerJoinRightBreak) {
-break; // there are no more candidates to emit on 
left-outerJoin-side and
-// right-outerJoin-side
+outerJoinLeftWindowOpen = true; // there are no 
more candidates to emit on left-outerJoin-side
 } else {
-continue; // there are possibly candidates left on 
the other outerJoin-side
+outerJoinRightWindowOpen = true; // there are no 
more candidates to emit on right-outerJoin-side
 }
 }
 
-final VOut nullJoinedValue;
-if (isLeftSide) {
-nullJoinedValue = joiner.apply(key,
-value.getLeftValue(),
-value.getRightValue());
-} else {
-nullJoinedValue = joiner.apply(key,
-(V1) value.getRightValue(),
-(V2) value.getLeftValue());
+if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) {
+// if windows are open for both joinSides we can break 
since there are no more candidates to emit
+break;
+}  else if (windowOpenForJoinSide(outerJoinLeftWindowOpen, 
outerJoinRightWindowOpen, timestampedKeyAndJoinSide)) {
+// else if  window is open only for this joinSide we 
continue with the next outer record
+continue;
 }
 
-context().forward(
-
record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp)
-);
-
-if (prevKey != null && 
!prevKey.equals(timestampedKeyAndJoinSide)) {
-// blind-delete the previous key from the outer window 
store now it is emitted;
-// we do this because this delete would remove the 
whole list of values of the same key,
-// and hence if we delete eagerly and then fail, we 
would miss emitting join results of the later
-// values in the list.
-// we do not use delete() calls since it would incur 
extra get()
-store.put(prevKey, null);
+final K key = timestampedKeyAndJoinSide.getKey();
+   

[jira] [Commented] (KAFKA-15538) Client support for java regex based subscription

2024-03-16 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827763#comment-17827763
 ] 

Phuc Hong Tran commented on KAFKA-15538:


[~kirktrue] regarding this comment in KafkaConsumerTest 
(https://github.com/apache/kafka/blob/313574e329ef55ce2da3641430add3c4473d621a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L1151),
 which rebalance logic were you reffering to? TIA

> Client support for java regex based subscription
> 
>
> Key: KAFKA-15538
> URL: https://issues.apache.org/jira/browse/KAFKA-15538
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Blocker
>  Labels: kip-848-client-support, newbie, regex
> Fix For: 3.8.0
>
>
> When using subscribe with a java regex (Pattern), we need to resolve it on 
> the client side to send the broker a list of topic names to subscribe to.
> Context:
> The new consumer group protocol uses [Google 
> RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
> new methods in the consumer API to subscribe using a `SubscribePattern`. The 
> subscribe using a java `Pattern` will be still supported for a while but 
> eventually removed.
>  * When the subscribe with SubscriptionPattern is used, the client should 
> just send the regex to the broker and it will be resolved on the server side.
>  * In the case of the subscribe with Pattern, the regex should be resolved on 
> the client side.
> As part of this task, we should re-enable all integration tests defined in 
> the PlainTextAsyncConsumer that relate to subscription with pattern and that 
> are currently disabled for the new consumer + new protocol



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15302: Stale value returned when using store.all() with key deletion [docs] [kafka]

2024-03-16 Thread via GitHub


jinyongchoi commented on code in PR #15495:
URL: https://github.com/apache/kafka/pull/15495#discussion_r1527393749


##
docs/streams/developer-guide/memory-mgmt.html:
##
@@ -151,6 +151,10 @@
 Serdes.Long())
   .withCachingEnabled();
   Record caches are not supported for versioned state stores.
+  Caution: When using withCachingEnabled(),
+if you delete() a key from the Store while iterating(e.g., 
KeyValueIterator) through the keys,

Review Comment:
   > Cannot remember from the ticket, but was this limited to deletes, or also 
updates?
   
   Yes, that's correct.
   This affects the snapshot created in store.all(), causing it to read the 
values from rocksdb before the cache is flushed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-16 Thread via GitHub


mjsax commented on code in PR #15414:
URL: https://github.com/apache/kafka/pull/15414#discussion_r1527393295


##
streams/src/test/java/org/apache/kafka/streams/integration/GlobalStateReprocessTest.java:
##
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+
+@Timeout(600)
+@Tag("integration")
+public class GlobalStateReprocessTest {
+private static final int NUM_BROKERS = 1;
+private static final Properties BROKER_CONFIG;
+
+static {
+BROKER_CONFIG = new Properties();
+BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 
1);
+BROKER_CONFIG.put("transaction.state.log.min.isr", 1);
+}
+
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
+
+@BeforeAll
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterAll
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+
+private final MockTime mockTime = CLUSTER.time;
+private final String globalStore = "globalStore";
+private StreamsBuilder builder;
+private Properties streamsConfiguration;
+private KafkaStreams kafkaStreams;
+private String globalStoreTopic;
+
+
+@BeforeEach
+public void before(final TestInfo testInfo) throws Exception {
+builder = new StreamsBuilder();
+
+createTopics();
+streamsConfiguration = new Properties();
+final String safeTestName = safeUniqueTestName(testInfo);
+streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
+streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
100L);
+
+final KeyValueStoreBuilder storeBuilder = new 
KeyValueStoreBuilder<>(
+

Re: [PR] MINOR: fix flaky EosIntegrationTest [kafka]

2024-03-16 Thread via GitHub


mjsax merged PR #15494:
URL: https://github.com/apache/kafka/pull/15494


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12187: replace assertTrue(obj instanceof X) with assertInstanceOf [kafka]

2024-03-16 Thread via GitHub


brandboat commented on PR #15512:
URL: https://github.com/apache/kafka/pull/15512#issuecomment-2002043253

   gentle ping @chia7712, there is another case that could be replaced with 
assertIsntanceOf 
   (in IntelliJ Find in files with regex `assertTrue\(\n?.*isInstance`)
   
https://github.com/apache/kafka/blob/3f666026260472179c17ded967a8d9577107e1f0/clients/src/test/java/org/apache/kafka/test/TestUtils.java#L559
   could be replaced with
   ```
   assertInstanceOf(exceptionCauseClass, exception.getCause(),
   ```
   should we include this one too ?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12187: replace assertTrue(obj instanceof X) with assertInstanceOf [kafka]

2024-03-16 Thread via GitHub


brandboat commented on code in PR #15512:
URL: https://github.com/apache/kafka/pull/15512#discussion_r1527189077


##
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##
@@ -850,7 +851,7 @@ public void testMetadataTimeoutWithMissingTopic(boolean 
isIdempotenceEnabled) th
 try {
 future.get();

Review Comment:
   Thanks for the detailed review ! :smiley: 
   
   Here's the revised version for a final check before I proceed with applying 
it.
   ```java
   try {
   assertInstanceOf(TimeoutException.class, 
assertThrows(ExecutionException.class, future::get).getCause());
   } finally {
   producer.close(Duration.ofMillis(0));
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-16 Thread via GitHub


johnnychhsu commented on code in PR #15476:
URL: https://github.com/apache/kafka/pull/15476#discussion_r1527123510


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -79,9 +79,34 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
+  def testThreeNonCompressedRecordsInOneBatch(quorum: String): Unit = {
+produceMessagesInOneBatch()
+verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+createTopicWithConfig(topicNameWithCustomConfigs, props)
+produceMessagesInOneBatch(topic=topicNameWithCustomConfigs)
+// In LogAppendTime's case, if the timestamps are the same, we choose the 
offset of the first record

Review Comment:
   this is the difference explanation compared to previous behaviour 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-16 Thread via GitHub


johnnychhsu commented on PR #15476:
URL: https://github.com/apache/kafka/pull/15476#issuecomment-2001881984

   test with
   ```
   ./gradlew clean core:test --tests kafka.log.LogSegmentTest
   ./gradlew clean core:test --tests kafka.log.LocalLogTest 
   ```
   and it passed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-16 Thread via GitHub


johnnychhsu commented on code in PR #15476:
URL: https://github.com/apache/kafka/pull/15476#discussion_r1527119875


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java:
##
@@ -209,7 +209,7 @@ private static FilterResult filterTo(TopicPartition 
partition, Iterable

Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-16 Thread via GitHub


johnnychhsu commented on code in PR #15476:
URL: https://github.com/apache/kafka/pull/15476#discussion_r1527119875


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java:
##
@@ -209,7 +209,7 @@ private static FilterResult filterTo(TopicPartition 
partition, Iterable

[jira] [Resolved] (KAFKA-16342) Fix compressed records

2024-03-16 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-16342.
---
Fix Version/s: 3.6.2
   3.8.0
   3.7.1
   Resolution: Fixed

> Fix compressed records
> --
>
> Key: KAFKA-16342
> URL: https://issues.apache.org/jira/browse/KAFKA-16342
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-16 Thread via GitHub


showuon merged PR #15542:
URL: https://github.com/apache/kafka/pull/15542


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-16 Thread via GitHub


johnnychhsu commented on code in PR #15476:
URL: https://github.com/apache/kafka/pull/15476#discussion_r1527116740


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -82,6 +82,13 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
   def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
 produceMessagesInSeparateBatch()
 verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+createTopicWithConfig(topicNameWithCustomConfigs, props)
+produceMessagesInSeparateBatch(topic = topicNameWithCustomConfigs)
+verifyListOffsets(topic = topicNameWithCustomConfigs, 2)

Review Comment:
   agree, let me add that. Thanks for the suggestion!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-16 Thread via GitHub


johnnychhsu commented on code in PR #15476:
URL: https://github.com/apache/kafka/pull/15476#discussion_r1527116473


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -77,11 +77,34 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
 verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testThreeNonCompressedRecordsInOneBatch(quorum: String): Unit = {
+produceMessagesInOneBatch()
+verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+createTopicWithConfig(topicNameWithCustomConfigs, props)
+produceMessagesInOneBatch(topic=topicNameWithCustomConfigs)
+// In LogAppendTime's case, the maxTimestampOffset should be the first 
record of the batch.
+// So in this one batch test, it'll be the first offset which is 0
+verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
+  }
+
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
   def testThreeRecordsInSeparateBatch(quorum: String): Unit = {

Review Comment:
   thanks for the suggestions! totally agree, let me change it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org