[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-15 Thread GitBox


prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769918971



##
File path: 
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {

Review comment:
   Agreed that extending `BaseProducerSendTest` would run unnecessary 
tests. Changed now to extend from `IntegrationTestHarness` and verified that 
the test fails w/o the fix and passes after the fix.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-15 Thread GitBox


prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769919454



##
File path: 
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {
+
+
+override def generateConfigs = {
+val overridingProps = new Properties()
+val numServers = 2
+overridingProps.put(KafkaConfig.NumPartitionsProp, 2.toString)
+overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 
2.toString)
+overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, 
false.toString)
+TestUtils.createBrokerConfigs(numServers, zkConnect, false, 
interBrokerSecurityProtocol = Some(securityProtocol),
+trustStoreFile = trustStoreFile, saslProperties = 
serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
+}
+
+/**
+ * Tests that Producer gets self-recovered when a topic is deleted mid-way 
of produce.
+ *
+ * Producer will attempt to send messages to the partition specified in 
each record, and should
+ * succeed as long as the partition is included in the metadata.
+ */
+@Test
+def testSendWithTopicDeletionMidWay(): Unit = {
+val numRecords = 10
+
+// create topic with leader as 0 for the 2 partitions.
+createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+
+val reassignment = Map(
+new TopicPartition(topic, 0) -> Seq(1, 0),
+new TopicPartition(topic, 1) -> Seq(1, 0)
+)
+
+// Change leader to 1 for both the partitions to increase leader Epoch 
from 0 -> 1
+zkClient.createPartitionReassignment(reassignment)
+TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
+"failed to remove reassign partitions path after completion")
+
+val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L, 
deliveryTimeoutMs = 20 * 1000)
+
+(1 to numRecords).map { i =>

Review comment:
   done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-15 Thread GitBox


prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769918057



##
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##
@@ -372,6 +372,31 @@ public void testUpdateLastEpoch() {
 assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 12));
 }
 
+@Test
+public void testEpochUpdateAfterTopicDeletion() {
+TopicPartition tp = new TopicPartition("topic-1", 0);
+Map topicIds = Collections.singletonMap("topic-1", 
Uuid.randomUuid());
+
+MetadataResponse metadataResponse = emptyMetadataResponse();
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+// Start with a topic A with a topic ID foo

Review comment:
   agreed, modified accordingly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-15 Thread GitBox


prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769917809



##
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##
@@ -372,6 +372,31 @@ public void testUpdateLastEpoch() {
 assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 12));
 }
 
+@Test
+public void testEpochUpdateAfterTopicDeletion() {
+TopicPartition tp = new TopicPartition("topic-1", 0);
+Map topicIds = Collections.singletonMap("topic-1", 
Uuid.randomUuid());

Review comment:
   ack.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-15 Thread GitBox


prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769917597



##
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##
@@ -394,8 +394,12 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
 if (hasReliableLeaderEpoch && 
partitionMetadata.leaderEpoch.isPresent()) {
 int newEpoch = partitionMetadata.leaderEpoch.get();
 Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-if (topicId != null && oldTopicId != null && 
!topicId.equals(oldTopicId)) {
-// If both topic IDs were valid and the topic ID changed, 
update the metadata
+// Between the time that a topic is deleted and re-created, the 
client may lose
+// track of the corresponding topicId (i.e. `oldTopicId` will be 
null). In this case,
+// when we discover the new topicId, we allow the corresponding 
leader epoch
+// to override the last seen value.

Review comment:
   moved as per the suggestion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-15 Thread GitBox


prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769903291



##
File path: 
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {
+
+

Review comment:
   removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-15 Thread GitBox


prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769902172



##
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##
@@ -372,6 +372,31 @@ public void testUpdateLastEpoch() {
 assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 12));
 }
 
+@Test
+public void testEpochUpdateAfterTopicDeletion() {
+TopicPartition tp = new TopicPartition("topic-1", 0);
+Map topicIds = Collections.singletonMap("topic-1", 
Uuid.randomUuid());
+
+MetadataResponse metadataResponse = emptyMetadataResponse();
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+// Start with a topic A with a topic ID foo
+metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, 
topicIds);
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+// Topic A is now deleted so Response contains an Error. LeaderEpoch 
should still return maintain Old value

Review comment:
   removed.

##
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##
@@ -372,6 +372,31 @@ public void testUpdateLastEpoch() {
 assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 12));
 }
 
+@Test
+public void testEpochUpdateAfterTopicDeletion() {
+TopicPartition tp = new TopicPartition("topic-1", 0);
+Map topicIds = Collections.singletonMap("topic-1", 
Uuid.randomUuid());
+
+MetadataResponse metadataResponse = emptyMetadataResponse();
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+// Start with a topic A with a topic ID foo
+metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, 
topicIds);
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+// Topic A is now deleted so Response contains an Error. LeaderEpoch 
should still return maintain Old value
+metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, 
Collections.singletonMap("topic-1", Errors.UNKNOWN_TOPIC_OR_PARTITION), new 
HashMap<>());

Review comment:
   ack




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-15 Thread GitBox


prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769901576



##
File path: 
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+

Review comment:
   removed.

##
File path: 
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {
+
+
+override def generateConfigs = {
+val overridingProps = new Properties()
+val numServers = 2
+overridingProps.put(KafkaConfig.NumPartitionsProp, 2.toString)
+overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 
2.toString)
+overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, 
false.toString)
+TestUtils.createBrokerConfigs(numServers, zkConnect, false, 
interBrokerSecurityProtocol = Some(securityProtocol),
+trustStoreFile = trustStoreFile, saslProperties = 
serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
+}
+
+/**
+ * Tests that Producer gets self-recovered when a topic is deleted mid-way 
of produce.
+ *
+ * Producer will attempt to send messages to the partition specified in 
each record, and should
+ * succeed as long as the partition is included in the metadata.
+ */
+@Test
+def testSendWithTopicDeletionMidWay(): Unit = {
+val numRecords = 10
+
+// create topic with leader as 0 for the 2 partitions.
+createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+
+val reassignment = Map(
+new TopicPartition(topic, 0) -> Seq(1, 0),
+new TopicPartition(topic, 1) -> Seq(1, 0)
+)
+
+// Change leader to 1 for both the partitions to increase leader Epoch 
from 0 -> 1
+zkClient.createPartitionReassignment(reassignment)
+TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
+"failed to remove reassign partitions path after completion")
+
+val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L, 
deliveryTimeoutMs = 20 * 1000)
+
+(1 to numRecords).map { i =>
+val resp = producer.send(new ProducerRecord(topic, null, ("value" 
+ i).getBytes(StandardCharsets.UTF_8))).get
+assertEquals(topic, resp.topic())
+}
+
+// start topic deletion
+adminZkClient.deleteTopic(topic)
+
+// Verify that the topic is deleted when no metadata request comes in
+TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
+
+// Producer should be able to send messages even after topic gets 
deleted and auto-created
+assertEquals(topic, producer.send(new ProducerRecord(topic, null, 
("value").getBytes(StandardCharsets.UTF_8))).get.topic())

Review comment:
   ack




-- 
This is an automated message from the A

[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-15 Thread GitBox


prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769901214



##
File path: 
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {
+
+
+override def generateConfigs = {
+val overridingProps = new Properties()
+val numServers = 2
+overridingProps.put(KafkaConfig.NumPartitionsProp, 2.toString)
+overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 
2.toString)
+overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, 
false.toString)
+TestUtils.createBrokerConfigs(numServers, zkConnect, false, 
interBrokerSecurityProtocol = Some(securityProtocol),
+trustStoreFile = trustStoreFile, saslProperties = 
serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
+}
+
+/**
+ * Tests that Producer gets self-recovered when a topic is deleted mid-way 
of produce.
+ *
+ * Producer will attempt to send messages to the partition specified in 
each record, and should
+ * succeed as long as the partition is included in the metadata.
+ */
+@Test
+def testSendWithTopicDeletionMidWay(): Unit = {
+val numRecords = 10
+
+// create topic with leader as 0 for the 2 partitions.
+createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+
+val reassignment = Map(
+new TopicPartition(topic, 0) -> Seq(1, 0),
+new TopicPartition(topic, 1) -> Seq(1, 0)
+)
+
+// Change leader to 1 for both the partitions to increase leader Epoch 
from 0 -> 1
+zkClient.createPartitionReassignment(reassignment)
+TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
+"failed to remove reassign partitions path after completion")
+
+val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L, 
deliveryTimeoutMs = 20 * 1000)
+
+(1 to numRecords).map { i =>
+val resp = producer.send(new ProducerRecord(topic, null, ("value" 
+ i).getBytes(StandardCharsets.UTF_8))).get
+assertEquals(topic, resp.topic())
+}
+
+// start topic deletion
+adminZkClient.deleteTopic(topic)
+
+// Verify that the topic is deleted when no metadata request comes in
+TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
+
+// Producer should be able to send messages even after topic gets 
deleted and auto-created
+assertEquals(topic, producer.send(new ProducerRecord(topic, null, 
("value").getBytes(StandardCharsets.UTF_8))).get.topic())
+}
+
+

Review comment:
   removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-11-30 Thread GitBox


prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r759880730



##
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##
@@ -394,7 +394,8 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
 if (hasReliableLeaderEpoch && 
partitionMetadata.leaderEpoch.isPresent()) {
 int newEpoch = partitionMetadata.leaderEpoch.get();
 Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-if (topicId != null && oldTopicId != null && 
!topicId.equals(oldTopicId)) {
+// oldTopicId can be null (when metadata is fetched during topic 
recreation), update the metadata in that case as well.

Review comment:
   Ack. Changed the comment as per the suggestion.
   
   > would it make sense to move this into the corresponding branch that it 
applies to?
   
   Sorry, couldn't get it. Can you elaborate on this please. (Do you mean a 
separate `if` branch? The current `If` branch deals with separate topicId, so 
that should be the one we should modify as part of this patch.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-11-30 Thread GitBox


prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r759882210



##
File path: 
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {

Review comment:
   I earlier thought of putting it in `PlaintextProducerTest` but the new 
case needs 2 as RF for reassignments to take place (to increase leader epoch) 
and disabling `AutoLeaderRebalance`. Modifying the configs for the whole test 
class would affect the other tests, so decided to go for a new test class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-11-30 Thread GitBox


prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r759881263



##
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##
@@ -381,13 +381,12 @@ public void testEpochUpdateOnChangedTopicIds() {
 metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
 
 // Start with a topic with no topic ID
-metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
+metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 100);
 metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
-assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+assertEquals(Optional.of(100), metadata.lastSeenLeaderEpoch(tp));
 
-// We should treat an added topic ID as though it is the same topic. 
Handle only when epoch increases.
-// Don't update to an older one
-metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, 
topicIds);
+// If the Older topic Id is null, we should go with the new TopicId as 
the leader Epoch

Review comment:
   added a new testcase with the suggested flow. Old test-case still needs 
to be changed as that case fails now, so modified the case as per the new 
changes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-11-30 Thread GitBox


prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r759880730



##
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##
@@ -394,7 +394,8 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
 if (hasReliableLeaderEpoch && 
partitionMetadata.leaderEpoch.isPresent()) {
 int newEpoch = partitionMetadata.leaderEpoch.get();
 Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-if (topicId != null && oldTopicId != null && 
!topicId.equals(oldTopicId)) {
+// oldTopicId can be null (when metadata is fetched during topic 
recreation), update the metadata in that case as well.

Review comment:
   Ack. Changed the comment as per the suggestion.
   
   > would it make sense to move this into the corresponding branch that it 
applies to?
   Sorry, couldn't get it. Can you elaborate on this please.

##
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##
@@ -394,7 +394,8 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
 if (hasReliableLeaderEpoch && 
partitionMetadata.leaderEpoch.isPresent()) {
 int newEpoch = partitionMetadata.leaderEpoch.get();
 Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-if (topicId != null && oldTopicId != null && 
!topicId.equals(oldTopicId)) {
+// oldTopicId can be null (when metadata is fetched during topic 
recreation), update the metadata in that case as well.

Review comment:
   Ack. Changed the comment as per the suggestion.
   
   > would it make sense to move this into the corresponding branch that it 
applies to?
   
   Sorry, couldn't get it. Can you elaborate on this please.

##
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##
@@ -394,7 +394,8 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
 if (hasReliableLeaderEpoch && 
partitionMetadata.leaderEpoch.isPresent()) {
 int newEpoch = partitionMetadata.leaderEpoch.get();
 Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-if (topicId != null && oldTopicId != null && 
!topicId.equals(oldTopicId)) {
+// oldTopicId can be null (when metadata is fetched during topic 
recreation), update the metadata in that case as well.
+if (topicId != null && !topicId.equals(oldTopicId)) {
 // If both topic IDs were valid and the topic ID changed, 
update the metadata

Review comment:
   updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org