[
https://issues.apache.org/jira/browse/KAFKA-807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dragos Manolescu updated KAFKA-807:
-----------------------------------
Labels: patch producer (was: )
Status: Patch Available (was: Open)
Index: core/build.sbt
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/build.sbt (date 1364252653000)
+++ core/build.sbt (date 1364254450000)
@@ -18,8 +18,9 @@
libraryDependencies <<= (scalaVersion, libraryDependencies) { (sv, deps) =>
deps :+ (sv match {
- case "2.8.0" => "org.scalatest" % "scalatest" % "1.2" % "test"
+ case "2.8.0" => "org.scalatest" % "scalatest" % "1.2" % "test"
+ case "2.9.2" => "org.scalatest" %% "scalatest" % "1.9.1" % "test"
- case _ => "org.scalatest" %% "scalatest" % "1.8" % "test"
+ case _ => "org.scalatest" %% "scalatest" % "1.8" % "test"
})
}
Index: core/src/main/scala/kafka/producer/ConsoleProducer.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/producer/ConsoleProducer.scala (date
1364252653000)
+++ core/src/main/scala/kafka/producer/ConsoleProducer.scala (date
1364254450000)
@@ -196,7 +196,7 @@
topic = props.getProperty("topic")
if(props.containsKey("parse.key"))
parseKey =
props.getProperty("parse.key").trim.toLowerCase.equals("true")
- if(props.containsKey("key.seperator"))
+ if(props.containsKey("key.separator"))
keySeparator = props.getProperty("key.separator")
if(props.containsKey("ignore.error"))
ignoreError =
props.getProperty("ignore.error").trim.toLowerCase.equals("true")
> LineMessageReader doesn't correctly parse the key separator
> -----------------------------------------------------------
>
> Key: KAFKA-807
> URL: https://issues.apache.org/jira/browse/KAFKA-807
> Project: Kafka
> Issue Type: Bug
> Components: tools
> Affects Versions: 0.8
> Reporter: Dragos Manolescu
> Priority: Trivial
> Labels: producer, patch
> Fix For: 0.8
>
>
> Typo in key name prevents extracting the key separator. The patch follows;
> what's the recommended way to submit patches?
> Index: core/src/main/scala/kafka/producer/ConsoleProducer.scala
> IDEA additional info:
> Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
> <+>UTF-8
> Subsystem: com.intellij.openapi.diff.impl.patch.BaseRevisionTextPatchEP
> <+>/**\n * Licensed to the Apache Software Foundation (ASF) under one or
> more\n * contributor license agreements. See the NOTICE file distributed
> with\n * this work for additional information regarding copyright
> ownership.\n * The ASF licenses this file to You under the Apache License,
> Version 2.0\n * (the \"License\"); you may not use this file except in
> compliance with\n * the License. You may obtain a copy of the License at\n *
> \n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by
> applicable law or agreed to in writing, software\n * distributed under the
> License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR
> CONDITIONS OF ANY KIND, either express or implied.\n * See the License for
> the specific language governing permissions and\n * limitations under the
> License.\n */\n\npackage kafka.producer\n\nimport
> scala.collection.JavaConversions._\nimport joptsimple._\nimport
> java.util.Properties\nimport java.io._\nimport kafka.common._\nimport
> kafka.message._\nimport kafka.serializer._\n\nobject ConsoleProducer { \n\n
> def main(args: Array[String]) { \n val parser = new OptionParser\n val
> topicOpt = parser.accepts(\"topic\", \"REQUIRED: The topic id to produce
> messages to.\")\n .withRequiredArg\n
> .describedAs(\"topic\")\n
> .ofType(classOf[String])\n val brokerListOpt =
> parser.accepts(\"broker-list\", \"REQUIRED: The broker list string in the
> form HOST1:PORT1,HOST2:PORT2.\")\n
> .withRequiredArg\n .describedAs(\"broker-list\")\n
> .ofType(classOf[String])\n val syncOpt =
> parser.accepts(\"sync\", \"If set message send requests to the brokers are
> synchronously, one at a time as they arrive.\")\n val compressOpt =
> parser.accepts(\"compress\", \"If set, messages batches are sent
> compressed\")\n val batchSizeOpt = parser.accepts(\"batch-size\", \"Number
> of messages to send in a single batch if they are not being sent
> synchronously.\")\n .withRequiredArg\n
> .describedAs(\"size\")\n
> .ofType(classOf[java.lang.Integer])\n
> .defaultsTo(200)\n val sendTimeoutOpt = parser.accepts(\"timeout\", \"If
> set and the producer is running in asynchronous mode, this gives the maximum
> amount of time\" + \n \" a
> message will queue awaiting suffient batch size. The value is given in
> ms.\")\n .withRequiredArg\n
> .describedAs(\"timeout_ms\")\n
> .ofType(classOf[java.lang.Long])\n
> .defaultsTo(1000)\n val queueSizeOpt = parser.accepts(\"queue-size\", \"If
> set and the producer is running in asynchronous mode, this gives the maximum
> amount of \" + \n \"
> messages will queue awaiting suffient batch size.\")\n
> .withRequiredArg\n
> .describedAs(\"queue_size\")\n
> .ofType(classOf[java.lang.Long])\n
> .defaultsTo(10000)\n val queueEnqueueTimeoutMsOpt =
> parser.accepts(\"queue-enqueuetimeout-ms\", \"Timeout for event enqueue\")\n
> .withRequiredArg\n
> .describedAs(\"queue enqueuetimeout ms\")\n
> .ofType(classOf[java.lang.Long])\n
> .defaultsTo(0)\n val requestRequiredAcksOpt =
> parser.accepts(\"request-required-acks\", \"The required acks of the producer
> requests\")\n .withRequiredArg\n
> .describedAs(\"request required acks\")\n
> .ofType(classOf[java.lang.Integer])\n
> .defaultsTo(0)\n val requestTimeoutMsOpt =
> parser.accepts(\"request-timeout-ms\", \"The ack timeout of the producer
> requests. Value must be non-negative and non-zero\")\n
> .withRequiredArg\n
> .describedAs(\"request timeout ms\")\n
> .ofType(classOf[java.lang.Integer])\n
> .defaultsTo(1500)\n val valueEncoderOpt =
> parser.accepts(\"value-serializer\", \"The class name of the message encoder
> implementation to use for serializing values.\")\n
> .withRequiredArg\n
> .describedAs(\"encoder_class\")\n
> .ofType(classOf[java.lang.String])\n
> .defaultsTo(classOf[StringEncoder].getName)\n val keyEncoderOpt =
> parser.accepts(\"key-serializer\", \"The class name of the message encoder
> implementation to use for serializing keys.\")\n
> .withRequiredArg\n
> .describedAs(\"encoder_class\")\n
> .ofType(classOf[java.lang.String])\n
> .defaultsTo(classOf[StringEncoder].getName)\n val messageReaderOpt =
> parser.accepts(\"line-reader\", \"The class name of the class to use for
> reading lines from standard in. \" + \n
> \"By default each line is read as a separate message.\")\n
> .withRequiredArg\n
> .describedAs(\"reader_class\")\n
> .ofType(classOf[java.lang.String])\n
> .defaultsTo(classOf[LineMessageReader].getName)\n val socketBufferSizeOpt
> = parser.accepts(\"socket-buffer-size\", \"The size of the tcp RECV
> size.\")\n .withRequiredArg\n
> .describedAs(\"size\")\n
> .ofType(classOf[java.lang.Integer])\n
> .defaultsTo(1024*100)\n val propertyOpt = parser.accepts(\"property\", \"A
> mechanism to pass user-defined properties in the form key=value to the
> message reader. \" +\n \"This
> allows custom configuration for a user-defined message reader.\")\n
> .withRequiredArg\n
> .describedAs(\"prop\")\n
> .ofType(classOf[String])\n\n\n val options = parser.parse(args : _*)\n
> for(arg <- List(topicOpt, brokerListOpt)) {\n if(!options.has(arg)) {\n
> System.err.println(\"Missing required argument \\\"\" + arg +
> \"\\\"\")\n parser.printHelpOn(System.err)\n System.exit(1)\n
> }\n }\n\n val topic = options.valueOf(topicOpt)\n val brokerList
> = options.valueOf(brokerListOpt)\n val sync = options.has(syncOpt)\n
> val compress = options.has(compressOpt)\n val batchSize =
> options.valueOf(batchSizeOpt)\n val sendTimeout =
> options.valueOf(sendTimeoutOpt)\n val queueSize =
> options.valueOf(queueSizeOpt)\n val queueEnqueueTimeoutMs =
> options.valueOf(queueEnqueueTimeoutMsOpt)\n val requestRequiredAcks =
> options.valueOf(requestRequiredAcksOpt)\n val requestTimeoutMs =
> options.valueOf(requestTimeoutMsOpt)\n val keyEncoderClass =
> options.valueOf(keyEncoderOpt)\n val valueEncoderClass =
> options.valueOf(valueEncoderOpt)\n val readerClass =
> options.valueOf(messageReaderOpt)\n val socketBuffer =
> options.valueOf(socketBufferSizeOpt)\n val cmdLineProps =
> parseLineReaderArgs(options.valuesOf(propertyOpt))\n
> cmdLineProps.put(\"topic\", topic)\n\n val props = new Properties()\n
> props.put(\"broker.list\", brokerList)\n val codec = if(compress)
> DefaultCompressionCodec.codec else NoCompressionCodec.codec\n
> props.put(\"compression.codec\", codec.toString)\n
> props.put(\"producer.type\", if(sync) \"sync\" else \"async\")\n
> if(options.has(batchSizeOpt))\n props.put(\"batch.num.messages\",
> batchSize.toString)\n props.put(\"queue.buffering.max.ms\",
> sendTimeout.toString)\n props.put(\"queue.buffering.max.messages\",
> queueSize.toString)\n props.put(\"queue.enqueue.timeout.ms\",
> queueEnqueueTimeoutMs.toString)\n props.put(\"request.required.acks\",
> requestRequiredAcks.toString)\n props.put(\"request.timeout.ms\",
> requestTimeoutMs.toString)\n props.put(\"key.serializer.class\",
> keyEncoderClass)\n props.put(\"serializer.class\", valueEncoderClass)\n
> props.put(\"send.buffer.bytes\", socketBuffer.toString)\n val reader =
> Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef,
> AnyRef]]\n reader.init(System.in, cmdLineProps)\n\n try {\n val
> producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))\n\n
> Runtime.getRuntime.addShutdownHook(new Thread() {\n override def
> run() {\n producer.close()\n }\n })\n\n var
> message: KeyedMessage[AnyRef, AnyRef] = null\n do {\n message
> = reader.readMessage()\n if(message != null)\n
> producer.send(message)\n } while(message != null)\n } catch {\n
> case e: Exception =>\n e.printStackTrace\n System.exit(1)\n
> }\n System.exit(0)\n }\n\n def parseLineReaderArgs(args:
> Iterable[String]): Properties = {\n val splits = args.map(_ split
> \"=\").filterNot(_ == null).filterNot(_.length == 0)\n
> if(!splits.forall(_.length == 2)) {\n System.err.println(\"Invalid line
> reader properties: \" + args.mkString(\" \"))\n System.exit(1)\n }\n
> val props = new Properties\n for(a <- splits)\n props.put(a(0),
> a(1))\n props\n }\n\n trait MessageReader[K,V] { \n def
> init(inputStream: InputStream, props: Properties) {}\n def readMessage():
> KeyedMessage[K,V]\n def close() {}\n }\n\n class LineMessageReader
> extends MessageReader[String, String] {\n var topic: String = null\n
> var reader: BufferedReader = null\n var parseKey = false\n var
> keySeparator = \"\\t\"\n var ignoreError = false\n var lineNumber =
> 0\n\n override def init(inputStream: InputStream, props: Properties) {\n
> topic = props.getProperty(\"topic\")\n
> if(props.containsKey(\"parse.key\"))\n parseKey =
> props.getProperty(\"parse.key\").trim.toLowerCase.equals(\"true\")\n
> if(props.containsKey(\"key.seperator\"))\n keySeparator =
> props.getProperty(\"key.separator\")\n
> if(props.containsKey(\"ignore.error\"))\n ignoreError =
> props.getProperty(\"ignore.error\").trim.toLowerCase.equals(\"true\")\n
> reader = new BufferedReader(new InputStreamReader(inputStream))\n }\n\n
> override def readMessage() = {\n lineNumber += 1\n
> (reader.readLine(), parseKey) match {\n case (null, _) => null\n
> case (line, true) =>\n line.indexOf(keySeparator) match {\n
> case -1 =>\n if(ignoreError)\n new
> KeyedMessage(topic, line)\n else\n throw new
> KafkaException(\"No key found on line \" + lineNumber + \": \" + line)\n
> case n =>\n new KeyedMessage(topic,\n
> line.substring(0, n), \n if(n +
> keySeparator.size > line.size) \"\" else line.substring(n +
> keySeparator.size))\n }\n case (line, false) =>\n
> new KeyedMessage(topic, line)\n }\n }\n }\n}\n
> ===================================================================
> --- core/src/main/scala/kafka/producer/ConsoleProducer.scala (revision
> 290d5e0eac38e9917c64353a131154821b899f26)
> +++ core/src/main/scala/kafka/producer/ConsoleProducer.scala (revision )
> @@ -196,7 +196,7 @@
> topic = props.getProperty("topic")
> if(props.containsKey("parse.key"))
> parseKey =
> props.getProperty("parse.key").trim.toLowerCase.equals("true")
> - if(props.containsKey("key.seperator"))
> + if(props.containsKey("key.separator"))
> keySeparator = props.getProperty("key.separator")
> if(props.containsKey("ignore.error"))
> ignoreError =
> props.getProperty("ignore.error").trim.toLowerCase.equals("true")
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira