Repository: kafka Updated Branches: refs/heads/trunk f15cdbc91 -> 47ee8e954
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index cc061da..cc6a394 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -22,13 +22,22 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; +import java.util.Random; +import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.memory.SimpleMemoryPool; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.utils.MockTime; @@ -50,7 +59,7 @@ public class SelectorTest { protected Time time; protected Selector selector; protected ChannelBuilder channelBuilder; - private Metrics metrics; + protected Metrics metrics; @Before public void setUp() throws Exception { @@ -322,6 +331,87 @@ public class SelectorTest { assertTrue("Unexpected receive", selector.completedReceives().isEmpty()); } + @Test + public void testMuteOnOOM() throws Exception { + //clean up default selector, replace it with one that uses a finite mem pool + selector.close(); + MemoryPool pool = new SimpleMemoryPool(900, 900, false, null); + selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, "MetricGroup", + new HashMap<String, String>(), true, false, channelBuilder, pool); + + try (ServerSocketChannel ss = ServerSocketChannel.open()) { + ss.bind(new InetSocketAddress(0)); + + InetSocketAddress serverAddress = (InetSocketAddress) ss.getLocalAddress(); + + Thread sender1 = createSender(serverAddress, randomPayload(900)); + Thread sender2 = createSender(serverAddress, randomPayload(900)); + sender1.start(); + sender2.start(); + + //wait until everything has been flushed out to network (assuming payload size is smaller than OS buffer size) + //this is important because we assume both requests' prefixes (1st 4 bytes) have made it. + sender1.join(5000); + sender2.join(5000); + + SocketChannel channelX = ss.accept(); //not defined if its 1 or 2 + channelX.configureBlocking(false); + SocketChannel channelY = ss.accept(); + channelY.configureBlocking(false); + selector.register("clientX", channelX); + selector.register("clientY", channelY); + + List<NetworkReceive> completed = Collections.emptyList(); + long deadline = System.currentTimeMillis() + 5000; + while (System.currentTimeMillis() < deadline && completed.isEmpty()) { + selector.poll(1000); + completed = selector.completedReceives(); + } + assertEquals("could not read a single request within timeout", 1, completed.size()); + NetworkReceive firstReceive = completed.get(0); + assertEquals(0, pool.availableMemory()); + assertTrue(selector.isOutOfMemory()); + + selector.poll(10); + assertTrue(selector.completedReceives().isEmpty()); + assertEquals(0, pool.availableMemory()); + assertTrue(selector.isOutOfMemory()); + + firstReceive.close(); + assertEquals(900, pool.availableMemory()); //memory has been released back to pool + + completed = Collections.emptyList(); + deadline = System.currentTimeMillis() + 5000; + while (System.currentTimeMillis() < deadline && completed.isEmpty()) { + selector.poll(1000); + completed = selector.completedReceives(); + } + assertEquals("could not read a single request within timeout", 1, selector.completedReceives().size()); + assertEquals(0, pool.availableMemory()); + assertFalse(selector.isOutOfMemory()); + } + } + + private Thread createSender(InetSocketAddress serverAddress, byte[] payload) { + return new PlaintextSender(serverAddress, payload); + } + + protected byte[] randomPayload(int sizeBytes) throws Exception { + Random random = new Random(); + byte[] payload = new byte[sizeBytes + 4]; + random.nextBytes(payload); + ByteArrayOutputStream prefixOs = new ByteArrayOutputStream(); + DataOutputStream prefixDos = new DataOutputStream(prefixOs); + prefixDos.writeInt(sizeBytes); + prefixDos.flush(); + prefixDos.close(); + prefixOs.flush(); + prefixOs.close(); + byte[] prefix = prefixOs.toByteArray(); + System.arraycopy(prefix, 0, payload, 0, prefix.length); + return payload; + } + private String blockingRequest(String node, String s) throws IOException { selector.send(createSend(node, s)); selector.poll(1000L); http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java index e272855..46d3b79 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java @@ -17,17 +17,23 @@ package org.apache.kafka.common.network; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; +import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.memory.SimpleMemoryPool; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.protocol.SecurityProtocol; @@ -35,6 +41,7 @@ import org.apache.kafka.common.security.ssl.SslFactory; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.test.TestSslUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -43,7 +50,6 @@ import org.junit.Test; */ public class SslSelectorTest extends SelectorTest { - private Metrics metrics; private Map<String, Object> sslClientConfigs; @Before @@ -160,6 +166,90 @@ public class SslSelectorTest extends SelectorTest { } + @Override + public void testMuteOnOOM() throws Exception { + //clean up default selector, replace it with one that uses a finite mem pool + selector.close(); + MemoryPool pool = new SimpleMemoryPool(900, 900, false, null); + //the initial channel builder is for clients, we need a server one + File trustStoreFile = File.createTempFile("truststore", ".jks"); + Map<String, Object> sslServerConfigs = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server"); + sslServerConfigs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); + channelBuilder = new SslChannelBuilder(Mode.SERVER); + channelBuilder.configure(sslServerConfigs); + selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, "MetricGroup", + new HashMap<String, String>(), true, false, channelBuilder, pool); + + try (ServerSocketChannel ss = ServerSocketChannel.open()) { + ss.bind(new InetSocketAddress(0)); + + InetSocketAddress serverAddress = (InetSocketAddress) ss.getLocalAddress(); + + SslSender sender1 = createSender(serverAddress, randomPayload(900)); + SslSender sender2 = createSender(serverAddress, randomPayload(900)); + sender1.start(); + sender2.start(); + + SocketChannel channelX = ss.accept(); //not defined if its 1 or 2 + channelX.configureBlocking(false); + SocketChannel channelY = ss.accept(); + channelY.configureBlocking(false); + selector.register("clientX", channelX); + selector.register("clientY", channelY); + + boolean success = false; + NetworkReceive firstReceive = null; + long deadline = System.currentTimeMillis() + 5000; + //keep calling poll until: + //1. both senders have completed the handshakes (so server selector has tried reading both payloads) + //2. a single payload is actually read out completely (the other is too big to fit) + while (System.currentTimeMillis() < deadline) { + selector.poll(10); + + List<NetworkReceive> completed = selector.completedReceives(); + if (firstReceive == null) { + if (!completed.isEmpty()) { + assertEquals("expecting a single request", 1, completed.size()); + firstReceive = completed.get(0); + assertTrue(selector.isMadeReadProgressLastPoll()); + assertEquals(0, pool.availableMemory()); + } + } else { + assertTrue("only expecting single request", completed.isEmpty()); + } + + boolean handshaked = sender1.waitForHandshake(1); + handshaked = handshaked && sender2.waitForHandshake(1); + + if (handshaked && firstReceive != null) { + success = true; + break; + } + } + if (!success) { + Assert.fail("could not initiate connections within timeout"); + } + + selector.poll(10); + assertTrue(selector.completedReceives().isEmpty()); + assertEquals(0, pool.availableMemory()); + assertTrue(selector.isOutOfMemory()); + + firstReceive.close(); + assertEquals(900, pool.availableMemory()); //memory has been released back to pool + + List<NetworkReceive> completed = Collections.emptyList(); + deadline = System.currentTimeMillis() + 5000; + while (System.currentTimeMillis() < deadline && completed.isEmpty()) { + selector.poll(1000); + completed = selector.completedReceives(); + } + assertEquals("could not read remaining request within timeout", 1, completed.size()); + assertEquals(0, pool.availableMemory()); + assertFalse(selector.isOutOfMemory()); + } + } + /** * Connects and waits for handshake to complete. This is required since SslTransportLayer * implementation requires the channel to be ready before send is invoked (unlike plaintext @@ -169,4 +259,7 @@ public class SslSelectorTest extends SelectorTest { blockingConnect(node, serverAddr); } + private SslSender createSender(InetSocketAddress serverAddress, byte[] payload) { + return new SslSender(serverAddress, payload); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/network/SslSender.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSender.java b/clients/src/test/java/org/apache/kafka/common/network/SslSender.java new file mode 100644 index 0000000..cae69cb --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/network/SslSender.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.network; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocket; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class SslSender extends Thread { + + private final InetSocketAddress serverAddress; + private final byte[] payload; + private final CountDownLatch handshaked = new CountDownLatch(1); + + public SslSender(InetSocketAddress serverAddress, byte[] payload) { + this.serverAddress = serverAddress; + this.payload = payload; + setDaemon(true); + setName("SslSender - " + payload.length + " bytes @ " + serverAddress); + } + + @Override + public void run() { + try { + SSLContext sc = SSLContext.getInstance("TLSv1.2"); + sc.init(null, new TrustManager[]{new NaiveTrustManager()}, new java.security.SecureRandom()); + try (SSLSocket connection = (SSLSocket) sc.getSocketFactory().createSocket(serverAddress.getAddress(), serverAddress.getPort())) { + OutputStream os = connection.getOutputStream(); + connection.startHandshake(); + handshaked.countDown(); + os.write(payload); + os.flush(); + } + } catch (Exception e) { + e.printStackTrace(System.err); + } + } + + public boolean waitForHandshake(long timeoutMillis) throws InterruptedException { + return handshaked.await(timeoutMillis, TimeUnit.MILLISECONDS); + } + + /** + * blindly trust any certificate presented to it + */ + private static class NaiveTrustManager implements X509TrustManager { + @Override + public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { + //nop + } + + @Override + public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { + //nop + } + + @Override + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index bb5d2a7..8338ad7 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -39,6 +39,7 @@ import javax.net.ssl.SSLParameters; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.security.TestSecurityConfig; import org.apache.kafka.common.security.ssl.SslFactory; import org.apache.kafka.common.metrics.Metrics; @@ -580,7 +581,7 @@ public class SslTransportLayerTest { public void testNetworkThreadTimeRecorded() throws Exception { selector.close(); this.selector = new Selector(NetworkReceive.UNLIMITED, 5000, new Metrics(), Time.SYSTEM, - "MetricGroup", new HashMap<String, String>(), false, true, channelBuilder); + "MetricGroup", new HashMap<String, String>(), false, true, channelBuilder, MemoryPool.NONE); String node = "0"; server = createEchoServer(SecurityProtocol.SSL); http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java new file mode 100644 index 0000000..2b2cc91 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.protocol; + +import org.junit.Assert; +import org.junit.Test; + +public class ProtoUtilsTest { + @Test + public void testDelayedAllocationSchemaDetection() throws Exception { + //verifies that schemas known to retain a reference to the underlying byte buffer are correctly detected. + for (ApiKeys key : ApiKeys.values()) { + if (key == ApiKeys.PRODUCE || key == ApiKeys.JOIN_GROUP || key == ApiKeys.SYNC_GROUP) { + Assert.assertTrue(Protocol.requiresDelayedDeallocation(key.id)); + } else { + Assert.assertFalse(Protocol.requiresDelayedDeallocation(key.id)); + } + } + + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java index 2d6d05c..3feeff2 100755 --- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java @@ -36,6 +36,7 @@ import java.util.Collections; import java.util.Random; import static org.apache.kafka.common.utils.Utils.formatAddress; +import static org.apache.kafka.common.utils.Utils.formatBytes; import static org.apache.kafka.common.utils.Utils.getHost; import static org.apache.kafka.common.utils.Utils.getPort; import static org.junit.Assert.assertArrayEquals; @@ -78,6 +79,17 @@ public class UtilsTest { } @Test + public void testFormatBytes() { + assertEquals("-1", formatBytes(-1)); + assertEquals("1023 B", formatBytes(1023)); + assertEquals("1 KB", formatBytes(1024)); + assertEquals("1024 KB", formatBytes((1024 * 1024) - 1)); + assertEquals("1 MB", formatBytes(1024 * 1024)); + assertEquals("1.1 MB", formatBytes((long) (1.1 * 1024 * 1024))); + assertEquals("10 MB", formatBytes(10 * 1024 * 1024)); + } + + @Test public void testJoin() { assertEquals("", Utils.join(Collections.emptyList(), ",")); assertEquals("1", Utils.join(Arrays.asList("1"), ",")); http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/main/scala/kafka/network/RequestChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index bd71340..6b8dbaa 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -29,9 +29,10 @@ import kafka.server.QuotaId import kafka.utils.{Logging, NotNothing} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.InvalidRequestException +import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.network.{ListenerName, Send} import org.apache.kafka.common.protocol.{ApiKeys, Protocol, SecurityProtocol} -import org.apache.kafka.common.record.{RecordBatch, MemoryRecords} +import org.apache.kafka.common.record.{MemoryRecords, RecordBatch} import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.Time @@ -41,7 +42,7 @@ import scala.reflect.ClassTag object RequestChannel extends Logging { val AllDone = Request(processor = 1, connectionId = "2", Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost), - buffer = shutdownReceive, startTimeNanos = 0, listenerName = new ListenerName(""), + buffer = shutdownReceive, memoryPool = MemoryPool.NONE, startTimeNanos = 0, listenerName = new ListenerName(""), securityProtocol = SecurityProtocol.PLAINTEXT) private val requestLogger = Logger.getLogger("kafka.request.logger") @@ -56,10 +57,12 @@ object RequestChannel extends Logging { val sanitizedUser = QuotaId.sanitize(principal.getName) } - case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer, - startTimeNanos: Long, listenerName: ListenerName, securityProtocol: SecurityProtocol) { + case class Request(processor: Int, connectionId: String, session: Session, buffer: ByteBuffer, + private val memoryPool: MemoryPool, startTimeNanos: Long, listenerName: ListenerName, + securityProtocol: SecurityProtocol) { // These need to be volatile because the readers are in the network thread and the writers are in the request // handler threads or the purgatory threads + @volatile var bufferReference = buffer @volatile var requestDequeueTimeNanos = -1L @volatile var apiLocalCompleteTimeNanos = -1L @volatile var responseCompleteTimeNanos = -1L @@ -104,7 +107,12 @@ object RequestChannel extends Logging { else null - buffer = null + //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer. + //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference + //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done. + if (!Protocol.requiresDelayedDeallocation(requestId)) { + dispose() + } def requestDesc(details: Boolean): String = { if (requestObj != null) @@ -194,6 +202,13 @@ object RequestChannel extends Logging { .format(requestDesc(detailsEnabled), connectionId, totalTimeMs, requestQueueTimeMs, apiLocalTimeMs, apiRemoteTimeMs, apiThrottleTimeMs, responseQueueTimeMs, responseSendTimeMs, securityProtocol, session.principal, listenerName.value)) } } + + def dispose(): Unit = { + if (bufferReference != null) { + memoryPool.release(bufferReference) + bufferReference = null + } + } } object Response { http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 2ba5553..e541015 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -32,7 +32,9 @@ import kafka.security.CredentialProvider import kafka.server.KafkaConfig import kafka.utils._ import org.apache.kafka.common.errors.InvalidRequestException +import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool} import org.apache.kafka.common.metrics._ +import org.apache.kafka.common.metrics.stats.Rate import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, Selectable, Send, Selector => KSelector} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.protocol.SecurityProtocol @@ -61,6 +63,10 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time this.logIdent = "[Socket Server on Broker " + config.brokerId + "], " + private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization") + private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent", "socket-server-metrics") + memoryPoolSensor.add(memoryPoolDepletedPercentMetricName, new Rate(TimeUnit.MILLISECONDS)) + private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests) private val processors = new Array[Processor](totalProcessorThreads) @@ -86,7 +92,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time val processorEndIndex = processorBeginIndex + numProcessorThreads for (i <- processorBeginIndex until processorEndIndex) - processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol) + processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol, memoryPool) val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas) @@ -109,7 +115,16 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time }.sum / totalProcessorThreads } ) - + newGauge("MemoryPoolAvailable", + new Gauge[Long] { + def value = memoryPool.availableMemory() + } + ) + newGauge("MemoryPoolUsed", + new Gauge[Long] { + def value = memoryPool.size() - memoryPool.availableMemory() + } + ) info("Started " + acceptors.size + " acceptor threads") } @@ -138,7 +153,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time /* `protected` for test usage */ protected[network] def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, - securityProtocol: SecurityProtocol): Processor = { + securityProtocol: SecurityProtocol, memoryPool: MemoryPool): Processor = { new Processor(id, time, config.socketRequestMaxBytes, @@ -149,7 +164,8 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time securityProtocol, config, metrics, - credentialProvider + credentialProvider, + memoryPool ) } @@ -378,7 +394,8 @@ private[kafka] class Processor(val id: Int, securityProtocol: SecurityProtocol, config: KafkaConfig, metrics: Metrics, - credentialProvider: CredentialProvider) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { + credentialProvider: CredentialProvider, + memoryPool: MemoryPool) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { private object ConnectionId { def fromString(s: String): Option[ConnectionId] = s.split("-") match { @@ -422,7 +439,8 @@ private[kafka] class Processor(val id: Int, metricTags, false, true, - ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache)) + ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache), + memoryPool) override def run() { startupComplete() @@ -517,7 +535,8 @@ private[kafka] class Processor(val id: Int, val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeNanos = time.nanoseconds, - listenerName = listenerName, securityProtocol = securityProtocol) + listenerName = listenerName, securityProtocol = securityProtocol, + memoryPool = memoryPool) requestChannel.sendRequest(req) selector.mute(receive.source) } catch { http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 3941e17..a900e6d 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -52,6 +52,7 @@ object Defaults { val NumIoThreads = 8 val BackgroundThreads = 10 val QueuedMaxRequests = 500 + val QueuedMaxRequestBytes = -1 /************* Authorizer Configuration ***********/ val AuthorizerClassName = "" @@ -236,6 +237,7 @@ object KafkaConfig { val NumIoThreadsProp = "num.io.threads" val BackgroundThreadsProp = "background.threads" val QueuedMaxRequestsProp = "queued.max.requests" + val QueuedMaxBytesProp = "queued.max.request.bytes" val RequestTimeoutMsProp = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG /************* Authorizer Configuration ***********/ val AuthorizerClassNameProp = "authorizer.class.name" @@ -420,6 +422,7 @@ object KafkaConfig { val NumIoThreadsDoc = "The number of threads that the server uses for processing requests, which may include disk I/O" val BackgroundThreadsDoc = "The number of threads to use for various background processing tasks" val QueuedMaxRequestsDoc = "The number of queued requests allowed before blocking the network threads" + val QueuedMaxRequestBytesDoc = "The number of queued bytes allowed before no more requests are read" val RequestTimeoutMsDoc = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC /************* Authorizer Configuration ***********/ val AuthorizerClassNameDoc = "The authorizer class that should be used for authorization" @@ -684,6 +687,7 @@ object KafkaConfig { .define(NumIoThreadsProp, INT, Defaults.NumIoThreads, atLeast(1), HIGH, NumIoThreadsDoc) .define(BackgroundThreadsProp, INT, Defaults.BackgroundThreads, atLeast(1), HIGH, BackgroundThreadsDoc) .define(QueuedMaxRequestsProp, INT, Defaults.QueuedMaxRequests, atLeast(1), HIGH, QueuedMaxRequestsDoc) + .define(QueuedMaxBytesProp, LONG, Defaults.QueuedMaxRequestBytes, MEDIUM, QueuedMaxRequestBytesDoc) .define(RequestTimeoutMsProp, INT, Defaults.RequestTimeoutMs, HIGH, RequestTimeoutMsDoc) /************* Authorizer Configuration ***********/ @@ -900,6 +904,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra val numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp) val backgroundThreads = getInt(KafkaConfig.BackgroundThreadsProp) val queuedMaxRequests = getInt(KafkaConfig.QueuedMaxRequestsProp) + val queuedMaxBytes = getLong(KafkaConfig.QueuedMaxBytesProp) val numIoThreads = getInt(KafkaConfig.NumIoThreadsProp) val messageMaxBytes = getInt(KafkaConfig.MessageMaxBytesProp) val requestTimeoutMs = getInt(KafkaConfig.RequestTimeoutMsProp) @@ -1191,5 +1196,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra s"Only GSSAPI mechanism is supported for inter-broker communication with SASL when inter.broker.protocol.version is set to $interBrokerProtocolVersionString") require(!interBrokerUsesSasl || saslEnabledMechanisms.contains(saslMechanismInterBrokerProtocol), s"${KafkaConfig.SaslMechanismInterBrokerProtocolProp} must be included in ${KafkaConfig.SaslEnabledMechanismsProp} when SASL is used for inter-broker communication") + require(queuedMaxBytes <= 0 || queuedMaxBytes >= socketRequestMaxBytes, + s"${KafkaConfig.QueuedMaxBytesProp} must be larger or equal to ${KafkaConfig.SocketRequestMaxBytesProp}") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/main/scala/kafka/server/KafkaRequestHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index feb07b8..512be67 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -40,9 +40,9 @@ class KafkaRequestHandler(id: Int, private val latch = new CountDownLatch(1) def run() { - while (true) { + while(true) { + var req : RequestChannel.Request = null try { - var req : RequestChannel.Request = null while (req == null) { // We use a single meter for aggregate idle percentage for the thread pool. // Since meter is calculated as total_recorded_value / time_window and @@ -69,6 +69,9 @@ class KafkaRequestHandler(id: Int, latch.countDown() Exit.exit(e.statusCode) case e: Throwable => error("Exception when handling request", e) + } finally { + if (req != null) + req.dispose() } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index acf96e8..ed35269 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -29,6 +29,7 @@ import kafka.security.CredentialProvider import kafka.server.KafkaConfig import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.{ListenerName, NetworkSend, Send} import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} @@ -328,9 +329,9 @@ class SocketServerTest extends JUnitSuite { var conn: Socket = null val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) { override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, - protocol: SecurityProtocol): Processor = { + protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = { new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, - config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider) { + config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider, MemoryPool.NONE) { override protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) { conn.close() super.sendResponse(response, responseSend) http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index ae1cfc0..38d4bb3 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -35,6 +35,7 @@ import kafka.server._ import kafka.utils.{MockTime, TestUtils, ZkUtils} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.UnsupportedVersionException +import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} @@ -395,7 +396,7 @@ class KafkaApisTest { val header = new RequestHeader(builder.apiKey.id, request.version, "", 0) val buffer = request.serialize(header) val session = Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost) - (request, RequestChannel.Request(1, "1", session, buffer, 0, new ListenerName(""), SecurityProtocol.PLAINTEXT)) + (request, RequestChannel.Request(1, "1", session, buffer, MemoryPool.NONE, 0, new ListenerName(""), SecurityProtocol.PLAINTEXT)) } private def readResponse(api: ApiKeys, request: AbstractRequest, capturedResponse: Capture[RequestChannel.Response]): AbstractResponse = { http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 1b801de..dee6e87 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -545,6 +545,7 @@ class KafkaConfigTest { case KafkaConfig.NumIoThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.BackgroundThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.QueuedMaxRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.QueuedMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.RequestTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.AuthorizerClassNameProp => //ignore string
