[GitHub] [kafka] dengziming commented on pull request #11681: KAFKA-8785: fix request timeout by waiting for metadata cache up-to-date

2022-03-25 Thread GitBox


dengziming commented on pull request #11681:
URL: https://github.com/apache/kafka/pull/11681#issuecomment-1078739838


   > if we don't advertise the offset to controller,
   
   @showuon, Here, Jason means we postpone advertising offset after the 
publisher has finished publishing, which means MetadataCache has been updated.
   
   We can achieve it in a simpler way here:
   1. Add a method/field in MetadataPublisher, def highestMetadataOffset long
   2. every time we call `BrokerMetadataPublisher.publish()`, we set the 
`highestMetadataOffset` to the newest offset, maybe we should add a parameter 
logEndOffset to `BrokerMetadataPublisher.publish()`
   3. Change the code `lifecycleManager.start(() => 
metadataListener.highestMetadataOffset )` to `lifecycleManager.start(() => 
metadataPublisher.highestMetadataOffset` in `BrokerServer`.
   
   PTAL. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11681: KAFKA-8785: fix request timeout by waiting for metadata cache up-to-date

2022-03-25 Thread GitBox


showuon commented on pull request #11681:
URL: https://github.com/apache/kafka/pull/11681#issuecomment-1078741456


   @dengziming , cool! Thanks for the info. Are you interested in submitting 
another PR to address it? That would be helpful and make v3.2.0 release sooner!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #11681: KAFKA-8785: fix request timeout by waiting for metadata cache up-to-date

2022-03-25 Thread GitBox


dengziming commented on pull request #11681:
URL: https://github.com/apache/kafka/pull/11681#issuecomment-1078747626


   @showuon Thank you, I will have a try.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on pull request #11919: MINOR: Unify the log output of JaasContext.defaultContext

2022-03-25 Thread GitBox


RivenSun2 commented on pull request #11919:
URL: https://github.com/apache/kafka/pull/11919#issuecomment-1078748642


   Hi @dajac 
   please help to review the PR .
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on pull request #11911: KAFKA-13463: Make pause behavior consistent between cooperative and eager protocols

2022-03-25 Thread GitBox


RivenSun2 commented on pull request #11911:
URL: https://github.com/apache/kafka/pull/11911#issuecomment-1078749227


   @guozhangwang @hachikuji 
   please help to review the PR .
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11942: KAFKA-13767: Fetch response did not need delay return when the preferred read replica not local replica

2022-03-25 Thread GitBox


dajac commented on a change in pull request #11942:
URL: https://github.com/apache/kafka/pull/11942#discussion_r835027340



##
File path: core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
##
@@ -166,6 +171,34 @@ class DelayedFetchTest {
 assertTrue(fetchResultOpt.isDefined)
   }
 
+
+  @Test
+  def testHasPreferredReadReplica(): Unit = {

Review comment:
   It seems that we don't really test the changes made in 
`ReplicaManager.fetchMessages` method here, isn't it? Overall, I think that the 
unit test should be in `ReplicaManagerTest` and it should ensure that a fetch 
does not go to the fetch purgatory if it has a preferred read replica. We might 
consider update `testPreferredReplicaAsLeader` directly. That test actually 
works because it fetches with min bytes equals to zero. If we change this to 1, 
it fails right away.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10405) Flaky Test org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState

2022-03-25 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-10405:
-

Assignee: Luke Chen

> Flaky Test 
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState
> ---
>
> Key: KAFKA-10405
> URL: https://issues.apache.org/jira/browse/KAFKA-10405
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Luke Chen
>Priority: Major
>
> From build [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1979/]
>  
> {noformat}
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest > 
> shouldRestoreState FAILED
> 14:25:19 java.lang.AssertionError: Condition not met within timeout 
> 6. Repartition topic 
> restore-test-KSTREAM-AGGREGATE-STATE-STORE-02-repartition not purged 
> data after 6 ms.
> 14:25:19 at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:388)
> 14:25:19 at 
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState(PurgeRepartitionTopicIntegrationTest.java:206){noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon commented on a change in pull request #11948: KAFKA-10405: set purge interval explicitly

2022-03-25 Thread GitBox


showuon commented on a change in pull request #11948:
URL: https://github.com/apache/kafka/pull/11948#discussion_r835060883



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
##
@@ -203,10 +204,11 @@ public void shouldRestoreState() throws Exception {
 TestUtils.waitForCondition(new 
RepartitionTopicCreatedWithExpectedConfigs(), 6,
 "Repartition topic " + REPARTITION_TOPIC + " not created with 
the expected configs after 6 ms.");
 
+// wait until we received more than 1 segment of data, so that we can 
confirm the purge succeeds in next verification
 TestUtils.waitForCondition(
-new RepartitionTopicVerified(currentSize -> currentSize > 0),
+new RepartitionTopicVerified(currentSize -> currentSize > 
PURGE_SEGMENT_BYTES),

Review comment:
   side fix: we used to only make sure we received data before verifying 
the purge result. But it might be `0 <  current size < PURGE_SEGMENT_BYTES`, 
and next, we verify `currentSize <= PURGE_SEGMENT_BYTES` will get passed 
directly, even though there's no purge happened. Fix it by making sure we 
received more than `PURGE_SEGMENT_BYTES` size of data




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11948: KAFKA-10405: set purge interval explicitly

2022-03-25 Thread GitBox


showuon commented on pull request #11948:
URL: https://github.com/apache/kafka/pull/11948#issuecomment-1078788088


   @guozhangwang @mjsax , call for review for a simple fix. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-4801) Transient test failure (part 2): ConsumerBounceTest.testConsumptionWithBrokerFailures

2022-03-25 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-4801:


Assignee: Luke Chen

> Transient test failure (part 2): 
> ConsumerBounceTest.testConsumptionWithBrokerFailures
> -
>
> Key: KAFKA-4801
> URL: https://issues.apache.org/jira/browse/KAFKA-4801
> Project: Kafka
>  Issue Type: Sub-task
>  Components: unit tests
>Reporter: Armin Braun
>Assignee: Luke Chen
>Priority: Minor
>  Labels: transient-system-test-failure
>
> There is still some (but very little ... when reproducing this you need more 
> than 100 runs in half the cases statistically) instability left in the test
> {code}
> ConsumerBounceTest.testConsumptionWithBrokerFailures
> {code}
> Resulting in this exception being thrown at a relatively low rate (I'd say 
> def less than 0.5% of all runs on my machine).
> {code}
> kafka.api.ConsumerBounceTest > testConsumptionWithBrokerFailures FAILED
> java.lang.IllegalArgumentException: You can only check the position for 
> partitions assigned to this consumer.
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1271)
> at 
> kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:96)
> at 
> kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:69)
> {code}
> this was also reported in a comment to the original KAFKA-4198
> https://issues.apache.org/jira/browse/KAFKA-4198?focusedCommentId=15765468&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15765468



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon opened a new pull request #11949: KAFKA-4801: don't verify assignment during broker up and down

2022-03-25 Thread GitBox


showuon opened a new pull request #11949:
URL: https://github.com/apache/kafka/pull/11949


   Test failed with 
   ```
   org.opentest4j.AssertionFailedError: expected:  but was: 

   ```
   In this test, we have another thread to let broker down and up, to test if 
consumer can still work as expected. During the broker down and up, we tried to 
verify the assignment is as what we expected. But the rebalance will keep 
triggering while broker down and up. It doesn't make sense to verify the 
assignment here. Remove it to make the test reliable.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11949: KAFKA-4801: don't verify assignment during broker up and down

2022-03-25 Thread GitBox


showuon commented on pull request #11949:
URL: https://github.com/apache/kafka/pull/11949#issuecomment-1078793585


   @guozhangwang , call for review for this simple fix. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10405) Flaky Test org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState

2022-03-25 Thread Liam Clarke-Hutchinson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17512263#comment-17512263
 ] 

Liam Clarke-Hutchinson commented on KAFKA-10405:


Kia ora korua [~mjsax] [~ableegoldman], 

I've been looking into this test and its failures. 

While I can say _why_ it's failing (it's expecting a log segment to be deleted 
that isn't being deleted, as the next segment's base offset isn't less than or 
equal to the log start offset (LSO) that is being set by the StreamThread 
asking for records with committed offsets to be deleted, so can't be deleted), 
I can't explain why it's failing {_}now{_}. And from the comments, this failure 
appears to be sporadic.

That said, I would like to propose that the test either be deleted or seriously 
revised, for the following reasons:
 # The test, as it stands, seems more focused on testing segment deletion 
behaviour, which I expect is being tested elsewhere
 # The code under test has at least two, if not three, concurrently moving 
parts executing on timers with default periods that aren't being explicitly set 
- e.g., StreamThread "purges" committed offsets on a timer, UnifiedLog deletes 
segments on a timer, LocalLog rolls segments on a timer (and whether or not a 
segment is rolled is based on another configured duration. I believe this makes 
this test incredibly fragile and non-deterministic from the get-go.
 # There's a massive amount of uncommented, or unexpressed, assumptions in the 
test, and it's very hard to maintain without the knowledge of the original 
author.

I'm very happy to either delete it, or revise it. 

But if we go for revision, how do we test what we're trying to test?

My understanding, based on my code reading, is that once the purge timer in the 
StreamThread fires, a request is sent in the TaskManager through the 
adminClient to delete records up to the offset of the last committed 
(consumed?) record in the repartition topic. 

And this is accomplished by setting the log start offset (LSO) to the last 
committed offset + 1. 

So would it be sufficient for this test, to verify that the LSO on the topic, 
is the highest offset sent by the streams app in the deletion request +1? 

I'm not, at this stage, even sure if testing it like this as possible, as it 
would involve capturing the offsets sent by the streams app via the admin 
client, but I feel that if it's possible to test this, it would be a better 
test of the desired functionality than peering into topic sizes and relying on 
assumed behaviour of the Log side functionality of the broker?

Alternatively, is there a possibility that this failure is actually exposing a 
bug? I honestly don't have enough knowledge to answer that.

I have scattered a bunch of logging statements through UnifiedLog and LocalLog 
to understand what's happening, and I can provide that output if it's of use in 
answering my questions :) 

> Flaky Test 
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState
> ---
>
> Key: KAFKA-10405
> URL: https://issues.apache.org/jira/browse/KAFKA-10405
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Luke Chen
>Priority: Major
>
> From build [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1979/]
>  
> {noformat}
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest > 
> shouldRestoreState FAILED
> 14:25:19 java.lang.AssertionError: Condition not met within timeout 
> 6. Repartition topic 
> restore-test-KSTREAM-AGGREGATE-STATE-STORE-02-repartition not purged 
> data after 6 ms.
> 14:25:19 at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:388)
> 14:25:19 at 
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState(PurgeRepartitionTopicIntegrationTest.java:206){noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-10405) Flaky Test org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState

2022-03-25 Thread Liam Clarke-Hutchinson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17512263#comment-17512263
 ] 

Liam Clarke-Hutchinson edited comment on KAFKA-10405 at 3/25/22, 8:54 AM:
--

Kia ora rā kōrua [~mjsax] [~ableegoldman], 

I've been looking into this test and its failures.

While I can say _why_ it's failing (it's expecting a log segment to be deleted 
that isn't being deleted, as the next segment's base offset isn't less than or 
equal to the log start offset (LSO) that is being set by the StreamThread 
asking for records with committed offsets to be deleted, so can't be deleted), 
I can't explain why it's failing {_}now{_}. And from the comments, this failure 
appears to be sporadic.

That said, I would like to propose that the test either be deleted or seriously 
revised, for the following reasons:
 # The test, as it stands, seems more focused on testing segment deletion 
behaviour, which I expect is being tested elsewhere
 # The code under test has at least two, if not three, concurrently moving 
parts executing on timers with default periods that aren't being explicitly set 
- e.g., StreamThread "purges" committed offsets on a timer, UnifiedLog deletes 
segments on a timer, LocalLog rolls segments on a timer (and whether or not a 
segment is rolled is based on another configured duration. I believe this makes 
this test incredibly fragile and non-deterministic from the get-go.
 # There's a massive amount of uncommented, or unexpressed, assumptions in the 
test, and it's very hard to maintain without the knowledge of the original 
author.

I'm very happy to either delete it, or revise it.

But if we go for revision, how do we test what we're trying to test?

My understanding, based on my code reading, is that once the purge timer in the 
StreamThread fires, a request is sent in the TaskManager through the 
adminClient to delete records up to the offset of the last committed 
(consumed?) record in the repartition topic. 

And this is accomplished by setting the log start offset (LSO) to the last 
committed offset + 1. 

So would it be sufficient for this test, to verify that the LSO on the topic, 
is the highest offset sent by the streams app in the deletion request +1? 

I'm not, at this stage, even sure if testing it like this as possible, as it 
would involve capturing the offsets sent by the streams app via the admin 
client, but I feel that if it's possible to test this, it would be a better 
test of the desired functionality than peering into topic sizes and relying on 
assumed behaviour of the Log side functionality of the broker?

Alternatively, is there a possibility that this failure is actually exposing a 
bug? I honestly don't have enough knowledge to answer that.

I have scattered a bunch of logging statements through UnifiedLog and LocalLog 
to understand what's happening, and I can provide that output if it's of use in 
answering my questions :) 

Ngā mihi, 

Liam Clarke


was (Author: JIRAUSER279886):
Kia ora korua [~mjsax] [~ableegoldman], 

I've been looking into this test and its failures. 

While I can say _why_ it's failing (it's expecting a log segment to be deleted 
that isn't being deleted, as the next segment's base offset isn't less than or 
equal to the log start offset (LSO) that is being set by the StreamThread 
asking for records with committed offsets to be deleted, so can't be deleted), 
I can't explain why it's failing {_}now{_}. And from the comments, this failure 
appears to be sporadic.

That said, I would like to propose that the test either be deleted or seriously 
revised, for the following reasons:
 # The test, as it stands, seems more focused on testing segment deletion 
behaviour, which I expect is being tested elsewhere
 # The code under test has at least two, if not three, concurrently moving 
parts executing on timers with default periods that aren't being explicitly set 
- e.g., StreamThread "purges" committed offsets on a timer, UnifiedLog deletes 
segments on a timer, LocalLog rolls segments on a timer (and whether or not a 
segment is rolled is based on another configured duration. I believe this makes 
this test incredibly fragile and non-deterministic from the get-go.
 # There's a massive amount of uncommented, or unexpressed, assumptions in the 
test, and it's very hard to maintain without the knowledge of the original 
author.

I'm very happy to either delete it, or revise it. 

But if we go for revision, how do we test what we're trying to test?

My understanding, based on my code reading, is that once the purge timer in the 
StreamThread fires, a request is sent in the TaskManager through the 
adminClient to delete records up to the offset of the last committed 
(consumed?) record in the repartition topic. 

And this is accomplished by setting the log start offset (LSO) to the last 
committed offset + 1

[jira] [Comment Edited] (KAFKA-10405) Flaky Test org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState

2022-03-25 Thread Liam Clarke-Hutchinson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17512263#comment-17512263
 ] 

Liam Clarke-Hutchinson edited comment on KAFKA-10405 at 3/25/22, 8:55 AM:
--

Kia ora rā kōrua [~mjsax] [~ableegoldman], 

I've been looking into this test and its failures.

While I can say _why_ it's failing (it's expecting a log segment to be deleted 
that isn't being deleted, as the next segment's base offset isn't less than or 
equal to the log start offset (LSO) which was set by the StreamThread when it 
asked for records with committed offsets to be deleted), I can't explain why 
it's failing {_}now{_}. And from the comments, this failure appears to be 
sporadic.

That said, I would like to propose that the test either be deleted or seriously 
revised, for the following reasons:
 # The test, as it stands, seems more focused on testing segment deletion 
behaviour, which I expect is being tested elsewhere
 # The code under test has at least two, if not three, concurrently moving 
parts executing on timers with default periods that aren't being explicitly set 
- e.g., StreamThread "purges" committed offsets on a timer, UnifiedLog deletes 
segments on a timer, LocalLog rolls segments on a timer (and whether or not a 
segment is rolled is based on another configured duration. I believe this makes 
this test incredibly fragile and non-deterministic from the get-go.
 # There's a massive amount of uncommented, or unexpressed, assumptions in the 
test, and it's very hard to maintain without the knowledge of the original 
author.

I'm very happy to either delete it, or revise it.

But if we go for revision, how do we test what we're trying to test?

My understanding, based on my code reading, is that once the purge timer in the 
StreamThread fires, a request is sent in the TaskManager through the 
adminClient to delete records up to the offset of the last committed 
(consumed?) record in the repartition topic. 

And this is accomplished by setting the log start offset (LSO) to the last 
committed offset + 1. 

So would it be sufficient for this test, to verify that the LSO on the topic, 
is the highest offset sent by the streams app in the deletion request +1? 

I'm not, at this stage, even sure if testing it like this as possible, as it 
would involve capturing the offsets sent by the streams app via the admin 
client, but I feel that if it's possible to test this, it would be a better 
test of the desired functionality than peering into topic sizes and relying on 
assumed behaviour of the Log side functionality of the broker?

Alternatively, is there a possibility that this failure is actually exposing a 
bug? I honestly don't have enough knowledge to answer that.

I have scattered a bunch of logging statements through UnifiedLog and LocalLog 
to understand what's happening, and I can provide that output if it's of use in 
answering my questions :) 

Ngā mihi, 

Liam Clarke


was (Author: JIRAUSER279886):
Kia ora rā kōrua [~mjsax] [~ableegoldman], 

I've been looking into this test and its failures.

While I can say _why_ it's failing (it's expecting a log segment to be deleted 
that isn't being deleted, as the next segment's base offset isn't less than or 
equal to the log start offset (LSO) that is being set by the StreamThread 
asking for records with committed offsets to be deleted, so can't be deleted), 
I can't explain why it's failing {_}now{_}. And from the comments, this failure 
appears to be sporadic.

That said, I would like to propose that the test either be deleted or seriously 
revised, for the following reasons:
 # The test, as it stands, seems more focused on testing segment deletion 
behaviour, which I expect is being tested elsewhere
 # The code under test has at least two, if not three, concurrently moving 
parts executing on timers with default periods that aren't being explicitly set 
- e.g., StreamThread "purges" committed offsets on a timer, UnifiedLog deletes 
segments on a timer, LocalLog rolls segments on a timer (and whether or not a 
segment is rolled is based on another configured duration. I believe this makes 
this test incredibly fragile and non-deterministic from the get-go.
 # There's a massive amount of uncommented, or unexpressed, assumptions in the 
test, and it's very hard to maintain without the knowledge of the original 
author.

I'm very happy to either delete it, or revise it.

But if we go for revision, how do we test what we're trying to test?

My understanding, based on my code reading, is that once the purge timer in the 
StreamThread fires, a request is sent in the TaskManager through the 
adminClient to delete records up to the offset of the last committed 
(consumed?) record in the repartition topic. 

And this is accomplished by setting the log start offset (LSO) to the last 
committed offset + 1. 

So would it b

[jira] [Comment Edited] (KAFKA-10405) Flaky Test org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState

2022-03-25 Thread Liam Clarke-Hutchinson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17512263#comment-17512263
 ] 

Liam Clarke-Hutchinson edited comment on KAFKA-10405 at 3/25/22, 8:57 AM:
--

Kia ora rā kōrua [~mjsax] [~ableegoldman], 

I've been looking into this test and its failures.

While I can say _why_ it's failing (it's expecting a log segment to be deleted 
that isn't being deleted, as the next segment's base offset isn't less than or 
equal to the log start offset (LSO) which was set by the StreamThread when it 
asked for records with committed offsets to be deleted), I can't explain why 
it's failing {_}now{_}. And from the comments, this failure appears to be 
sporadic.

That said, I would like to propose that the test either be deleted or seriously 
revised, for the following reasons:
 # The test, as it stands, seems more focused on testing segment deletion 
behaviour, which I expect is being tested elsewhere
 # The code under test has at least two, if not three, concurrently moving 
parts executing on timers with default periods that aren't being explicitly set 
- e.g., StreamThread "purges" committed offsets on a timer, UnifiedLog deletes 
segments on a timer, LocalLog rolls segments on a timer (and whether or not a 
segment is rolled is based on another configured duration). I believe this 
makes this test significantly fragile and likely to be non-deterministic on a 
fundamental level.
 # There's a massive amount of uncommented, or unexpressed, assumptions in the 
test, and it's very hard to refactor or fix without the knowledge of the 
original author.

I'm very happy to either delete it, or revise it.

But if we go for revision, how do we test what we're trying to test?

My understanding, based on my code reading, is that once the purge timer in the 
StreamThread fires, a request is sent in the TaskManager through the 
adminClient to delete records up to the offset of the last committed/consumed 
record in the repartition topic. 

And this is accomplished by setting the log start offset (LSO) to the last 
committed offset + 1. 

So would it be sufficient for this test, to verify that the LSO on the topic, 
is the highest offset sent by the streams app in the deletion request +1? 

I'm not, at this stage, even sure if testing it like this as possible, as it 
would involve capturing the offsets sent by the streams app via the admin 
client, but I feel that if it's possible to test this, it would be a better 
test of the desired functionality than peering into topic sizes and relying on 
assumed behaviour of the Log side functionality of the broker?

Alternatively, is there a possibility that this failure is actually exposing a 
bug? I honestly don't have enough knowledge to answer that.

I have scattered a bunch of logging statements through UnifiedLog and LocalLog 
to understand what's happening, and I can provide that output if it's of use in 
answering my questions :) 

Ngā mihi, 

Liam Clarke


was (Author: JIRAUSER279886):
Kia ora rā kōrua [~mjsax] [~ableegoldman], 

I've been looking into this test and its failures.

While I can say _why_ it's failing (it's expecting a log segment to be deleted 
that isn't being deleted, as the next segment's base offset isn't less than or 
equal to the log start offset (LSO) which was set by the StreamThread when it 
asked for records with committed offsets to be deleted), I can't explain why 
it's failing {_}now{_}. And from the comments, this failure appears to be 
sporadic.

That said, I would like to propose that the test either be deleted or seriously 
revised, for the following reasons:
 # The test, as it stands, seems more focused on testing segment deletion 
behaviour, which I expect is being tested elsewhere
 # The code under test has at least two, if not three, concurrently moving 
parts executing on timers with default periods that aren't being explicitly set 
- e.g., StreamThread "purges" committed offsets on a timer, UnifiedLog deletes 
segments on a timer, LocalLog rolls segments on a timer (and whether or not a 
segment is rolled is based on another configured duration. I believe this makes 
this test incredibly fragile and non-deterministic from the get-go.
 # There's a massive amount of uncommented, or unexpressed, assumptions in the 
test, and it's very hard to maintain without the knowledge of the original 
author.

I'm very happy to either delete it, or revise it.

But if we go for revision, how do we test what we're trying to test?

My understanding, based on my code reading, is that once the purge timer in the 
StreamThread fires, a request is sent in the TaskManager through the 
adminClient to delete records up to the offset of the last committed 
(consumed?) record in the repartition topic. 

And this is accomplished by setting the log start offset (LSO) to the last 
committed offset + 1. 

So

[jira] (KAFKA-10405) Flaky Test org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState

2022-03-25 Thread Liam Clarke-Hutchinson (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-10405 ]


Liam Clarke-Hutchinson deleted comment on KAFKA-10405:


was (Author: JIRAUSER279886):
Kia ora rā kōrua [~mjsax] [~ableegoldman], 

I've been looking into this test and its failures.

While I can say _why_ it's failing (it's expecting a log segment to be deleted 
that isn't being deleted, as the next segment's base offset isn't less than or 
equal to the log start offset (LSO) which was set by the StreamThread when it 
asked for records with committed offsets to be deleted), I can't explain why 
it's failing {_}now{_}. And from the comments, this failure appears to be 
sporadic.

That said, I would like to propose that the test either be deleted or seriously 
revised, for the following reasons:
 # The test, as it stands, seems more focused on testing segment deletion 
behaviour, which I expect is being tested elsewhere
 # The code under test has at least two, if not three, concurrently moving 
parts executing on timers with default periods that aren't being explicitly set 
- e.g., StreamThread "purges" committed offsets on a timer, UnifiedLog deletes 
segments on a timer, LocalLog rolls segments on a timer (and whether or not a 
segment is rolled is based on another configured duration). I believe this 
makes this test significantly fragile and likely to be non-deterministic on a 
fundamental level.
 # There's a massive amount of uncommented, or unexpressed, assumptions in the 
test, and it's very hard to refactor or fix without the knowledge of the 
original author.

I'm very happy to either delete it, or revise it.

But if we go for revision, how do we test what we're trying to test?

My understanding, based on my code reading, is that once the purge timer in the 
StreamThread fires, a request is sent in the TaskManager through the 
adminClient to delete records up to the offset of the last committed/consumed 
record in the repartition topic. 

And this is accomplished by setting the log start offset (LSO) to the last 
committed offset + 1. 

So would it be sufficient for this test, to verify that the LSO on the topic, 
is the highest offset sent by the streams app in the deletion request +1? 

I'm not, at this stage, even sure if testing it like this as possible, as it 
would involve capturing the offsets sent by the streams app via the admin 
client, but I feel that if it's possible to test this, it would be a better 
test of the desired functionality than peering into topic sizes and relying on 
assumed behaviour of the Log side functionality of the broker?

Alternatively, is there a possibility that this failure is actually exposing a 
bug? I honestly don't have enough knowledge to answer that.

I have scattered a bunch of logging statements through UnifiedLog and LocalLog 
to understand what's happening, and I can provide that output if it's of use in 
answering my questions :) 

Ngā mihi, 

Liam Clarke

> Flaky Test 
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState
> ---
>
> Key: KAFKA-10405
> URL: https://issues.apache.org/jira/browse/KAFKA-10405
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Luke Chen
>Priority: Major
>
> From build [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1979/]
>  
> {noformat}
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest > 
> shouldRestoreState FAILED
> 14:25:19 java.lang.AssertionError: Condition not met within timeout 
> 6. Repartition topic 
> restore-test-KSTREAM-AGGREGATE-STATE-STORE-02-repartition not purged 
> data after 6 ms.
> 14:25:19 at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:388)
> 14:25:19 at 
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState(PurgeRepartitionTopicIntegrationTest.java:206){noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-13737) Flaky kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection

2022-03-25 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming reassigned KAFKA-13737:
--

Assignee: dengziming

> Flaky kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection
> 
>
> Key: KAFKA-13737
> URL: https://issues.apache.org/jira/browse/KAFKA-13737
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: dengziming
>Priority: Blocker
> Fix For: 3.2.0
>
>
> Examples:
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests
> {code}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
> assignment. Call: describeTopics
>   at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
>   at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
>   at 
> kafka.utils.TestUtils$.$anonfun$waitForLeaderToBecome$1(TestUtils.scala:1812)
>   at scala.util.Try$.apply(Try.scala:210)
>   at kafka.utils.TestUtils$.currentLeader$1(TestUtils.scala:1811)
>   at kafka.utils.TestUtils$.waitForLeaderToBecome(TestUtils.scala:1819)
>   at kafka.utils.TestUtils$.assertLeader(TestUtils.scala:1789)
>   at 
> kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection(LeaderElectionCommandTest.scala:172)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13768) Transactional producer exits because of expiration in RecordAccumulator

2022-03-25 Thread xuexiaoyue (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuexiaoyue updated KAFKA-13768:
---
Issue Type: Bug  (was: Improvement)

> Transactional producer exits because of expiration in RecordAccumulator
> ---
>
> Key: KAFKA-13768
> URL: https://issues.apache.org/jira/browse/KAFKA-13768
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.0.0
>Reporter: xuexiaoyue
>Priority: Major
>
> Hi team, I'm using a transactional producer and set request.timeout.ms to a 
> rather small value such as 10s, meanwhile set zookeeper.session.timeout.ms 
> longer such as 30s. 
> When the producer sending records and one broker accidentally shut down, I 
> notice the producer throw out 'org.apache.kafka.common.KafkaException: The 
> client hasn't received acknowledgment for some previously sent messages and 
> can no longer retry them. It isn't safe to continue' and exit.
> Looking into the code, I found that when a batch expired in 
> RecordAccumulator, it will be marked as unsolved in Sender#sendProducerData. 
> And if it's a transactional process, it will be doomed to 
> transitionToFatalError later.
> I'm wondering why we need to transitionToFatalError here? Is it better to 
> abort this transaction instead? I know it's necessary to bump the epoch 
> during the idempotence sending, but why we let the producer crash in this 
> case?
> I found that KAFKA-8805; Bump producer epoch on recoverable errors (#7389)  
> fix this by automatically bumping the producer epoch after aborting the 
> transaction, but why it's necessary to bump the epoch, what problem will 
> occur if we call transitionToAbortableError directly and let the user abort 
> it?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-8391) Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector

2022-03-25 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17512275#comment-17512275
 ] 

Luke Chen commented on KAFKA-8391:
--

This issue might be related (caused by) KAFKA-12495

> Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
> ---
>
> Key: KAFKA-8391
> URL: https://issues.apache.org/jira/browse/KAFKA-8391
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.2.0
>
> Attachments: 100-gradle-builds.tar
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4747/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testDeleteConnector/]
> {quote}java.lang.AssertionError: Condition not met within timeout 3. 
> Connector tasks did not stop in time. at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:166){quote}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] Gerrrr commented on a change in pull request #11945: KAFKA-13769: Explicitly route FK join results to correct partitions

2022-03-25 Thread GitBox


Ge commented on a change in pull request #11945:
URL: https://github.com/apache/kafka/pull/11945#discussion_r835106474



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -1197,7 +1197,7 @@ boolean sendingOldValueEnabled() {
 
 final StreamPartitioner> 
foreignResponseSinkPartitioner =
 tableJoinedInternal.partitioner() == null
-? null
+? (topic, key, subscriptionResponseWrapper, numPartitions) -> 
subscriptionResponseWrapper.getPrimaryPartition()

Review comment:
   Yes, this is correct. For completeness, here is how I reasoned about it. 
Without this patch the partitioner is `null`, so `RecordCollectorImpl` sets the 
partition to `null` here:
   
   
https://github.com/apache/kafka/blob/ce883892270a02a72e91afbdb1fabdd50d7da474/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L136
   
   With this patch, the partitioner is always not `null`, so `RecordCollector` 
uses it to find desired partition here:
   
   
https://github.com/apache/kafka/blob/ce883892270a02a72e91afbdb1fabdd50d7da474/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L130
   
   In the case when the new partitioner returns `null` for the v0 data, we get 
to the same state as without this patch - the following call to another version 
of `RecordCollectorImpl#send` has `partition` set to `null`:
   
   
https://github.com/apache/kafka/blob/ce883892270a02a72e91afbdb1fabdd50d7da474/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L139




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7957) Flaky Test DynamicBrokerReconfigurationTest#testMetricsReporterUpdate

2022-03-25 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17512290#comment-17512290
 ] 

Luke Chen commented on KAFKA-7957:
--

We didn't put the latest failed error stack trace, which is not easy to 
investigate

> Flaky Test DynamicBrokerReconfigurationTest#testMetricsReporterUpdate
> -
>
> Key: KAFKA-7957
> URL: https://issues.apache.org/jira/browse/KAFKA-7957
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.2.0
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/18/]
> {quote}java.lang.AssertionError: Messages not sent at 
> kafka.utils.TestUtils$.fail(TestUtils.scala:356) at 
> kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:766) at 
> kafka.server.DynamicBrokerReconfigurationTest.startProduceConsume(DynamicBrokerReconfigurationTest.scala:1270)
>  at 
> kafka.server.DynamicBrokerReconfigurationTest.testMetricsReporterUpdate(DynamicBrokerReconfigurationTest.scala:650){quote}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-8280) Flaky Test DynamicBrokerReconfigurationTest#testUncleanLeaderElectionEnable

2022-03-25 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17512288#comment-17512288
 ] 

Luke Chen commented on KAFKA-8280:
--

We didn't put the latest failed error stack trace, which is not easy to 
investigate

> Flaky Test DynamicBrokerReconfigurationTest#testUncleanLeaderElectionEnable
> ---
>
> Key: KAFKA-8280
> URL: https://issues.apache.org/jira/browse/KAFKA-8280
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.3.0
>Reporter: John Roesler
>Priority: Blocker
> Fix For: 3.2.0
>
>
> I saw this fail again on 
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3979/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/testUncleanLeaderElectionEnable/
> {noformat}
> Error Message
> java.lang.AssertionError: Unclean leader not elected
> Stacktrace
> java.lang.AssertionError: Unclean leader not elected
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable(DynamicBrokerReconfigurationTest.scala:510)
> {noformat}
> {noformat}
> Standard Output
> Completed Updating config for entity: brokers '0'.
> Completed Updating config for entity: brokers '1'.
> Completed Updating config for entity: brokers '2'.
> [2019-04-23 01:17:11,690] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition testtopic-6 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,690] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition testtopic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,722] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition testtopic-7 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,722] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition testtopic-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,760] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=1] Error for partition testtopic-6 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,761] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=1] Error for partition testtopic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,765] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition testtopic-3 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,765] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition testtopic-9 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:13,779] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition __consumer_offsets-8 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:13,780] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition __consumer_offsets-38 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:13,780] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition __consumer_offsets-2 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:13,780] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition __consumer_offsets-14 at offset 0 
> (kafka.server.R

[jira] [Commented] (KAFKA-6527) Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig

2022-03-25 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17512291#comment-17512291
 ] 

Luke Chen commented on KAFKA-6527:
--

We didn't put the latest failed error stack trace, which is not easy to 
investigate. Maybe we can re-enable it and gather the failed message when 
occurrence?

> Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig
> 
>
> Key: KAFKA-6527
> URL: https://issues.apache.org/jira/browse/KAFKA-6527
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 3.2.0
>
>
> {code:java}
> java.lang.AssertionError: Log segment size increase not applied
>   at kafka.utils.TestUtils$.fail(TestUtils.scala:355)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.testDefaultTopicConfig(DynamicBrokerReconfigurationTest.scala:348)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-8280) Flaky Test DynamicBrokerReconfigurationTest#testUncleanLeaderElectionEnable

2022-03-25 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17512288#comment-17512288
 ] 

Luke Chen edited comment on KAFKA-8280 at 3/25/22, 9:59 AM:


We didn't put the latest failed error stack trace, which is not easy to 
investigate. Maybe we can re-enable it and gather the failed message when 
occurrence?


was (Author: showuon):
We didn't put the latest failed error stack trace, which is not easy to 
investigate

> Flaky Test DynamicBrokerReconfigurationTest#testUncleanLeaderElectionEnable
> ---
>
> Key: KAFKA-8280
> URL: https://issues.apache.org/jira/browse/KAFKA-8280
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.3.0
>Reporter: John Roesler
>Priority: Blocker
> Fix For: 3.2.0
>
>
> I saw this fail again on 
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3979/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/testUncleanLeaderElectionEnable/
> {noformat}
> Error Message
> java.lang.AssertionError: Unclean leader not elected
> Stacktrace
> java.lang.AssertionError: Unclean leader not elected
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable(DynamicBrokerReconfigurationTest.scala:510)
> {noformat}
> {noformat}
> Standard Output
> Completed Updating config for entity: brokers '0'.
> Completed Updating config for entity: brokers '1'.
> Completed Updating config for entity: brokers '2'.
> [2019-04-23 01:17:11,690] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition testtopic-6 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,690] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition testtopic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,722] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition testtopic-7 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,722] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition testtopic-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,760] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=1] Error for partition testtopic-6 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,761] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=1] Error for partition testtopic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,765] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition testtopic-3 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,765] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition testtopic-9 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:13,779] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition __consumer_offsets-8 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:13,780] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition __consumer_offsets-38 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:13,780] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition __consumer_offsets-2 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopic

[jira] [Comment Edited] (KAFKA-7957) Flaky Test DynamicBrokerReconfigurationTest#testMetricsReporterUpdate

2022-03-25 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17512290#comment-17512290
 ] 

Luke Chen edited comment on KAFKA-7957 at 3/25/22, 10:00 AM:
-

We didn't put the latest failed error stack trace, which is not easy to 
investigate. Maybe we can re-enable it and gather the failed message when 
occurrence?


was (Author: showuon):
We didn't put the latest failed error stack trace, which is not easy to 
investigate

> Flaky Test DynamicBrokerReconfigurationTest#testMetricsReporterUpdate
> -
>
> Key: KAFKA-7957
> URL: https://issues.apache.org/jira/browse/KAFKA-7957
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.2.0
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/18/]
> {quote}java.lang.AssertionError: Messages not sent at 
> kafka.utils.TestUtils$.fail(TestUtils.scala:356) at 
> kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:766) at 
> kafka.server.DynamicBrokerReconfigurationTest.startProduceConsume(DynamicBrokerReconfigurationTest.scala:1270)
>  at 
> kafka.server.DynamicBrokerReconfigurationTest.testMetricsReporterUpdate(DynamicBrokerReconfigurationTest.scala:650){quote}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] Gerrrr commented on a change in pull request #11945: KAFKA-13769: Explicitly route FK join results to correct partitions

2022-03-25 Thread GitBox


Ge commented on a change in pull request #11945:
URL: https://github.com/apache/kafka/pull/11945#discussion_r835135226



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
##
@@ -23,12 +23,13 @@
 
 
 public class SubscriptionWrapper {
-static final byte CURRENT_VERSION = 0;
+static final byte CURRENT_VERSION = 1;
 
 private final long[] hash;
 private final Instruction instruction;
 private final byte version;
 private final K primaryKey;
+private final Integer primaryPartition;

Review comment:
   Added in fcfe805cd1847cb9f441535a30c6a8b87c27e8e1

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
##
@@ -79,16 +79,27 @@ public void setIfUnset(final SerdeGetter getter) {
 );
 
 final ByteBuffer buf;
+int dataLength = 2 + primaryKeySerializedData.length;

Review comment:
   Done in a59a4f6fe2a0d61d15656742f1d42862918f80a6




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bozhao12 commented on a change in pull request #11942: KAFKA-13767: Fetch response did not need delay return when the preferred read replica not local replica

2022-03-25 Thread GitBox


bozhao12 commented on a change in pull request #11942:
URL: https://github.com/apache/kafka/pull/11942#discussion_r835142708



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1046,7 +1049,9 @@ class ReplicaManager(val config: KafkaConfig,
 //3) has enough data to respond
 //4) some error happens while reading data
 //5) we found a diverging epoch
-if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes 
|| errorReadingData || hasDivergingEpoch) {
+//6) the preferred read replica not local replica

Review comment:
   Following your suggestion, I modified the unit test
   I also found some problems when modifying the unit test, which I think are 
bugs




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bozhao12 commented on a change in pull request #11942: KAFKA-13767: Fetch response did not need delay return when the preferred read replica not local replica

2022-03-25 Thread GitBox


bozhao12 commented on a change in pull request #11942:
URL: https://github.com/apache/kafka/pull/11942#discussion_r835149333



##
File path: core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
##
@@ -166,6 +171,34 @@ class DelayedFetchTest {
 assertTrue(fetchResultOpt.isDefined)
   }
 
+
+  @Test
+  def testHasPreferredReadReplica(): Unit = {

Review comment:
   @dajac  Following your  suggestion, I have modified the unit test.
   I also found some problems when modifying the unit test, which I think it's 
a bug:
   If KafkaConfig.ReplicaSelectorClassProp is not configured, 
ReplicaManager.replicaSelectorOpt will  be a None object, which will cause most 
of the unit tests on preferred read replicas to not take test purpose., eg 
ReplicaManagerTest.testReplicaSelector 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bozhao12 commented on a change in pull request #11942: KAFKA-13767: Fetch response did not need delay return when the preferred read replica not local replica

2022-03-25 Thread GitBox


bozhao12 commented on a change in pull request #11942:
URL: https://github.com/apache/kafka/pull/11942#discussion_r835149333



##
File path: core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
##
@@ -166,6 +171,34 @@ class DelayedFetchTest {
 assertTrue(fetchResultOpt.isDefined)
   }
 
+
+  @Test
+  def testHasPreferredReadReplica(): Unit = {

Review comment:
   @dajac  Following your  suggestion, I have modified the unit test.
   I also found some problems when modifying the unit test, which I think it's 
a bug:
   If KafkaConfig.ReplicaSelectorClassProp is not configured, 
ReplicaManager.replicaSelectorOpt will  be a None object, which will cause most 
of the unit tests on preferred read replicas to not take test purpose., eg 
ReplicaManagerTest.testReplicaSelector.
   Should I mention another PR  or fix it in this PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bozhao12 commented on a change in pull request #11942: KAFKA-13767: Fetch response did not need delay return when the preferred read replica not local replica

2022-03-25 Thread GitBox


bozhao12 commented on a change in pull request #11942:
URL: https://github.com/apache/kafka/pull/11942#discussion_r835149333



##
File path: core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
##
@@ -166,6 +171,34 @@ class DelayedFetchTest {
 assertTrue(fetchResultOpt.isDefined)
   }
 
+
+  @Test
+  def testHasPreferredReadReplica(): Unit = {

Review comment:
   @dajac  Following your  suggestion, I have modified the unit test.
   I also found some problems when modifying the unit test, which I think it's 
a bug:
   If `KafkaConfig.ReplicaSelectorClassProp`  is not configured, 
`ReplicaManager.replicaSelectorOpt` will  be a None object, which will cause 
most of the unit tests on preferred read replicas to not take test purpose., eg 
`ReplicaManagerTest.testReplicaSelector.`
   Should I mention another PR  or fix it in this PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi commented on a change in pull request #10738: KAFKA-6945: KIP-373, allow users to create delegation token for others

2022-03-25 Thread GitBox


viktorsomogyi commented on a change in pull request #10738:
URL: https://github.com/apache/kafka/pull/10738#discussion_r835156392



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -2926,24 +2926,36 @@ class KafkaApis(val requestChannel: RequestChannel,
   trace(s"Sending create token response for correlation id 
${request.header.correlationId} " +
 s"to client ${request.header.clientId}.")
   requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-CreateDelegationTokenResponse.prepareResponse(requestThrottleMs, 
createResult.error, request.context.principal, createResult.issueTimestamp,
-  createResult.expiryTimestamp, createResult.maxTimestamp, 
createResult.tokenId, ByteBuffer.wrap(createResult.hmac)))
+
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs, createResult.error, createResult.owner,
+  createResult.tokenRequester, createResult.issueTimestamp, 
createResult.expiryTimestamp, createResult.maxTimestamp, createResult.tokenId,
+  ByteBuffer.wrap(createResult.hmac)))
 }
 
+val ownerPrincipalName = createTokenRequest.data().ownerPrincipalName()
+val owner = if (ownerPrincipalName == null || ownerPrincipalName.isEmpty) {
+  request.context.principal
+} else {
+  new KafkaPrincipal(createTokenRequest.data().ownerPrincipalType(), 
ownerPrincipalName)
+}
+val requester = request.context.principal
+
 if (!allowTokenRequests(request))

Review comment:
   I'll solve it like this (and added tests too):
   ```
   if (!allowTokenRequests(request)) {
 requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
   
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs,
 Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, owner, requester))
   } else if (!owner.equals(requester) && 
!authHelper.authorize(request.context, CREATE_TOKENS, USER, owner.toString)) {
 requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
   
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs,
 Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED, owner, requester))
   } else {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bozhao12 commented on a change in pull request #11942: KAFKA-13767: Fetch response did not need delay return when the preferred read replica not local replica

2022-03-25 Thread GitBox


bozhao12 commented on a change in pull request #11942:
URL: https://github.com/apache/kafka/pull/11942#discussion_r835149333



##
File path: core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
##
@@ -166,6 +171,34 @@ class DelayedFetchTest {
 assertTrue(fetchResultOpt.isDefined)
   }
 
+
+  @Test
+  def testHasPreferredReadReplica(): Unit = {

Review comment:
   @dajac  Following your  suggestion, I have modified the unit test.
   I also found some problems when modifying the unit test, which I think it's 
a bug:
   If `KafkaConfig.ReplicaSelectorClassProp`  is not configured, 
`ReplicaManager.replicaSelectorOpt` will  be a None object, which will causes 
some unit tests of preferred read replicas to not take test purpose., eg 
`ReplicaManagerTest.testReplicaSelector.`
   Should I mention another PR  or fix it in this PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11942: KAFKA-13767: Fetch response did not need delay return when the preferred read replica not local replica

2022-03-25 Thread GitBox


dajac commented on a change in pull request #11942:
URL: https://github.com/apache/kafka/pull/11942#discussion_r835170244



##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -1253,9 +1253,11 @@ class ReplicaManagerTest {
 val countDownLatch = new CountDownLatch(1)
 
 // Prepare the mocked components for the test
+val props = new Properties()
+props.put(KafkaConfig.ReplicaSelectorClassProp, 
"org.apache.kafka.common.replica.RackAwareReplicaSelector")

Review comment:
   Hum... It seems that my suggestion to update this test was perhaps not 
good when I see the diff. I misread the test. It might be better to keep it and 
add another one which is somewhat similar. Sorry for the confusion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bozhao12 commented on a change in pull request #11942: KAFKA-13767: Fetch response did not need delay return when the preferred read replica not local replica

2022-03-25 Thread GitBox


bozhao12 commented on a change in pull request #11942:
URL: https://github.com/apache/kafka/pull/11942#discussion_r835186358



##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -1253,9 +1253,11 @@ class ReplicaManagerTest {
 val countDownLatch = new CountDownLatch(1)
 
 // Prepare the mocked components for the test
+val props = new Properties()
+props.put(KafkaConfig.ReplicaSelectorClassProp, 
"org.apache.kafka.common.replica.RackAwareReplicaSelector")

Review comment:
   Thanks a lot for your suggestion, I added a new unit test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bozhao12 commented on a change in pull request #11942: KAFKA-13767: Fetch response did not need delay return when the preferred read replica not local replica

2022-03-25 Thread GitBox


bozhao12 commented on a change in pull request #11942:
URL: https://github.com/apache/kafka/pull/11942#discussion_r835186358



##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -1253,9 +1253,11 @@ class ReplicaManagerTest {
 val countDownLatch = new CountDownLatch(1)
 
 // Prepare the mocked components for the test
+val props = new Properties()
+props.put(KafkaConfig.ReplicaSelectorClassProp, 
"org.apache.kafka.common.replica.RackAwareReplicaSelector")

Review comment:
   @dajac Thanks a lot for your suggestion, I added a new unit test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10405) Flaky Test org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState

2022-03-25 Thread Liam Clarke-Hutchinson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17512383#comment-17512383
 ] 

Liam Clarke-Hutchinson commented on KAFKA-10405:


Nice work [~showuon], last build looks great!

> Flaky Test 
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState
> ---
>
> Key: KAFKA-10405
> URL: https://issues.apache.org/jira/browse/KAFKA-10405
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Luke Chen
>Priority: Major
>
> From build [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1979/]
>  
> {noformat}
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest > 
> shouldRestoreState FAILED
> 14:25:19 java.lang.AssertionError: Condition not met within timeout 
> 6. Repartition topic 
> restore-test-KSTREAM-AGGREGATE-STATE-STORE-02-repartition not purged 
> data after 6 ms.
> 14:25:19 at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:388)
> 14:25:19 at 
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState(PurgeRepartitionTopicIntegrationTest.java:206){noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] cadonna commented on a change in pull request #11923: KAFKA-6718 / Documentation

2022-03-25 Thread GitBox


cadonna commented on a change in pull request #11923:
URL: https://github.com/apache/kafka/pull/11923#discussion_r835303600



##
File path: docs/streams/developer-guide/config-streams.html
##
@@ -677,6 +685,30 @@ default.windowed.value.serde.innerThis is discussed in more detail in Data types and serialization.
 
 
+  
+rack.aware.assignment.tags
+
+
+  
+
+  This configuration sets list of tag keys used to distribute 
standby replicas across Kafka Streams
+  instances. When configured, Kafka Streams will make a 
best-effort to distribute the standby tasks over
+  each client tag dimension.
+
+
+  Tags for the Kafka Streams instances can be set via client.tag.
+  prefix. Example:
+

Review comment:
   I think the doc would benefit from a more complete example. could you 
try to put the example from the KIP in  a concise way here?
   ```
   # Kafka Streams Client 1
   client.tag.zone: eu-central-1a
   client.tag.cluster: k8s-cluster1
   rack.aware.assignment.tags: zone,cluster

   # Kafka Streams Client 2
   client.tag.zone: eu-central-1b
   client.tag.cluster: k8s-cluster1
   rack.aware.assignment.tags: zone,cluster

   # Kafka Streams Client 3
   client.tag.zone: eu-central-1a
   client.tag.cluster: k8s-cluster2
   rack.aware.assignment.tags: zone,cluster

   # Kafka Streams Client 4
   client.tag.zone: eu-central-1b
   client.tag.cluster: k8s-cluster2
   rack.aware.assignment.tags: zone,cluster
   ```
   Maybe it is possible to put the clients next to each other or two client in 
one row and the other two client below. Below the clients you could describe 
who get which standbys for the active tasks on each client.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yyu1993 opened a new pull request #11950: Fix missing active segment

2022-03-25 Thread GitBox


yyu1993 opened a new pull request #11950:
URL: https://github.com/apache/kafka/pull/11950


   In the log layer, if all segments from a log are deleted before a new 
segment is created, the log will have no active segment for a brief time. This 
will allows for a race condition to happen if other threads are reading the 
active segment at the same time which will can result in a 
NoSuchElementException.
   
   This PR:
   - introduces a new function which creates a new segment and then deletes the 
given segment. This would ensure the log will have an active segment at all 
times.
   - fixes two places where segments are deleted before new segment is created.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yyu1993 commented on pull request #11950: Fix missing active segment

2022-03-25 Thread GitBox


yyu1993 commented on pull request #11950:
URL: https://github.com/apache/kafka/pull/11950#issuecomment-1079087291


   Hi @kowshik and @junrao 
   could you help review this PR? This is same as the fix we made for ce-kafka.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11942: KAFKA-13767: Fetch response did not need delay return when the preferred read replica not local replica

2022-03-25 Thread GitBox


dajac commented on a change in pull request #11942:
URL: https://github.com/apache/kafka/pull/11942#discussion_r835364568



##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -1300,6 +1300,67 @@ class ReplicaManagerTest {
 TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testHasPreferredReplica(): Unit = {
+val topicPartition = 0
+val topicId = Uuid.randomUuid()
+val followerBrokerId = 0
+val leaderBrokerId = 1
+val leaderEpoch = 1
+val leaderEpochIncrement = 2
+val countDownLatch = new CountDownLatch(1)
+
+// Prepare the mocked components for the test
+val props = new Properties()
+props.put(KafkaConfig.ReplicaSelectorClassProp, 
"org.apache.kafka.common.replica.RackAwareReplicaSelector")
+val (replicaManager, _) = prepareReplicaManagerAndLogManager(new 
MockTimer(time),
+  topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
+  leaderBrokerId, countDownLatch, expectTruncation = true, topicId = 
Some(topicId), extraProps = props)

Review comment:
   Using this method create a lot of unnecessary noise in my opinion. 
Nowadays, we tend to use `setupReplicaManagerWithMockedPurgatories` which is 
simpler. `setupReplicaManagerWithMockedPurgatories` does not support racks 
though. I suppose that we could it accept an optional Map from broker id to 
rack. Could we try using this one instead? There are many example in this file.

##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -1300,6 +1300,67 @@ class ReplicaManagerTest {
 TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testHasPreferredReplica(): Unit = {
+val topicPartition = 0
+val topicId = Uuid.randomUuid()
+val followerBrokerId = 0
+val leaderBrokerId = 1
+val leaderEpoch = 1
+val leaderEpochIncrement = 2
+val countDownLatch = new CountDownLatch(1)
+
+// Prepare the mocked components for the test
+val props = new Properties()
+props.put(KafkaConfig.ReplicaSelectorClassProp, 
"org.apache.kafka.common.replica.RackAwareReplicaSelector")
+val (replicaManager, _) = prepareReplicaManagerAndLogManager(new 
MockTimer(time),
+  topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
+  leaderBrokerId, countDownLatch, expectTruncation = true, topicId = 
Some(topicId), extraProps = props)
+
+try {
+  val brokerList = Seq[Integer](0, 1).asJava
+
+  val tp0 = new TopicPartition(topic, 0)
+  val tidp0 = new TopicIdPartition(topicId, tp0)
+
+  initializeLogAndTopicId(replicaManager, tp0, topicId)
+
+  // Make this replica the follower

Review comment:
   I suppose the replica is a leader, no?

##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -1300,6 +1300,67 @@ class ReplicaManagerTest {
 TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testHasPreferredReplica(): Unit = {
+val topicPartition = 0
+val topicId = Uuid.randomUuid()
+val followerBrokerId = 0
+val leaderBrokerId = 1
+val leaderEpoch = 1
+val leaderEpochIncrement = 2
+val countDownLatch = new CountDownLatch(1)
+
+// Prepare the mocked components for the test
+val props = new Properties()
+props.put(KafkaConfig.ReplicaSelectorClassProp, 
"org.apache.kafka.common.replica.RackAwareReplicaSelector")
+val (replicaManager, _) = prepareReplicaManagerAndLogManager(new 
MockTimer(time),
+  topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
+  leaderBrokerId, countDownLatch, expectTruncation = true, topicId = 
Some(topicId), extraProps = props)
+
+try {
+  val brokerList = Seq[Integer](0, 1).asJava
+
+  val tp0 = new TopicPartition(topic, 0)
+  val tidp0 = new TopicIdPartition(topicId, tp0)
+
+  initializeLogAndTopicId(replicaManager, tp0, topicId)
+
+  // Make this replica the follower
+  val leaderAndIsrRequest2 = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+Seq(new LeaderAndIsrPartitionState()
+  .setTopicName(topic)
+  .setPartitionIndex(0)
+  .setControllerEpoch(0)
+  .setLeader(0)
+  .setLeaderEpoch(1)
+  .setIsr(brokerList)
+  .setZkVersion(0)
+  .setReplicas(brokerList)
+  .setIsNew(false)).asJava,
+Collections.singletonMap(topic, topicId),
+Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+  replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => 
())
+  replicaManager.getPartitionOrException(tp0).updateFollowerFetchState(1, 
new LogOffsetMetadata(0), 0, 0, 0)
+
+  val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", 
"client-id",
+InetAddress.getByName("localhost"), KafkaPrincipal.ANO

[jira] [Commented] (KAFKA-13771) Support to explicitly delete delegationTokens that have expired but have not been automatically cleaned up

2022-03-25 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17512444#comment-17512444
 ] 

Guozhang Wang commented on KAFKA-13771:
---

cc [~rsivaram] could you take a look at this ticket when you are free?

> Support to explicitly delete delegationTokens that have expired but have not 
> been automatically cleaned up
> --
>
> Key: KAFKA-13771
> URL: https://issues.apache.org/jira/browse/KAFKA-13771
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Reporter: RivenSun
>Priority: Major
>
> Quoting the official documentation
> {quote}
> Tokens can also be cancelled explicitly. If a token is not renewed by the 
> token’s expiration time or if token is beyond the max life time, it will be 
> deleted from all broker caches as well as from zookeeper.
> {quote}
> 1. The first point above means that after the `AdminClient` initiates the 
> EXPIRE_DELEGATION_TOKEN request, in the DelegationTokenManager.expireToken() 
> method on the KafkaServer side, if the user passes in expireLifeTimeMs less 
> than 0, KafaServer will delete the corresponding delegationToken directly.
> 2. There is a thread named "delete-expired-tokens" on the KafkaServer side, 
> which is responsible for regularly cleaning up expired tokens. The execution 
> interval is `delegation.token.expiry.check.interval.ms`, and the default 
> value is one hour.
> But carefully analyze the code logic in DelegationTokenManager.expireToken(), 
> *now Kafka does not support users to delete an expired delegationToken that 
> he no longer uses/renew. If the user wants to do this, they will receive a 
> DelegationTokenExpiredException.*
> In the worst case, an expired delegationToken may still can be used normally 
> within {*}an hour{*}, even if this configuration 
> (delegation.token.expiry.check.interval.ms) broker can shorten the 
> configuration as much as possible.
> The solution is very simple, simply adjust the `if` order of 
> DelegationTokenManager.expireToken().
> {code:java}
> if (!allowedToRenew(principal, tokenInfo)) {
>   expireResponseCallback(Errors.DELEGATION_TOKEN_OWNER_MISMATCH, -1)
> } else if (expireLifeTimeMs < 0) { //expire immediately
>   removeToken(tokenInfo.tokenId)
>   info(s"Token expired for token: ${tokenInfo.tokenId} for owner: 
> ${tokenInfo.owner}")
>   expireResponseCallback(Errors.NONE, now)
> } else if (tokenInfo.maxTimestamp < now || tokenInfo.expiryTimestamp < now) {
>   expireResponseCallback(Errors.DELEGATION_TOKEN_EXPIRED, -1)
> } else {
>   //set expiry time stamp
>  ..
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] Gerrrr commented on a change in pull request #11945: KAFKA-13769: Explicitly route FK join results to correct partitions

2022-03-25 Thread GitBox


Ge commented on a change in pull request #11945:
URL: https://github.com/apache/kafka/pull/11945#discussion_r835404011



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
##
@@ -64,7 +64,7 @@ public void setIfUnset(final SerdeGetter getter) {
 
 @Override
 public byte[] serialize(final String topic, final 
SubscriptionResponseWrapper data) {
-
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized 
data}
+
//{1-bit-isHashNull}{7-bits-version}{4-bytes-primaryPartition}{Optional-16-byte-Hash}{n-bytes
 serialized data}

Review comment:
   What will happen when an old processor receives a v1 record and throws 
an exception? Will it throw that record on the floor or just block the 
processing until the upgrade?
   
   As John suggested in 
[KAFKA-10336](https://issues.apache.org/jira/browse/KAFKA-10336), we can pass 
`UPGRADE_FROM` into the serde so that it produces v0 records if the option is 
set. I will also add `3.3` as a valid choice for that option. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yyu1993 edited a comment on pull request #11950: Fix missing active segment

2022-03-25 Thread GitBox


yyu1993 edited a comment on pull request #11950:
URL: https://github.com/apache/kafka/pull/11950#issuecomment-1079087291


   Hi @kowshik and @junrao 
   could you help review this PR? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #11948: KAFKA-10405: set purge interval explicitly

2022-03-25 Thread GitBox


guozhangwang merged pull request #11948:
URL: https://github.com/apache/kafka/pull/11948


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10405) Flaky Test org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState

2022-03-25 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-10405:
--
Fix Version/s: 3.2.0

> Flaky Test 
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState
> ---
>
> Key: KAFKA-10405
> URL: https://issues.apache.org/jira/browse/KAFKA-10405
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.2.0
>
>
> From build [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1979/]
>  
> {noformat}
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest > 
> shouldRestoreState FAILED
> 14:25:19 java.lang.AssertionError: Condition not met within timeout 
> 6. Repartition topic 
> restore-test-KSTREAM-AGGREGATE-STATE-STORE-02-repartition not purged 
> data after 6 ms.
> 14:25:19 at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:388)
> 14:25:19 at 
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState(PurgeRepartitionTopicIntegrationTest.java:206){noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-10405) Flaky Test org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState

2022-03-25 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-10405:
--
Fix Version/s: 3.3.0
   (was: 3.2.0)

> Flaky Test 
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState
> ---
>
> Key: KAFKA-10405
> URL: https://issues.apache.org/jira/browse/KAFKA-10405
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.3.0
>
>
> From build [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1979/]
>  
> {noformat}
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest > 
> shouldRestoreState FAILED
> 14:25:19 java.lang.AssertionError: Condition not met within timeout 
> 6. Repartition topic 
> restore-test-KSTREAM-AGGREGATE-STATE-STORE-02-repartition not purged 
> data after 6 ms.
> 14:25:19 at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:388)
> 14:25:19 at 
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState(PurgeRepartitionTopicIntegrationTest.java:206){noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] junrao commented on a change in pull request #11950: Fix missing active segment

2022-03-25 Thread GitBox


junrao commented on a change in pull request #11950:
URL: https://github.com/apache/kafka/pull/11950#discussion_r835434811



##
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##
@@ -1355,15 +1355,19 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   val numToDelete = deletable.size
   if (numToDelete > 0) {
 // we must always have at least one segment, so if we are going to 
delete all the segments, create a new one first
-if (localLog.segments.numberOfSegments == numToDelete)
-  roll()
-lock synchronized {
-  localLog.checkIfMemoryMappedBufferClosed()
-  // remove the segments for lookups
-  localLog.removeAndDeleteSegments(deletable, asyncDelete = true, 
reason)
-  deleteProducerSnapshots(deletable, asyncDelete = true)
-  
maybeIncrementLogStartOffset(localLog.segments.firstSegmentBaseOffset.get, 
SegmentDeletion)
+var segmentsToDelete = deletable
+if (localLog.segments.numberOfSegments == numToDelete) {
+  val newSegment = roll()
+  if (deletable.last.baseOffset == newSegment.baseOffset) {
+warn(s"Empty active segment at ${deletable.last.baseOffset} was 
deleted and recreated due to $reason")
+segmentsToDelete = deletable.dropRight(1)

Review comment:
   Hmm, I am not sure if this part of the logic is completely right. roll() 
eventually calls createAndDeleteSegment() that always deletes the current 
active segment. So, in the case when the new segment's offset is different from 
the last segment's offset, it seems we will be deleting the last segment twice, 
once in createAndDeleteSegment() and another here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #11949: KAFKA-4801: don't verify assignment during broker up and down

2022-03-25 Thread GitBox


guozhangwang merged pull request #11949:
URL: https://github.com/apache/kafka/pull/11949


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #11927: KAFKA-13735/KAFKA-13736: Reenable SocketServerTest.closingChannelWithBufferedReceives and SocketServerTest.remoteCloseWithoutBufferedReceives

2022-03-25 Thread GitBox


guozhangwang merged pull request #11927:
URL: https://github.com/apache/kafka/pull/11927


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lihaosky closed pull request #11858: [Emit final][3/N][KIP-825] introduce a new API to control when aggregated results are produced

2022-03-25 Thread GitBox


lihaosky closed pull request #11858:
URL: https://github.com/apache/kafka/pull/11858


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lihaosky commented on pull request #11858: [Emit final][3/N][KIP-825] introduce a new API to control when aggregated results are produced

2022-03-25 Thread GitBox


lihaosky commented on pull request #11858:
URL: https://github.com/apache/kafka/pull/11858#issuecomment-1079238287


   I'll just merge this together with https://github.com/apache/kafka/pull/11896


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-4609) KTable/KTable join followed by groupBy and aggregate/count can result in duplicated results

2022-03-25 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16959455#comment-16959455
 ] 

Matthias J. Sax edited comment on KAFKA-4609 at 3/25/22, 5:31 PM:
--

As you can see, the ticket is open. It was never addressed and thus it's not a 
surprise that you may still hit it in newer versions.


was (Author: mjsax):
As you can see, the ticket is open. It was never addressed and thus it's not a 
surprise that you may still hit it in never versions.

> KTable/KTable join followed by groupBy and aggregate/count can result in 
> duplicated results
> ---
>
> Key: KAFKA-4609
> URL: https://issues.apache.org/jira/browse/KAFKA-4609
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1, 0.10.2.0
>Reporter: Damian Guy
>Priority: Major
>  Labels: architecture
>
> When caching is enabled, KTable/KTable joins can result in duplicate values 
> being emitted. This will occur if there were updates to the same key in both 
> tables. Each table is flushed independently, and each table will trigger the 
> join, so you get two results for the same key. 
> If we subsequently perform a groupBy and then aggregate operation we will now 
> process these duplicates resulting in incorrect aggregated values. For 
> example count will be double the value it should be.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] bozhao12 commented on a change in pull request #11942: KAFKA-13767: Fetch response did not need delay return when the preferred read replica not local replica

2022-03-25 Thread GitBox


bozhao12 commented on a change in pull request #11942:
URL: https://github.com/apache/kafka/pull/11942#discussion_r835480386



##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -1300,6 +1300,67 @@ class ReplicaManagerTest {
 TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testHasPreferredReplica(): Unit = {
+val topicPartition = 0
+val topicId = Uuid.randomUuid()
+val followerBrokerId = 0
+val leaderBrokerId = 1
+val leaderEpoch = 1
+val leaderEpochIncrement = 2
+val countDownLatch = new CountDownLatch(1)
+
+// Prepare the mocked components for the test
+val props = new Properties()
+props.put(KafkaConfig.ReplicaSelectorClassProp, 
"org.apache.kafka.common.replica.RackAwareReplicaSelector")
+val (replicaManager, _) = prepareReplicaManagerAndLogManager(new 
MockTimer(time),
+  topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
+  leaderBrokerId, countDownLatch, expectTruncation = true, topicId = 
Some(topicId), extraProps = props)

Review comment:
   @dajac  Thanks for your review,I made changes for the test and add some 
comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bozhao12 commented on a change in pull request #11942: KAFKA-13767: Fetch response did not need delay return when the preferred read replica not local replica

2022-03-25 Thread GitBox


bozhao12 commented on a change in pull request #11942:
URL: https://github.com/apache/kafka/pull/11942#discussion_r835480386



##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -1300,6 +1300,67 @@ class ReplicaManagerTest {
 TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testHasPreferredReplica(): Unit = {
+val topicPartition = 0
+val topicId = Uuid.randomUuid()
+val followerBrokerId = 0
+val leaderBrokerId = 1
+val leaderEpoch = 1
+val leaderEpochIncrement = 2
+val countDownLatch = new CountDownLatch(1)
+
+// Prepare the mocked components for the test
+val props = new Properties()
+props.put(KafkaConfig.ReplicaSelectorClassProp, 
"org.apache.kafka.common.replica.RackAwareReplicaSelector")
+val (replicaManager, _) = prepareReplicaManagerAndLogManager(new 
MockTimer(time),
+  topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
+  leaderBrokerId, countDownLatch, expectTruncation = true, topicId = 
Some(topicId), extraProps = props)

Review comment:
   @dajac  Thanks for your review,I made changes for the unit test and 
added  some  necessary  comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #11908: KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default

2022-03-25 Thread GitBox


kkonstantine commented on a change in pull request #11908:
URL: https://github.com/apache/kafka/pull/11908#discussion_r835510543



##
File path: docs/connect.html
##
@@ -48,6 +48,8 @@ Running 
Kafka Connectbootstrap.servers - List of Kafka servers used to 
bootstrap connections to Kafka
 key.converter - Converter class used to convert 
between Kafka Connect format and the serialized form that is written to Kafka. 
This controls the format of the keys in messages written to or read from Kafka, 
and since this is independent of connectors it allows any connector to work 
with any serialization format. Examples of common formats include JSON and 
Avro.
 value.converter - Converter class used to convert 
between Kafka Connect format and the serialized form that is written to Kafka. 
This controls the format of the values in messages written to or read from 
Kafka, and since this is independent of connectors it allows any connector to 
work with any serialization format. Examples of common formats include JSON and 
Avro.
+plugin.path (default empty) - a list of 
paths that contain
+plugins (connectors, converters, transformations). For the purpose 
of quick starts users will have to add the path that contains the 
FileStreamSourceConnector and FileStreamSinkConnector packaged in 
connect-file-"version".jar, because these connectors are not 
included by default to the CLASSPATH or the 
plugin.path of the Connect worker.

Review comment:
   Thanks for the comment @rhauch 
   One note that I have is that I wouldn't recommend adding the relative path 
even if it works, because it possibly doesn't lead to robust deployments. 
`plugin.path` works when following symlinks so that can help. 
   
   `plugin.path` works with uber-jars but I wouldn't necessarily consider 
`connect-file-.jar` an uber jar. I'd recommend the mainstream way 
which is to add the parent directory of any directories containing connector 
jars. Let me try to see how an example looks like here, because that's already 
just a bullet point. 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #11908: KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default

2022-03-25 Thread GitBox


kkonstantine commented on a change in pull request #11908:
URL: https://github.com/apache/kafka/pull/11908#discussion_r835511649



##
File path: tests/kafkatest/services/connect.py
##
@@ -279,12 +282,34 @@ def append_to_environment_variable(self, envvar, value):
 env_opts = "\"%s %s\"" % (env_opts.strip('\"'), value)
 self.environment[envvar] = env_opts
 
+def append_filestream_connectors_to_classpath(self):

Review comment:
   Append and return. Easier to write than finding a name for it. Let me 
give it a try :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #11908: KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default

2022-03-25 Thread GitBox


kkonstantine commented on a change in pull request #11908:
URL: https://github.com/apache/kafka/pull/11908#discussion_r835512365



##
File path: docs/connect.html
##
@@ -74,6 +74,7 @@ Running 
Kafka Connectconfig.storage.topic (default 
connect-configs) - topic to use for storing connector and task 
configurations; note that this should be a single partition, highly replicated, 
compacted topic. You may need to manually create the topic to ensure the 
correct configuration as auto created topics may have multiple partitions or be 
automatically configured for deletion rather than compaction
 offset.storage.topic (default 
connect-offsets) - topic to use for storing offsets; this topic 
should have many partitions, be replicated, and be configured for 
compaction
 status.storage.topic (default 
connect-status) - topic to use for storing statuses; this topic 
can have multiple partitions, and should be replicated and configured for 
compaction
+plugin.path (default empty) - a list of 
paths that contain plugins (connectors, converters, transformations). For the 
purpose of quick starts users will have to add the path that contains the 
FileStreamSourceConnector and FileStreamSinkConnector packaged in 
connect-file-"version".jar, because these connectors are not 
included by default to the CLASSPATH or the 
plugin.path of the Connect worker

Review comment:
   Thanks for the link @mimaison. In that case, it's worth updating the 
quick start here too. Let me do that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #11908: KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default

2022-03-25 Thread GitBox


kkonstantine commented on a change in pull request #11908:
URL: https://github.com/apache/kafka/pull/11908#discussion_r835514732



##
File path: tests/kafkatest/services/connect.py
##
@@ -279,12 +282,34 @@ def append_to_environment_variable(self, envvar, value):
 env_opts = "\"%s %s\"" % (env_opts.strip('\"'), value)
 self.environment[envvar] = env_opts
 
+def append_filestream_connectors_to_classpath(self):

Review comment:
   In a sense it is append, because we are re-using the existing classpath. 
Multiple export commands will be cumulative. We don't need to have one such 
command as long as any export call keeps the existing classpath. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] edoardocomar commented on pull request #11743: KAFKA-13660: Switch log4j12 to reload4j

2022-03-25 Thread GitBox


edoardocomar commented on pull request #11743:
URL: https://github.com/apache/kafka/pull/11743#issuecomment-1079291632


   thanks @FireBurn 
   I had a black box go at building and running Kafka built with this PR and it 
looked good to me


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11945: KAFKA-13769: Explicitly route FK join results to correct partitions

2022-03-25 Thread GitBox


mjsax commented on a change in pull request #11945:
URL: https://github.com/apache/kafka/pull/11945#discussion_r835536651



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
##
@@ -64,7 +64,7 @@ public void setIfUnset(final SerdeGetter getter) {
 
 @Override
 public byte[] serialize(final String topic, final 
SubscriptionResponseWrapper data) {
-
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized 
data}
+
//{1-bit-isHashNull}{7-bits-version}{4-bytes-primaryPartition}{Optional-16-byte-Hash}{n-bytes
 serialized data}

Review comment:
   > What will happen when an old processor receives a v1 record and throws 
an exception? Will it throw that record on the floor or just blocks processing 
until the upgrade?
   
   The thread with crash, and we will eventually rebalance. So there is no 
data-loss, but for at-least-once duplicate processing. It's very bad user 
experience if they upgrade one instance and suddenly all other might crash...
   
   Re-using `UPGRAD_FROM` should work. The config is already there, so maybe 
it's ok if we just add a never "accepted value" without doing a KIP? We need to 
cover this case in the upgrade docs though!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11945: KAFKA-13769: Explicitly route FK join results to correct partitions

2022-03-25 Thread GitBox


mjsax commented on a change in pull request #11945:
URL: https://github.com/apache/kafka/pull/11945#discussion_r835538117



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
##
@@ -92,6 +103,32 @@ public void setIfUnset(final SerdeGetter getter) {
 return buf.array();
 }
 
+private byte[] serializeV1(final String topic, final 
SubscriptionResponseWrapper data) {
+final byte[] serializedData = data.getForeignValue() == null ? 
null : serializer.serialize(topic, data.getForeignValue());
+final int serializedDataLength = serializedData == null ? 0 : 
serializedData.length;
+final long[] originalHash = data.getOriginalValueHash();
+final int hashLength = originalHash == null ? 0 : 2 * Long.BYTES;
+final int primaryPartitionLength = Integer.BYTES;
+final int dataLength = 1 + hashLength + serializedDataLength + 
primaryPartitionLength;
+
+final ByteBuffer buf = ByteBuffer.allocate(dataLength);
+
+if (originalHash != null) {
+buf.put(data.getVersion());
+} else {
+buf.put((byte) (data.getVersion() | (byte) 0x80));
+}
+buf.putInt(data.getPrimaryPartition());

Review comment:
   Can we move this to the end of the byte array and thus re-use 
`serializeV0` to write the prefix?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on pull request #11950: Fix missing active segment

2022-03-25 Thread GitBox


kowshik commented on pull request #11950:
URL: https://github.com/apache/kafka/pull/11950#issuecomment-1079354780


   @yyu1993 KAFKA-12875 is the jira we had created for this earlier, we could 
associate this PR with the same.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #11908: KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default

2022-03-25 Thread GitBox


kkonstantine commented on a change in pull request #11908:
URL: https://github.com/apache/kafka/pull/11908#discussion_r835564791



##
File path: tests/kafkatest/services/connect.py
##
@@ -279,12 +282,34 @@ def append_to_environment_variable(self, envvar, value):
 env_opts = "\"%s %s\"" % (env_opts.strip('\"'), value)
 self.environment[envvar] = env_opts
 
+def append_filestream_connectors_to_classpath(self):

Review comment:
   Given the above and after taking a second look I think I'll keep the 
current name to avoid any implementation details on the name. Indeed by looking 
at the code each individual command is an append to the classpath. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #11908: KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default

2022-03-25 Thread GitBox


kkonstantine commented on pull request #11908:
URL: https://github.com/apache/kafka/pull/11908#issuecomment-1079379309


   @mimaison @rhauch I tried to address your comments. Please take another look 
when you get the chance. It be great to make 3.2 with this change. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #11952: MINOR: Fix stream-join metadata

2022-03-25 Thread GitBox


vvcephei opened a new pull request #11952:
URL: https://github.com/apache/kafka/pull/11952


   https://github.com/apache/kafka/pull/11356 inadvertently changed
   the (undefined) header forwarding behavior of stream-stream joins.
   
   This change does not define the behavior, but just restores the prior
   undefined behavior for continuity's sake. Defining the header-forwarding
   behavior is future work.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #11952: MINOR: Fix stream-join metadata

2022-03-25 Thread GitBox


vvcephei commented on pull request #11952:
URL: https://github.com/apache/kafka/pull/11952#issuecomment-1079392675


   ping @jeqo , in case you have a chance to review as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #11908: KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default

2022-03-25 Thread GitBox


rhauch commented on a change in pull request #11908:
URL: https://github.com/apache/kafka/pull/11908#discussion_r835615520



##
File path: docs/quickstart.html
##
@@ -173,7 +173,11 @@ 
 
 
 
-First, we'll start by creating some seed data to test with:
+First, make sure to add connect-file-{{fullDotVersion}}.jar to the 
plugin.path property in the Connect worker's configuration (see plugin.path for examples).
+

Review comment:
   Will users that are trying this for the first time understand what this 
means, or will this be enough of a barrier to cause them to abandon the 
quickstart? I'm not sure, but it might be worth to be more explicit here.
   
   First, since this is a quickstart I think it's okay to duplicate some 
information here just so that users don't have to go elsewhere to learn what 
they need to do to run the quickstart.
   
   Second, the referenced plugin.path configuration section says (emphasis 
mine):
   > The list should consist of top level _directories_ that include any 
combination of ...
   
   And, to run the quickstarts we want them to add the _path to that JAR file_ 
to the `plugin.path` configuration property, not add the _path to the `lib/` 
directory_ that contains the `connect-file-.JAR` file, as the 
referenced documentation suggests to do.
   
   So, I suggest we be much more explicit here, and say something like:
   ```suggestion
   First, instruct Connect to use the example file system 
connectors by editing the configuration for the Connect standalone worker to 
changing the following line from:
   
   
   #plugin.path=
   to:
   
   
   plugin.path=lib/connect-file-{{fullDotVersion}}.jar
   ```
   
   We could make this a little easier if we changed the 
`connect-standalone.properties` and `connect-distributed.properties` files to 
add a few lines to the existing commented out section, so that all a user has 
to do is uncomment one line. For example:
   ```
   ...
   # Set to a list of filesystem paths separated by commas (,) to enable class 
loading isolation for plugins
   # (connectors, converters, transformations). The list should consist of top 
level directories that include 
   # any combination of: 
   # a) directories immediately containing jars with plugins and their 
dependencies
   # b) uber-jars with plugins and their dependencies
   # c) directories immediately containing the package directory structure of 
classes of plugins and their dependencies
   # Note: symlinks will be followed to discover dependencies or plugins.
   # Examples: 
   # 
plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
   #
   # To run the quickstart that just uses the example file system connectors, 
uncomment this line:
   # plugin.path=lib/connect-file-.jar,
   #
   #plugin.path=
   ```
   These are comments, and I think we can improve the comments without a KIP, 
especially since with this PR we're trying to limit the impact on the 
quickstarts.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11952: MINOR: Fix stream-join metadata

2022-03-25 Thread GitBox


mjsax commented on a change in pull request #11952:
URL: https://github.com/apache/kafka/pull/11952#discussion_r835654066



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
##
@@ -887,6 +892,98 @@ public void 
shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() {
 }
 }
 
+/**
+ * NOTE: Header forwarding is undefined behavior, but we still want to 
understand the
+ * behavior so that we can make decisions about defining it in the future.
+ */
+@Test
+public void shouldForwardCurrentHeaders() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+
+joined = stream1.outerJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), 
ofMillis(10L)),
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+joined.process(supplier);
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockApiProcessor processor = 
supplier.theCapturedProcessor();
+
+inputTopic1.pipeInput(new TestRecord<>(
+0,
+"A0",
+new RecordHeaders(new Header[]{new RecordHeader("h", new 
byte[]{0x1})}),
+0L
+));
+inputTopic2.pipeInput(new TestRecord<>(
+1,
+"a0",
+new RecordHeaders(new Header[]{new RecordHeader("h", new 
byte[]{0x2})}),
+0L
+));
+// bump stream-time to trigger outer-join results
+inputTopic2.pipeInput(new TestRecord<>(
+3,
+"dummy",
+new RecordHeaders(new Header[]{new RecordHeader("h", new 
byte[]{0x3})}),
+(long) 211
+));
+
+// Again, header forwarding is undefined, but the current observed 
behavior is that
+// the headers pass through the forwarding record.
+processor.checkAndClearProcessedRecords(
+new Record<>(
+1,
+"null+a0",
+0L,
+new RecordHeaders(new Header[]{new RecordHeader("h", new 
byte[]{0x3})})

Review comment:
   Semantically really bad to forward `0x3`, but well, it is what it is.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11952: MINOR: Fix stream-join metadata

2022-03-25 Thread GitBox


mjsax commented on a change in pull request #11952:
URL: https://github.com/apache/kafka/pull/11952#discussion_r835654720



##
File path: streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
##
@@ -92,6 +93,20 @@ public void process(final Record record) {
 }
 
 public void checkAndClearProcessResult(final KeyValueTimestamp... 
expected) {

Review comment:
   Seems we should get rid of this one after we have updated the full DSL 
to use the new `Record` type.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2022-03-25 Thread GitBox


mjsax commented on pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#issuecomment-1079513140


   > Sorry I didn't check the entire implementation before posting
   
   No problem. It's much harder to reverse engineer the code than asking :) 
   
   > But if a key is expired from the inner store (what this PR addresses for 
persistent stores) but is still present in the cache, then do we keep it or 
remove it from the cache? And do we return that key?
   
   For this PR, I would propose to still return the data. It's the exact 
problem that should be fixed in a follow up PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yyu1993 commented on a change in pull request #11950: KAFKA-12875: Change Log layer segment map mutations to avoid absence of active segment

2022-03-25 Thread GitBox


yyu1993 commented on a change in pull request #11950:
URL: https://github.com/apache/kafka/pull/11950#discussion_r835676088



##
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##
@@ -1355,15 +1355,19 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   val numToDelete = deletable.size
   if (numToDelete > 0) {
 // we must always have at least one segment, so if we are going to 
delete all the segments, create a new one first
-if (localLog.segments.numberOfSegments == numToDelete)
-  roll()
-lock synchronized {
-  localLog.checkIfMemoryMappedBufferClosed()
-  // remove the segments for lookups
-  localLog.removeAndDeleteSegments(deletable, asyncDelete = true, 
reason)
-  deleteProducerSnapshots(deletable, asyncDelete = true)
-  
maybeIncrementLogStartOffset(localLog.segments.firstSegmentBaseOffset.get, 
SegmentDeletion)
+var segmentsToDelete = deletable
+if (localLog.segments.numberOfSegments == numToDelete) {
+  val newSegment = roll()
+  if (deletable.last.baseOffset == newSegment.baseOffset) {
+warn(s"Empty active segment at ${deletable.last.baseOffset} was 
deleted and recreated due to $reason")
+segmentsToDelete = deletable.dropRight(1)

Review comment:
   In roll(), we only call createAndDeleteSegment() when new segment's 
offset is same as the last segment's offset and last segment is empty. So in 
the case when new segment's offset is different, we will not delete the last 
segment in roll(). It will be deleted as part of segmentsToDelete. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11948: KAFKA-10405: set purge interval explicitly

2022-03-25 Thread GitBox


showuon commented on pull request #11948:
URL: https://github.com/apache/kafka/pull/11948#issuecomment-1079575157


   > @showuon Thank you for setting the purge interval! Nice catch that we 
missed to set this after KIP-811 was merged. However, I am not sure that this 
is the root cause of the flakiness as described in the ticket since the test 
was flaky before KIP-811 was merged. But maybe there were multiple causes to 
the flakiness one of which is fixed with this PR.
   
   @cadonna , yes, I agree there could be other issues cause the flakiness. 
What I tried to do is to make the test much more stable than what we see now. 
Let's see if it still failed and we can improve it later. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming opened a new pull request #9770: MINOR: Add ByteBufferAccessorTest

2022-03-25 Thread GitBox


dengziming opened a new pull request #9770:
URL: https://github.com/apache/kafka/pull/9770


   *More detailed description of your change*
   Add ByteBufferAccessorTest  similar to SendBuilderTest
   1. test basic read and write
   2. test none zero-copy when writing ByteBuffer and Records
   3. Write ByteBuffer respects position
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] denglinfan commented on pull request #8656: KAFKA-9981; dedicated mm2 cluster lose the update operation.

2022-03-25 Thread GitBox


denglinfan commented on pull request #8656:
URL: https://github.com/apache/kafka/pull/8656#issuecomment-1079624320


   In my case, I cannot get the leaderUrl successfully to ask leader task to 
store current change. And the leaderUrl is NOTUSED/PA.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org