[ https://issues.apache.org/jira/browse/KAFKA-5529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655393#comment-16655393 ]
ASF GitHub Bot commented on KAFKA-5529: --------------------------------------- omkreddy closed pull request #3450: KAFKA-5529 Use only KafkaProducer in ConsoleProducer URL: https://github.com/apache/kafka/pull/3450 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 1b221407b57..0ecebbf5735 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -21,13 +21,13 @@ import kafka.common._ import kafka.message._ import kafka.serializer._ import kafka.utils.{CommandLineUtils, Exit, ToolsUtils} -import kafka.producer.{NewShinyProducer, OldProducer} import java.util.Properties import java.io._ import java.nio.charset.StandardCharsets import joptsimple._ -import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} +import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.utils.Utils import scala.collection.JavaConverters._ @@ -41,12 +41,7 @@ object ConsoleProducer { val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader] reader.init(System.in, getReaderProps(config)) - val producer = - if(config.useOldProducer) { - new OldProducer(getOldProducerProps(config)) - } else { - new NewShinyProducer(getNewProducerProps(config)) - } + val producer = new KafkaProducer[Array[Byte], Array[Byte]](getNewProducerProps(config)) Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { @@ -58,7 +53,7 @@ object ConsoleProducer { do { message = reader.readMessage() if (message != null) - producer.send(message.topic, message.key, message.value) + producer.send(message, new ErrorLoggingCallback(message.topic, message.key, message.value, false)) } while (message != null) } catch { case e: joptsimple.OptionException => @@ -78,29 +73,6 @@ object ConsoleProducer { props } - def getOldProducerProps(config: ProducerConfig): Properties = { - val props = producerProps(config) - - props.put("metadata.broker.list", config.brokerList) - props.put("compression.codec", config.compressionCodec) - props.put("producer.type", if(config.sync) "sync" else "async") - props.put("batch.num.messages", config.batchSize.toString) - props.put("message.send.max.retries", config.messageSendMaxRetries.toString) - props.put("retry.backoff.ms", config.retryBackoffMs.toString) - props.put("queue.buffering.max.ms", config.sendTimeout.toString) - props.put("queue.buffering.max.messages", config.queueSize.toString) - props.put("queue.enqueue.timeout.ms", config.queueEnqueueTimeoutMs.toString) - props.put("request.required.acks", config.requestRequiredAcks) - props.put("request.timeout.ms", config.requestTimeoutMs.toString) - props.put("key.serializer.class", config.keyEncoderClass) - props.put("serializer.class", config.valueEncoderClass) - props.put("send.buffer.bytes", config.socketBuffer.toString) - props.put("topic.metadata.refresh.interval.ms", config.metadataExpiryMs.toString) - props.put("client.id", "console-producer") - - props - } - private def producerProps(config: ProducerConfig): Properties = { val props = if (config.options.has(config.producerConfigOpt)) @@ -247,14 +219,12 @@ object ConsoleProducer { .withRequiredArg .describedAs("config file") .ofType(classOf[String]) - val useOldProducerOpt = parser.accepts("old-producer", "Use the old producer implementation.") val options = parser.parse(args : _*) if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "Read data from standard input and publish it to Kafka.") CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, brokerListOpt) - val useOldProducer = options.has(useOldProducerOpt) val topic = options.valueOf(topicOpt) val brokerList = options.valueOf(brokerListOpt) ToolsUtils.validatePortOrDie(parser,brokerList) diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala index da80c0d41e2..93be28778ba 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala @@ -17,7 +17,6 @@ package kafka.tools -import kafka.producer.ProducerConfig import ConsoleProducer.LineMessageReader import org.apache.kafka.clients.producer.KafkaProducer import org.junit.{Assert, Test} @@ -49,13 +48,6 @@ class ConsoleProducerTest { producer.close() } - @Test - @deprecated("This test has been deprecated and it will be removed in a future release.", "0.10.0.0") - def testValidConfigsOldProducer() { - val config = new ConsoleProducer.ProducerConfig(validArgs) - new ProducerConfig(ConsoleProducer.getOldProducerProps(config)) - } - @Test def testInvalidConfigs() { try { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ConsoleProducer uses deprecated BaseProducer > -------------------------------------------- > > Key: KAFKA-5529 > URL: https://issues.apache.org/jira/browse/KAFKA-5529 > Project: Kafka > Issue Type: Improvement > Components: tools > Reporter: Evgeny Veretennikov > Assignee: Evgeny Veretennikov > Priority: Minor > Fix For: 2.0.0 > > > BaseProducer is deprecated, should use > org.apache.kafka.clients.producer.KafkaProducer instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)