[ https://issues.apache.org/jira/browse/KAFKA-5523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499343#comment-16499343 ]
ASF GitHub Bot commented on KAFKA-5523: --------------------------------------- ijuma closed pull request #5092: KAFKA-5523: Remove ReplayLogProducer tool URL: https://github.com/apache/kafka/pull/5092 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bin/kafka-replay-log-producer.sh b/bin/kafka-replay-log-producer.sh deleted file mode 100755 index bba3241d75f..00000000000 --- a/bin/kafka-replay-log-producer.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -exec $(dirname $0)/kafka-run-class.sh kafka.tools.ReplayLogProducer "$@" diff --git a/bin/windows/kafka-replay-log-producer.bat b/bin/windows/kafka-replay-log-producer.bat deleted file mode 100644 index 7b51302a005..00000000000 --- a/bin/windows/kafka-replay-log-producer.bat +++ /dev/null @@ -1,17 +0,0 @@ -@echo off -rem Licensed to the Apache Software Foundation (ASF) under one or more -rem contributor license agreements. See the NOTICE file distributed with -rem this work for additional information regarding copyright ownership. -rem The ASF licenses this file to You under the Apache License, Version 2.0 -rem (the "License"); you may not use this file except in compliance with -rem the License. You may obtain a copy of the License at -rem -rem http://www.apache.org/licenses/LICENSE-2.0 -rem -rem Unless required by applicable law or agreed to in writing, software -rem distributed under the License is distributed on an "AS IS" BASIS, -rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -rem See the License for the specific language governing permissions and -rem limitations under the License. - -"%~dp0kafka-run-class.bat" kafka.tools.ReplayLogProducer %* diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala deleted file mode 100644 index ca9c111163a..00000000000 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - -import joptsimple.OptionParser -import java.util.concurrent.CountDownLatch -import java.util.Properties -import kafka.consumer._ -import kafka.utils.{ToolsUtils, CommandLineUtils, Logging, ZkUtils} -import kafka.api.OffsetRequest -import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig} -import scala.collection.JavaConverters._ - -object ReplayLogProducer extends Logging { - - private val GroupId: String = "replay-log-producer" - - def main(args: Array[String]) { - val config = new Config(args) - - // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack - ZkUtils.maybeDeletePath(config.zkConnect, "/consumers/" + GroupId) - Thread.sleep(500) - - // consumer properties - val consumerProps = new Properties - consumerProps.put("group.id", GroupId) - consumerProps.put("zookeeper.connect", config.zkConnect) - consumerProps.put("consumer.timeout.ms", "10000") - consumerProps.put("auto.offset.reset", OffsetRequest.SmallestTimeString) - consumerProps.put("fetch.message.max.bytes", (1024*1024).toString) - consumerProps.put("socket.receive.buffer.bytes", (2 * 1024 * 1024).toString) - val consumerConfig = new ConsumerConfig(consumerProps) - val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig) - val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(config.inputTopic -> config.numThreads)) - var threadList = List[ZKConsumerThread]() - for (streamList <- topicMessageStreams.values) - for (stream <- streamList) - threadList ::= new ZKConsumerThread(config, stream) - - for (thread <- threadList) - thread.start - - threadList.foreach(_.shutdown) - consumerConnector.shutdown - } - - class Config(args: Array[String]) { - val parser = new OptionParser(false) - val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + - "Multiple URLS can be given to allow fail-over.") - .withRequiredArg - .describedAs("zookeeper url") - .ofType(classOf[String]) - .defaultsTo("127.0.0.1:2181") - val brokerListOpt = parser.accepts("broker-list", "REQUIRED: the broker list must be specified.") - .withRequiredArg - .describedAs("hostname:port") - .ofType(classOf[String]) - val inputTopicOpt = parser.accepts("inputtopic", "REQUIRED: The topic to consume from.") - .withRequiredArg - .describedAs("input-topic") - .ofType(classOf[String]) - val outputTopicOpt = parser.accepts("outputtopic", "REQUIRED: The topic to produce to") - .withRequiredArg - .describedAs("output-topic") - .ofType(classOf[String]) - val numMessagesOpt = parser.accepts("messages", "The number of messages to send.") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(-1) - val numThreadsOpt = parser.accepts("threads", "Number of sending threads.") - .withRequiredArg - .describedAs("threads") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) - val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print progress info.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(5000) - val propertyOpt = parser.accepts("property", "A mechanism to pass properties in the form key=value to the producer. " + - "This allows the user to override producer properties that are not exposed by the existing command line arguments") - .withRequiredArg - .describedAs("producer properties") - .ofType(classOf[String]) - val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") - - val options = parser.parse(args : _*) - - CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, inputTopicOpt) - - val zkConnect = options.valueOf(zkConnectOpt) - val brokerList = options.valueOf(brokerListOpt) - ToolsUtils.validatePortOrDie(parser,brokerList) - val numMessages = options.valueOf(numMessagesOpt).intValue - val numThreads = options.valueOf(numThreadsOpt).intValue - val inputTopic = options.valueOf(inputTopicOpt) - val outputTopic = options.valueOf(outputTopicOpt) - val reportingInterval = options.valueOf(reportingIntervalOpt).intValue - val isSync = options.has(syncOpt) - val producerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt).asScala) - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") - producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") - } - - class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread with Logging { - val shutdownLatch = new CountDownLatch(1) - val producer = new KafkaProducer[Array[Byte],Array[Byte]](config.producerProps) - - override def run() { - info("Starting consumer thread..") - var messageCount: Int = 0 - try { - val iter = - if(config.numMessages >= 0) - stream.slice(0, config.numMessages) - else - stream - for (messageAndMetadata <- iter) { - try { - val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](config.outputTopic, null, - messageAndMetadata.timestamp, messageAndMetadata.key(), messageAndMetadata.message())) - if(config.isSync) { - response.get() - } - messageCount += 1 - }catch { - case ie: Exception => error("Skipping this message", ie) - } - } - }catch { - case e: ConsumerTimeoutException => error("consumer thread timing out", e) - } - info("Sent " + messageCount + " messages") - shutdownLatch.countDown - info("thread finished execution !" ) - } - - def shutdown() { - shutdownLatch.await - producer.close - } - - } -} diff --git a/docs/upgrade.html b/docs/upgrade.html index 03d1feb2c18..00b73143c6f 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -96,6 +96,7 @@ <h5><a id="upgrade_200_notable" href="#upgrade_200_notable">Notable changes in 2 does not block for dynamic partition assignment. The old <code>poll(long)</code> API has been deprecated and will be removed in a future version.</li> <li>The internal method <code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. Users are encouraged to migrate to <code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li> + <li>The tool kafka.tools.ReplayLogProducer has been removed.</li> </ul> <h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New Protocol Versions</a></h5> ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ReplayLogProducer not using the new Kafka consumer > -------------------------------------------------- > > Key: KAFKA-5523 > URL: https://issues.apache.org/jira/browse/KAFKA-5523 > Project: Kafka > Issue Type: Improvement > Components: tools > Reporter: Paolo Patierno > Priority: Minor > > Hi, > the ReplayLogProducer is using the latest Kafka producer but not the latest > Kafka consumer. Is this tool today deprecated ? I see that something like > that could be done using the MirrorMaker. [~ijuma] Does it make sense to > update the ReplayLogProducer to the latest Kafka consumer ? > Thanks, > Paolo -- This message was sent by Atlassian JIRA (v7.6.3#76005)