[ https://issues.apache.org/jira/browse/KAFKA-6955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499159#comment-16499159 ]
ASF GitHub Bot commented on KAFKA-6955: --------------------------------------- ijuma closed pull request #5088: KAFKA-6955: Use Java AdminClient in DeleteRecordsCommand URL: https://github.com/apache/kafka/pull/5088 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index bcc11fd4917..ea42530e66d 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -18,7 +18,6 @@ import java.util.{Collections, Properties} import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{ConcurrentLinkedQueue, Future, TimeUnit} -import kafka.admin.AdminClient.DeleteRecordsResult import kafka.common.KafkaException import kafka.coordinator.group.GroupOverview import kafka.utils.Logging @@ -216,73 +215,6 @@ class AdminClient(val time: Time, broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker).asJava)) }.toMap - /* - * Remove all the messages whose offset is smaller than the given offset of the corresponding partition - * - * DeleteRecordsResult contains either lowWatermark of the partition or exception. We list the possible exception - * and their interpretations below: - * - * - DisconnectException if leader node of the partition is not available. Need retry by user. - * - PolicyViolationException if the topic is configured as non-deletable. - * - TopicAuthorizationException if the topic doesn't exist and the user doesn't have the authority to create the topic - * - TimeoutException if response is not available within the timeout specified by either Future's timeout or AdminClient's request timeout - * - UnknownTopicOrPartitionException if the partition doesn't exist or if the user doesn't have the authority to describe the topic - * - NotLeaderForPartitionException if broker is not leader of the partition. Need retry by user. - * - OffsetOutOfRangeException if the offset is larger than high watermark of this partition - * - */ - - def deleteRecordsBefore(offsets: Map[TopicPartition, Long]): Future[Map[TopicPartition, DeleteRecordsResult]] = { - val metadataRequest = new MetadataRequest.Builder(offsets.keys.map(_.topic).toSet.toList.asJava, true) - val response = sendAnyNode(ApiKeys.METADATA, metadataRequest).asInstanceOf[MetadataResponse] - val errors = response.errors - if (!errors.isEmpty) - error(s"Metadata request contained errors: $errors") - - val (partitionsWithoutError, partitionsWithError) = offsets.partition{ partitionAndOffset => - !response.errors().containsKey(partitionAndOffset._1.topic())} - - val (partitionsWithLeader, partitionsWithoutLeader) = partitionsWithoutError.partition{ partitionAndOffset => - response.cluster().leaderFor(partitionAndOffset._1) != null} - - val partitionsWithErrorResults = partitionsWithError.keys.map( partition => - partition -> DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, response.errors().get(partition.topic()).exception())).toMap - - val partitionsWithoutLeaderResults = partitionsWithoutLeader.mapValues( _ => - DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.LEADER_NOT_AVAILABLE.exception())) - - val partitionsGroupByLeader = partitionsWithLeader.groupBy(partitionAndOffset => - response.cluster().leaderFor(partitionAndOffset._1)) - - // prepare requests and generate Future objects - val futures = partitionsGroupByLeader.map{ case (node, partitionAndOffsets) => - val convertedMap: java.util.Map[TopicPartition, java.lang.Long] = partitionAndOffsets.mapValues(_.asInstanceOf[java.lang.Long]).asJava - val future = client.send(node, new DeleteRecordsRequest.Builder(requestTimeoutMs, convertedMap)) - pendingFutures.add(future) - future.compose(new RequestFutureAdapter[ClientResponse, Map[TopicPartition, DeleteRecordsResult]]() { - override def onSuccess(response: ClientResponse, future: RequestFuture[Map[TopicPartition, DeleteRecordsResult]]) { - val deleteRecordsResponse = response.responseBody().asInstanceOf[DeleteRecordsResponse] - val result = deleteRecordsResponse.responses().asScala.mapValues(v => DeleteRecordsResult(v.lowWatermark, v.error.exception())).toMap - future.complete(result) - pendingFutures.remove(future) - } - - override def onFailure(e: RuntimeException, future: RequestFuture[Map[TopicPartition, DeleteRecordsResult]]) { - val result = partitionAndOffsets.mapValues(_ => DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, e)) - future.complete(result) - pendingFutures.remove(future) - } - - }) - } - - // default output if not receiving DeleteRecordsResponse before timeout - val defaultResults = offsets.mapValues(_ => - DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.REQUEST_TIMED_OUT.exception())) ++ partitionsWithErrorResults ++ partitionsWithoutLeaderResults - - new CompositeFuture(time, defaultResults, futures.toList) - } - /** * Case class used to represent a consumer of a consumer group */ @@ -473,8 +405,6 @@ object AdminClient { config } - case class DeleteRecordsResult(lowWatermark: Long, error: Exception) - class AdminConfig(originals: Map[_,_]) extends AbstractConfig(AdminConfigDef, originals.asJava, false) def createSimplePlaintext(brokerUrl: String): AdminClient = { diff --git a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala index 2715490ec23..14d38ecd37a 100644 --- a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala +++ b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala @@ -20,14 +20,17 @@ package kafka.admin import java.io.PrintStream import java.util.Properties -import kafka.admin.AdminClient.DeleteRecordsResult import kafka.common.AdminCommandFailedException -import kafka.utils.{CoreUtils, Json, CommandLineUtils} +import kafka.utils.{CommandLineUtils, CoreUtils, Json} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.utils.Utils +import org.apache.kafka.clients.admin +import org.apache.kafka.clients.admin.RecordsToDelete import org.apache.kafka.clients.CommonClientConfigs import joptsimple._ +import scala.collection.JavaConverters._ + /** * A command for delete records of the given partitions down to the specified offset. */ @@ -61,26 +64,31 @@ object DeleteRecordsCommand { if (duplicatePartitions.nonEmpty) throw new AdminCommandFailedException("Offset json file contains duplicate topic partitions: %s".format(duplicatePartitions.mkString(","))) + val recordsToDelete = offsetSeq.map { case (topicPartition, offset) => + (topicPartition, RecordsToDelete.beforeOffset(offset)) + }.toMap.asJava + out.println("Executing records delete operation") - val deleteRecordsResult: Map[TopicPartition, DeleteRecordsResult] = adminClient.deleteRecordsBefore(offsetSeq.toMap).get() + val deleteRecordsResult = adminClient.deleteRecords(recordsToDelete) out.println("Records delete operation completed:") - deleteRecordsResult.foreach{ case (tp, partitionResult) => { - if (partitionResult.error == null) - out.println(s"partition: $tp\tlow_watermark: ${partitionResult.lowWatermark}") - else - out.println(s"partition: $tp\terror: ${partitionResult.error.toString}") + deleteRecordsResult.lowWatermarks.asScala.foreach { case (tp, partitionResult) => { + try out.println(s"partition: $tp\tlow_watermark: ${partitionResult.get.lowWatermark}") + catch { + case e: Exception => out.println(s"partition: $tp\terror: ${e.getMessage}") + } }} + adminClient.close() } - private def createAdminClient(opts: DeleteRecordsCommandOptions): AdminClient = { + private def createAdminClient(opts: DeleteRecordsCommandOptions): admin.AdminClient = { val props = if (opts.options.has(opts.commandConfigOpt)) Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) else new Properties() props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt)) - AdminClient.create(props) + admin.AdminClient.create(props) } class DeleteRecordsCommandOptions(args: Array[String]) { diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index 231b1e7ab26..5e4b893bf3f 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -49,8 +49,6 @@ import scala.collection.JavaConverters._ import java.lang.{Long => JLong} import kafka.zk.KafkaZkClient -import org.apache.kafka.common.internals.Topic -import org.scalatest.Assertions.intercept import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} @@ -291,8 +289,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { ) } } - - client.close() } @Test @@ -746,7 +742,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { assertEquals(0L, consumer.position(topicPartition)) val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(5L)).asJava) - val lowWatermark = result.lowWatermarks().get(topicPartition).get().lowWatermark() + val lowWatermark = result.lowWatermarks().get(topicPartition).get.lowWatermark assertEquals(5L, lowWatermark) consumer.seekToBeginning(Collections.singletonList(topicPartition)) @@ -755,7 +751,9 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { consumer.seek(topicPartition, 7L) assertEquals(7L, consumer.position(topicPartition)) - client.close() + client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(DeleteRecordsRequest.HIGH_WATERMARK)).asJava).all.get + consumer.seekToBeginning(Collections.singletonList(topicPartition)) + assertEquals(10L, consumer.position(topicPartition)) } @Test @@ -794,7 +792,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { e.getCause.isInstanceOf[NotLeaderForPartitionException] => false } }, s"Expected low watermark of the partition to be 5 but got ${lowWatermark.getOrElse("no response within the timeout")}") - client.close() } @Test @@ -807,13 +804,11 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { sendRecords(producers.head, 10, topicPartition) val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava) - val lowWatermark = result.lowWatermarks().get(topicPartition).get().lowWatermark() + val lowWatermark = result.lowWatermarks.get(topicPartition).get.lowWatermark assertEquals(3L, lowWatermark) for (i <- 0 until serverCount) assertEquals(3, servers(i).replicaManager.getReplica(topicPartition).get.logStartOffset) - - client.close() } @Test @@ -829,14 +824,68 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { assertEquals(0L, consumer.offsetsForTimes(Map(topicPartition -> JLong.valueOf(0L)).asJava).get(topicPartition).offset()) var result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(5L)).asJava) - result.all().get() + result.all.get assertEquals(5L, consumer.offsetsForTimes(Map(topicPartition -> JLong.valueOf(0L)).asJava).get(topicPartition).offset()) result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(DeleteRecordsRequest.HIGH_WATERMARK)).asJava) - result.all().get() + result.all.get assertNull(consumer.offsetsForTimes(Map(topicPartition -> JLong.valueOf(0L)).asJava).get(topicPartition)) + } - client.close() + @Test + def testConsumeAfterDeleteRecords(): Unit = { + val consumer = consumers.head + subscribeAndWaitForAssignment(topic, consumer) + + client = AdminClient.create(createConfig) + + sendRecords(producers.head, 10, topicPartition) + var messageCount = 0 + TestUtils.waitUntilTrue(() => { + messageCount += consumer.poll(0).count + messageCount == 10 + }, "Expected 10 messages", 3000L) + + client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava).all.get + consumer.seek(topicPartition, 1) + messageCount = 0 + TestUtils.waitUntilTrue(() => { + messageCount += consumer.poll(0).count + messageCount == 7 + }, "Expected 7 messages", 3000L) + + client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(8L)).asJava).all.get + consumer.seek(topicPartition, 1) + messageCount = 0 + TestUtils.waitUntilTrue(() => { + messageCount += consumer.poll(0).count + messageCount == 2 + }, "Expected 2 messages", 3000L) + } + + @Test + def testDeleteRecordsWithException(): Unit = { + subscribeAndWaitForAssignment(topic, consumers.head) + + client = AdminClient.create(createConfig) + + sendRecords(producers.head, 10, topicPartition) + + assertEquals(5L, client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(5L)).asJava) + .lowWatermarks.get(topicPartition).get.lowWatermark) + + // OffsetOutOfRangeException if offset > high_watermark + var cause = intercept[ExecutionException] { + client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(20L)).asJava).lowWatermarks.get(topicPartition).get + }.getCause + assertEquals(classOf[OffsetOutOfRangeException], cause.getClass) + + val nonExistPartition = new TopicPartition(topic, 3) + // LeaderNotAvailableException if non existent partition + cause = intercept[ExecutionException] { + client.deleteRecords(Map(nonExistPartition -> RecordsToDelete.beforeOffset(20L)).asJava).lowWatermarks.get(nonExistPartition).get + }.getCause + assertEquals(classOf[LeaderNotAvailableException], cause.getClass) } @Test @@ -856,8 +905,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { val describeResult2 = client.describeConfigs(Collections.singletonList(invalidTopic)) assertTrue(intercept[ExecutionException](describeResult2.values.get(invalidTopic).get).getCause.isInstanceOf[InvalidTopicException]) - - client.close() } private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte], Array[Byte]]): Unit = { @@ -902,7 +949,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { classOf[SecurityDisabledException]) assertFutureExceptionTypeEquals(client.deleteAcls(Collections.singleton(ACL1.toFilter())).all(), classOf[SecurityDisabledException]) - client.close() } /** @@ -955,7 +1001,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { assertFutureExceptionTypeEquals(future, classOf[TimeoutException]) val endTimeMs = Time.SYSTEM.milliseconds() assertTrue("Expected the timeout to take at least one millisecond.", endTimeMs > startTimeMs); - client.close() } /** @@ -973,7 +1018,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { val future2 = client.createTopics(Seq("mytopic3", "mytopic4").map(new NewTopic(_, 1, 1)).asJava, new CreateTopicsOptions().validateOnly(true)).all() future2.get - client.close() assertEquals(1, factory.failuresInjected) } @@ -1091,6 +1135,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { Utils.closeQuietly(client, "adminClient") } } + } object AdminClientIntegrationTest { diff --git a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala index 2f6fa0126af..b78946cde30 100644 --- a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala +++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala @@ -17,20 +17,19 @@ package kafka.api import java.util.Collections -import java.util.concurrent.TimeUnit import kafka.admin.AdminClient -import kafka.admin.AdminClient.DeleteRecordsResult import kafka.server.KafkaConfig import java.lang.{Long => JLong} + import kafka.utils.{Logging, TestUtils} -import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig} -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, ProducerConfig} +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.protocol.{Errors, ApiKeys} -import org.apache.kafka.common.requests.DeleteRecordsRequest +import org.apache.kafka.common.protocol.ApiKeys import org.junit.{After, Before, Test} import org.junit.Assert._ + import scala.collection.JavaConverters._ /** @@ -78,122 +77,12 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging { super.tearDown() } - @Test - def testSeekToBeginningAfterDeleteRecords() { - val consumer = consumers.head - subscribeAndWaitForAssignment(topic, consumer) - - sendRecords(producers.head, 10, tp) - consumer.seekToBeginning(Collections.singletonList(tp)) - assertEquals(0L, consumer.position(tp)) - - client.deleteRecordsBefore(Map((tp, 5L))).get() - consumer.seekToBeginning(Collections.singletonList(tp)) - assertEquals(5L, consumer.position(tp)) - - client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get() - consumer.seekToBeginning(Collections.singletonList(tp)) - assertEquals(10L, consumer.position(tp)) - } - - @Test - def testConsumeAfterDeleteRecords() { - val consumer = consumers.head - subscribeAndWaitForAssignment(topic, consumer) - - sendRecords(producers.head, 10, tp) - var messageCount = 0 - TestUtils.waitUntilTrue(() => { - messageCount += consumer.poll(0).count() - messageCount == 10 - }, "Expected 10 messages", 3000L) - - client.deleteRecordsBefore(Map((tp, 3L))).get() - consumer.seek(tp, 1) - messageCount = 0 - TestUtils.waitUntilTrue(() => { - messageCount += consumer.poll(0).count() - messageCount == 7 - }, "Expected 7 messages", 3000L) - - client.deleteRecordsBefore(Map((tp, 8L))).get() - consumer.seek(tp, 1) - messageCount = 0 - TestUtils.waitUntilTrue(() => { - messageCount += consumer.poll(0).count() - messageCount == 2 - }, "Expected 2 messages", 3000L) - } - - @Test - def testLogStartOffsetCheckpoint() { - subscribeAndWaitForAssignment(topic, consumers.head) - - sendRecords(producers.head, 10, tp) - assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp)) - - for (i <- 0 until serverCount) - killBroker(i) - restartDeadBrokers() - - client.close() - brokerList = TestUtils.bootstrapServers(servers, listenerName) - client = AdminClient.createSimplePlaintext(brokerList) - - TestUtils.waitUntilTrue(() => { - // Need to retry if leader is not available for the partition - client.deleteRecordsBefore(Map((tp, 0L))).get(1000L, TimeUnit.MILLISECONDS)(tp).equals(DeleteRecordsResult(5L, null)) - }, "Expected low watermark of the partition to be 5L") - } - - @Test - def testLogStartOffsetAfterDeleteRecords() { - subscribeAndWaitForAssignment(topic, consumers.head) - - sendRecords(producers.head, 10, tp) - client.deleteRecordsBefore(Map((tp, 3L))).get() - - for (i <- 0 until serverCount) - assertEquals(3, servers(i).replicaManager.getReplica(tp).get.logStartOffset) - } - @Test def testOffsetsForTimesWhenOffsetNotFound() { val consumer = consumers.head assertNull(consumer.offsetsForTimes(Map(tp -> JLong.valueOf(0L)).asJava).get(tp)) } - @Test - def testOffsetsForTimesAfterDeleteRecords() { - val consumer = consumers.head - subscribeAndWaitForAssignment(topic, consumer) - - sendRecords(producers.head, 10, tp) - assertEquals(0L, consumer.offsetsForTimes(Map(tp -> JLong.valueOf(0L)).asJava).get(tp).offset()) - - client.deleteRecordsBefore(Map((tp, 5L))).get() - assertEquals(5L, consumer.offsetsForTimes(Map(tp -> JLong.valueOf(0L)).asJava).get(tp).offset()) - - client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get() - assertNull(consumer.offsetsForTimes(Map(tp -> JLong.valueOf(0L)).asJava).get(tp)) - } - - @Test - def testDeleteRecordsWithException() { - subscribeAndWaitForAssignment(topic, consumers.head) - - sendRecords(producers.head, 10, tp) - // Should get success result - assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp)) - // OffsetOutOfRangeException if offset > high_watermark - assertEquals(DeleteRecordsResult(-1L, Errors.OFFSET_OUT_OF_RANGE.exception()), client.deleteRecordsBefore(Map((tp, 20))).get()(tp)) - - val nonExistPartition = new TopicPartition(topic, 3) - // UnknownTopicOrPartitionException if user tries to delete records of a non-existent partition - assertEquals(DeleteRecordsResult(-1L, Errors.LEADER_NOT_AVAILABLE.exception()), - client.deleteRecordsBefore(Map((nonExistPartition, 20))).get()(nonExistPartition)) - } - @Test def testListGroups() { subscribeAndWaitForAssignment(topic, consumers.head) diff --git a/docs/upgrade.html b/docs/upgrade.html index ba2d93024f9..03d1feb2c18 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -95,6 +95,7 @@ <h5><a id="upgrade_200_notable" href="#upgrade_200_notable">Notable changes in 2 timeout behavior for blocking APIs. In particular, a new <code>poll(Duration)</code> API has been added which does not block for dynamic partition assignment. The old <code>poll(long)</code> API has been deprecated and will be removed in a future version.</li> + <li>The internal method <code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. Users are encouraged to migrate to <code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li> </ul> <h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New Protocol Versions</a></h5> ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Use Java AdminClient in DeleteRecordsCommand > -------------------------------------------- > > Key: KAFKA-6955 > URL: https://issues.apache.org/jira/browse/KAFKA-6955 > Project: Kafka > Issue Type: Improvement > Reporter: Ismael Juma > Assignee: Vahid Hashemian > Priority: Major > Labels: newbie > > The Scala AdminClient was introduced as a stop gap until we had an officially > supported API. The Java AdminClient is the supported API so we should migrate > all usages to it and remove the Scala AdminClient. This JIRA is for using the > Java AdminClient in DeleteRecordsCommand. -- This message was sent by Atlassian JIRA (v7.6.3#76005)