Repository: kafka
Updated Branches:
  refs/heads/0.11.0 e47f2d3c4 -> d77c057dd


KAFKA-5644; Fix Reset Consumer Group Offset tool to handle minute component of 
TimeZone

Author: Manikumar Reddy <manikumar.re...@gmail.com>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #3626 from omkreddy/KAFKA-5644

(cherry picked from commit 5b99a288cd2d4461380b041067768ee6c1b2efc7)
Signed-off-by: Jason Gustafson <ja...@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d77c057d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d77c057d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d77c057d

Branch: refs/heads/0.11.0
Commit: d77c057dddbe5950566b741a4c5e62f308b80130
Parents: e47f2d3
Author: Manikumar Reddy <manikumar.re...@gmail.com>
Authored: Wed Aug 23 14:30:20 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Aug 23 14:32:24 2017 -0700

----------------------------------------------------------------------
 .../kafka/admin/ConsumerGroupCommand.scala      | 13 ++-
 .../admin/ResetConsumerGroupOffsetTest.scala    | 96 ++++++++------------
 2 files changed, 47 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d77c057d/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 4c1f593..6690d89 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -17,7 +17,7 @@
 
 package kafka.admin
 
-import java.text.SimpleDateFormat
+import java.text.{ParseException, SimpleDateFormat}
 import java.util.{Date, Properties}
 import javax.xml.datatype.DatatypeFactory
 
@@ -665,13 +665,18 @@ object ConsumerGroupCommand extends Logging {
       }
     }
 
-    private def getDateTime: java.lang.Long = {
+    private[admin] def getDateTime: java.lang.Long = {
       val datetime: String = opts.options.valueOf(opts.resetToDatetimeOpt) 
match {
         case ts if ts.split("T")(1).contains("+") || 
ts.split("T")(1).contains("-") || ts.split("T")(1).contains("Z") => ts.toString
         case ts => s"${ts}Z"
       }
-      val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX")
-      val date = format.parse(datetime)
+      val date = {
+        try {
+          new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse(datetime)
+        } catch {
+          case e: ParseException => new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(datetime)
+        }
+      }
       date.getTime
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d77c057d/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala 
b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index 22958a9..effcd92 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -10,10 +10,10 @@
   * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
   * specific language governing permissions and limitations under the License.
   */
-package unit.kafka.admin
+package kafka.admin
 
 import java.io.{BufferedWriter, File, FileWriter}
-import java.text.SimpleDateFormat
+import java.text.{ParseException, SimpleDateFormat}
 import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
 import java.util.{Calendar, Collections, Date, Properties}
 
@@ -25,7 +25,8 @@ import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.common.errors.WakeupException
 import org.apache.kafka.common.serialization.StringDeserializer
-import org.junit.{Before, After, Test}
+import org.junit.{After, Before, Test}
+
 import scala.collection.mutable.ArrayBuffer
 
 /**
@@ -156,7 +157,7 @@ class ResetConsumerGroupOffsetTest extends 
KafkaServerTestHarness {
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
     TestUtils.produceMessages(servers, topic1, 50, acks = 1, 100 * 1000)
 
-    val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX")
+    val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
     val checkpoint = new Date()
 
     TestUtils.produceMessages(servers, topic1, 50, acks = 1, 100 * 1000)
@@ -196,6 +197,39 @@ class ResetConsumerGroupOffsetTest extends 
KafkaServerTestHarness {
   }
 
   @Test
+  def testDateTimeFormats() {
+    //check valid formats
+    invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"))
+    invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"))
+    invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"))
+    invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX"))
+    invokeGetDateTimeMethod(new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"))
+
+    //check some invalid formats
+    try {
+      invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"))
+      fail("Call to getDateTime should fail")
+    } catch {
+      case _: ParseException =>
+    }
+
+    try {
+      invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.X"))
+      fail("Call to getDateTime should fail")
+    } catch {
+      case _: ParseException =>
+    }
+  }
+
+  private def invokeGetDateTimeMethod(format: SimpleDateFormat) {
+    val checkpoint = new Date()
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics", "--to-datetime", format.format(checkpoint), 
"--execute")
+    val opts  = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = createConsumerGroupService(opts)
+    consumerGroupCommand.getDateTime
+  }
+
+  @Test
   def testResetOffsetsByDuration() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics", "--by-duration", "PT1M", "--execute")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
@@ -571,57 +605,3 @@ class ResetConsumerGroupOffsetTest extends 
KafkaServerTestHarness {
     service
   }
 }
-
-
-class ConsumerThread(broker: String, id: Int, groupId: String, topic: String) 
extends Runnable {
-  val props = new Properties
-  props.put("bootstrap.servers", broker)
-  props.put("group.id", groupId)
-  props.put("key.deserializer", classOf[StringDeserializer].getName)
-  props.put("value.deserializer", classOf[StringDeserializer].getName)
-  val consumer = new KafkaConsumer(props)
-
-  def run() {
-    try {
-      consumer.subscribe(Collections.singleton(topic))
-      while (true)
-        consumer.poll(Long.MaxValue)
-    } catch {
-      case _: WakeupException => // OK
-    } finally {
-      consumer.close()
-    }
-  }
-
-  def shutdown() {
-    consumer.wakeup()
-  }
-}
-
-class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: 
String, topic: String) {
-  val executor: ExecutorService = Executors.newFixedThreadPool(numConsumers)
-  var consumers: List[ConsumerThread] = List[ConsumerThread]()
-
-  for (i <- 1 to numConsumers) {
-    val consumer = new ConsumerThread(broker, i, groupId, topic)
-    consumers ++= List(consumer)
-    executor.submit(consumer)
-  }
-
-  Runtime.getRuntime.addShutdownHook(new Thread() {
-    override def run() {
-      shutdown()
-    }
-  })
-
-  def shutdown() {
-    consumers.foreach(_.shutdown())
-    executor.shutdown()
-    try {
-      executor.awaitTermination(5000, TimeUnit.MILLISECONDS)
-    } catch {
-      case e: InterruptedException =>
-        e.printStackTrace()
-    }
-  }
-}

Reply via email to