[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway
prat0318 commented on a change in pull request #11552: URL: https://github.com/apache/kafka/pull/11552#discussion_r769918971 ## File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala ## @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.clients.producer.ProducerRecord + +import java.util.Properties +import org.apache.kafka.common.TopicPartition +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +import java.nio.charset.StandardCharsets + + +class ProducerSendWhileDeletionTest extends BaseProducerSendTest { Review comment: Agreed that extending `BaseProducerSendTest` would run unnecessary tests. Changed now to extend from `IntegrationTestHarness` and verified that the test fails w/o the fix and passes after the fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway
prat0318 commented on a change in pull request #11552: URL: https://github.com/apache/kafka/pull/11552#discussion_r769919454 ## File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala ## @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.clients.producer.ProducerRecord + +import java.util.Properties +import org.apache.kafka.common.TopicPartition +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +import java.nio.charset.StandardCharsets + + +class ProducerSendWhileDeletionTest extends BaseProducerSendTest { + + +override def generateConfigs = { +val overridingProps = new Properties() +val numServers = 2 +overridingProps.put(KafkaConfig.NumPartitionsProp, 2.toString) +overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 2.toString) +overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString) +TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol), +trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps)) +} + +/** + * Tests that Producer gets self-recovered when a topic is deleted mid-way of produce. + * + * Producer will attempt to send messages to the partition specified in each record, and should + * succeed as long as the partition is included in the metadata. + */ +@Test +def testSendWithTopicDeletionMidWay(): Unit = { +val numRecords = 10 + +// create topic with leader as 0 for the 2 partitions. +createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1))) + +val reassignment = Map( +new TopicPartition(topic, 0) -> Seq(1, 0), +new TopicPartition(topic, 1) -> Seq(1, 0) +) + +// Change leader to 1 for both the partitions to increase leader Epoch from 0 -> 1 +zkClient.createPartitionReassignment(reassignment) +TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress, +"failed to remove reassign partitions path after completion") + +val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L, deliveryTimeoutMs = 20 * 1000) + +(1 to numRecords).map { i => Review comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway
prat0318 commented on a change in pull request #11552: URL: https://github.com/apache/kafka/pull/11552#discussion_r769918057 ## File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ## @@ -372,6 +372,31 @@ public void testUpdateLastEpoch() { assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12)); } +@Test +public void testEpochUpdateAfterTopicDeletion() { +TopicPartition tp = new TopicPartition("topic-1", 0); +Map topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid()); + +MetadataResponse metadataResponse = emptyMetadataResponse(); +metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L); + +// Start with a topic A with a topic ID foo Review comment: agreed, modified accordingly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway
prat0318 commented on a change in pull request #11552: URL: https://github.com/apache/kafka/pull/11552#discussion_r769917809 ## File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ## @@ -372,6 +372,31 @@ public void testUpdateLastEpoch() { assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12)); } +@Test +public void testEpochUpdateAfterTopicDeletion() { +TopicPartition tp = new TopicPartition("topic-1", 0); +Map topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid()); Review comment: ack. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway
prat0318 commented on a change in pull request #11552: URL: https://github.com/apache/kafka/pull/11552#discussion_r769917597 ## File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java ## @@ -394,8 +394,12 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED) if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) { int newEpoch = partitionMetadata.leaderEpoch.get(); Integer currentEpoch = lastSeenLeaderEpochs.get(tp); -if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) { -// If both topic IDs were valid and the topic ID changed, update the metadata +// Between the time that a topic is deleted and re-created, the client may lose +// track of the corresponding topicId (i.e. `oldTopicId` will be null). In this case, +// when we discover the new topicId, we allow the corresponding leader epoch +// to override the last seen value. Review comment: moved as per the suggestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway
prat0318 commented on a change in pull request #11552: URL: https://github.com/apache/kafka/pull/11552#discussion_r769903291 ## File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala ## @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.clients.producer.ProducerRecord + +import java.util.Properties +import org.apache.kafka.common.TopicPartition +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +import java.nio.charset.StandardCharsets + + +class ProducerSendWhileDeletionTest extends BaseProducerSendTest { + + Review comment: removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway
prat0318 commented on a change in pull request #11552: URL: https://github.com/apache/kafka/pull/11552#discussion_r769902172 ## File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ## @@ -372,6 +372,31 @@ public void testUpdateLastEpoch() { assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12)); } +@Test +public void testEpochUpdateAfterTopicDeletion() { +TopicPartition tp = new TopicPartition("topic-1", 0); +Map topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid()); + +MetadataResponse metadataResponse = emptyMetadataResponse(); +metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L); + +// Start with a topic A with a topic ID foo +metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, topicIds); +metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L); +assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp)); + +// Topic A is now deleted so Response contains an Error. LeaderEpoch should still return maintain Old value Review comment: removed. ## File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ## @@ -372,6 +372,31 @@ public void testUpdateLastEpoch() { assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12)); } +@Test +public void testEpochUpdateAfterTopicDeletion() { +TopicPartition tp = new TopicPartition("topic-1", 0); +Map topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid()); + +MetadataResponse metadataResponse = emptyMetadataResponse(); +metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L); + +// Start with a topic A with a topic ID foo +metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, topicIds); +metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L); +assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp)); + +// Topic A is now deleted so Response contains an Error. LeaderEpoch should still return maintain Old value +metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.singletonMap("topic-1", Errors.UNKNOWN_TOPIC_OR_PARTITION), new HashMap<>()); Review comment: ack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway
prat0318 commented on a change in pull request #11552: URL: https://github.com/apache/kafka/pull/11552#discussion_r769901576 ## File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala ## @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + Review comment: removed. ## File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala ## @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.clients.producer.ProducerRecord + +import java.util.Properties +import org.apache.kafka.common.TopicPartition +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +import java.nio.charset.StandardCharsets + + +class ProducerSendWhileDeletionTest extends BaseProducerSendTest { + + +override def generateConfigs = { +val overridingProps = new Properties() +val numServers = 2 +overridingProps.put(KafkaConfig.NumPartitionsProp, 2.toString) +overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 2.toString) +overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString) +TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol), +trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps)) +} + +/** + * Tests that Producer gets self-recovered when a topic is deleted mid-way of produce. + * + * Producer will attempt to send messages to the partition specified in each record, and should + * succeed as long as the partition is included in the metadata. + */ +@Test +def testSendWithTopicDeletionMidWay(): Unit = { +val numRecords = 10 + +// create topic with leader as 0 for the 2 partitions. +createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1))) + +val reassignment = Map( +new TopicPartition(topic, 0) -> Seq(1, 0), +new TopicPartition(topic, 1) -> Seq(1, 0) +) + +// Change leader to 1 for both the partitions to increase leader Epoch from 0 -> 1 +zkClient.createPartitionReassignment(reassignment) +TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress, +"failed to remove reassign partitions path after completion") + +val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L, deliveryTimeoutMs = 20 * 1000) + +(1 to numRecords).map { i => +val resp = producer.send(new ProducerRecord(topic, null, ("value" + i).getBytes(StandardCharsets.UTF_8))).get +assertEquals(topic, resp.topic()) +} + +// start topic deletion +adminZkClient.deleteTopic(topic) + +// Verify that the topic is deleted when no metadata request comes in +TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers) + +// Producer should be able to send messages even after topic gets deleted and auto-created +assertEquals(topic, producer.send(new ProducerRecord(topic, null, ("value").getBytes(StandardCharsets.UTF_8))).get.topic()) Review comment: ack -- This is an automated message from the A
[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway
prat0318 commented on a change in pull request #11552: URL: https://github.com/apache/kafka/pull/11552#discussion_r769901214 ## File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala ## @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.clients.producer.ProducerRecord + +import java.util.Properties +import org.apache.kafka.common.TopicPartition +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +import java.nio.charset.StandardCharsets + + +class ProducerSendWhileDeletionTest extends BaseProducerSendTest { + + +override def generateConfigs = { +val overridingProps = new Properties() +val numServers = 2 +overridingProps.put(KafkaConfig.NumPartitionsProp, 2.toString) +overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 2.toString) +overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString) +TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol), +trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps)) +} + +/** + * Tests that Producer gets self-recovered when a topic is deleted mid-way of produce. + * + * Producer will attempt to send messages to the partition specified in each record, and should + * succeed as long as the partition is included in the metadata. + */ +@Test +def testSendWithTopicDeletionMidWay(): Unit = { +val numRecords = 10 + +// create topic with leader as 0 for the 2 partitions. +createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1))) + +val reassignment = Map( +new TopicPartition(topic, 0) -> Seq(1, 0), +new TopicPartition(topic, 1) -> Seq(1, 0) +) + +// Change leader to 1 for both the partitions to increase leader Epoch from 0 -> 1 +zkClient.createPartitionReassignment(reassignment) +TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress, +"failed to remove reassign partitions path after completion") + +val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L, deliveryTimeoutMs = 20 * 1000) + +(1 to numRecords).map { i => +val resp = producer.send(new ProducerRecord(topic, null, ("value" + i).getBytes(StandardCharsets.UTF_8))).get +assertEquals(topic, resp.topic()) +} + +// start topic deletion +adminZkClient.deleteTopic(topic) + +// Verify that the topic is deleted when no metadata request comes in +TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers) + +// Producer should be able to send messages even after topic gets deleted and auto-created +assertEquals(topic, producer.send(new ProducerRecord(topic, null, ("value").getBytes(StandardCharsets.UTF_8))).get.topic()) +} + + Review comment: removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway
prat0318 commented on a change in pull request #11552: URL: https://github.com/apache/kafka/pull/11552#discussion_r759880730 ## File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java ## @@ -394,7 +394,8 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED) if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) { int newEpoch = partitionMetadata.leaderEpoch.get(); Integer currentEpoch = lastSeenLeaderEpochs.get(tp); -if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) { +// oldTopicId can be null (when metadata is fetched during topic recreation), update the metadata in that case as well. Review comment: Ack. Changed the comment as per the suggestion. > would it make sense to move this into the corresponding branch that it applies to? Sorry, couldn't get it. Can you elaborate on this please. (Do you mean a separate `if` branch? The current `If` branch deals with separate topicId, so that should be the one we should modify as part of this patch.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway
prat0318 commented on a change in pull request #11552: URL: https://github.com/apache/kafka/pull/11552#discussion_r759882210 ## File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala ## @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.clients.producer.ProducerRecord + +import java.util.Properties +import org.apache.kafka.common.TopicPartition +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +import java.nio.charset.StandardCharsets + + +class ProducerSendWhileDeletionTest extends BaseProducerSendTest { Review comment: I earlier thought of putting it in `PlaintextProducerTest` but the new case needs 2 as RF for reassignments to take place (to increase leader epoch) and disabling `AutoLeaderRebalance`. Modifying the configs for the whole test class would affect the other tests, so decided to go for a new test class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway
prat0318 commented on a change in pull request #11552: URL: https://github.com/apache/kafka/pull/11552#discussion_r759881263 ## File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ## @@ -381,13 +381,12 @@ public void testEpochUpdateOnChangedTopicIds() { metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L); // Start with a topic with no topic ID -metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10); +metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 100); metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L); -assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp)); +assertEquals(Optional.of(100), metadata.lastSeenLeaderEpoch(tp)); -// We should treat an added topic ID as though it is the same topic. Handle only when epoch increases. -// Don't update to an older one -metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, topicIds); +// If the Older topic Id is null, we should go with the new TopicId as the leader Epoch Review comment: added a new testcase with the suggested flow. Old test-case still needs to be changed as that case fails now, so modified the case as per the new changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway
prat0318 commented on a change in pull request #11552: URL: https://github.com/apache/kafka/pull/11552#discussion_r759880730 ## File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java ## @@ -394,7 +394,8 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED) if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) { int newEpoch = partitionMetadata.leaderEpoch.get(); Integer currentEpoch = lastSeenLeaderEpochs.get(tp); -if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) { +// oldTopicId can be null (when metadata is fetched during topic recreation), update the metadata in that case as well. Review comment: Ack. Changed the comment as per the suggestion. > would it make sense to move this into the corresponding branch that it applies to? Sorry, couldn't get it. Can you elaborate on this please. ## File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java ## @@ -394,7 +394,8 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED) if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) { int newEpoch = partitionMetadata.leaderEpoch.get(); Integer currentEpoch = lastSeenLeaderEpochs.get(tp); -if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) { +// oldTopicId can be null (when metadata is fetched during topic recreation), update the metadata in that case as well. Review comment: Ack. Changed the comment as per the suggestion. > would it make sense to move this into the corresponding branch that it applies to? Sorry, couldn't get it. Can you elaborate on this please. ## File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java ## @@ -394,7 +394,8 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED) if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) { int newEpoch = partitionMetadata.leaderEpoch.get(); Integer currentEpoch = lastSeenLeaderEpochs.get(tp); -if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) { +// oldTopicId can be null (when metadata is fetched during topic recreation), update the metadata in that case as well. +if (topicId != null && !topicId.equals(oldTopicId)) { // If both topic IDs were valid and the topic ID changed, update the metadata Review comment: updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org