[ 
https://issues.apache.org/jira/browse/KAFKA-6955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499159#comment-16499159
 ] 

ASF GitHub Bot commented on KAFKA-6955:
---------------------------------------

ijuma closed pull request #5088: KAFKA-6955: Use Java AdminClient in 
DeleteRecordsCommand
URL: https://github.com/apache/kafka/pull/5088
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala 
b/core/src/main/scala/kafka/admin/AdminClient.scala
index bcc11fd4917..ea42530e66d 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -18,7 +18,6 @@ import java.util.{Collections, Properties}
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.{ConcurrentLinkedQueue, Future, TimeUnit}
 
-import kafka.admin.AdminClient.DeleteRecordsResult
 import kafka.common.KafkaException
 import kafka.coordinator.group.GroupOverview
 import kafka.utils.Logging
@@ -216,73 +215,6 @@ class AdminClient(val time: Time,
       broker -> Try[NodeApiVersions](new 
NodeApiVersions(getApiVersions(broker).asJava))
     }.toMap
 
-  /*
-   * Remove all the messages whose offset is smaller than the given offset of 
the corresponding partition
-   *
-   * DeleteRecordsResult contains either lowWatermark of the partition or 
exception. We list the possible exception
-   * and their interpretations below:
-   *
-   * - DisconnectException if leader node of the partition is not available. 
Need retry by user.
-   * - PolicyViolationException if the topic is configured as non-deletable.
-   * - TopicAuthorizationException if the topic doesn't exist and the user 
doesn't have the authority to create the topic
-   * - TimeoutException if response is not available within the timeout 
specified by either Future's timeout or AdminClient's request timeout
-   * - UnknownTopicOrPartitionException if the partition doesn't exist or if 
the user doesn't have the authority to describe the topic
-   * - NotLeaderForPartitionException if broker is not leader of the 
partition. Need retry by user.
-   * - OffsetOutOfRangeException if the offset is larger than high watermark 
of this partition
-   *
-   */
-
-  def deleteRecordsBefore(offsets: Map[TopicPartition, Long]): 
Future[Map[TopicPartition, DeleteRecordsResult]] = {
-    val metadataRequest = new 
MetadataRequest.Builder(offsets.keys.map(_.topic).toSet.toList.asJava, true)
-    val response = sendAnyNode(ApiKeys.METADATA, 
metadataRequest).asInstanceOf[MetadataResponse]
-    val errors = response.errors
-    if (!errors.isEmpty)
-      error(s"Metadata request contained errors: $errors")
-
-    val (partitionsWithoutError, partitionsWithError) = offsets.partition{ 
partitionAndOffset =>
-      !response.errors().containsKey(partitionAndOffset._1.topic())}
-
-    val (partitionsWithLeader, partitionsWithoutLeader) = 
partitionsWithoutError.partition{ partitionAndOffset =>
-      response.cluster().leaderFor(partitionAndOffset._1) != null}
-
-    val partitionsWithErrorResults = partitionsWithError.keys.map( partition =>
-      partition -> 
DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, 
response.errors().get(partition.topic()).exception())).toMap
-
-    val partitionsWithoutLeaderResults = partitionsWithoutLeader.mapValues( _ 
=>
-      DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, 
Errors.LEADER_NOT_AVAILABLE.exception()))
-
-    val partitionsGroupByLeader = 
partitionsWithLeader.groupBy(partitionAndOffset =>
-      response.cluster().leaderFor(partitionAndOffset._1))
-
-    // prepare requests and generate Future objects
-    val futures = partitionsGroupByLeader.map{ case (node, 
partitionAndOffsets) =>
-      val convertedMap: java.util.Map[TopicPartition, java.lang.Long] = 
partitionAndOffsets.mapValues(_.asInstanceOf[java.lang.Long]).asJava
-      val future = client.send(node, new 
DeleteRecordsRequest.Builder(requestTimeoutMs, convertedMap))
-      pendingFutures.add(future)
-      future.compose(new RequestFutureAdapter[ClientResponse, 
Map[TopicPartition, DeleteRecordsResult]]() {
-          override def onSuccess(response: ClientResponse, future: 
RequestFuture[Map[TopicPartition, DeleteRecordsResult]]) {
-            val deleteRecordsResponse = 
response.responseBody().asInstanceOf[DeleteRecordsResponse]
-            val result = deleteRecordsResponse.responses().asScala.mapValues(v 
=> DeleteRecordsResult(v.lowWatermark, v.error.exception())).toMap
-            future.complete(result)
-            pendingFutures.remove(future)
-          }
-
-          override def onFailure(e: RuntimeException, future: 
RequestFuture[Map[TopicPartition, DeleteRecordsResult]]) {
-            val result = partitionAndOffsets.mapValues(_ => 
DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, e))
-            future.complete(result)
-            pendingFutures.remove(future)
-          }
-
-        })
-    }
-
-    // default output if not receiving DeleteRecordsResponse before timeout
-    val defaultResults = offsets.mapValues(_ =>
-      DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, 
Errors.REQUEST_TIMED_OUT.exception())) ++ partitionsWithErrorResults ++ 
partitionsWithoutLeaderResults
-
-    new CompositeFuture(time, defaultResults, futures.toList)
-  }
-
   /**
    * Case class used to represent a consumer of a consumer group
    */
@@ -473,8 +405,6 @@ object AdminClient {
     config
   }
 
-  case class DeleteRecordsResult(lowWatermark: Long, error: Exception)
-
   class AdminConfig(originals: Map[_,_]) extends 
AbstractConfig(AdminConfigDef, originals.asJava, false)
 
   def createSimplePlaintext(brokerUrl: String): AdminClient = {
diff --git a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala 
b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
index 2715490ec23..14d38ecd37a 100644
--- a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
+++ b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
@@ -20,14 +20,17 @@ package kafka.admin
 import java.io.PrintStream
 import java.util.Properties
 
-import kafka.admin.AdminClient.DeleteRecordsResult
 import kafka.common.AdminCommandFailedException
-import kafka.utils.{CoreUtils, Json, CommandLineUtils}
+import kafka.utils.{CommandLineUtils, CoreUtils, Json}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.clients.admin
+import org.apache.kafka.clients.admin.RecordsToDelete
 import org.apache.kafka.clients.CommonClientConfigs
 import joptsimple._
 
+import scala.collection.JavaConverters._
+
 /**
  * A command for delete records of the given partitions down to the specified 
offset.
  */
@@ -61,26 +64,31 @@ object DeleteRecordsCommand {
     if (duplicatePartitions.nonEmpty)
       throw new AdminCommandFailedException("Offset json file contains 
duplicate topic partitions: %s".format(duplicatePartitions.mkString(",")))
 
+    val recordsToDelete = offsetSeq.map { case (topicPartition, offset) =>
+      (topicPartition, RecordsToDelete.beforeOffset(offset))
+    }.toMap.asJava
+
     out.println("Executing records delete operation")
-    val deleteRecordsResult: Map[TopicPartition, DeleteRecordsResult] = 
adminClient.deleteRecordsBefore(offsetSeq.toMap).get()
+    val deleteRecordsResult = adminClient.deleteRecords(recordsToDelete)
     out.println("Records delete operation completed:")
 
-    deleteRecordsResult.foreach{ case (tp, partitionResult) => {
-      if (partitionResult.error == null)
-        out.println(s"partition: $tp\tlow_watermark: 
${partitionResult.lowWatermark}")
-      else
-        out.println(s"partition: $tp\terror: 
${partitionResult.error.toString}")
+    deleteRecordsResult.lowWatermarks.asScala.foreach { case (tp, 
partitionResult) => {
+      try out.println(s"partition: $tp\tlow_watermark: 
${partitionResult.get.lowWatermark}")
+      catch {
+        case e: Exception => out.println(s"partition: $tp\terror: 
${e.getMessage}")
+      }
     }}
+
     adminClient.close()
   }
 
-  private def createAdminClient(opts: DeleteRecordsCommandOptions): 
AdminClient = {
+  private def createAdminClient(opts: DeleteRecordsCommandOptions): 
admin.AdminClient = {
     val props = if (opts.options.has(opts.commandConfigOpt))
       Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
     else
       new Properties()
     props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
opts.options.valueOf(opts.bootstrapServerOpt))
-    AdminClient.create(props)
+    admin.AdminClient.create(props)
   }
 
   class DeleteRecordsCommandOptions(args: Array[String]) {
diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 231b1e7ab26..5e4b893bf3f 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -49,8 +49,6 @@ import scala.collection.JavaConverters._
 import java.lang.{Long => JLong}
 
 import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.internals.Topic
-import org.scalatest.Assertions.intercept
 
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future}
@@ -291,8 +289,6 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
         )
       }
     }
-
-    client.close()
   }
 
   @Test
@@ -746,7 +742,7 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
     assertEquals(0L, consumer.position(topicPartition))
 
     val result = client.deleteRecords(Map(topicPartition -> 
RecordsToDelete.beforeOffset(5L)).asJava)
-    val lowWatermark = 
result.lowWatermarks().get(topicPartition).get().lowWatermark()
+    val lowWatermark = 
result.lowWatermarks().get(topicPartition).get.lowWatermark
     assertEquals(5L, lowWatermark)
 
     consumer.seekToBeginning(Collections.singletonList(topicPartition))
@@ -755,7 +751,9 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
     consumer.seek(topicPartition, 7L)
     assertEquals(7L, consumer.position(topicPartition))
 
-    client.close()
+    client.deleteRecords(Map(topicPartition -> 
RecordsToDelete.beforeOffset(DeleteRecordsRequest.HIGH_WATERMARK)).asJava).all.get
+    consumer.seekToBeginning(Collections.singletonList(topicPartition))
+    assertEquals(10L, consumer.position(topicPartition))
   }
 
   @Test
@@ -794,7 +792,6 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
           e.getCause.isInstanceOf[NotLeaderForPartitionException] => false
         }
     }, s"Expected low watermark of the partition to be 5 but got 
${lowWatermark.getOrElse("no response within the timeout")}")
-    client.close()
   }
 
   @Test
@@ -807,13 +804,11 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
 
     sendRecords(producers.head, 10, topicPartition)
     val result = client.deleteRecords(Map(topicPartition -> 
RecordsToDelete.beforeOffset(3L)).asJava)
-    val lowWatermark = 
result.lowWatermarks().get(topicPartition).get().lowWatermark()
+    val lowWatermark = 
result.lowWatermarks.get(topicPartition).get.lowWatermark
     assertEquals(3L, lowWatermark)
 
     for (i <- 0 until serverCount)
       assertEquals(3, 
servers(i).replicaManager.getReplica(topicPartition).get.logStartOffset)
-
-    client.close()
   }
 
   @Test
@@ -829,14 +824,68 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
     assertEquals(0L, consumer.offsetsForTimes(Map(topicPartition -> 
JLong.valueOf(0L)).asJava).get(topicPartition).offset())
 
     var result = client.deleteRecords(Map(topicPartition -> 
RecordsToDelete.beforeOffset(5L)).asJava)
-    result.all().get()
+    result.all.get
     assertEquals(5L, consumer.offsetsForTimes(Map(topicPartition -> 
JLong.valueOf(0L)).asJava).get(topicPartition).offset())
 
     result = client.deleteRecords(Map(topicPartition -> 
RecordsToDelete.beforeOffset(DeleteRecordsRequest.HIGH_WATERMARK)).asJava)
-    result.all().get()
+    result.all.get
     assertNull(consumer.offsetsForTimes(Map(topicPartition -> 
JLong.valueOf(0L)).asJava).get(topicPartition))
+  }
 
-    client.close()
+  @Test
+  def testConsumeAfterDeleteRecords(): Unit = {
+    val consumer = consumers.head
+    subscribeAndWaitForAssignment(topic, consumer)
+
+    client = AdminClient.create(createConfig)
+
+    sendRecords(producers.head, 10, topicPartition)
+    var messageCount = 0
+    TestUtils.waitUntilTrue(() => {
+      messageCount += consumer.poll(0).count
+      messageCount == 10
+    }, "Expected 10 messages", 3000L)
+
+    client.deleteRecords(Map(topicPartition -> 
RecordsToDelete.beforeOffset(3L)).asJava).all.get
+    consumer.seek(topicPartition, 1)
+    messageCount = 0
+    TestUtils.waitUntilTrue(() => {
+      messageCount += consumer.poll(0).count
+      messageCount == 7
+    }, "Expected 7 messages", 3000L)
+
+    client.deleteRecords(Map(topicPartition -> 
RecordsToDelete.beforeOffset(8L)).asJava).all.get
+    consumer.seek(topicPartition, 1)
+    messageCount = 0
+    TestUtils.waitUntilTrue(() => {
+      messageCount += consumer.poll(0).count
+      messageCount == 2
+    }, "Expected 2 messages", 3000L)
+  }
+
+  @Test
+  def testDeleteRecordsWithException(): Unit = {
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
+    client = AdminClient.create(createConfig)
+
+    sendRecords(producers.head, 10, topicPartition)
+
+    assertEquals(5L, client.deleteRecords(Map(topicPartition -> 
RecordsToDelete.beforeOffset(5L)).asJava)
+      .lowWatermarks.get(topicPartition).get.lowWatermark)
+
+    // OffsetOutOfRangeException if offset > high_watermark
+    var cause = intercept[ExecutionException] {
+      client.deleteRecords(Map(topicPartition -> 
RecordsToDelete.beforeOffset(20L)).asJava).lowWatermarks.get(topicPartition).get
+    }.getCause
+    assertEquals(classOf[OffsetOutOfRangeException], cause.getClass)
+
+    val nonExistPartition = new TopicPartition(topic, 3)
+    // LeaderNotAvailableException if non existent partition
+    cause = intercept[ExecutionException] {
+      client.deleteRecords(Map(nonExistPartition -> 
RecordsToDelete.beforeOffset(20L)).asJava).lowWatermarks.get(nonExistPartition).get
+    }.getCause
+    assertEquals(classOf[LeaderNotAvailableException], cause.getClass)
   }
 
   @Test
@@ -856,8 +905,6 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
     val describeResult2 = 
client.describeConfigs(Collections.singletonList(invalidTopic))
 
     
assertTrue(intercept[ExecutionException](describeResult2.values.get(invalidTopic).get).getCause.isInstanceOf[InvalidTopicException])
-
-    client.close()
   }
 
   private def subscribeAndWaitForAssignment(topic: String, consumer: 
KafkaConsumer[Array[Byte], Array[Byte]]): Unit = {
@@ -902,7 +949,6 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
         classOf[SecurityDisabledException])
     
assertFutureExceptionTypeEquals(client.deleteAcls(Collections.singleton(ACL1.toFilter())).all(),
       classOf[SecurityDisabledException])
-    client.close()
   }
 
   /**
@@ -955,7 +1001,6 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
     assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
     val endTimeMs = Time.SYSTEM.milliseconds()
     assertTrue("Expected the timeout to take at least one millisecond.", 
endTimeMs > startTimeMs);
-    client.close()
   }
 
   /**
@@ -973,7 +1018,6 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
     val future2 = client.createTopics(Seq("mytopic3", "mytopic4").map(new 
NewTopic(_, 1, 1)).asJava,
       new CreateTopicsOptions().validateOnly(true)).all()
     future2.get
-    client.close()
     assertEquals(1, factory.failuresInjected)
   }
 
@@ -1091,6 +1135,7 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
       Utils.closeQuietly(client, "adminClient")
     }
   }
+
 }
 
 object AdminClientIntegrationTest {
diff --git 
a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala 
b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
index 2f6fa0126af..b78946cde30 100644
--- a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
@@ -17,20 +17,19 @@
 package kafka.api
 
 import java.util.Collections
-import java.util.concurrent.TimeUnit
 
 import kafka.admin.AdminClient
-import kafka.admin.AdminClient.DeleteRecordsResult
 import kafka.server.KafkaConfig
 import java.lang.{Long => JLong}
+
 import kafka.utils.{Logging, TestUtils}
-import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig}
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, 
ProducerConfig}
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.{Errors, ApiKeys}
-import org.apache.kafka.common.requests.DeleteRecordsRequest
+import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
+
 import scala.collection.JavaConverters._
 
 /**
@@ -78,122 +77,12 @@ class LegacyAdminClientTest extends IntegrationTestHarness 
with Logging {
     super.tearDown()
   }
 
-  @Test
-  def testSeekToBeginningAfterDeleteRecords() {
-    val consumer = consumers.head
-    subscribeAndWaitForAssignment(topic, consumer)
-
-    sendRecords(producers.head, 10, tp)
-    consumer.seekToBeginning(Collections.singletonList(tp))
-    assertEquals(0L, consumer.position(tp))
-
-    client.deleteRecordsBefore(Map((tp, 5L))).get()
-    consumer.seekToBeginning(Collections.singletonList(tp))
-    assertEquals(5L, consumer.position(tp))
-
-    client.deleteRecordsBefore(Map((tp, 
DeleteRecordsRequest.HIGH_WATERMARK))).get()
-    consumer.seekToBeginning(Collections.singletonList(tp))
-    assertEquals(10L, consumer.position(tp))
-  }
-
-  @Test
-  def testConsumeAfterDeleteRecords() {
-    val consumer = consumers.head
-    subscribeAndWaitForAssignment(topic, consumer)
-
-    sendRecords(producers.head, 10, tp)
-    var messageCount = 0
-    TestUtils.waitUntilTrue(() => {
-      messageCount += consumer.poll(0).count()
-      messageCount == 10
-    }, "Expected 10 messages", 3000L)
-
-    client.deleteRecordsBefore(Map((tp, 3L))).get()
-    consumer.seek(tp, 1)
-    messageCount = 0
-    TestUtils.waitUntilTrue(() => {
-      messageCount += consumer.poll(0).count()
-      messageCount == 7
-    }, "Expected 7 messages", 3000L)
-
-    client.deleteRecordsBefore(Map((tp, 8L))).get()
-    consumer.seek(tp, 1)
-    messageCount = 0
-    TestUtils.waitUntilTrue(() => {
-      messageCount += consumer.poll(0).count()
-      messageCount == 2
-    }, "Expected 2 messages", 3000L)
-  }
-
-  @Test
-  def testLogStartOffsetCheckpoint() {
-    subscribeAndWaitForAssignment(topic, consumers.head)
-
-    sendRecords(producers.head, 10, tp)
-    assertEquals(DeleteRecordsResult(5L, null), 
client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
-
-    for (i <- 0 until serverCount)
-      killBroker(i)
-    restartDeadBrokers()
-
-    client.close()
-    brokerList = TestUtils.bootstrapServers(servers, listenerName)
-    client = AdminClient.createSimplePlaintext(brokerList)
-
-    TestUtils.waitUntilTrue(() => {
-      // Need to retry if leader is not available for the partition
-      client.deleteRecordsBefore(Map((tp, 0L))).get(1000L, 
TimeUnit.MILLISECONDS)(tp).equals(DeleteRecordsResult(5L, null))
-    }, "Expected low watermark of the partition to be 5L")
-  }
-
-  @Test
-  def testLogStartOffsetAfterDeleteRecords() {
-    subscribeAndWaitForAssignment(topic, consumers.head)
-
-    sendRecords(producers.head, 10, tp)
-    client.deleteRecordsBefore(Map((tp, 3L))).get()
-
-    for (i <- 0 until serverCount)
-      assertEquals(3, 
servers(i).replicaManager.getReplica(tp).get.logStartOffset)
-  }
-
   @Test
   def testOffsetsForTimesWhenOffsetNotFound() {
     val consumer = consumers.head
     assertNull(consumer.offsetsForTimes(Map(tp -> 
JLong.valueOf(0L)).asJava).get(tp))
   }
 
-  @Test
-  def testOffsetsForTimesAfterDeleteRecords() {
-    val consumer = consumers.head
-    subscribeAndWaitForAssignment(topic, consumer)
-
-    sendRecords(producers.head, 10, tp)
-    assertEquals(0L, consumer.offsetsForTimes(Map(tp -> 
JLong.valueOf(0L)).asJava).get(tp).offset())
-
-    client.deleteRecordsBefore(Map((tp, 5L))).get()
-    assertEquals(5L, consumer.offsetsForTimes(Map(tp -> 
JLong.valueOf(0L)).asJava).get(tp).offset())
-
-    client.deleteRecordsBefore(Map((tp, 
DeleteRecordsRequest.HIGH_WATERMARK))).get()
-    assertNull(consumer.offsetsForTimes(Map(tp -> 
JLong.valueOf(0L)).asJava).get(tp))
-  }
-
-  @Test
-  def testDeleteRecordsWithException() {
-    subscribeAndWaitForAssignment(topic, consumers.head)
-
-    sendRecords(producers.head, 10, tp)
-    // Should get success result
-    assertEquals(DeleteRecordsResult(5L, null), 
client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
-    // OffsetOutOfRangeException if offset > high_watermark
-    assertEquals(DeleteRecordsResult(-1L, 
Errors.OFFSET_OUT_OF_RANGE.exception()), client.deleteRecordsBefore(Map((tp, 
20))).get()(tp))
-
-    val nonExistPartition = new TopicPartition(topic, 3)
-    // UnknownTopicOrPartitionException if user tries to delete records of a 
non-existent partition
-    assertEquals(DeleteRecordsResult(-1L, 
Errors.LEADER_NOT_AVAILABLE.exception()),
-                 client.deleteRecordsBefore(Map((nonExistPartition, 
20))).get()(nonExistPartition))
-  }
-
   @Test
   def testListGroups() {
     subscribeAndWaitForAssignment(topic, consumers.head)
diff --git a/docs/upgrade.html b/docs/upgrade.html
index ba2d93024f9..03d1feb2c18 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -95,6 +95,7 @@ <h5><a id="upgrade_200_notable" 
href="#upgrade_200_notable">Notable changes in 2
         timeout behavior for blocking APIs. In particular, a new 
<code>poll(Duration)</code> API has been added which
         does not block for dynamic partition assignment. The old 
<code>poll(long)</code> API has been deprecated and
         will be removed in a future version.</li>
+    <li>The internal method 
<code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. 
Users are encouraged to migrate to 
<code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li>
 </ul>
 
 <h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New 
Protocol Versions</a></h5>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Use Java AdminClient in DeleteRecordsCommand
> --------------------------------------------
>
>                 Key: KAFKA-6955
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6955
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Ismael Juma
>            Assignee: Vahid Hashemian
>            Priority: Major
>              Labels: newbie
>
> The Scala AdminClient was introduced as a stop gap until we had an officially 
> supported API. The Java AdminClient is the supported API so we should migrate 
> all usages to it and remove the Scala AdminClient. This JIRA is for using the 
> Java AdminClient in DeleteRecordsCommand.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to