Repository: kafka Updated Branches: refs/heads/trunk c197113a9 -> bc9237701
KAFKA-3140: Fix PatternSyntaxException and hand caused by it in Mirro⦠Fix PatternSyntaxException and hand caused by it in MirrorMaker on passing invalid java regex string as whitelist Author: Ashish Singh <[email protected]> Reviewers: Grant Henke, Gwen Shapira Closes #805 from SinghAsDev/KAFKA-3140 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bc923770 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bc923770 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bc923770 Branch: refs/heads/trunk Commit: bc9237701b06768c119e954ddb4cd2e61c24e305 Parents: c197113 Author: Ashish Singh <[email protected]> Authored: Fri Jan 22 16:24:47 2016 -0800 Committer: Gwen Shapira <[email protected]> Committed: Fri Jan 22 16:24:47 2016 -0800 ---------------------------------------------------------------------- core/src/main/scala/kafka/tools/MirrorMaker.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/bc923770/core/src/main/scala/kafka/tools/MirrorMaker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index f1d56b5..f03623a 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -20,7 +20,7 @@ package kafka.tools import java.util import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.concurrent.{CountDownLatch, TimeUnit} -import java.util.regex.Pattern +import java.util.regex.{PatternSyntaxException, Pattern} import java.util.{Collections, Properties} import com.yammer.metrics.core.Gauge @@ -385,8 +385,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { override def run() { info("Starting mirror maker thread " + threadName) - mirrorMakerConsumer.init() try { + mirrorMakerConsumer.init() + // We need the two while loop to make sure when old consumer is used, even there is no message we // still commit offset. When new consumer is used, this is handled by poll(timeout). while (!exitingOnSendFailure && !shuttingDown) { @@ -515,8 +516,15 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { override def init() { debug("Initiating new consumer") val consumerRebalanceListener = new InternalRebalanceListenerForNewConsumer(this, customRebalanceListener) - if (whitelistOpt.isDefined) - consumer.subscribe(Pattern.compile(whitelistOpt.get), consumerRebalanceListener) + if (whitelistOpt.isDefined) { + try { + consumer.subscribe(Pattern.compile(whitelistOpt.get), consumerRebalanceListener) + } catch { + case pse: PatternSyntaxException => + error("Invalid expression syntax: %s".format(whitelistOpt.get)) + throw pse + } + } } // New consumer always hasNext
