Repository: kafka Updated Branches: refs/heads/0.11.0 e47f2d3c4 -> d77c057dd
KAFKA-5644; Fix Reset Consumer Group Offset tool to handle minute component of TimeZone Author: Manikumar Reddy <manikumar.re...@gmail.com> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #3626 from omkreddy/KAFKA-5644 (cherry picked from commit 5b99a288cd2d4461380b041067768ee6c1b2efc7) Signed-off-by: Jason Gustafson <ja...@confluent.io> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d77c057d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d77c057d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d77c057d Branch: refs/heads/0.11.0 Commit: d77c057dddbe5950566b741a4c5e62f308b80130 Parents: e47f2d3 Author: Manikumar Reddy <manikumar.re...@gmail.com> Authored: Wed Aug 23 14:30:20 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Wed Aug 23 14:32:24 2017 -0700 ---------------------------------------------------------------------- .../kafka/admin/ConsumerGroupCommand.scala | 13 ++- .../admin/ResetConsumerGroupOffsetTest.scala | 96 ++++++++------------ 2 files changed, 47 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d77c057d/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 4c1f593..6690d89 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -17,7 +17,7 @@ package kafka.admin -import java.text.SimpleDateFormat +import java.text.{ParseException, SimpleDateFormat} import java.util.{Date, Properties} import javax.xml.datatype.DatatypeFactory @@ -665,13 +665,18 @@ object ConsumerGroupCommand extends Logging { } } - private def getDateTime: java.lang.Long = { + private[admin] def getDateTime: java.lang.Long = { val datetime: String = opts.options.valueOf(opts.resetToDatetimeOpt) match { case ts if ts.split("T")(1).contains("+") || ts.split("T")(1).contains("-") || ts.split("T")(1).contains("Z") => ts.toString case ts => s"${ts}Z" } - val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX") - val date = format.parse(datetime) + val date = { + try { + new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse(datetime) + } catch { + case e: ParseException => new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(datetime) + } + } date.getTime } http://git-wip-us.apache.org/repos/asf/kafka/blob/d77c057d/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala index 22958a9..effcd92 100644 --- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala @@ -10,10 +10,10 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package unit.kafka.admin +package kafka.admin import java.io.{BufferedWriter, File, FileWriter} -import java.text.SimpleDateFormat +import java.text.{ParseException, SimpleDateFormat} import java.util.concurrent.{ExecutorService, Executors, TimeUnit} import java.util.{Calendar, Collections, Date, Properties} @@ -25,7 +25,8 @@ import kafka.utils.TestUtils import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.errors.WakeupException import org.apache.kafka.common.serialization.StringDeserializer -import org.junit.{Before, After, Test} +import org.junit.{After, Before, Test} + import scala.collection.mutable.ArrayBuffer /** @@ -156,7 +157,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { AdminUtils.createTopic(zkUtils, topic1, 1, 1) TestUtils.produceMessages(servers, topic1, 50, acks = 1, 100 * 1000) - val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX") + val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX") val checkpoint = new Date() TestUtils.produceMessages(servers, topic1, 50, acks = 1, 100 * 1000) @@ -196,6 +197,39 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { } @Test + def testDateTimeFormats() { + //check valid formats + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")) + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")) + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX")) + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX")) + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")) + + //check some invalid formats + try { + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")) + fail("Call to getDateTime should fail") + } catch { + case _: ParseException => + } + + try { + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.X")) + fail("Call to getDateTime should fail") + } catch { + case _: ParseException => + } + } + + private def invokeGetDateTimeMethod(format: SimpleDateFormat) { + val checkpoint = new Date() + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(checkpoint), "--execute") + val opts = new ConsumerGroupCommandOptions(cgcArgs) + val consumerGroupCommand = createConsumerGroupService(opts) + consumerGroupCommand.getDateTime + } + + @Test def testResetOffsetsByDuration() { val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT1M", "--execute") val opts = new ConsumerGroupCommandOptions(cgcArgs) @@ -571,57 +605,3 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { service } } - - -class ConsumerThread(broker: String, id: Int, groupId: String, topic: String) extends Runnable { - val props = new Properties - props.put("bootstrap.servers", broker) - props.put("group.id", groupId) - props.put("key.deserializer", classOf[StringDeserializer].getName) - props.put("value.deserializer", classOf[StringDeserializer].getName) - val consumer = new KafkaConsumer(props) - - def run() { - try { - consumer.subscribe(Collections.singleton(topic)) - while (true) - consumer.poll(Long.MaxValue) - } catch { - case _: WakeupException => // OK - } finally { - consumer.close() - } - } - - def shutdown() { - consumer.wakeup() - } -} - -class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String, topic: String) { - val executor: ExecutorService = Executors.newFixedThreadPool(numConsumers) - var consumers: List[ConsumerThread] = List[ConsumerThread]() - - for (i <- 1 to numConsumers) { - val consumer = new ConsumerThread(broker, i, groupId, topic) - consumers ++= List(consumer) - executor.submit(consumer) - } - - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run() { - shutdown() - } - }) - - def shutdown() { - consumers.foreach(_.shutdown()) - executor.shutdown() - try { - executor.awaitTermination(5000, TimeUnit.MILLISECONDS) - } catch { - case e: InterruptedException => - e.printStackTrace() - } - } -}