[ 
https://issues.apache.org/jira/browse/KAFKA-5523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499343#comment-16499343
 ] 

ASF GitHub Bot commented on KAFKA-5523:
---------------------------------------

ijuma closed pull request #5092: KAFKA-5523: Remove ReplayLogProducer tool
URL: https://github.com/apache/kafka/pull/5092
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/kafka-replay-log-producer.sh b/bin/kafka-replay-log-producer.sh
deleted file mode 100755
index bba3241d75f..00000000000
--- a/bin/kafka-replay-log-producer.sh
+++ /dev/null
@@ -1,17 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-exec $(dirname $0)/kafka-run-class.sh kafka.tools.ReplayLogProducer "$@"
diff --git a/bin/windows/kafka-replay-log-producer.bat 
b/bin/windows/kafka-replay-log-producer.bat
deleted file mode 100644
index 7b51302a005..00000000000
--- a/bin/windows/kafka-replay-log-producer.bat
+++ /dev/null
@@ -1,17 +0,0 @@
-@echo off
-rem Licensed to the Apache Software Foundation (ASF) under one or more
-rem contributor license agreements.  See the NOTICE file distributed with
-rem this work for additional information regarding copyright ownership.
-rem The ASF licenses this file to You under the Apache License, Version 2.0
-rem (the "License"); you may not use this file except in compliance with
-rem the License.  You may obtain a copy of the License at
-rem
-rem     http://www.apache.org/licenses/LICENSE-2.0
-rem
-rem Unless required by applicable law or agreed to in writing, software
-rem distributed under the License is distributed on an "AS IS" BASIS,
-rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-rem See the License for the specific language governing permissions and
-rem limitations under the License.
-
-"%~dp0kafka-run-class.bat" kafka.tools.ReplayLogProducer %*
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala 
b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
deleted file mode 100644
index ca9c111163a..00000000000
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import joptsimple.OptionParser
-import java.util.concurrent.CountDownLatch
-import java.util.Properties
-import kafka.consumer._
-import kafka.utils.{ToolsUtils, CommandLineUtils, Logging, ZkUtils}
-import kafka.api.OffsetRequest
-import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, 
ProducerConfig}
-import scala.collection.JavaConverters._
-
-object ReplayLogProducer extends Logging {
-
-  private val GroupId: String = "replay-log-producer"
-
-  def main(args: Array[String]) {
-    val config = new Config(args)
-
-    // if there is no group specified then avoid polluting zookeeper with 
persistent group data, this is a hack
-    ZkUtils.maybeDeletePath(config.zkConnect, "/consumers/" + GroupId)
-    Thread.sleep(500)
-
-    // consumer properties
-    val consumerProps = new Properties
-    consumerProps.put("group.id", GroupId)
-    consumerProps.put("zookeeper.connect", config.zkConnect)
-    consumerProps.put("consumer.timeout.ms", "10000")
-    consumerProps.put("auto.offset.reset", OffsetRequest.SmallestTimeString)
-    consumerProps.put("fetch.message.max.bytes", (1024*1024).toString)
-    consumerProps.put("socket.receive.buffer.bytes", (2 * 1024 * 
1024).toString)
-    val consumerConfig = new ConsumerConfig(consumerProps)
-    val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
-    val topicMessageStreams = 
consumerConnector.createMessageStreams(Predef.Map(config.inputTopic -> 
config.numThreads))
-    var threadList = List[ZKConsumerThread]()
-    for (streamList <- topicMessageStreams.values)
-      for (stream <- streamList)
-        threadList ::= new ZKConsumerThread(config, stream)
-
-    for (thread <- threadList)
-      thread.start
-
-    threadList.foreach(_.shutdown)
-    consumerConnector.shutdown
-  }
-
-  class Config(args: Array[String]) {
-    val parser = new OptionParser(false)
-    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection 
string for the zookeeper connection in the form host:port. " +
-      "Multiple URLS can be given to allow fail-over.")
-      .withRequiredArg
-      .describedAs("zookeeper url")
-      .ofType(classOf[String])
-      .defaultsTo("127.0.0.1:2181")
-    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: the broker 
list must be specified.")
-      .withRequiredArg
-      .describedAs("hostname:port")
-      .ofType(classOf[String])
-    val inputTopicOpt = parser.accepts("inputtopic", "REQUIRED: The topic to 
consume from.")
-      .withRequiredArg
-      .describedAs("input-topic")
-      .ofType(classOf[String])
-    val outputTopicOpt = parser.accepts("outputtopic", "REQUIRED: The topic to 
produce to")
-      .withRequiredArg
-      .describedAs("output-topic")
-      .ofType(classOf[String])
-    val numMessagesOpt = parser.accepts("messages", "The number of messages to 
send.")
-      .withRequiredArg
-      .describedAs("count")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(-1)
-    val numThreadsOpt = parser.accepts("threads", "Number of sending threads.")
-      .withRequiredArg
-      .describedAs("threads")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1)
-    val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval 
at which to print progress info.")
-      .withRequiredArg
-      .describedAs("size")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(5000)
-    val propertyOpt = parser.accepts("property", "A mechanism to pass 
properties in the form key=value to the producer. " +
-      "This allows the user to override producer properties that are not 
exposed by the existing command line arguments")
-      .withRequiredArg
-      .describedAs("producer properties")
-      .ofType(classOf[String])
-    val syncOpt = parser.accepts("sync", "If set message send requests to the 
brokers are synchronously, one at a time as they arrive.")
-
-    val options = parser.parse(args : _*)
-    
-    CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, 
inputTopicOpt)
-
-    val zkConnect = options.valueOf(zkConnectOpt)
-    val brokerList = options.valueOf(brokerListOpt)
-    ToolsUtils.validatePortOrDie(parser,brokerList)
-    val numMessages = options.valueOf(numMessagesOpt).intValue
-    val numThreads = options.valueOf(numThreadsOpt).intValue
-    val inputTopic = options.valueOf(inputTopicOpt)
-    val outputTopic = options.valueOf(outputTopicOpt)
-    val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
-    val isSync = options.has(syncOpt)
-    val producerProps = 
CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt).asScala)
-    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer")
-    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer")
-  }
-
-  class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], 
Array[Byte]]) extends Thread with Logging {
-    val shutdownLatch = new CountDownLatch(1)
-    val producer = new 
KafkaProducer[Array[Byte],Array[Byte]](config.producerProps)
-
-    override def run() {
-      info("Starting consumer thread..")
-      var messageCount: Int = 0
-      try {
-        val iter =
-          if(config.numMessages >= 0)
-            stream.slice(0, config.numMessages)
-          else
-            stream
-        for (messageAndMetadata <- iter) {
-          try {
-            val response = producer.send(new 
ProducerRecord[Array[Byte],Array[Byte]](config.outputTopic, null,
-                                            messageAndMetadata.timestamp, 
messageAndMetadata.key(), messageAndMetadata.message()))
-            if(config.isSync) {
-              response.get()
-            }
-            messageCount += 1
-          }catch {
-            case ie: Exception => error("Skipping this message", ie)
-          }
-        }
-      }catch {
-        case e: ConsumerTimeoutException => error("consumer thread timing 
out", e)
-      }
-      info("Sent " + messageCount + " messages")
-      shutdownLatch.countDown
-      info("thread finished execution !" )
-    }
-
-    def shutdown() {
-      shutdownLatch.await
-      producer.close
-    }
-
-  }
-}
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 03d1feb2c18..00b73143c6f 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -96,6 +96,7 @@ <h5><a id="upgrade_200_notable" 
href="#upgrade_200_notable">Notable changes in 2
         does not block for dynamic partition assignment. The old 
<code>poll(long)</code> API has been deprecated and
         will be removed in a future version.</li>
     <li>The internal method 
<code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. 
Users are encouraged to migrate to 
<code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li>
+    <li>The tool kafka.tools.ReplayLogProducer has been removed.</li>
 </ul>
 
 <h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New 
Protocol Versions</a></h5>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ReplayLogProducer not using the new Kafka consumer
> --------------------------------------------------
>
>                 Key: KAFKA-5523
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5523
>             Project: Kafka
>          Issue Type: Improvement
>          Components: tools
>            Reporter: Paolo Patierno
>            Priority: Minor
>
> Hi,
> the ReplayLogProducer is using the latest Kafka producer but not the latest 
> Kafka consumer. Is this tool today deprecated ? I see that something like 
> that could be done using the MirrorMaker. [~ijuma] Does it make sense to 
> update the ReplayLogProducer to the latest Kafka consumer ?
> Thanks,
> Paolo



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to