This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4bba2c8a32a KAFKA-14591: Move DeleteRecordsCommand to tools (#13278)
4bba2c8a32a is described below

commit 4bba2c8a32a3e35f6870cf3f738c0eef8bb652d2
Author: Nikolay <[email protected]>
AuthorDate: Fri Jul 21 18:30:28 2023 +0300

    KAFKA-14591: Move DeleteRecordsCommand to tools (#13278)
    
    
    Reviewers: Mickael Maison <[email protected]>, Federico Valeri 
<[email protected]>
---
 bin/kafka-delete-records.sh                        |   2 +-
 bin/windows/kafka-delete-records.bat               |   2 +-
 core/src/main/scala/kafka/admin/AdminUtils.scala   |   2 +-
 .../scala/kafka/admin/DeleteRecordsCommand.scala   | 137 ---------------
 .../scala/kafka/admin/LeaderElectionCommand.scala  |   2 +-
 .../kafka/admin/ReassignPartitionsCommand.scala    |   2 +-
 core/src/main/scala/kafka/admin/TopicCommand.scala |   2 +-
 .../scala/kafka/controller/KafkaController.scala   |   3 +-
 .../main/scala/kafka/server/ZkAdminManager.scala   |   3 +-
 core/src/main/scala/kafka/zk/AdminZkClient.scala   |   3 +-
 .../scala/unit/kafka/admin/AddPartitionsTest.scala |   1 +
 .../admin/LeaderElectionCommandErrorTest.scala     |   2 +-
 .../kafka/admin/LeaderElectionCommandTest.scala    |   3 +-
 .../kafka/admin/ReassignPartitionsUnitTest.scala   |   2 +-
 .../scala/unit/kafka/admin/TopicCommandTest.scala  |   2 +-
 .../unit/kafka/server/DynamicConfigTest.scala      |   2 +-
 .../scala/unit/kafka/zk/AdminZkClientTest.scala    |   1 +
 .../server/common/AdminCommandFailedException.java |  21 ++-
 .../server/common/AdminOperationException.java     |  19 ++-
 .../apache/kafka/tools/DeleteRecordsCommand.java   | 183 +++++++++++++++++++++
 .../kafka/tools/DeleteRecordsCommandTest.java      | 182 ++++++++++++++++++++
 21 files changed, 408 insertions(+), 168 deletions(-)

diff --git a/bin/kafka-delete-records.sh b/bin/kafka-delete-records.sh
index 8726f919992..e9db8f95c58 100755
--- a/bin/kafka-delete-records.sh
+++ b/bin/kafka-delete-records.sh
@@ -14,4 +14,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-exec $(dirname $0)/kafka-run-class.sh kafka.admin.DeleteRecordsCommand "$@"
+exec $(dirname $0)/kafka-run-class.sh 
org.apache.kafka.tools.DeleteRecordsCommand "$@"
diff --git a/bin/windows/kafka-delete-records.bat 
b/bin/windows/kafka-delete-records.bat
index d07e05f88a2..a883ec707e9 100644
--- a/bin/windows/kafka-delete-records.bat
+++ b/bin/windows/kafka-delete-records.bat
@@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied.
 rem See the License for the specific language governing permissions and
 rem limitations under the License.
 
-"%~dp0kafka-run-class.bat" kafka.admin.DeleteRecordsCommand %*
+"%~dp0kafka-run-class.bat" org.apache.kafka.tools.DeleteRecordsCommand %*
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala 
b/core/src/main/scala/kafka/admin/AdminUtils.scala
index a37e3d68e64..5ac09ab5348 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -18,9 +18,9 @@
 package kafka.admin
 
 import java.util.Random
-
 import kafka.utils.Logging
 import org.apache.kafka.common.errors.{InvalidPartitionsException, 
InvalidReplicationFactorException}
+import org.apache.kafka.server.common.AdminOperationException
 
 import collection.{Map, mutable, _}
 
diff --git a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala 
b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
deleted file mode 100644
index b1747313708..00000000000
--- a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.admin
-
-import java.io.PrintStream
-import java.util.Properties
-import kafka.common.AdminCommandFailedException
-import kafka.utils.json.JsonValue
-import kafka.utils.{CoreUtils, Json}
-import org.apache.kafka.clients.admin.{Admin, RecordsToDelete}
-import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
-
-import scala.jdk.CollectionConverters._
-import scala.collection.Seq
-
-/**
- * A command for delete records of the given partitions down to the specified 
offset.
- */
-object DeleteRecordsCommand {
-
-  private[admin] val EarliestVersion = 1
-
-  def main(args: Array[String]): Unit = {
-    execute(args, System.out)
-  }
-
-  def parseOffsetJsonStringWithoutDedup(jsonData: String): 
Seq[(TopicPartition, Long)] = {
-    Json.parseFull(jsonData) match {
-      case Some(js) =>
-        val version = js.asJsonObject.get("version") match {
-          case Some(jsonValue) => jsonValue.to[Int]
-          case None => EarliestVersion
-        }
-        parseJsonData(version, js)
-      case None => throw new AdminOperationException("The input string is not 
a valid JSON")
-    }
-  }
-
-  def parseJsonData(version: Int, js: JsonValue): Seq[(TopicPartition, Long)] 
= {
-    version match {
-      case 1 =>
-        js.asJsonObject.get("partitions") match {
-          case Some(partitions) =>
-            partitions.asJsonArray.iterator.map(_.asJsonObject).map { 
partitionJs =>
-              val topic = partitionJs("topic").to[String]
-              val partition = partitionJs("partition").to[Int]
-              val offset = partitionJs("offset").to[Long]
-              new TopicPartition(topic, partition) -> offset
-            }.toBuffer
-          case _ => throw new AdminOperationException("Missing partitions 
field");
-        }
-      case _ => throw new AdminOperationException(s"Not supported version 
field value $version")
-    }
-  }
-
-  def execute(args: Array[String], out: PrintStream): Unit = {
-    val opts = new DeleteRecordsCommandOptions(args)
-    val adminClient = createAdminClient(opts)
-    val offsetJsonFile =  opts.options.valueOf(opts.offsetJsonFileOpt)
-    val offsetJsonString = Utils.readFileAsString(offsetJsonFile)
-    val offsetSeq = parseOffsetJsonStringWithoutDedup(offsetJsonString)
-
-    val duplicatePartitions = CoreUtils.duplicates(offsetSeq.map { case (tp, 
_) => tp })
-    if (duplicatePartitions.nonEmpty)
-      throw new AdminCommandFailedException("Offset json file contains 
duplicate topic partitions: %s".format(duplicatePartitions.mkString(",")))
-
-    val recordsToDelete = offsetSeq.map { case (topicPartition, offset) =>
-      (topicPartition, RecordsToDelete.beforeOffset(offset))
-    }.toMap.asJava
-
-    out.println("Executing records delete operation")
-    val deleteRecordsResult = adminClient.deleteRecords(recordsToDelete)
-    out.println("Records delete operation completed:")
-
-    deleteRecordsResult.lowWatermarks.forEach { (tp, partitionResult) =>
-      try out.println(s"partition: $tp\tlow_watermark: 
${partitionResult.get.lowWatermark}")
-      catch {
-        case e: Exception => out.println(s"partition: $tp\terror: 
${e.getMessage}")
-      }
-    }
-
-    adminClient.close()
-  }
-
-  private def createAdminClient(opts: DeleteRecordsCommandOptions): Admin = {
-    val props = if (opts.options.has(opts.commandConfigOpt))
-      Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
-    else
-      new Properties()
-    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
opts.options.valueOf(opts.bootstrapServerOpt))
-    Admin.create(props)
-  }
-
-  class DeleteRecordsCommandOptions(args: Array[String]) extends 
CommandDefaultOptions(args) {
-    val BootstrapServerDoc = "REQUIRED: The server to connect to."
-    val offsetJsonFileDoc = "REQUIRED: The JSON file with offset per 
partition. The format to use is:\n" +
-                                 "{\"partitions\":\n  [{\"topic\": \"foo\", 
\"partition\": 1, \"offset\": 1}],\n \"version\":1\n}"
-    val CommandConfigDoc = "A property file containing configs to be passed to 
Admin Client."
-
-    val bootstrapServerOpt = parser.accepts("bootstrap-server", 
BootstrapServerDoc)
-                                   .withRequiredArg
-                                   .describedAs("server(s) to use for 
bootstrapping")
-                                   .ofType(classOf[String])
-    val offsetJsonFileOpt = parser.accepts("offset-json-file", 
offsetJsonFileDoc)
-                                   .withRequiredArg
-                                   .describedAs("Offset json file path")
-                                   .ofType(classOf[String])
-    val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
-                                   .withRequiredArg
-                                   .describedAs("command config property file 
path")
-                                   .ofType(classOf[String])
-
-    options = parser.parse(args : _*)
-
-    CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to delete 
records of the given partitions down to the specified offset.")
-
-    CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, 
offsetJsonFileOpt)
-  }
-}
diff --git a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala 
b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
index 140f4b70177..868c54916e9 100644
--- a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
@@ -19,7 +19,6 @@ package kafka.admin
 import java.util.Properties
 import java.util.concurrent.ExecutionException
 import joptsimple.util.EnumConverter
-import kafka.common.AdminCommandFailedException
 import kafka.utils.CoreUtils
 import kafka.utils.Implicits._
 import kafka.utils.Json
@@ -31,6 +30,7 @@ import 
org.apache.kafka.common.errors.ClusterAuthorizationException
 import org.apache.kafka.common.errors.ElectionNotNeededException
 import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.common.{AdminCommandFailedException, 
AdminOperationException}
 import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 
 import scala.jdk.CollectionConverters._
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index d1b75c483eb..40f688c7085 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -19,7 +19,6 @@ package kafka.admin
 import java.util
 import java.util.Optional
 import java.util.concurrent.ExecutionException
-import kafka.common.AdminCommandFailedException
 import kafka.server.DynamicConfig
 import kafka.utils.{CoreUtils, Exit, Json, Logging}
 import kafka.utils.Implicits._
@@ -30,6 +29,7 @@ import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors.{ReplicaNotAvailableException, 
UnknownTopicOrPartitionException}
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{KafkaException, KafkaFuture, TopicPartition, 
TopicPartitionReplica}
+import org.apache.kafka.server.common.{AdminCommandFailedException, 
AdminOperationException}
 import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 import org.apache.kafka.storage.internals.log.LogConfig
 
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala 
b/core/src/main/scala/kafka/admin/TopicCommand.scala
index b4c74d35cdf..568dd1798a1 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -20,7 +20,6 @@ package kafka.admin
 import java.util
 import java.util.{Collections, Optional, Properties}
 import joptsimple._
-import kafka.common.AdminCommandFailedException
 import kafka.utils._
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.CreatePartitionsOptions
@@ -33,6 +32,7 @@ import org.apache.kafka.common.config.{ConfigResource, 
TopicConfig}
 import org.apache.kafka.common.errors.{ClusterAuthorizationException, 
TopicExistsException, UnsupportedVersionException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.common.{AdminCommandFailedException, 
AdminOperationException}
 import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 import org.apache.kafka.storage.internals.log.LogConfig
 import org.apache.kafka.server.util.TopicFilter.IncludeList
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index c3f76e83d12..fa2575d9d8b 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -19,7 +19,6 @@ package kafka.controller
 import com.yammer.metrics.core.Timer
 
 import java.util.concurrent.TimeUnit
-import kafka.admin.AdminOperationException
 import kafka.api._
 import kafka.common._
 import kafka.cluster.Broker
@@ -45,7 +44,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, 
LeaderAndIsrResponse, UpdateFeaturesRequest, UpdateMetadataResponse}
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.metadata.LeaderRecoveryState
-import org.apache.kafka.server.common.ProducerIdsBlock
+import org.apache.kafka.server.common.{AdminOperationException, 
ProducerIdsBlock}
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.KafkaScheduler
 import org.apache.zookeeper.KeeperException
diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala 
b/core/src/main/scala/kafka/server/ZkAdminManager.scala
index 97cf2b2f713..31ab40430d7 100644
--- a/core/src/main/scala/kafka/server/ZkAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala
@@ -18,7 +18,7 @@ package kafka.server
 
 import java.util
 import java.util.Properties
-import kafka.admin.{AdminOperationException, AdminUtils}
+import kafka.admin.AdminUtils
 import kafka.common.TopicAlreadyMarkedForDeletionException
 import kafka.server.ConfigAdminManager.{prepareIncrementalConfigs, 
toLoggableProps}
 import kafka.server.DynamicConfig.QuotaConfigs
@@ -47,6 +47,7 @@ import org.apache.kafka.common.requests.CreateTopicsRequest._
 import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError}
 import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, 
ScramFormatter}
 import org.apache.kafka.common.utils.Sanitizer
+import org.apache.kafka.server.common.AdminOperationException
 import org.apache.kafka.storage.internals.log.LogConfig
 
 import scala.collection.{Map, mutable, _}
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala 
b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index c735d53eba9..07ca35cc9ae 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -17,7 +17,7 @@
 package kafka.zk
 
 import java.util.Properties
-import kafka.admin.{AdminOperationException, AdminUtils, BrokerMetadata, 
RackAwareMode}
+import kafka.admin.{AdminUtils, BrokerMetadata, RackAwareMode}
 import kafka.common.TopicAlreadyMarkedForDeletionException
 import kafka.controller.ReplicaAssignment
 import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
@@ -26,6 +26,7 @@ import kafka.utils.Implicits._
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.server.common.AdminOperationException
 import org.apache.kafka.storage.internals.log.LogConfig
 import org.apache.zookeeper.KeeperException.NodeExistsException
 
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 
b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 07d00963d52..ea57207e278 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.clients.admin.{Admin, NewPartitions, 
NewTopic}
 import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
 import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
 import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
+import org.apache.kafka.server.common.AdminOperationException
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
diff --git 
a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala 
b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala
index eaef9367a1d..6d36120b136 100644
--- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala
@@ -16,8 +16,8 @@
  */
 package kafka.admin
 
-import kafka.common.AdminCommandFailedException
 import org.apache.kafka.common.errors.TimeoutException
+import org.apache.kafka.server.common.AdminCommandFailedException
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 
diff --git 
a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
index 278bcf0e68c..ff6cd2cad60 100644
--- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
@@ -18,8 +18,6 @@ package kafka.admin
 
 import java.nio.charset.StandardCharsets
 import java.nio.file.{Files, Path}
-
-import kafka.common.AdminCommandFailedException
 import kafka.server.IntegrationTestUtils.createTopic
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
@@ -29,6 +27,7 @@ import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.AdminClientConfig
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
+import org.apache.kafka.server.common.AdminCommandFailedException
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.extension.ExtendWith
 import org.junit.jupiter.api.{BeforeEach, Tag}
diff --git 
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala 
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala
index 71bbcb91601..fab7907e17b 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala
@@ -20,12 +20,12 @@ package kafka.admin
 import java.util.concurrent.ExecutionException
 import java.util.{Arrays, Collections}
 import kafka.admin.ReassignPartitionsCommand._
-import kafka.common.AdminCommandFailedException
 import kafka.utils.Exit
 import org.apache.kafka.clients.admin.{Config, MockAdminClient, 
PartitionReassignment}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors.{InvalidReplicationFactorException, 
UnknownTopicOrPartitionException}
 import org.apache.kafka.common.{Node, TopicPartition, TopicPartitionInfo, 
TopicPartitionReplica}
+import org.apache.kafka.server.common.{AdminCommandFailedException, 
AdminOperationException}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertThrows, assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout}
 
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 59def64ec85..088c9dd0205 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -17,13 +17,13 @@
 package kafka.admin
 
 import kafka.admin.TopicCommand.{PartitionDescription, TopicCommandOptions, 
TopicService}
-import kafka.common.AdminCommandFailedException
 import kafka.utils.Exit
 import org.apache.kafka.clients.admin.{Admin, AdminClientTestUtils, 
CreatePartitionsOptions, CreateTopicsOptions, DeleteTopicsOptions, 
NewPartitions, NewTopic, PartitionReassignment, TopicDescription}
 import org.apache.kafka.common.Node
 import org.apache.kafka.common.TopicPartitionInfo
 import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.server.common.{AdminCommandFailedException, 
AdminOperationException}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 import org.mockito.ArgumentMatcher
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
index 621c94f67e6..ddcb6377357 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
@@ -16,11 +16,11 @@
   */
 package kafka.server
 
-import kafka.admin.AdminOperationException
 import kafka.utils.CoreUtils._
 import kafka.server.QuorumTestHarness
 import org.apache.kafka.common.config._
 import org.apache.kafka.common.config.internals.QuotaConfigs
+import org.apache.kafka.server.common.AdminOperationException
 import org.junit.jupiter.api.Assertions.assertThrows
 import org.junit.jupiter.api.Test
 
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala 
b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
index 833fd930414..ba0a2583598 100644
--- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.config.internals.QuotaConfigs
 import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, 
InvalidTopicException, TopicExistsException}
 import org.apache.kafka.common.metrics.Quota
+import org.apache.kafka.server.common.AdminOperationException
 import org.apache.kafka.storage.internals.log.LogConfig
 import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.junit.jupiter.api.Assertions._
diff --git a/core/src/main/scala/kafka/admin/AdminOperationException.scala 
b/server-common/src/main/java/org/apache/kafka/server/common/AdminCommandFailedException.java
similarity index 60%
rename from core/src/main/scala/kafka/admin/AdminOperationException.scala
rename to 
server-common/src/main/java/org/apache/kafka/server/common/AdminCommandFailedException.java
index a45b3f7e93a..62bb8425546 100644
--- a/core/src/main/scala/kafka/admin/AdminOperationException.scala
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/AdminCommandFailedException.java
@@ -1,10 +1,10 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -15,9 +15,14 @@
  * limitations under the License.
  */
 
-package kafka.admin
+package org.apache.kafka.server.common;
 
-class AdminOperationException(val error: String, cause: Throwable) extends 
RuntimeException(error, cause) {
-  def this(error: Throwable) = this(error.getMessage, error)
-  def this(msg: String) = this(msg, null)
-}
\ No newline at end of file
+public class AdminCommandFailedException extends RuntimeException {
+    public AdminCommandFailedException(String message) {
+        super(message);
+    }
+
+    public AdminCommandFailedException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git a/core/src/main/scala/kafka/common/AdminCommandFailedException.scala 
b/server-common/src/main/java/org/apache/kafka/server/common/AdminOperationException.java
similarity index 61%
rename from core/src/main/scala/kafka/common/AdminCommandFailedException.scala
rename to 
server-common/src/main/java/org/apache/kafka/server/common/AdminOperationException.java
index 94e28641dd0..03826d1ac00 100644
--- a/core/src/main/scala/kafka/common/AdminCommandFailedException.scala
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/AdminOperationException.java
@@ -1,10 +1,10 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -15,9 +15,14 @@
  * limitations under the License.
  */
 
-package kafka.common
+package org.apache.kafka.server.common;
 
-class AdminCommandFailedException(message: String, cause: Throwable) extends 
RuntimeException(message, cause) {
-  def this(message: String) = this(message, null)
-  def this() = this(null, null)
+public class AdminOperationException extends RuntimeException {
+    public AdminOperationException(String message) {
+        super(message);
+    }
+
+    public AdminOperationException(Throwable cause) {
+        super(cause.getMessage(), cause);
+    }
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/DeleteRecordsCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/DeleteRecordsCommand.java
new file mode 100644
index 00000000000..5e44865b200
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/DeleteRecordsCommand.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DeleteRecordsResult;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.StringJoiner;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * A command for delete records of the given partitions down to the specified 
offset.
+ */
+public class DeleteRecordsCommand {
+    private static final int EARLIEST_VERSION = 1;
+
+    private static final DecodeJson.DecodeInteger INT = new 
DecodeJson.DecodeInteger();
+
+    private static final DecodeJson.DecodeLong LONG = new 
DecodeJson.DecodeLong();
+
+    private static final DecodeJson.DecodeString STRING = new 
DecodeJson.DecodeString();
+
+    public static void main(String[] args) throws Exception {
+        execute(args, System.out);
+    }
+
+    static Map<TopicPartition, List<Long>> 
parseOffsetJsonStringWithoutDedup(String jsonData) throws 
JsonProcessingException {
+        JsonValue js = Json.parseFull(jsonData)
+            .orElseThrow(() -> new AdminOperationException("The input string 
is not a valid JSON"));
+
+        Optional<JsonValue> version = js.asJsonObject().get("version");
+
+        return parseJsonData(version.isPresent() ? version.get().to(INT) : 
EARLIEST_VERSION, js);
+    }
+
+    private static Map<TopicPartition, List<Long>> parseJsonData(int version, 
JsonValue js) throws JsonMappingException {
+        if (version == 1) {
+            JsonValue partitions = js.asJsonObject().get("partitions")
+                .orElseThrow(() -> new AdminOperationException("Missing 
partitions field"));
+
+            Map<TopicPartition, List<Long>> res = new HashMap<>();
+
+            Iterator<JsonValue> iterator = partitions.asJsonArray().iterator();
+
+            while (iterator.hasNext()) {
+                JsonObject partitionJs = iterator.next().asJsonObject();
+
+                String topic = partitionJs.apply("topic").to(STRING);
+                int partition = partitionJs.apply("partition").to(INT);
+                long offset = partitionJs.apply("offset").to(LONG);
+
+                res.computeIfAbsent(new TopicPartition(topic, partition), k -> 
new ArrayList<>()).add(offset);
+            }
+
+            return res;
+        }
+
+        throw new AdminOperationException("Not supported version field value " 
+ version);
+    }
+
+    public static void execute(String[] args, PrintStream out) throws 
IOException {
+        DeleteRecordsCommandOptions opts = new 
DeleteRecordsCommandOptions(args);
+
+        try (Admin adminClient = createAdminClient(opts)) {
+            execute(adminClient, 
Utils.readFileAsString(opts.options.valueOf(opts.offsetJsonFileOpt)), out);
+        }
+    }
+
+    static void execute(Admin adminClient, String offsetJsonString, 
PrintStream out) throws JsonProcessingException {
+        Map<TopicPartition, List<Long>> offsetSeq = 
parseOffsetJsonStringWithoutDedup(offsetJsonString);
+
+        Set<TopicPartition> duplicatePartitions = offsetSeq.entrySet().stream()
+            .filter(e -> e.getValue().size() > 1)
+            .map(Map.Entry::getKey)
+            .collect(Collectors.toSet());
+
+        if (!duplicatePartitions.isEmpty()) {
+            StringJoiner duplicates = new StringJoiner(",");
+            duplicatePartitions.forEach(tp -> duplicates.add(tp.toString()));
+            throw new AdminCommandFailedException(
+                String.format("Offset json file contains duplicate topic 
partitions: %s", duplicates)
+            );
+        }
+
+        Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
+
+        for (Map.Entry<TopicPartition, List<Long>> e : offsetSeq.entrySet())
+            recordsToDelete.put(e.getKey(), 
RecordsToDelete.beforeOffset(e.getValue().get(0)));
+
+        out.println("Executing records delete operation");
+        DeleteRecordsResult deleteRecordsResult = 
adminClient.deleteRecords(recordsToDelete);
+        out.println("Records delete operation completed:");
+
+        deleteRecordsResult.lowWatermarks().forEach((tp, partitionResult) -> {
+            try {
+                out.printf("partition: %s\tlow_watermark: %s%n", tp, 
partitionResult.get().lowWatermark());
+            } catch (InterruptedException | ExecutionException e) {
+                out.printf("partition: %s\terror: %s%n", tp, e.getMessage());
+            }
+        });
+    }
+
+    private static Admin createAdminClient(DeleteRecordsCommandOptions opts) 
throws IOException {
+        Properties props = opts.options.has(opts.commandConfigOpt)
+            ? Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
+            : new Properties();
+        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
opts.options.valueOf(opts.bootstrapServerOpt));
+        return Admin.create(props);
+    }
+
+    private static class DeleteRecordsCommandOptions extends 
CommandDefaultOptions {
+        private final OptionSpec<String> bootstrapServerOpt;
+        private final OptionSpec<String> offsetJsonFileOpt;
+        private final OptionSpec<String> commandConfigOpt;
+
+        public DeleteRecordsCommandOptions(String[] args) {
+            super(args);
+
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: 
The server to connect to.")
+                .withRequiredArg()
+                .describedAs("server(s) to use for bootstrapping")
+                .ofType(String.class);
+
+            offsetJsonFileOpt = parser.accepts("offset-json-file", "REQUIRED: 
The JSON file with offset per partition. " +
+                    "The format to use is:\n" +
+                    "{\"partitions\":\n  [{\"topic\": \"foo\", \"partition\": 
1, \"offset\": 1}],\n \"version\":1\n}")
+                .withRequiredArg()
+                .describedAs("Offset json file path")
+                .ofType(String.class);
+
+            commandConfigOpt = parser.accepts("command-config", "A property 
file containing configs to be passed to Admin Client.")
+                .withRequiredArg()
+                .describedAs("command config property file path")
+                .ofType(String.class);
+
+            options = parser.parse(args);
+
+            CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to 
delete records of the given partitions down to the specified offset.");
+
+            CommandLineUtils.checkRequiredArgs(parser, options, 
bootstrapServerOpt, offsetJsonFileOpt);
+        }
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
new file mode 100644
index 00000000000..2c06fb66ba0
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.io.IOException;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL)
+@Tag("integration")
+public class DeleteRecordsCommandTest {
+
+    private final ClusterInstance cluster;
+    public DeleteRecordsCommandTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    @ClusterTest
+    public void testCommand() throws Exception {
+        Properties adminProps = new Properties();
+
+        adminProps.put(AdminClientConfig.RETRIES_CONFIG, 1);
+
+        try (Admin admin = cluster.createAdminClient(adminProps)) {
+            assertThrows(
+                AdminCommandFailedException.class,
+                () -> DeleteRecordsCommand.execute(admin, "{\"partitions\":[" +
+                    "{\"topic\":\"t\", \"partition\":0, \"offset\":1}," +
+                    "{\"topic\":\"t\", \"partition\":0, \"offset\":1}]" +
+                    "}", System.out),
+                "Offset json file contains duplicate topic partitions: t-0"
+            );
+
+            admin.createTopics(Collections.singleton(new NewTopic("t", 1, 
(short) 1))).all().get();
+
+            Properties props = new Properties();
+
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
+            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
+            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
+
+            try (KafkaProducer<?, String> producer = new 
KafkaProducer<>(props)) {
+                producer.send(new ProducerRecord<>("t", "1")).get();
+                producer.send(new ProducerRecord<>("t", "2")).get();
+                producer.send(new ProducerRecord<>("t", "3")).get();
+            }
+
+            executeAndAssertOutput(
+                "{\"partitions\":[{\"topic\":\"t\", \"partition\":0, 
\"offset\":1}]}",
+                "partition: t-0\tlow_watermark: 1",
+                admin
+            );
+
+            executeAndAssertOutput(
+                "{\"partitions\":[{\"topic\":\"t\", \"partition\":42, 
\"offset\":42}]}",
+                "partition: t-42\terror",
+                admin
+            );
+        }
+    }
+
+    private static void executeAndAssertOutput(String json, String expOut, 
Admin admin) {
+        String output = ToolsTestUtils.captureStandardOut(() -> {
+            try {
+                DeleteRecordsCommand.execute(admin, json, System.out);
+            } catch (JsonProcessingException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        assertTrue(output.contains(expOut));
+    }
+}
+
+/**
+ * Unit test of {@link DeleteRecordsCommand} tool.
+ */
+class DeleteRecordsCommandUnitTest {
+    @Test
+    public void testOffsetFileNotExists() {
+        assertThrows(IOException.class, () -> DeleteRecordsCommand.main(new 
String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--offset-json-file", "/not/existing/file"
+        }));
+    }
+
+    @Test
+    public void testCommandConfigNotExists() {
+        assertThrows(NoSuchFileException.class, () -> 
DeleteRecordsCommand.main(new String[] {
+            "--bootstrap-server", "localhost:9092",
+            "--offset-json-file", "/not/existing/file",
+            "--command-config", "/another/not/existing/file"
+        }));
+    }
+
+    @Test
+    public void testWrongVersion() {
+        assertCommandThrows(JsonProcessingException.class, 
"{\"version\":\"string\"}");
+        assertCommandThrows(AdminOperationException.class, "{\"version\":2}");
+    }
+
+    @Test
+    public void testWrongPartitions() {
+        assertCommandThrows(AdminOperationException.class, "{\"version\":1}");
+        assertCommandThrows(JsonProcessingException.class, 
"{\"partitions\":2}");
+        assertCommandThrows(JsonProcessingException.class, 
"{\"partitions\":{}}");
+        assertCommandThrows(JsonProcessingException.class, 
"{\"partitions\":[{}]}");
+        assertCommandThrows(JsonProcessingException.class, 
"{\"partitions\":[{\"topic\":\"t\"}]}");
+        assertCommandThrows(JsonProcessingException.class, 
"{\"partitions\":[{\"topic\":\"t\", \"partition\": \"\"}]}");
+        assertCommandThrows(JsonProcessingException.class, 
"{\"partitions\":[{\"topic\":\"t\", \"partition\": 0}]}");
+        assertCommandThrows(JsonProcessingException.class, 
"{\"partitions\":[{\"topic\":\"t\", \"offset\":0}]}");
+    }
+
+    @Test
+    public void testParse() throws Exception {
+        Map<TopicPartition, List<Long>> res = 
DeleteRecordsCommand.parseOffsetJsonStringWithoutDedup(
+            "{\"partitions\":[" +
+                "{\"topic\":\"t\", \"partition\":0, \"offset\":0}," +
+                "{\"topic\":\"t\", \"partition\":1, \"offset\":1, 
\"ignored\":\"field\"}," +
+                "{\"topic\":\"t\", \"partition\":0, \"offset\":2}," +
+                "{\"topic\":\"t\", \"partition\":0, \"offset\":0}" +
+            "]}"
+        );
+
+        assertEquals(2, res.size());
+        assertEquals(Arrays.asList(0L, 2L, 0L), res.get(new 
TopicPartition("t", 0)));
+        assertEquals(Collections.singletonList(1L), res.get(new 
TopicPartition("t", 1)));
+    }
+
+    /**
+     * Asserts that {@link 
DeleteRecordsCommand#parseOffsetJsonStringWithoutDedup(String)} throws {@link 
AdminOperationException}.
+     * @param jsonData Data to check.
+     */
+    private static void assertCommandThrows(Class<? extends Exception> 
expectedException, String jsonData) {
+        assertThrows(
+            expectedException,
+            () -> 
DeleteRecordsCommand.parseOffsetJsonStringWithoutDedup(jsonData)
+        );
+    }
+}
\ No newline at end of file


Reply via email to