Repository: kafka
Updated Branches:
  refs/heads/trunk 1f2ee5f0a -> fc1cfe475


http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index b864e5d..a26bc2e 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -26,8 +26,9 @@ import kafka.integration.KafkaServerTestHarness
 import kafka.network.SocketServer
 import kafka.utils._
 import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.types.Struct
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
-import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader, 
ResponseHeader}
+import org.apache.kafka.common.requests.{AbstractRequest, 
AbstractRequestResponse, RequestHeader, ResponseHeader}
 
 abstract class BaseRequestTest extends KafkaServerTestHarness {
   private var correlationId = 0
@@ -97,41 +98,64 @@ abstract class BaseRequestTest extends 
KafkaServerTestHarness {
   }
 
   /**
-    *
-    * @param request
-    * @param apiKey
     * @param destination An optional SocketServer ot send the request to. If 
not set, any available server is used.
     * @param protocol An optional SecurityProtocol to use. If not set, 
PLAINTEXT is used.
-    * @return
+    * @return A ByteBuffer containing the response (without the response 
header)
     */
-  def send(request: AbstractRequest, apiKey: ApiKeys,
-           destination: SocketServer = anySocketServer, protocol: 
SecurityProtocol = SecurityProtocol.PLAINTEXT): ByteBuffer = {
+  def connectAndSend(request: AbstractRequest, apiKey: ApiKeys,
+                     destination: SocketServer = anySocketServer,
+                     apiVersion: Option[Short] = None,
+                     protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): 
ByteBuffer = {
     val socket = connect(destination, protocol)
-    try {
-      send(request, apiKey, socket)
-    } finally {
-      socket.close()
-    }
+    try send(request, apiKey, socket, apiVersion)
+    finally socket.close()
   }
 
   /**
-    * Serializes and send the request to the given api.
+    * @param destination An optional SocketServer ot send the request to. If 
not set, any available server is used.
+    * @param protocol An optional SecurityProtocol to use. If not set, 
PLAINTEXT is used.
+    * @return A ByteBuffer containing the response (without the response 
header).
+    */
+  def connectAndSendStruct(requestStruct: Struct, apiKey: ApiKeys, apiVersion: 
Short,
+                           destination: SocketServer = anySocketServer,
+                           protocol: SecurityProtocol = 
SecurityProtocol.PLAINTEXT): ByteBuffer = {
+    val socket = connect(destination, protocol)
+    try sendStruct(requestStruct, apiKey, socket, apiVersion)
+    finally socket.close()
+  }
+
+  /**
+    * Serializes and sends the request to the given api.
     * A ByteBuffer containing the response is returned.
     */
-  def send(request: AbstractRequest, apiKey: ApiKeys, socket: Socket): 
ByteBuffer = {
-    correlationId += 1
-    val serializedBytes = {
-      val header = new RequestHeader(apiKey.id, request.version, "client-id", 
correlationId)
-      val byteBuffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf)
-      header.writeTo(byteBuffer)
-      request.writeTo(byteBuffer)
-      byteBuffer.array()
-    }
+  def send(request: AbstractRequest, apiKey: ApiKeys, socket: Socket, 
apiVersion: Option[Short] = None): ByteBuffer = {
+    val header = nextRequestHeader(apiKey, 
apiVersion.getOrElse(request.version))
+    val serializedBytes = request.serialize(header).array
+    val response = requestAndReceive(socket, serializedBytes)
+    skipResponseHeader(response)
+  }
 
+  /**
+    * Serializes and sends the requestStruct to the given api.
+    * A ByteBuffer containing the response (without the response header) is 
returned.
+    */
+  def sendStruct(requestStruct: Struct, apiKey: ApiKeys, socket: Socket, 
apiVersion: Short): ByteBuffer = {
+    val header = nextRequestHeader(apiKey, apiVersion)
+    val serializedBytes = AbstractRequestResponse.serialize(header.toStruct, 
requestStruct).array
     val response = requestAndReceive(socket, serializedBytes)
+    skipResponseHeader(response)
+  }
 
+  private def skipResponseHeader(response: Array[Byte]): ByteBuffer = {
     val responseBuffer = ByteBuffer.wrap(response)
-    ResponseHeader.parse(responseBuffer) // Parse the header to ensure its 
valid and move the buffer forward
+    // Parse the header to ensure its valid and move the buffer forward
+    ResponseHeader.parse(responseBuffer)
     responseBuffer
   }
+
+  def nextRequestHeader(apiKey: ApiKeys, apiVersion: Short): RequestHeader = {
+    correlationId += 1
+    new RequestHeader(apiKey.id, apiVersion, "client-id", correlationId)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
index 6efa189..4ab9520 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
@@ -116,30 +116,27 @@ class CreateTopicsRequestTest extends 
AbstractCreateTopicsRequestTest {
     // Duplicate
     val singleRequest = new CreateTopicsRequest.Builder(Map("duplicate-topic" 
->
         new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 
1000).build()
-    val duplicateRequest = duplicateFirstTopic(singleRequest)
-    assertFalse("Request doesn't have duplicate topics", 
duplicateRequest.duplicateTopics().isEmpty)
-    validateErrorCreateTopicsRequests(duplicateRequest, Map("duplicate-topic" 
-> error(Errors.INVALID_REQUEST,
-      Some("""Create topics request from client `client-id` contains multiple 
entries for the following topics: duplicate-topic"""))))
+    validateErrorCreateTopicsRequests(singleRequest, Map("duplicate-topic" -> 
error(Errors.INVALID_REQUEST,
+      Some("""Create topics request from client `client-id` contains multiple 
entries for the following topics: duplicate-topic"""))),
+      requestStruct = Some(toStructWithDuplicateFirstTopic(singleRequest)))
 
     // Duplicate Partial with validateOnly
     val doubleRequestValidateOnly = new CreateTopicsRequest.Builder(Map(
       "duplicate-topic" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort),
       "other-topic" -> new CreateTopicsRequest.TopicDetails(1, 
1.toShort)).asJava, 1000, true).build()
-    val duplicateDoubleRequestValidateOnly = 
duplicateFirstTopic(doubleRequestValidateOnly)
-    assertFalse("Request doesn't have duplicate topics", 
duplicateDoubleRequestValidateOnly.duplicateTopics.isEmpty)
-    validateErrorCreateTopicsRequests(duplicateDoubleRequestValidateOnly, Map(
+    validateErrorCreateTopicsRequests(doubleRequestValidateOnly, Map(
       "duplicate-topic" -> error(Errors.INVALID_REQUEST),
-      "other-topic" -> error(Errors.NONE)), checkErrorMessage = false)
+      "other-topic" -> error(Errors.NONE)), checkErrorMessage = false,
+      requestStruct = 
Some(toStructWithDuplicateFirstTopic(doubleRequestValidateOnly)))
 
     // Duplicate Partial
     val doubleRequest = new CreateTopicsRequest.Builder(Map(
       "duplicate-topic" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort),
       "other-topic" -> new CreateTopicsRequest.TopicDetails(1, 
1.toShort)).asJava, 1000).build()
-    val duplicateDoubleRequest = duplicateFirstTopic(doubleRequest)
-    assertFalse("Request doesn't have duplicate topics", 
duplicateDoubleRequest.duplicateTopics.isEmpty)
-    validateErrorCreateTopicsRequests(duplicateDoubleRequest, Map(
+    validateErrorCreateTopicsRequests(doubleRequest, Map(
       "duplicate-topic" -> error(Errors.INVALID_REQUEST),
-      "other-topic" -> error(Errors.NONE)), checkErrorMessage = false)
+      "other-topic" -> error(Errors.NONE)), checkErrorMessage = false,
+      requestStruct = Some(toStructWithDuplicateFirstTopic(doubleRequest)))
 
     // Partitions/ReplicationFactor and ReplicaAssignment
     val assignments = replicaAssignmentToJava(Map(0 -> List(0)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
index 9a092d0..9cd53d8 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
@@ -112,12 +112,12 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
   }
 
   private def sendDeleteTopicsRequest(request: DeleteTopicsRequest, 
socketServer: SocketServer = controllerSocketServer): DeleteTopicsResponse = {
-    val response = send(request, ApiKeys.DELETE_TOPICS, socketServer)
+    val response = connectAndSend(request, ApiKeys.DELETE_TOPICS, socketServer)
     DeleteTopicsResponse.parse(response, request.version)
   }
 
   private def sendMetadataRequest(request: MetadataRequest): MetadataResponse 
= {
-    val response = send(request, ApiKeys.METADATA)
-    MetadataResponse.parse(response)
+    val response = connectAndSend(request, ApiKeys.METADATA)
+    MetadataResponse.parse(response, request.version)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
index 1f9e18b..2d4a22a 100755
--- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
@@ -114,15 +114,15 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
     val correlationId = -1
     TestUtils.createTopic(zkUtils, topic, numPartitions = 1, replicationFactor 
= 1, servers = servers)
 
+    val version = 2: Short
     val serializedBytes = {
-      val headerBytes = requestHeaderBytes(ApiKeys.PRODUCE.id, 2, null, 
correlationId)
+      val headerBytes = requestHeaderBytes(ApiKeys.PRODUCE.id, version, null, 
correlationId)
       val messageBytes = "message".getBytes
       val records = 
MemoryRecords.readableRecords(ByteBuffer.wrap(messageBytes))
-      val request = new ProduceRequest.Builder(
-          1, 10000, Map(topicPartition -> records).asJava).build()
-      val byteBuffer = ByteBuffer.allocate(headerBytes.length + request.sizeOf)
+      val request = new ProduceRequest.Builder(1, 10000, Map(topicPartition -> 
records).asJava).build()
+      val byteBuffer = ByteBuffer.allocate(headerBytes.length + 
request.toStruct.sizeOf)
       byteBuffer.put(headerBytes)
-      request.writeTo(byteBuffer)
+      request.toStruct.writeTo(byteBuffer)
       byteBuffer.array()
     }
 
@@ -130,13 +130,13 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
 
     val responseBuffer = ByteBuffer.wrap(response)
     val responseHeader = ResponseHeader.parse(responseBuffer)
-    val produceResponse = ProduceResponse.parse(responseBuffer)
+    val produceResponse = ProduceResponse.parse(responseBuffer, version)
 
-    assertEquals("The response should parse completely", 0, 
responseBuffer.remaining())
-    assertEquals("The correlationId should match request", correlationId, 
responseHeader.correlationId())
-    assertEquals("One partition response should be returned", 1, 
produceResponse.responses().size())
+    assertEquals("The response should parse completely", 0, 
responseBuffer.remaining)
+    assertEquals("The correlationId should match request", correlationId, 
responseHeader.correlationId)
+    assertEquals("One partition response should be returned", 1, 
produceResponse.responses.size)
 
-    val partitionResponse = produceResponse.responses().get(topicPartition)
+    val partitionResponse = produceResponse.responses.get(topicPartition)
     assertNotNull(partitionResponse)
     assertEquals("There should be no error", Errors.NONE, 
partitionResponse.error)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 3811360..64be5b3 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -24,7 +24,7 @@ import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils}
 import org.apache.kafka.common.record.LogEntry
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
 import org.apache.kafka.common.serialization.StringSerializer
@@ -56,7 +56,7 @@ class FetchRequestTest extends BaseRequestTest {
 
   private def createFetchRequest(maxResponseBytes: Int, maxPartitionBytes: 
Int, topicPartitions: Seq[TopicPartition],
                                  offsetMap: Map[TopicPartition, Long] = 
Map.empty): FetchRequest =
-    new FetchRequest.Builder(Int.MaxValue, 0, 
createPartitionMap(maxPartitionBytes, topicPartitions, offsetMap))
+    FetchRequest.Builder.forConsumer(Int.MaxValue, 0, 
createPartitionMap(maxPartitionBytes, topicPartitions, offsetMap))
       .setMaxBytes(maxResponseBytes).build()
 
   private def createPartitionMap(maxPartitionBytes: Int, topicPartitions: 
Seq[TopicPartition],
@@ -69,8 +69,8 @@ class FetchRequestTest extends BaseRequestTest {
   }
 
   private def sendFetchRequest(leaderId: Int, request: FetchRequest): 
FetchResponse = {
-    val response = send(request, ApiKeys.FETCH, destination = 
brokerSocketServer(leaderId))
-    FetchResponse.parse(response)
+    val response = connectAndSend(request, ApiKeys.FETCH, destination = 
brokerSocketServer(leaderId))
+    FetchResponse.parse(response, ProtoUtils.latestVersion(ApiKeys.FETCH.id))
   }
 
   @Test
@@ -156,10 +156,9 @@ class FetchRequestTest extends BaseRequestTest {
     val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions 
= 1).head
     producer.send(new ProducerRecord(topicPartition.topic, 
topicPartition.partition,
       "key", new String(new Array[Byte](maxPartitionBytes + 1)))).get
-    val fetchRequestBuilder = new FetchRequest.Builder(
-      Int.MaxValue, 0, createPartitionMap(maxPartitionBytes, 
Seq(topicPartition))).
-      setVersion(2)
-    val fetchResponse = sendFetchRequest(leaderId, fetchRequestBuilder.build())
+    val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0,
+      createPartitionMap(maxPartitionBytes, Seq(topicPartition))).build(2)
+    val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
     val partitionData = fetchResponse.responseData.get(topicPartition)
     assertEquals(Errors.NONE, partitionData.error)
     assertTrue(partitionData.highWatermark > 0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 99a95ad..1b5007d 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -22,7 +22,7 @@ import util.Arrays.asList
 import kafka.common.BrokerEndPointNotAvailableException
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils, 
SecurityProtocol}
 import org.apache.kafka.common.requests.{PartitionState, UpdateMetadataRequest}
 import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, 
EndPoint}
 import org.junit.Test
@@ -69,8 +69,9 @@ class MetadataCacheTest {
       new TopicPartition(topic, 1) -> new PartitionState(controllerEpoch, 1, 
1, asList(1), zkVersion, asSet(1)),
       new TopicPartition(topic, 2) -> new PartitionState(controllerEpoch, 2, 
2, asList(2), zkVersion, asSet(2)))
 
-    val updateMetadataRequest = new UpdateMetadataRequest.Builder(
-      controllerId, controllerEpoch, partitionStates.asJava, 
brokers.asJava).build()
+    val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)
+    val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
controllerId, controllerEpoch,
+      partitionStates.asJava, brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
 
     for (securityProtocol <- Seq(SecurityProtocol.PLAINTEXT, 
SecurityProtocol.SSL)) {
@@ -120,8 +121,9 @@ class MetadataCacheTest {
     val partitionStates = Map(
       new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, 
leader, leaderEpoch, asList(0), zkVersion, asSet(0)))
 
-    val updateMetadataRequest = new UpdateMetadataRequest.Builder(
-      controllerId, controllerEpoch, partitionStates.asJava, 
brokers.asJava).build()
+    val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)
+    val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
controllerId, controllerEpoch,
+      partitionStates.asJava, brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
 
     val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName)
@@ -163,8 +165,9 @@ class MetadataCacheTest {
     val partitionStates = Map(
       new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, 
leader, leaderEpoch, isr, zkVersion, replicas))
 
-    val updateMetadataRequest = new UpdateMetadataRequest.Builder(
-      controllerId, controllerEpoch, partitionStates.asJava, 
brokers.asJava).build()
+    val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)
+    val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
controllerId, controllerEpoch,
+      partitionStates.asJava, brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
 
     // Validate errorUnavailableEndpoints = false
@@ -222,8 +225,9 @@ class MetadataCacheTest {
     val partitionStates = Map(
       new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, 
leader, leaderEpoch, isr, zkVersion, replicas))
 
-    val updateMetadataRequest = new UpdateMetadataRequest.Builder(
-      controllerId, controllerEpoch, partitionStates.asJava, 
brokers.asJava).build()
+    val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)
+    val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
controllerId, controllerEpoch,
+      partitionStates.asJava, brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
 
     // Validate errorUnavailableEndpoints = false
@@ -273,8 +277,9 @@ class MetadataCacheTest {
     val isr = asList[Integer](0, 1)
     val partitionStates = Map(
       new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, 
leader, leaderEpoch, isr, 3, replicas))
-    val updateMetadataRequest = new UpdateMetadataRequest.Builder(
-      2, controllerEpoch, partitionStates.asJava, brokers.asJava).build()
+    val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)
+    val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, 
controllerEpoch, partitionStates.asJava,
+      brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
 
     try {
@@ -305,8 +310,9 @@ class MetadataCacheTest {
       val isr = asList[Integer](0, 1)
       val partitionStates = Map(
         new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, 
leader, leaderEpoch, isr, 3, replicas))
-      val updateMetadataRequest = new UpdateMetadataRequest.Builder(
-        2, controllerEpoch, partitionStates.asJava, brokers.asJava).build()
+      val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)
+      val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
2, controllerEpoch, partitionStates.asJava,
+        brokers.asJava).build()
       cache.updateCache(15, updateMetadataRequest)
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index f3bb912..ed0e805 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -36,14 +36,14 @@ class MetadataRequestTest extends BaseRequestTest {
 
   @Test
   def testClusterIdWithRequestVersion1() {
-    val v1MetadataResponse = 
sendMetadataRequest(MetadataRequest.allTopics(1.toShort))
+    val v1MetadataResponse = 
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
     val v1ClusterId = v1MetadataResponse.clusterId
     assertNull(s"v1 clusterId should be null", v1ClusterId)
   }
 
   @Test
   def testClusterIdIsValid() {
-    val metadataResponse = 
sendMetadataRequest(MetadataRequest.allTopics(2.toShort))
+    val metadataResponse = 
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(2.toShort))
     isValidClusterId(metadataResponse.clusterId)
   }
 
@@ -51,7 +51,7 @@ class MetadataRequestTest extends BaseRequestTest {
   def testControllerId() {
     val controllerServer = servers.find(_.kafkaController.isActive).get
     val controllerId = controllerServer.config.brokerId
-    val metadataResponse = 
sendMetadataRequest(MetadataRequest.allTopics(1.toShort))
+    val metadataResponse = 
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
 
     assertEquals("Controller id should match the active controller",
       controllerId, metadataResponse.controller.id)
@@ -64,14 +64,14 @@ class MetadataRequestTest extends BaseRequestTest {
     val controllerId2 = controllerServer2.config.brokerId
     assertNotEquals("Controller id should switch to a new broker", 
controllerId, controllerId2)
     TestUtils.waitUntilTrue(() => {
-      val metadataResponse2 = 
sendMetadataRequest(MetadataRequest.allTopics(1.toShort))
+      val metadataResponse2 = 
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
       metadataResponse2.controller != null && controllerServer2.apis.brokerId 
== metadataResponse2.controller.id
     }, "Controller id should match the active controller after failover", 5000)
   }
 
   @Test
   def testRack() {
-    val metadataResponse = 
sendMetadataRequest(MetadataRequest.allTopics(1.toShort))
+    val metadataResponse = 
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
     // Validate rack matches what's set in generateConfigs() above
     metadataResponse.brokers.asScala.foreach { broker =>
       assertEquals("Rack information should match config", 
s"rack/${broker.id}", broker.rack)
@@ -86,7 +86,7 @@ class MetadataRequestTest extends BaseRequestTest {
     TestUtils.createTopic(zkUtils, internalTopic, 3, 2, servers)
     TestUtils.createTopic(zkUtils, notInternalTopic, 3, 2, servers)
 
-    val metadataResponse = 
sendMetadataRequest(MetadataRequest.allTopics(1.toShort))
+    val metadataResponse = 
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
     assertTrue("Response should have no errors", 
metadataResponse.errors.isEmpty)
 
     val topicMetadata = metadataResponse.topicMetadata.asScala
@@ -124,7 +124,7 @@ class MetadataRequestTest extends BaseRequestTest {
     assertEquals("V0 Response should have 2 (all) topics", 2, 
metadataResponseV0.topicMetadata.size())
 
     // v1, Null represents all topics
-    val metadataResponseV1 = 
sendMetadataRequest(MetadataRequest.allTopics(1.toShort))
+    val metadataResponseV1 = 
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
     assertTrue("V1 Response should have no errors", 
metadataResponseV1.errors.isEmpty)
     assertEquals("V1 Response should have 2 (all) topics", 2, 
metadataResponseV1.topicMetadata.size())
   }
@@ -177,7 +177,7 @@ class MetadataRequestTest extends BaseRequestTest {
   }
 
   private def sendMetadataRequest(request: MetadataRequest): MetadataResponse 
= {
-    val response = send(request, ApiKeys.METADATA)
+    val response = connectAndSend(request, ApiKeys.METADATA)
     MetadataResponse.parse(response, request.version)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index b05be9d..81118fa 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -88,8 +88,8 @@ class ProduceRequestTest extends BaseRequestTest {
   }
 
   private def sendProduceRequest(leaderId: Int, request: ProduceRequest): 
ProduceResponse = {
-    val response = send(request, ApiKeys.PRODUCE, destination = 
brokerSocketServer(leaderId))
-    ProduceResponse.parse(response)
+    val response = connectAndSend(request, ApiKeys.PRODUCE, destination = 
brokerSocketServer(leaderId))
+    ProduceResponse.parse(response, request.version)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 927ace9..92a518d 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -20,14 +20,13 @@ import java.io.IOException
 import java.net.Socket
 import java.util.Collections
 
-import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils, 
SecurityProtocol}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
 import org.apache.kafka.common.requests.{ApiVersionsRequest, 
ApiVersionsResponse}
 import org.apache.kafka.common.requests.SaslHandshakeRequest
 import org.apache.kafka.common.requests.SaslHandshakeResponse
 import org.junit.Test
 import org.junit.Assert._
 import kafka.api.SaslTestHarness
-import org.apache.kafka.common.protocol.types.Struct
 
 class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness {
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
@@ -42,8 +41,7 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with 
SaslTestHarness {
   def testApiVersionsRequestBeforeSaslHandshakeRequest() {
     val plaintextSocket = connect(protocol = securityProtocol)
     try {
-      val apiVersionsResponse = sendApiVersionsRequest(plaintextSocket,
-          new ApiVersionsRequest.Builder().setVersion(0).build())
+      val apiVersionsResponse = sendApiVersionsRequest(plaintextSocket, new 
ApiVersionsRequest.Builder().build(0))
       ApiVersionsRequestTest.validateApiVersionsResponse(apiVersionsResponse)
       sendSaslHandshakeRequestValidateResponse(plaintextSocket)
     } finally {
@@ -57,8 +55,7 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with 
SaslTestHarness {
     try {
       sendSaslHandshakeRequestValidateResponse(plaintextSocket)
       try {
-        sendApiVersionsRequest(plaintextSocket,
-            new ApiVersionsRequest.Builder().setVersion(0).build())
+        sendApiVersionsRequest(plaintextSocket, new 
ApiVersionsRequest.Builder().build(0))
         fail("Versions Request during Sasl handshake did not fail")
       } catch {
         case _: IOException => // expected exception
@@ -72,12 +69,10 @@ class SaslApiVersionsRequestTest extends BaseRequestTest 
with SaslTestHarness {
   def testApiVersionsRequestWithUnsupportedVersion() {
     val plaintextSocket = connect(protocol = securityProtocol)
     try {
-      val apiVersionsRequest = new ApiVersionsRequest(
-        new Struct(ProtoUtils.requestSchema(ApiKeys.API_VERSIONS.id, 0)), 
Short.MaxValue);
-      val apiVersionsResponse = sendApiVersionsRequest(plaintextSocket, 
apiVersionsRequest)
+      val apiVersionsRequest = new ApiVersionsRequest(0)
+      val apiVersionsResponse = sendApiVersionsRequest(plaintextSocket, 
apiVersionsRequest, Some(Short.MaxValue))
       assertEquals(Errors.UNSUPPORTED_VERSION, apiVersionsResponse.error)
-      val apiVersionsResponse2 = sendApiVersionsRequest(plaintextSocket,
-          new ApiVersionsRequest.Builder().setVersion(0).build())
+      val apiVersionsResponse2 = sendApiVersionsRequest(plaintextSocket, new 
ApiVersionsRequest.Builder().build(0))
       ApiVersionsRequestTest.validateApiVersionsResponse(apiVersionsResponse2)
       sendSaslHandshakeRequestValidateResponse(plaintextSocket)
     } finally {
@@ -85,15 +80,17 @@ class SaslApiVersionsRequestTest extends BaseRequestTest 
with SaslTestHarness {
     }
   }
 
-  private def sendApiVersionsRequest(socket: Socket, request: 
ApiVersionsRequest): ApiVersionsResponse = {
-    val response = send(request, ApiKeys.API_VERSIONS, socket)
-    ApiVersionsResponse.parse(response)
+  private def sendApiVersionsRequest(socket: Socket, request: 
ApiVersionsRequest,
+                                     apiVersion: Option[Short] = None): 
ApiVersionsResponse = {
+    val response = send(request, ApiKeys.API_VERSIONS, socket, apiVersion)
+    ApiVersionsResponse.parse(response, request.version)
   }
 
   private def sendSaslHandshakeRequestValidateResponse(socket: Socket) {
-    val response = send(new SaslHandshakeRequest("PLAIN"), 
ApiKeys.SASL_HANDSHAKE, socket)
-    val handshakeResponse = SaslHandshakeResponse.parse(response)
+    val request = new SaslHandshakeRequest("PLAIN")
+    val response = send(request, ApiKeys.SASL_HANDSHAKE, socket)
+    val handshakeResponse = SaslHandshakeResponse.parse(response, 
request.version)
     assertEquals(Errors.NONE, handshakeResponse.error)
-    assertEquals(Collections.singletonList("PLAIN"), 
handshakeResponse.enabledMechanisms())
+    assertEquals(Collections.singletonList("PLAIN"), 
handshakeResponse.enabledMechanisms)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 69b3c46..5399653 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -106,8 +106,8 @@ public class InternalTopicManagerTest {
             MetadataResponse.PartitionMetadata partitionMetadata = new 
MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, new ArrayList<Node>(), 
new ArrayList<Node>());
             MetadataResponse.TopicMetadata topicMetadata = new 
MetadataResponse.TopicMetadata(Errors.NONE, topic, true, 
Collections.singletonList(partitionMetadata));
             MetadataResponse response = new 
MetadataResponse(Collections.<Node>emptyList(), null, 
MetadataResponse.NO_CONTROLLER_ID,
-                Collections.singletonList(topicMetadata), 0);
+                Collections.singletonList(topicMetadata));
             return response;
         }
     }
-}
\ No newline at end of file
+}

Reply via email to