Repository: kafka
Updated Branches:
  refs/heads/0.11.0 f16fc68ec -> 3cda63697


MINOR: Fix ClassCastException in ConsumerGroupCommand

Offset and partition are not converted from String to
long and int correctly.

Running the command line with --from-file option causes
the following exception:

java.lang.ClassCastException: java.lang.String cannot be
cast to java.lang.Integer

Reason: asInstanceOf used for the conversion.

Also, unit test is using --to-earliest and --from-file
together when executing the test. This is executing
--to-earliest option only and ignoring --from-file
option. Since the preparation part is also using
--to-earliest to create the file, this unit test
passes without testing --from-file option. Fixed
the unit test too.

Author: Erkan Unal <eu...@cisco.com>

Reviewers: Ismael Juma <ism...@juma.me.uk>

Closes #3938 from eu657/eu657-patch-1

(cherry picked from commit 71a65d95a257d98925b5f0f6f2227504cf5043a2)
Signed-off-by: Ismael Juma <ism...@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3cda6369
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3cda6369
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3cda6369

Branch: refs/heads/0.11.0
Commit: 3cda636973e4c1bd303a7913da55d7d66c201d3d
Parents: f16fc68
Author: Erkan Unal <eu...@cisco.com>
Authored: Fri Sep 22 22:34:05 2017 +0100
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Fri Sep 22 22:36:19 2017 +0100

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/ConsumerGroupCommand.scala |  4 ++--
 .../unit/kafka/admin/ResetConsumerGroupOffsetTest.scala   | 10 +++++-----
 2 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3cda6369/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 6690d89..d7033de 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -564,8 +564,8 @@ object ConsumerGroupCommand extends Logging {
       resetPlanCsv.split("\n")
         .map { line =>
           val Array(topic, partition, offset) = line.split(",").map(_.trim)
-          val topicPartition = new TopicPartition(topic, 
partition.asInstanceOf[Int])
-          val offsetAndMetadata = new 
OffsetAndMetadata(offset.asInstanceOf[Long])
+          val topicPartition = new TopicPartition(topic, partition.toInt)
+          val offsetAndMetadata = new OffsetAndMetadata(offset.toLong)
           (topicPartition, offsetAndMetadata)
         }.toMap
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3cda6369/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala 
b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index effcd92..e8822de 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -549,7 +549,7 @@ class ResetConsumerGroupOffsetTest extends 
KafkaServerTestHarness {
 
   @Test
   def testResetOffsetsExportImportPlan() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics", "--to-earliest", "--export")
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics", "--to-offset","2", "--export")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
@@ -564,18 +564,18 @@ class ResetConsumerGroupOffsetTest extends 
KafkaServerTestHarness {
       val bw = new BufferedWriter(new FileWriter(file))
       bw.write(consumerGroupCommand.exportOffsetsToReset(assignmentsToReset))
       bw.close()
-      assignmentsToReset.exists { assignment => assignment._2.offset() == 0 } 
&& file.exists()
+      assignmentsToReset.exists { assignment => assignment._2.offset() == 2 } 
&& file.exists()
     }, "Expected the consume all messages and save reset offsets plan to file")
 
 
-    val cgcArgsExec = Array("--bootstrap-server", brokerList, 
"--reset-offsets", "--group", group, "--all-topics", "--to-earliest", 
"--from-file", file.getCanonicalPath)
+    val cgcArgsExec = Array("--bootstrap-server", brokerList, 
"--reset-offsets", "--group", group, "--all-topics", "--from-file", 
file.getCanonicalPath)
     val optsExec = new ConsumerGroupCommandOptions(cgcArgsExec)
     val consumerGroupCommandExec = createConsumerGroupService(optsExec)
 
     TestUtils.waitUntilTrue(() => {
         val assignmentsToReset = consumerGroupCommandExec.resetOffsets()
-        assignmentsToReset.exists { assignment => assignment._2.offset() == 0 }
-    }, "Expected the consumer group to reset to offset 0 (earliest) by file.")
+        assignmentsToReset.exists { assignment => assignment._2.offset() == 2 }
+    }, "Expected the consumer group to reset to offset 2 according to the plan 
in the file.")
 
     file.deleteOnExit()
 

Reply via email to