Joe, 0.8 now has compilation error after this check in. It seems that you forgot to check in a new file. See my comment in kafka-240. Could you fix this?
Thanks, Jun On Fri, Mar 2, 2012 at 9:46 PM, <joest...@apache.org> wrote: > Author: joestein > Date: Sat Mar 3 05:46:43 2012 > New Revision: 1296577 > > URL: http://svn.apache.org/viewvc?rev=1296577&view=rev > Log: > KAFKA-240 ProducerRequest wire format protocol update and related changes > > Removed: > > > incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/MultiProducerRequest.scala > > > incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MultiMessageSetSend.scala > Modified: > > > incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala > > > incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala > > > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala > > > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala > > > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala > > > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala > > > incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala > > > incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala > > > incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala > > > incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala > > > incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala > > > incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala > > incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala > incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala > > > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala > > > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala > > > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala > > > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala > > Modified: > incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala > URL: > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > > ============================================================================== > --- > incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala > (original) > +++ > incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala > Sat Mar 3 05:46:43 2012 > @@ -39,6 +39,15 @@ object PartitionData { > > case class PartitionData(partition: Int, error: Int = > ErrorMapping.NoError, initialOffset:Long = 0L, messages: MessageSet) { > val sizeInBytes = 4 + 4 + 8 + 4 + messages.sizeInBytes.intValue() > + > + def this(partition: Int, messages: MessageSet) = this(partition, > ErrorMapping.NoError, 0L, messages) > + > + def getTranslatedPartition(topic: String, randomSelector: String => > Int): Int = { > + if (partition == ProducerRequest.RandomPartition) > + return randomSelector(topic) > + else > + return partition > + } > } > > object TopicData { > @@ -73,6 +82,15 @@ object TopicData { > > case class TopicData(topic: String, partitionData: Array[PartitionData]) { > val sizeInBytes = 2 + topic.length + partitionData.foldLeft(4)(_ + > _.sizeInBytes) > + > + override def equals(other: Any): Boolean = { > + other match { > + case that: TopicData => > + ( topic == that.topic && > + partitionData.toSeq == that.partitionData.toSeq ) > + case _ => false > + } > + } > } > > object FetchResponse { > > Modified: > incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala > URL: > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > > ============================================================================== > --- > incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala > (original) > +++ > incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala > Sat Mar 3 05:46:43 2012 > @@ -24,60 +24,108 @@ import kafka.utils._ > > object ProducerRequest { > val RandomPartition = -1 > - > + val versionId: Short = 0 > + > def readFrom(buffer: ByteBuffer): ProducerRequest = { > - val topic = Utils.readShortString(buffer, "UTF-8") > - val partition = buffer.getInt > - val messageSetSize = buffer.getInt > - val messageSetBuffer = buffer.slice() > - messageSetBuffer.limit(messageSetSize) > - buffer.position(buffer.position + messageSetSize) > - new ProducerRequest(topic, partition, new > ByteBufferMessageSet(messageSetBuffer)) > + val versionId: Short = buffer.getShort > + val correlationId: Int = buffer.getInt > + val clientId: String = Utils.readShortString(buffer, "UTF-8") > + val requiredAcks: Short = buffer.getShort > + val ackTimeout: Int = buffer.getInt > + //build the topic structure > + val topicCount = buffer.getInt > + val data = new Array[TopicData](topicCount) > + for(i <- 0 until topicCount) { > + val topic = Utils.readShortString(buffer, "UTF-8") > + > + val partitionCount = buffer.getInt > + //build the partition structure within this topic > + val partitionData = new Array[PartitionData](partitionCount) > + for (j <- 0 until partitionCount) { > + val partition = buffer.getInt > + val messageSetSize = buffer.getInt > + val messageSetBuffer = new Array[Byte](messageSetSize) > + buffer.get(messageSetBuffer,0,messageSetSize) > + partitionData(j) = new PartitionData(partition,new > ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer))) > + } > + data(i) = new TopicData(topic,partitionData) > + } > + new ProducerRequest(versionId, correlationId, clientId, requiredAcks, > ackTimeout, data) > } > } > > -class ProducerRequest(val topic: String, > - val partition: Int, > - val messages: ByteBufferMessageSet) extends > Request(RequestKeys.Produce) { > +case class ProducerRequest(val versionId: Short, val correlationId: Int, > + val clientId: String, > + val requiredAcks: Short, > + val ackTimeout: Int, > + val data: Array[TopicData]) extends > Request(RequestKeys.Produce) { > + > + def this(correlationId: Int, clientId: String, requiredAcks: Short, > ackTimeout: Int, data: Array[TopicData]) = this(ProducerRequest.versionId, > correlationId, clientId, requiredAcks, ackTimeout, data) > > def writeTo(buffer: ByteBuffer) { > - Utils.writeShortString(buffer, topic) > - buffer.putInt(partition) > - buffer.putInt(messages.serialized.limit) > - buffer.put(messages.serialized) > - messages.serialized.rewind > + buffer.putShort(versionId) > + buffer.putInt(correlationId) > + Utils.writeShortString(buffer, clientId, "UTF-8") > + buffer.putShort(requiredAcks) > + buffer.putInt(ackTimeout) > + //save the topic structure > + buffer.putInt(data.size) //the number of topics > + data.foreach(d =>{ > + Utils.writeShortString(buffer, d.topic, "UTF-8") //write the topic > + buffer.putInt(d.partitionData.size) //the number of partitions > + d.partitionData.foreach(p => { > + buffer.putInt(p.partition) > + buffer.putInt(p.messages.getSerialized().limit) > + buffer.put(p.messages.getSerialized()) > + p.messages.getSerialized().rewind > + }) > + }) > } > - > - def sizeInBytes(): Int = 2 + topic.length + 4 + 4 + > messages.sizeInBytes.asInstanceOf[Int] > > - def getTranslatedPartition(randomSelector: String => Int): Int = { > - if (partition == ProducerRequest.RandomPartition) > - return randomSelector(topic) > - else > - return partition > + def sizeInBytes(): Int = { > + var size = 0 > + //size, request_type_id, version_id, correlation_id, client_id, > required_acks, ack_timeout, data.size > + size = 2 + 4 + 2 + clientId.length + 2 + 4 + 4; > + data.foreach(d =>{ > + size += 2 + d.topic.length + 4 > + d.partitionData.foreach(p => { > + size += 4 + 4 + p.messages.sizeInBytes.asInstanceOf[Int] > + }) > + }) > + size > } > > override def toString: String = { > val builder = new StringBuilder() > builder.append("ProducerRequest(") > - builder.append(topic + ",") > - builder.append(partition + ",") > - builder.append(messages.sizeInBytes) > + builder.append(versionId + ",") > + builder.append(correlationId + ",") > + builder.append(clientId + ",") > + builder.append(requiredAcks + ",") > + builder.append(ackTimeout) > + data.foreach(d =>{ > + builder.append(":[" + d.topic) > + d.partitionData.foreach(p => { > + builder.append(":[") > + builder.append(p.partition + ",") > + builder.append(p.messages.sizeInBytes) > + builder.append("]") > + }) > + builder.append("]") > + }) > builder.append(")") > builder.toString > } > > override def equals(other: Any): Boolean = { > - other match { > + other match { > case that: ProducerRequest => > - (that canEqual this) && topic == that.topic && partition == > that.partition && > - messages.equals(that.messages) > + ( correlationId == that.correlationId && > + clientId == that.clientId && > + requiredAcks == that.requiredAcks && > + ackTimeout == that.ackTimeout && > + data.toSeq == that.data.toSeq) > case _ => false > } > } > - > - def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest] > - > - override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + > messages.hashCode > - > -} > +} > \ No newline at end of file > > Modified: > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala > URL: > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > > ============================================================================== > --- > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala > (original) > +++ > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala > Sat Mar 3 05:46:43 2012 > @@ -22,7 +22,7 @@ import kafka.api.TopicData > > class FetchResponse( val versionId: Short, > val correlationId: Int, > - val data: Array[TopicData] ) { > + private val data: Array[TopicData] ) { > > private val underlying = new kafka.api.FetchResponse(versionId, > correlationId, data) > > > Modified: > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala > URL: > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > > ============================================================================== > --- > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala > (original) > +++ > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala > Sat Mar 3 05:46:43 2012 > @@ -17,36 +17,29 @@ > package kafka.javaapi > > import kafka.network.Request > -import kafka.api.RequestKeys > +import kafka.api.{RequestKeys, TopicData} > import java.nio.ByteBuffer > > -class ProducerRequest(val topic: String, > - val partition: Int, > - val messages: > kafka.javaapi.message.ByteBufferMessageSet) extends > Request(RequestKeys.Produce) { > +class ProducerRequest(val correlationId: Int, > + val clientId: String, > + val requiredAcks: Short, > + val ackTimeout: Int, > + val data: Array[TopicData]) extends > Request(RequestKeys.Produce) { > + > import Implicits._ > - private val underlying = new kafka.api.ProducerRequest(topic, > partition, messages) > + val underlying = new kafka.api.ProducerRequest(correlationId, clientId, > requiredAcks, ackTimeout, data) > > def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) } > > def sizeInBytes(): Int = underlying.sizeInBytes > > - def getTranslatedPartition(randomSelector: String => Int): Int = > - underlying.getTranslatedPartition(randomSelector) > - > override def toString: String = > underlying.toString > > - override def equals(other: Any): Boolean = { > - other match { > - case that: ProducerRequest => > - (that canEqual this) && topic == that.topic && partition == > that.partition && > - messages.equals(that.messages) > - case _ => false > - } > - } > + override def equals(other: Any): Boolean = underlying.equals(other) > > def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest] > > - override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + > messages.hashCode > + override def hashCode: Int = underlying.hashCode > > -} > +} > \ No newline at end of file > > Modified: > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala > URL: > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > > ============================================================================== > --- > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala > (original) > +++ > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala > Sat Mar 3 05:46:43 2012 > @@ -39,7 +39,7 @@ class ByteBufferMessageSet(private val b > > def validBytes: Long = underlying.validBytes > > - def serialized():ByteBuffer = underlying.serialized > + def serialized():ByteBuffer = underlying.getSerialized() > > def getInitialOffset = initialOffset > > > Modified: > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala > URL: > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > > ============================================================================== > --- > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala > (original) > +++ > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala > Sat Mar 3 05:46:43 2012 > @@ -18,6 +18,8 @@ package kafka.javaapi.producer > > import kafka.producer.SyncProducerConfig > import kafka.javaapi.message.ByteBufferMessageSet > +import kafka.javaapi.ProducerRequest > +import kafka.api.{PartitionData, TopicData} > > class SyncProducer(syncProducer: kafka.producer.SyncProducer) { > > @@ -25,21 +27,17 @@ class SyncProducer(syncProducer: kafka.p > > val underlying = syncProducer > > - def send(topic: String, partition: Int, messages: ByteBufferMessageSet) > { > - import kafka.javaapi.Implicits._ > - underlying.send(topic, partition, messages) > + def send(producerRequest: kafka.javaapi.ProducerRequest) { > + underlying.send(producerRequest.underlying) > } > > - def send(topic: String, messages: ByteBufferMessageSet): Unit = > send(topic, > - > kafka.api.ProducerRequest.RandomPartition, > - > messages) > - > - def multiSend(produces: Array[kafka.javaapi.ProducerRequest]) { > - import kafka.javaapi.Implicits._ > - val produceRequests = new > Array[kafka.api.ProducerRequest](produces.length) > - for(i <- 0 until produces.length) > - produceRequests(i) = new > kafka.api.ProducerRequest(produces(i).topic, produces(i).partition, > produces(i).messages) > - underlying.multiSend(produceRequests) > + def send(topic: String, messages: ByteBufferMessageSet): Unit = { > + var data = new Array[TopicData](1) > + var partition_data = new Array[PartitionData](1) > + partition_data(0) = new PartitionData(-1,messages.underlying) > + data(0) = new TopicData(topic,partition_data) > + val producerRequest = new kafka.api.ProducerRequest(-1, "", 0, 0, > data) > + underlying.send(producerRequest) > } > > def close() { > > Modified: > incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala > URL: > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > > ============================================================================== > --- > incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala > (original) > +++ > incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala > Sat Mar 3 05:46:43 2012 > @@ -53,7 +53,7 @@ class ByteBufferMessageSet(private val b > > def getErrorCode = errorCode > > - def serialized(): ByteBuffer = buffer > + def getSerialized(): ByteBuffer = buffer > > def validBytes: Long = shallowValidBytes > > > Modified: > incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala > URL: > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > > ============================================================================== > --- > incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala > (original) > +++ > incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala > Sat Mar 3 05:46:43 2012 > @@ -40,6 +40,8 @@ class FileMessageSet private[kafka](priv > private val setSize = new AtomicLong() > private val setHighWaterMark = new AtomicLong() > > + def getSerialized(): ByteBuffer = throw new > java.lang.UnsupportedOperationException() > + > if(mutable) { > if(limit < Long.MaxValue || offset > 0) > throw new IllegalArgumentException("Attempt to open a mutable > message set with a view or offset, which is not allowed.") > > Modified: > incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala > URL: > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > > ============================================================================== > --- > incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala > (original) > +++ > incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala > Sat Mar 3 05:46:43 2012 > @@ -111,4 +111,9 @@ abstract class MessageSet extends Iterab > throw new InvalidMessageException > } > > + /** > + * Used to allow children to have serialization on implementation > + */ > + def getSerialized(): ByteBuffer > + > } > > Modified: > incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala > URL: > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > > ============================================================================== > --- > incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala > (original) > +++ > incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala > Sat Mar 3 05:46:43 2012 > @@ -51,29 +51,10 @@ class SyncProducer(val config: SyncProdu > if (logger.isTraceEnabled) { > trace("verifying sendbuffer of size " + buffer.limit) > val requestTypeId = buffer.getShort() > - if (requestTypeId == RequestKeys.MultiProduce) { > - try { > - val request = MultiProducerRequest.readFrom(buffer) > - for (produce <- request.produces) { > - try { > - for (messageAndOffset <- produce.messages) > - if (!messageAndOffset.message.isValid) > - trace("topic " + produce.topic + " is invalid") > - } > - catch { > - case e: Throwable => > - trace("error iterating messages ", e) > - } > - } > - } > - catch { > - case e: Throwable => > - trace("error verifying sendbuffer ", e) > - } > - } > + val request = ProducerRequest.readFrom(buffer) > + trace(request.toString) > } > } > - > /** > * Common functionality for the public send methods > */ > @@ -108,21 +89,15 @@ class SyncProducer(val config: SyncProdu > /** > * Send a message > */ > - def send(topic: String, partition: Int, messages: ByteBufferMessageSet) > { > - verifyMessageSize(messages) > - val setSize = messages.sizeInBytes.asInstanceOf[Int] > - trace("Got message set with " + setSize + " bytes to send") > - send(new BoundedByteBufferSend(new ProducerRequest(topic, partition, > messages))) > - } > - > - def send(topic: String, messages: ByteBufferMessageSet): Unit = > send(topic, ProducerRequest.RandomPartition, messages) > - > - def multiSend(produces: Array[ProducerRequest]) { > - for (request <- produces) > - verifyMessageSize(request.messages) > - val setSize = produces.foldLeft(0L)(_ + _.messages.sizeInBytes) > - trace("Got multi message sets with " + setSize + " bytes to send") > - send(new BoundedByteBufferSend(new MultiProducerRequest(produces))) > + def send(producerRequest: ProducerRequest) { > + producerRequest.data.foreach(d => { > + d.partitionData.foreach(p => { > + verifyMessageSize(new > ByteBufferMessageSet(p.messages.getSerialized())) > + val setSize = p.messages.sizeInBytes.asInstanceOf[Int] > + trace("Got message set with " + setSize + " bytes to send") > + }) > + }) > + send(new BoundedByteBufferSend(producerRequest)) > } > > def send(request: TopicMetadataRequest): Seq[TopicMetadata] = { > > Modified: > incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala > URL: > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > > ============================================================================== > --- > incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala > (original) > +++ > incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala > Sat Mar 3 05:46:43 2012 > @@ -41,4 +41,23 @@ trait SyncProducerConfigShared { > val reconnectInterval = Utils.getInt(props, "reconnect.interval", 30000) > > val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000) > + > + /* the client application sending the producer requests */ > + val correlationId = > Utils.getInt(props,"producer.request.correlation_id",-1) > + > + /* the client application sending the producer requests */ > + val clientId = Utils.getString(props,"producer.request.client_id","") > + > + /* the required_acks of the producer requests */ > + val requiredAcks = > Utils.getShort(props,"producer.request.required_acks",0) > + > + /* the ack_timeout of the producer requests */ > + val ackTimeout = Utils.getInt(props,"producer.request.ack_timeout",1) > } > + > +object SyncProducerConfig { > + val DefaultCorrelationId = -1 > + val DefaultClientId = "" > + val DefaultRequiredAcks : Short = 0 > + val DefaultAckTimeoutMs = 1 > +} > \ No newline at end of file > > Modified: > incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala > URL: > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > > ============================================================================== > --- > incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala > (original) > +++ > incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala > Sat Mar 3 05:46:43 2012 > @@ -17,7 +17,7 @@ > > package kafka.producer.async > > -import kafka.api.ProducerRequest > +import kafka.api.{ProducerRequest, TopicData, PartitionData} > import kafka.serializer.Encoder > import kafka.producer._ > import kafka.cluster.{Partition, Broker} > @@ -147,9 +147,22 @@ class DefaultEventHandler[K,V](config: P > > private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), > ByteBufferMessageSet]) { > if(messagesPerTopic.size > 0) { > - val requests = messagesPerTopic.map(f => new > ProducerRequest(f._1._1, f._1._2, f._2)).toArray > + val topics = new HashMap[String, ListBuffer[PartitionData]]() > + val requests = messagesPerTopic.map(f => { > + val topicName = f._1._1 > + val partitionId = f._1._2 > + val messagesSet= f._2 > + val topic = topics.get(topicName) // checking to see if this > topics exists > + topic match { > + case None => topics += topicName -> new > ListBuffer[PartitionData]() //create a new listbuffer for this topic > + case Some(x) => trace("found " + topicName) > + } > + topics(topicName).append(new PartitionData(partitionId, > messagesSet)) > + }) > + val topicData = topics.map(kv => new TopicData(kv._1,kv._2.toArray)) > + val producerRequest = new ProducerRequest(config.correlationId, > config.clientId, config.requiredAcks, config.ackTimeout, topicData.toArray) > //new kafka.javaapi.ProducerRequest(correlation_id, client_id, > required_acks, ack_timeout, topic_data.toArray) > val syncProducer = producerPool.getProducer(brokerId) > - syncProducer.multiSend(requests) > + syncProducer.send(producerRequest) > trace("kafka producer sent messages for topics %s to broker %s:%d" > .format(messagesPerTopic, syncProducer.config.host, > syncProducer.config.port)) > } > > Modified: > incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala > URL: > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > > ============================================================================== > --- > incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala > (original) > +++ > incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala > Sat Mar 3 05:46:43 2012 > @@ -41,7 +41,6 @@ class KafkaApis(val logManager: LogManag > apiId match { > case RequestKeys.Produce => handleProducerRequest(receive) > case RequestKeys.Fetch => handleFetchRequest(receive) > - case RequestKeys.MultiProduce => handleMultiProducerRequest(receive) > case RequestKeys.Offsets => handleOffsetRequest(receive) > case RequestKeys.TopicMetadata => handleTopicMetadataRequest(receive) > case _ => throw new IllegalStateException("No mapping found for > handler id " + apiId) > @@ -59,31 +58,38 @@ class KafkaApis(val logManager: LogManag > None > } > > - def handleMultiProducerRequest(receive: Receive): Option[Send] = { > - val request = MultiProducerRequest.readFrom(receive.buffer) > - if(requestLogger.isTraceEnabled) > - requestLogger.trace("Multiproducer request " + request.toString) > - request.produces.map(handleProducerRequest(_, "MultiProducerRequest")) > - None > - } > - > - private def handleProducerRequest(request: ProducerRequest, > requestHandlerName: String) = { > - val partition = > request.getTranslatedPartition(logManager.chooseRandomPartition) > - try { > - logManager.getOrCreateLog(request.topic, > partition).append(request.messages) > - trace(request.messages.sizeInBytes + " bytes written to logs.") > - } catch { > - case e => > - error("Error processing " + requestHandlerName + " on " + > request.topic + ":" + partition, e) > - e match { > - case _: IOException => > - fatal("Halting due to unrecoverable I/O error while handling > producer request: " + e.getMessage, e) > - System.exit(1) > - case _ => > + private def handleProducerRequest(request: ProducerRequest, > requestHandlerName: String): Option[ProducerResponse] = { > + val requestSize = request.data.size > + val errors = new Array[Int](requestSize) > + val offsets = new Array[Long](requestSize) > + > + request.data.foreach(d => { > + d.partitionData.foreach(p => { > + val partition = p.getTranslatedPartition(d.topic, > logManager.chooseRandomPartition) > + try { > + logManager.getOrCreateLog(d.topic, partition).append(p.messages) > + trace(p.messages.sizeInBytes + " bytes written to logs.") > + p.messages.foreach(m => trace("wrote message %s to > disk".format(m.message.checksum))) > } > - throw e > - } > - None > + catch { > + case e => > + //TODO: handle response in ProducerResponse > + error("Error processing " + requestHandlerName + " on " + > d.topic + ":" + partition, e) > + e match { > + case _: IOException => > + fatal("Halting due to unrecoverable I/O error while > handling producer request: " + e.getMessage, e) > + Runtime.getRuntime.halt(1) > + case _ => > + } > + //throw e > + } > + }) > + //None > + }) > + if (request.requiredAcks == 0) > + None > + else > + None //TODO: send when KAFKA-49 can receive this Some(new > ProducerResponse(request.versionId, request.correlationId, errors, offsets)) > } > > def handleFetchRequest(request: Receive): Option[Send] = { > > Modified: > incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala > URL: > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > > ============================================================================== > --- > incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala > (original) > +++ > incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala > Sat Mar 3 05:46:43 2012 > @@ -195,6 +195,9 @@ object Utils extends Logging { > def getInt(props: Properties, name: String, default: Int): Int = > getIntInRange(props, name, default, (Int.MinValue, Int.MaxValue)) > > + def getShort(props: Properties, name: String, default: Short): Short = > + getShortInRange(props, name, default, (Short.MinValue, > Short.MaxValue)) > + > /** > * Read an integer from the properties instance. Throw an exception > * if the value is not in the given range (inclusive) > @@ -217,6 +220,18 @@ object Utils extends Logging { > v > } > > + def getShortInRange(props: Properties, name: String, default: Short, > range: (Short, Short)): Short = { > + val v = > + if(props.containsKey(name)) > + props.getProperty(name).toShort > + else > + default > + if(v < range._1 || v > range._2) > + throw new IllegalArgumentException(name + " has value " + v + " > which is not in the range " + range + ".") > + else > + v > + } > + > def getIntInRange(buffer: ByteBuffer, name: String, range: (Int, Int)): > Int = { > val value = buffer.getInt > if(value < range._1 || value > range._2) > @@ -777,4 +792,4 @@ class SnapshotStats(private val monitorD > > def durationMs: Double = (end.get - start) / (1000.0 * 1000.0) > } > -} > +} > \ No newline at end of file > > Modified: > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala > URL: > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > > ============================================================================== > --- > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala > (original) > +++ > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala > Sat Mar 3 05:46:43 2012 > @@ -33,7 +33,7 @@ class ByteBufferMessageSetTest extends B > // create a ByteBufferMessageSet that doesn't contain a full message > // iterating it should get an InvalidMessageSizeException > val messages = new ByteBufferMessageSet(NoCompressionCodec, new > Message("01234567890123456789".getBytes())) > - val buffer = messages.serialized.slice > + val buffer = messages.getSerialized().slice > buffer.limit(10) > val messageSetWithNoFullMessage = new ByteBufferMessageSet(buffer = > buffer, initialOffset = 1000) > try { > @@ -51,7 +51,7 @@ class ByteBufferMessageSetTest extends B > { > val messages = new ByteBufferMessageSet(NoCompressionCodec, new > Message("hello".getBytes()), new Message("there".getBytes())) > val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2) > - buffer.put(messages.serialized) > + buffer.put(messages.getSerialized()) > buffer.putShort(4) > val messagesPlus = new ByteBufferMessageSet(buffer) > assertEquals("Adding invalid bytes shouldn't change byte count", > messages.validBytes, messagesPlus.validBytes) > @@ -93,7 +93,7 @@ class ByteBufferMessageSetTest extends B > //make sure ByteBufferMessageSet is re-iterable. > TestUtils.checkEquals[Message](messageList.iterator, > TestUtils.getMessageIterator(messageSet.iterator)) > //make sure the last offset after iteration is correct > - assertEquals("offset of last message not expected", > messageSet.last.offset, messageSet.serialized.limit) > + assertEquals("offset of last message not expected", > messageSet.last.offset, messageSet.getSerialized().limit) > } > > // test for compressed regular messages > @@ -103,7 +103,7 @@ class ByteBufferMessageSetTest extends B > //make sure ByteBufferMessageSet is re-iterable. > TestUtils.checkEquals[Message](messageList.iterator, > TestUtils.getMessageIterator(messageSet.iterator)) > //make sure the last offset after iteration is correct > - assertEquals("offset of last message not expected", > messageSet.last.offset, messageSet.serialized.limit) > + assertEquals("offset of last message not expected", > messageSet.last.offset, messageSet.getSerialized().limit) > } > > // test for mixed empty and non-empty messagesets uncompressed > @@ -111,16 +111,16 @@ class ByteBufferMessageSetTest extends B > val emptyMessageList : List[Message] = Nil > val emptyMessageSet = new ByteBufferMessageSet(NoCompressionCodec, > emptyMessageList: _*) > val regularMessgeSet = new ByteBufferMessageSet(NoCompressionCodec, > messageList: _*) > - val buffer = ByteBuffer.allocate(emptyMessageSet.serialized.limit + > regularMessgeSet.serialized.limit) > - buffer.put(emptyMessageSet.serialized) > - buffer.put(regularMessgeSet.serialized) > + val buffer = > ByteBuffer.allocate(emptyMessageSet.getSerialized().limit + > regularMessgeSet.getSerialized().limit) > + buffer.put(emptyMessageSet.getSerialized()) > + buffer.put(regularMessgeSet.getSerialized()) > buffer.rewind > val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0) > TestUtils.checkEquals[Message](messageList.iterator, > TestUtils.getMessageIterator(mixedMessageSet.iterator)) > //make sure ByteBufferMessageSet is re-iterable. > TestUtils.checkEquals[Message](messageList.iterator, > TestUtils.getMessageIterator(mixedMessageSet.iterator)) > //make sure the last offset after iteration is correct > - assertEquals("offset of last message not expected", > mixedMessageSet.last.offset, mixedMessageSet.serialized.limit) > + assertEquals("offset of last message not expected", > mixedMessageSet.last.offset, mixedMessageSet.getSerialized().limit) > } > > // test for mixed empty and non-empty messagesets compressed > @@ -128,16 +128,16 @@ class ByteBufferMessageSetTest extends B > val emptyMessageList : List[Message] = Nil > val emptyMessageSet = new > ByteBufferMessageSet(DefaultCompressionCodec, emptyMessageList: _*) > val regularMessgeSet = new > ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*) > - val buffer = ByteBuffer.allocate(emptyMessageSet.serialized.limit + > regularMessgeSet.serialized.limit) > - buffer.put(emptyMessageSet.serialized) > - buffer.put(regularMessgeSet.serialized) > + val buffer = > ByteBuffer.allocate(emptyMessageSet.getSerialized().limit + > regularMessgeSet.getSerialized().limit) > + buffer.put(emptyMessageSet.getSerialized()) > + buffer.put(regularMessgeSet.getSerialized()) > buffer.rewind > val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0) > TestUtils.checkEquals[Message](messageList.iterator, > TestUtils.getMessageIterator(mixedMessageSet.iterator)) > //make sure ByteBufferMessageSet is re-iterable. > TestUtils.checkEquals[Message](messageList.iterator, > TestUtils.getMessageIterator(mixedMessageSet.iterator)) > //make sure the last offset after iteration is correct > - assertEquals("offset of last message not expected", > mixedMessageSet.last.offset, mixedMessageSet.serialized.limit) > + assertEquals("offset of last message not expected", > mixedMessageSet.last.offset, mixedMessageSet.getSerialized().limit) > } > } > > > Modified: > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala > URL: > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > > ============================================================================== > --- > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala > (original) > +++ > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala > Sat Mar 3 05:46:43 2012 > @@ -381,11 +381,12 @@ class AsyncProducerTest extends JUnit3Su > val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer]) > mockSyncProducer.send(new TopicMetadataRequest(List(topic))) > EasyMock.expectLastCall().andReturn(List(topic1Metadata)) > - mockSyncProducer.multiSend(EasyMock.aryEq(Array(new > ProducerRequest(topic, 0, messagesToSet(msgs.take(5)))))) > + mockSyncProducer.send(TestUtils.produceRequest(topic, 0, > + messagesToSet(msgs.take(5)))) > EasyMock.expectLastCall > - mockSyncProducer.multiSend(EasyMock.aryEq(Array(new > ProducerRequest(topic, 0, messagesToSet(msgs.takeRight(5)))))) > - EasyMock.expectLastCall > - EasyMock.replay(mockSyncProducer) > + mockSyncProducer.send(TestUtils.produceRequest(topic, 0, > + messagesToSet(msgs.takeRight(5)))) > + EasyMock.replay(mockSyncProducer) > > val producerPool = EasyMock.createMock(classOf[ProducerPool]) > producerPool.getZkClient > @@ -495,10 +496,7 @@ class AsyncProducerTest extends JUnit3Su > } > > class MockProducer(override val config: SyncProducerConfig) extends > SyncProducer(config) { > - override def send(topic: String, messages: ByteBufferMessageSet): > Unit = { > - Thread.sleep(1000) > - } > - override def multiSend(produces: Array[ProducerRequest]) { > + override def send(produceRequest: ProducerRequest): Unit = { > Thread.sleep(1000) > } > } > > Modified: > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala > URL: > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > > ============================================================================== > --- > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala > (original) > +++ > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala > Sat Mar 3 05:46:43 2012 > @@ -44,7 +44,7 @@ class SyncProducerTest extends JUnit3Sui > var failed = false > val firstStart = SystemTime.milliseconds > try { > - producer.send("test", 0, new ByteBufferMessageSet(compressionCodec > = NoCompressionCodec, messages = new Message(messageBytes))) > + producer.send(TestUtils.produceRequest("test", 0, new > ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new > Message(messageBytes)))) > }catch { > case e: Exception => failed=true > } > @@ -54,7 +54,7 @@ class SyncProducerTest extends JUnit3Sui > Assert.assertTrue((firstEnd-firstStart) < 500) > val secondStart = SystemTime.milliseconds > try { > - producer.send("test", 0, new ByteBufferMessageSet(compressionCodec > = NoCompressionCodec, messages = new Message(messageBytes))) > + producer.send(TestUtils.produceRequest("test", 0, new > ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new > Message(messageBytes)))) > }catch { > case e: Exception => failed = true > } > @@ -63,7 +63,7 @@ class SyncProducerTest extends JUnit3Sui > Assert.assertTrue((secondEnd-secondStart) < 500) > > try { > - producer.multiSend(Array(new ProducerRequest("test", 0, new > ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new > Message(messageBytes))))) > + producer.send(TestUtils.produceRequest("test", 0, new > ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new > Message(messageBytes)))) > }catch { > case e: Exception => failed=true > } > @@ -83,7 +83,7 @@ class SyncProducerTest extends JUnit3Sui > val bytes = new Array[Byte](101) > var failed = false > try { > - producer.send("test", 0, new ByteBufferMessageSet(compressionCodec > = NoCompressionCodec, messages = new Message(bytes))) > + producer.send(TestUtils.produceRequest("test", 0, new > ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new > Message(bytes)))) > }catch { > case e: MessageSizeTooLargeException => failed = true > } > > Modified: > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala > URL: > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > > ============================================================================== > --- > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala > (original) > +++ > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala > Sat Mar 3 05:46:43 2012 > @@ -33,6 +33,7 @@ import collection.mutable.ListBuffer > import kafka.consumer.{KafkaMessageStream, ConsumerConfig} > import scala.collection.Map > import kafka.serializer.Encoder > +import kafka.api.{ProducerRequest, TopicData, PartitionData} > > /** > * Utility functions to help with testing > @@ -336,7 +337,47 @@ object TestUtils { > buffer += ("msg" + i) > buffer > } > + /** > + * Create a wired format request based on simple basic information > + */ > + def produceRequest(topic: String, message: ByteBufferMessageSet): > kafka.api.ProducerRequest = { > + > > produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,ProducerRequest.RandomPartition,message) > + } > + def produceRequest(topic: String, partition: Int, message: > ByteBufferMessageSet): kafka.api.ProducerRequest = { > + > > produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,partition,message) > + } > + > + def produceRequest(correlationId: Int, topic: String, partition: Int, > message: ByteBufferMessageSet): kafka.api.ProducerRequest = { > + val clientId = SyncProducerConfig.DefaultClientId > + val requiredAcks: Short = SyncProducerConfig.DefaultRequiredAcks > + val ackTimeout = SyncProducerConfig.DefaultAckTimeoutMs > + var data = new Array[TopicData](1) > + var partitionData = new Array[PartitionData](1) > + partitionData(0) = new PartitionData(partition,message) > + data(0) = new TopicData(topic,partitionData) > + val pr = new kafka.api.ProducerRequest(correlationId, clientId, > requiredAcks, ackTimeout, data) > + pr > + } > > + def produceJavaRequest(topic: String, message: > kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest > = { > + produceJavaRequest(-1,topic,-1,message) > + } > + > + def produceJavaRequest(topic: String, partition: Int, message: > kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest > = { > + produceJavaRequest(-1,topic,partition,message) > + } > + > + def produceJavaRequest(correlationId: Int, topic: String, partition: > Int, message: kafka.javaapi.message.ByteBufferMessageSet): > kafka.javaapi.ProducerRequest = { > + val clientId = "test" > + val requiredAcks: Short = 0 > + val ackTimeout = 0 > + var data = new Array[TopicData](1) > + var partitionData = new Array[PartitionData](1) > + partitionData(0) = new PartitionData(partition,message.underlying) > + data(0) = new TopicData(topic,partitionData) > + val pr = new kafka.javaapi.ProducerRequest(correlationId, clientId, > requiredAcks, ackTimeout, data) > + pr > + } > } > > object TestZKUtils { > > >