This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4bba2c8a32a KAFKA-14591: Move DeleteRecordsCommand to tools (#13278)
4bba2c8a32a is described below
commit 4bba2c8a32a3e35f6870cf3f738c0eef8bb652d2
Author: Nikolay <[email protected]>
AuthorDate: Fri Jul 21 18:30:28 2023 +0300
KAFKA-14591: Move DeleteRecordsCommand to tools (#13278)
Reviewers: Mickael Maison <[email protected]>, Federico Valeri
<[email protected]>
---
bin/kafka-delete-records.sh | 2 +-
bin/windows/kafka-delete-records.bat | 2 +-
core/src/main/scala/kafka/admin/AdminUtils.scala | 2 +-
.../scala/kafka/admin/DeleteRecordsCommand.scala | 137 ---------------
.../scala/kafka/admin/LeaderElectionCommand.scala | 2 +-
.../kafka/admin/ReassignPartitionsCommand.scala | 2 +-
core/src/main/scala/kafka/admin/TopicCommand.scala | 2 +-
.../scala/kafka/controller/KafkaController.scala | 3 +-
.../main/scala/kafka/server/ZkAdminManager.scala | 3 +-
core/src/main/scala/kafka/zk/AdminZkClient.scala | 3 +-
.../scala/unit/kafka/admin/AddPartitionsTest.scala | 1 +
.../admin/LeaderElectionCommandErrorTest.scala | 2 +-
.../kafka/admin/LeaderElectionCommandTest.scala | 3 +-
.../kafka/admin/ReassignPartitionsUnitTest.scala | 2 +-
.../scala/unit/kafka/admin/TopicCommandTest.scala | 2 +-
.../unit/kafka/server/DynamicConfigTest.scala | 2 +-
.../scala/unit/kafka/zk/AdminZkClientTest.scala | 1 +
.../server/common/AdminCommandFailedException.java | 21 ++-
.../server/common/AdminOperationException.java | 19 ++-
.../apache/kafka/tools/DeleteRecordsCommand.java | 183 +++++++++++++++++++++
.../kafka/tools/DeleteRecordsCommandTest.java | 182 ++++++++++++++++++++
21 files changed, 408 insertions(+), 168 deletions(-)
diff --git a/bin/kafka-delete-records.sh b/bin/kafka-delete-records.sh
index 8726f919992..e9db8f95c58 100755
--- a/bin/kafka-delete-records.sh
+++ b/bin/kafka-delete-records.sh
@@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-exec $(dirname $0)/kafka-run-class.sh kafka.admin.DeleteRecordsCommand "$@"
+exec $(dirname $0)/kafka-run-class.sh
org.apache.kafka.tools.DeleteRecordsCommand "$@"
diff --git a/bin/windows/kafka-delete-records.bat
b/bin/windows/kafka-delete-records.bat
index d07e05f88a2..a883ec707e9 100644
--- a/bin/windows/kafka-delete-records.bat
+++ b/bin/windows/kafka-delete-records.bat
@@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
-"%~dp0kafka-run-class.bat" kafka.admin.DeleteRecordsCommand %*
+"%~dp0kafka-run-class.bat" org.apache.kafka.tools.DeleteRecordsCommand %*
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala
b/core/src/main/scala/kafka/admin/AdminUtils.scala
index a37e3d68e64..5ac09ab5348 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -18,9 +18,9 @@
package kafka.admin
import java.util.Random
-
import kafka.utils.Logging
import org.apache.kafka.common.errors.{InvalidPartitionsException,
InvalidReplicationFactorException}
+import org.apache.kafka.server.common.AdminOperationException
import collection.{Map, mutable, _}
diff --git a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
deleted file mode 100644
index b1747313708..00000000000
--- a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.admin
-
-import java.io.PrintStream
-import java.util.Properties
-import kafka.common.AdminCommandFailedException
-import kafka.utils.json.JsonValue
-import kafka.utils.{CoreUtils, Json}
-import org.apache.kafka.clients.admin.{Admin, RecordsToDelete}
-import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
-
-import scala.jdk.CollectionConverters._
-import scala.collection.Seq
-
-/**
- * A command for delete records of the given partitions down to the specified
offset.
- */
-object DeleteRecordsCommand {
-
- private[admin] val EarliestVersion = 1
-
- def main(args: Array[String]): Unit = {
- execute(args, System.out)
- }
-
- def parseOffsetJsonStringWithoutDedup(jsonData: String):
Seq[(TopicPartition, Long)] = {
- Json.parseFull(jsonData) match {
- case Some(js) =>
- val version = js.asJsonObject.get("version") match {
- case Some(jsonValue) => jsonValue.to[Int]
- case None => EarliestVersion
- }
- parseJsonData(version, js)
- case None => throw new AdminOperationException("The input string is not
a valid JSON")
- }
- }
-
- def parseJsonData(version: Int, js: JsonValue): Seq[(TopicPartition, Long)]
= {
- version match {
- case 1 =>
- js.asJsonObject.get("partitions") match {
- case Some(partitions) =>
- partitions.asJsonArray.iterator.map(_.asJsonObject).map {
partitionJs =>
- val topic = partitionJs("topic").to[String]
- val partition = partitionJs("partition").to[Int]
- val offset = partitionJs("offset").to[Long]
- new TopicPartition(topic, partition) -> offset
- }.toBuffer
- case _ => throw new AdminOperationException("Missing partitions
field");
- }
- case _ => throw new AdminOperationException(s"Not supported version
field value $version")
- }
- }
-
- def execute(args: Array[String], out: PrintStream): Unit = {
- val opts = new DeleteRecordsCommandOptions(args)
- val adminClient = createAdminClient(opts)
- val offsetJsonFile = opts.options.valueOf(opts.offsetJsonFileOpt)
- val offsetJsonString = Utils.readFileAsString(offsetJsonFile)
- val offsetSeq = parseOffsetJsonStringWithoutDedup(offsetJsonString)
-
- val duplicatePartitions = CoreUtils.duplicates(offsetSeq.map { case (tp,
_) => tp })
- if (duplicatePartitions.nonEmpty)
- throw new AdminCommandFailedException("Offset json file contains
duplicate topic partitions: %s".format(duplicatePartitions.mkString(",")))
-
- val recordsToDelete = offsetSeq.map { case (topicPartition, offset) =>
- (topicPartition, RecordsToDelete.beforeOffset(offset))
- }.toMap.asJava
-
- out.println("Executing records delete operation")
- val deleteRecordsResult = adminClient.deleteRecords(recordsToDelete)
- out.println("Records delete operation completed:")
-
- deleteRecordsResult.lowWatermarks.forEach { (tp, partitionResult) =>
- try out.println(s"partition: $tp\tlow_watermark:
${partitionResult.get.lowWatermark}")
- catch {
- case e: Exception => out.println(s"partition: $tp\terror:
${e.getMessage}")
- }
- }
-
- adminClient.close()
- }
-
- private def createAdminClient(opts: DeleteRecordsCommandOptions): Admin = {
- val props = if (opts.options.has(opts.commandConfigOpt))
- Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
- else
- new Properties()
- props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
opts.options.valueOf(opts.bootstrapServerOpt))
- Admin.create(props)
- }
-
- class DeleteRecordsCommandOptions(args: Array[String]) extends
CommandDefaultOptions(args) {
- val BootstrapServerDoc = "REQUIRED: The server to connect to."
- val offsetJsonFileDoc = "REQUIRED: The JSON file with offset per
partition. The format to use is:\n" +
- "{\"partitions\":\n [{\"topic\": \"foo\",
\"partition\": 1, \"offset\": 1}],\n \"version\":1\n}"
- val CommandConfigDoc = "A property file containing configs to be passed to
Admin Client."
-
- val bootstrapServerOpt = parser.accepts("bootstrap-server",
BootstrapServerDoc)
- .withRequiredArg
- .describedAs("server(s) to use for
bootstrapping")
- .ofType(classOf[String])
- val offsetJsonFileOpt = parser.accepts("offset-json-file",
offsetJsonFileDoc)
- .withRequiredArg
- .describedAs("Offset json file path")
- .ofType(classOf[String])
- val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
- .withRequiredArg
- .describedAs("command config property file
path")
- .ofType(classOf[String])
-
- options = parser.parse(args : _*)
-
- CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to delete
records of the given partitions down to the specified offset.")
-
- CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt,
offsetJsonFileOpt)
- }
-}
diff --git a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
index 140f4b70177..868c54916e9 100644
--- a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
@@ -19,7 +19,6 @@ package kafka.admin
import java.util.Properties
import java.util.concurrent.ExecutionException
import joptsimple.util.EnumConverter
-import kafka.common.AdminCommandFailedException
import kafka.utils.CoreUtils
import kafka.utils.Implicits._
import kafka.utils.Json
@@ -31,6 +30,7 @@ import
org.apache.kafka.common.errors.ClusterAuthorizationException
import org.apache.kafka.common.errors.ElectionNotNeededException
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.common.{AdminCommandFailedException,
AdminOperationException}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index d1b75c483eb..40f688c7085 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -19,7 +19,6 @@ package kafka.admin
import java.util
import java.util.Optional
import java.util.concurrent.ExecutionException
-import kafka.common.AdminCommandFailedException
import kafka.server.DynamicConfig
import kafka.utils.{CoreUtils, Exit, Json, Logging}
import kafka.utils.Implicits._
@@ -30,6 +29,7 @@ import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{ReplicaNotAvailableException,
UnknownTopicOrPartitionException}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, KafkaFuture, TopicPartition,
TopicPartitionReplica}
+import org.apache.kafka.server.common.{AdminCommandFailedException,
AdminOperationException}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.storage.internals.log.LogConfig
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala
b/core/src/main/scala/kafka/admin/TopicCommand.scala
index b4c74d35cdf..568dd1798a1 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -20,7 +20,6 @@ package kafka.admin
import java.util
import java.util.{Collections, Optional, Properties}
import joptsimple._
-import kafka.common.AdminCommandFailedException
import kafka.utils._
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.CreatePartitionsOptions
@@ -33,6 +32,7 @@ import org.apache.kafka.common.config.{ConfigResource,
TopicConfig}
import org.apache.kafka.common.errors.{ClusterAuthorizationException,
TopicExistsException, UnsupportedVersionException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.common.{AdminCommandFailedException,
AdminOperationException}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.kafka.server.util.TopicFilter.IncludeList
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
b/core/src/main/scala/kafka/controller/KafkaController.scala
index c3f76e83d12..fa2575d9d8b 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -19,7 +19,6 @@ package kafka.controller
import com.yammer.metrics.core.Timer
import java.util.concurrent.TimeUnit
-import kafka.admin.AdminOperationException
import kafka.api._
import kafka.common._
import kafka.cluster.Broker
@@ -45,7 +44,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError,
LeaderAndIsrResponse, UpdateFeaturesRequest, UpdateMetadataResponse}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.metadata.LeaderRecoveryState
-import org.apache.kafka.server.common.ProducerIdsBlock
+import org.apache.kafka.server.common.{AdminOperationException,
ProducerIdsBlock}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.KafkaScheduler
import org.apache.zookeeper.KeeperException
diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala
b/core/src/main/scala/kafka/server/ZkAdminManager.scala
index 97cf2b2f713..31ab40430d7 100644
--- a/core/src/main/scala/kafka/server/ZkAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala
@@ -18,7 +18,7 @@ package kafka.server
import java.util
import java.util.Properties
-import kafka.admin.{AdminOperationException, AdminUtils}
+import kafka.admin.AdminUtils
import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.server.ConfigAdminManager.{prepareIncrementalConfigs,
toLoggableProps}
import kafka.server.DynamicConfig.QuotaConfigs
@@ -47,6 +47,7 @@ import org.apache.kafka.common.requests.CreateTopicsRequest._
import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError}
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils,
ScramFormatter}
import org.apache.kafka.common.utils.Sanitizer
+import org.apache.kafka.server.common.AdminOperationException
import org.apache.kafka.storage.internals.log.LogConfig
import scala.collection.{Map, mutable, _}
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala
b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index c735d53eba9..07ca35cc9ae 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -17,7 +17,7 @@
package kafka.zk
import java.util.Properties
-import kafka.admin.{AdminOperationException, AdminUtils, BrokerMetadata,
RackAwareMode}
+import kafka.admin.{AdminUtils, BrokerMetadata, RackAwareMode}
import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.controller.ReplicaAssignment
import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
@@ -26,6 +26,7 @@ import kafka.utils.Implicits._
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.server.common.AdminOperationException
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.zookeeper.KeeperException.NodeExistsException
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 07d00963d52..ea57207e278 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.clients.admin.{Admin, NewPartitions,
NewTopic}
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
+import org.apache.kafka.server.common.AdminOperationException
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
diff --git
a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala
b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala
index eaef9367a1d..6d36120b136 100644
--- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala
@@ -16,8 +16,8 @@
*/
package kafka.admin
-import kafka.common.AdminCommandFailedException
import org.apache.kafka.common.errors.TimeoutException
+import org.apache.kafka.server.common.AdminCommandFailedException
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
diff --git
a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
index 278bcf0e68c..ff6cd2cad60 100644
--- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
@@ -18,8 +18,6 @@ package kafka.admin
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path}
-
-import kafka.common.AdminCommandFailedException
import kafka.server.IntegrationTestUtils.createTopic
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
@@ -29,6 +27,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
+import org.apache.kafka.server.common.AdminCommandFailedException
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{BeforeEach, Tag}
diff --git
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala
index 71bbcb91601..fab7907e17b 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala
@@ -20,12 +20,12 @@ package kafka.admin
import java.util.concurrent.ExecutionException
import java.util.{Arrays, Collections}
import kafka.admin.ReassignPartitionsCommand._
-import kafka.common.AdminCommandFailedException
import kafka.utils.Exit
import org.apache.kafka.clients.admin.{Config, MockAdminClient,
PartitionReassignment}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{InvalidReplicationFactorException,
UnknownTopicOrPartitionException}
import org.apache.kafka.common.{Node, TopicPartition, TopicPartitionInfo,
TopicPartitionReplica}
+import org.apache.kafka.server.common.{AdminCommandFailedException,
AdminOperationException}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertThrows, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout}
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 59def64ec85..088c9dd0205 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -17,13 +17,13 @@
package kafka.admin
import kafka.admin.TopicCommand.{PartitionDescription, TopicCommandOptions,
TopicService}
-import kafka.common.AdminCommandFailedException
import kafka.utils.Exit
import org.apache.kafka.clients.admin.{Admin, AdminClientTestUtils,
CreatePartitionsOptions, CreateTopicsOptions, DeleteTopicsOptions,
NewPartitions, NewTopic, PartitionReassignment, TopicDescription}
import org.apache.kafka.common.Node
import org.apache.kafka.common.TopicPartitionInfo
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.server.common.{AdminCommandFailedException,
AdminOperationException}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatcher
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
index 621c94f67e6..ddcb6377357 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
@@ -16,11 +16,11 @@
*/
package kafka.server
-import kafka.admin.AdminOperationException
import kafka.utils.CoreUtils._
import kafka.server.QuorumTestHarness
import org.apache.kafka.common.config._
import org.apache.kafka.common.config.internals.QuotaConfigs
+import org.apache.kafka.server.common.AdminOperationException
import org.junit.jupiter.api.Assertions.assertThrows
import org.junit.jupiter.api.Test
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
index 833fd930414..ba0a2583598 100644
--- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.config.internals.QuotaConfigs
import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException,
InvalidTopicException, TopicExistsException}
import org.apache.kafka.common.metrics.Quota
+import org.apache.kafka.server.common.AdminOperationException
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions._
diff --git a/core/src/main/scala/kafka/admin/AdminOperationException.scala
b/server-common/src/main/java/org/apache/kafka/server/common/AdminCommandFailedException.java
similarity index 60%
rename from core/src/main/scala/kafka/admin/AdminOperationException.scala
rename to
server-common/src/main/java/org/apache/kafka/server/common/AdminCommandFailedException.java
index a45b3f7e93a..62bb8425546 100644
--- a/core/src/main/scala/kafka/admin/AdminOperationException.scala
+++
b/server-common/src/main/java/org/apache/kafka/server/common/AdminCommandFailedException.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -15,9 +15,14 @@
* limitations under the License.
*/
-package kafka.admin
+package org.apache.kafka.server.common;
-class AdminOperationException(val error: String, cause: Throwable) extends
RuntimeException(error, cause) {
- def this(error: Throwable) = this(error.getMessage, error)
- def this(msg: String) = this(msg, null)
-}
\ No newline at end of file
+public class AdminCommandFailedException extends RuntimeException {
+ public AdminCommandFailedException(String message) {
+ super(message);
+ }
+
+ public AdminCommandFailedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/core/src/main/scala/kafka/common/AdminCommandFailedException.scala
b/server-common/src/main/java/org/apache/kafka/server/common/AdminOperationException.java
similarity index 61%
rename from core/src/main/scala/kafka/common/AdminCommandFailedException.scala
rename to
server-common/src/main/java/org/apache/kafka/server/common/AdminOperationException.java
index 94e28641dd0..03826d1ac00 100644
--- a/core/src/main/scala/kafka/common/AdminCommandFailedException.scala
+++
b/server-common/src/main/java/org/apache/kafka/server/common/AdminOperationException.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -15,9 +15,14 @@
* limitations under the License.
*/
-package kafka.common
+package org.apache.kafka.server.common;
-class AdminCommandFailedException(message: String, cause: Throwable) extends
RuntimeException(message, cause) {
- def this(message: String) = this(message, null)
- def this() = this(null, null)
+public class AdminOperationException extends RuntimeException {
+ public AdminOperationException(String message) {
+ super(message);
+ }
+
+ public AdminOperationException(Throwable cause) {
+ super(cause.getMessage(), cause);
+ }
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/DeleteRecordsCommand.java
b/tools/src/main/java/org/apache/kafka/tools/DeleteRecordsCommand.java
new file mode 100644
index 00000000000..5e44865b200
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/DeleteRecordsCommand.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DeleteRecordsResult;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.StringJoiner;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * A command for delete records of the given partitions down to the specified
offset.
+ */
+public class DeleteRecordsCommand {
+ private static final int EARLIEST_VERSION = 1;
+
+ private static final DecodeJson.DecodeInteger INT = new
DecodeJson.DecodeInteger();
+
+ private static final DecodeJson.DecodeLong LONG = new
DecodeJson.DecodeLong();
+
+ private static final DecodeJson.DecodeString STRING = new
DecodeJson.DecodeString();
+
+ public static void main(String[] args) throws Exception {
+ execute(args, System.out);
+ }
+
+ static Map<TopicPartition, List<Long>>
parseOffsetJsonStringWithoutDedup(String jsonData) throws
JsonProcessingException {
+ JsonValue js = Json.parseFull(jsonData)
+ .orElseThrow(() -> new AdminOperationException("The input string
is not a valid JSON"));
+
+ Optional<JsonValue> version = js.asJsonObject().get("version");
+
+ return parseJsonData(version.isPresent() ? version.get().to(INT) :
EARLIEST_VERSION, js);
+ }
+
+ private static Map<TopicPartition, List<Long>> parseJsonData(int version,
JsonValue js) throws JsonMappingException {
+ if (version == 1) {
+ JsonValue partitions = js.asJsonObject().get("partitions")
+ .orElseThrow(() -> new AdminOperationException("Missing
partitions field"));
+
+ Map<TopicPartition, List<Long>> res = new HashMap<>();
+
+ Iterator<JsonValue> iterator = partitions.asJsonArray().iterator();
+
+ while (iterator.hasNext()) {
+ JsonObject partitionJs = iterator.next().asJsonObject();
+
+ String topic = partitionJs.apply("topic").to(STRING);
+ int partition = partitionJs.apply("partition").to(INT);
+ long offset = partitionJs.apply("offset").to(LONG);
+
+ res.computeIfAbsent(new TopicPartition(topic, partition), k ->
new ArrayList<>()).add(offset);
+ }
+
+ return res;
+ }
+
+ throw new AdminOperationException("Not supported version field value "
+ version);
+ }
+
+ public static void execute(String[] args, PrintStream out) throws
IOException {
+ DeleteRecordsCommandOptions opts = new
DeleteRecordsCommandOptions(args);
+
+ try (Admin adminClient = createAdminClient(opts)) {
+ execute(adminClient,
Utils.readFileAsString(opts.options.valueOf(opts.offsetJsonFileOpt)), out);
+ }
+ }
+
+ static void execute(Admin adminClient, String offsetJsonString,
PrintStream out) throws JsonProcessingException {
+ Map<TopicPartition, List<Long>> offsetSeq =
parseOffsetJsonStringWithoutDedup(offsetJsonString);
+
+ Set<TopicPartition> duplicatePartitions = offsetSeq.entrySet().stream()
+ .filter(e -> e.getValue().size() > 1)
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
+
+ if (!duplicatePartitions.isEmpty()) {
+ StringJoiner duplicates = new StringJoiner(",");
+ duplicatePartitions.forEach(tp -> duplicates.add(tp.toString()));
+ throw new AdminCommandFailedException(
+ String.format("Offset json file contains duplicate topic
partitions: %s", duplicates)
+ );
+ }
+
+ Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
+
+ for (Map.Entry<TopicPartition, List<Long>> e : offsetSeq.entrySet())
+ recordsToDelete.put(e.getKey(),
RecordsToDelete.beforeOffset(e.getValue().get(0)));
+
+ out.println("Executing records delete operation");
+ DeleteRecordsResult deleteRecordsResult =
adminClient.deleteRecords(recordsToDelete);
+ out.println("Records delete operation completed:");
+
+ deleteRecordsResult.lowWatermarks().forEach((tp, partitionResult) -> {
+ try {
+ out.printf("partition: %s\tlow_watermark: %s%n", tp,
partitionResult.get().lowWatermark());
+ } catch (InterruptedException | ExecutionException e) {
+ out.printf("partition: %s\terror: %s%n", tp, e.getMessage());
+ }
+ });
+ }
+
+ private static Admin createAdminClient(DeleteRecordsCommandOptions opts)
throws IOException {
+ Properties props = opts.options.has(opts.commandConfigOpt)
+ ? Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
+ : new Properties();
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
opts.options.valueOf(opts.bootstrapServerOpt));
+ return Admin.create(props);
+ }
+
+ private static class DeleteRecordsCommandOptions extends
CommandDefaultOptions {
+ private final OptionSpec<String> bootstrapServerOpt;
+ private final OptionSpec<String> offsetJsonFileOpt;
+ private final OptionSpec<String> commandConfigOpt;
+
+ public DeleteRecordsCommandOptions(String[] args) {
+ super(args);
+
+ bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED:
The server to connect to.")
+ .withRequiredArg()
+ .describedAs("server(s) to use for bootstrapping")
+ .ofType(String.class);
+
+ offsetJsonFileOpt = parser.accepts("offset-json-file", "REQUIRED:
The JSON file with offset per partition. " +
+ "The format to use is:\n" +
+ "{\"partitions\":\n [{\"topic\": \"foo\", \"partition\":
1, \"offset\": 1}],\n \"version\":1\n}")
+ .withRequiredArg()
+ .describedAs("Offset json file path")
+ .ofType(String.class);
+
+ commandConfigOpt = parser.accepts("command-config", "A property
file containing configs to be passed to Admin Client.")
+ .withRequiredArg()
+ .describedAs("command config property file path")
+ .ofType(String.class);
+
+ options = parser.parse(args);
+
+ CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to
delete records of the given partitions down to the specified offset.");
+
+ CommandLineUtils.checkRequiredArgs(parser, options,
bootstrapServerOpt, offsetJsonFileOpt);
+ }
+ }
+}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
new file mode 100644
index 00000000000..2c06fb66ba0
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.io.IOException;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL)
+@Tag("integration")
+public class DeleteRecordsCommandTest {
+
+ private final ClusterInstance cluster;
+ public DeleteRecordsCommandTest(ClusterInstance cluster) {
+ this.cluster = cluster;
+ }
+
+ @ClusterTest
+ public void testCommand() throws Exception {
+ Properties adminProps = new Properties();
+
+ adminProps.put(AdminClientConfig.RETRIES_CONFIG, 1);
+
+ try (Admin admin = cluster.createAdminClient(adminProps)) {
+ assertThrows(
+ AdminCommandFailedException.class,
+ () -> DeleteRecordsCommand.execute(admin, "{\"partitions\":[" +
+ "{\"topic\":\"t\", \"partition\":0, \"offset\":1}," +
+ "{\"topic\":\"t\", \"partition\":0, \"offset\":1}]" +
+ "}", System.out),
+ "Offset json file contains duplicate topic partitions: t-0"
+ );
+
+ admin.createTopics(Collections.singleton(new NewTopic("t", 1,
(short) 1))).all().get();
+
+ Properties props = new Properties();
+
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
cluster.bootstrapServers());
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
+
+ try (KafkaProducer<?, String> producer = new
KafkaProducer<>(props)) {
+ producer.send(new ProducerRecord<>("t", "1")).get();
+ producer.send(new ProducerRecord<>("t", "2")).get();
+ producer.send(new ProducerRecord<>("t", "3")).get();
+ }
+
+ executeAndAssertOutput(
+ "{\"partitions\":[{\"topic\":\"t\", \"partition\":0,
\"offset\":1}]}",
+ "partition: t-0\tlow_watermark: 1",
+ admin
+ );
+
+ executeAndAssertOutput(
+ "{\"partitions\":[{\"topic\":\"t\", \"partition\":42,
\"offset\":42}]}",
+ "partition: t-42\terror",
+ admin
+ );
+ }
+ }
+
+ private static void executeAndAssertOutput(String json, String expOut,
Admin admin) {
+ String output = ToolsTestUtils.captureStandardOut(() -> {
+ try {
+ DeleteRecordsCommand.execute(admin, json, System.out);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ assertTrue(output.contains(expOut));
+ }
+}
+
+/**
+ * Unit test of {@link DeleteRecordsCommand} tool.
+ */
+class DeleteRecordsCommandUnitTest {
+ @Test
+ public void testOffsetFileNotExists() {
+ assertThrows(IOException.class, () -> DeleteRecordsCommand.main(new
String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--offset-json-file", "/not/existing/file"
+ }));
+ }
+
+ @Test
+ public void testCommandConfigNotExists() {
+ assertThrows(NoSuchFileException.class, () ->
DeleteRecordsCommand.main(new String[] {
+ "--bootstrap-server", "localhost:9092",
+ "--offset-json-file", "/not/existing/file",
+ "--command-config", "/another/not/existing/file"
+ }));
+ }
+
+ @Test
+ public void testWrongVersion() {
+ assertCommandThrows(JsonProcessingException.class,
"{\"version\":\"string\"}");
+ assertCommandThrows(AdminOperationException.class, "{\"version\":2}");
+ }
+
+ @Test
+ public void testWrongPartitions() {
+ assertCommandThrows(AdminOperationException.class, "{\"version\":1}");
+ assertCommandThrows(JsonProcessingException.class,
"{\"partitions\":2}");
+ assertCommandThrows(JsonProcessingException.class,
"{\"partitions\":{}}");
+ assertCommandThrows(JsonProcessingException.class,
"{\"partitions\":[{}]}");
+ assertCommandThrows(JsonProcessingException.class,
"{\"partitions\":[{\"topic\":\"t\"}]}");
+ assertCommandThrows(JsonProcessingException.class,
"{\"partitions\":[{\"topic\":\"t\", \"partition\": \"\"}]}");
+ assertCommandThrows(JsonProcessingException.class,
"{\"partitions\":[{\"topic\":\"t\", \"partition\": 0}]}");
+ assertCommandThrows(JsonProcessingException.class,
"{\"partitions\":[{\"topic\":\"t\", \"offset\":0}]}");
+ }
+
+ @Test
+ public void testParse() throws Exception {
+ Map<TopicPartition, List<Long>> res =
DeleteRecordsCommand.parseOffsetJsonStringWithoutDedup(
+ "{\"partitions\":[" +
+ "{\"topic\":\"t\", \"partition\":0, \"offset\":0}," +
+ "{\"topic\":\"t\", \"partition\":1, \"offset\":1,
\"ignored\":\"field\"}," +
+ "{\"topic\":\"t\", \"partition\":0, \"offset\":2}," +
+ "{\"topic\":\"t\", \"partition\":0, \"offset\":0}" +
+ "]}"
+ );
+
+ assertEquals(2, res.size());
+ assertEquals(Arrays.asList(0L, 2L, 0L), res.get(new
TopicPartition("t", 0)));
+ assertEquals(Collections.singletonList(1L), res.get(new
TopicPartition("t", 1)));
+ }
+
+ /**
+ * Asserts that {@link
DeleteRecordsCommand#parseOffsetJsonStringWithoutDedup(String)} throws {@link
AdminOperationException}.
+ * @param jsonData Data to check.
+ */
+ private static void assertCommandThrows(Class<? extends Exception>
expectedException, String jsonData) {
+ assertThrows(
+ expectedException,
+ () ->
DeleteRecordsCommand.parseOffsetJsonStringWithoutDedup(jsonData)
+ );
+ }
+}
\ No newline at end of file