Repository: kafka Updated Branches: refs/heads/trunk 2656659e0 -> dfd625daa
MINOR: Remove unused SecurityProtocol.TRACE It adds complexity for no benefit since we don't use it anywhere. Also removed a few unused imports, variables and default parameters. Author: Ismael Juma <ism...@juma.me.uk> Reviewers: Manikumar Reddy <manikumar.re...@gmail.com>, Rajini Sivaram <rajinisiva...@googlemail.com>, Jason Gustafson <ja...@confluent.io> Closes #3856 from ijuma/remove-security-protocol-trace Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dfd625da Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dfd625da Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dfd625da Branch: refs/heads/trunk Commit: dfd625daa36de2e34e6c596967775394c55bc605 Parents: 2656659 Author: Ismael Juma <ism...@juma.me.uk> Authored: Thu Sep 14 13:14:06 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Thu Sep 14 13:39:30 2017 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/clients/ClientUtils.java | 2 - .../kafka/clients/CommonClientConfigs.java | 13 +------ .../kafka/common/network/ChannelBuilders.java | 1 - .../kafka/common/protocol/SecurityProtocol.java | 39 ++++---------------- .../scala/kafka/consumer/BaseConsumer.scala | 1 - .../TransactionMarkerChannelManager.scala | 2 +- ...nsactionMarkerRequestCompletionHandler.scala | 2 +- .../consumer/ZookeeperConsumerConnector.scala | 1 - .../main/scala/kafka/server/KafkaConfig.scala | 6 +-- .../unit/kafka/network/SocketServerTest.scala | 23 ++---------- 10 files changed, 17 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index 7d19ea4..4612322 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -82,8 +82,6 @@ public class ClientUtils { */ public static ChannelBuilder createChannelBuilder(AbstractConfig config) { SecurityProtocol securityProtocol = SecurityProtocol.forName(config.getString(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); - if (!SecurityProtocol.nonTestingValues().contains(securityProtocol)) - throw new ConfigException("Invalid SecurityProtocol " + securityProtocol); String clientSaslMechanism = config.getString(SaslConfigs.SASL_MECHANISM); return ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT, config, null, clientSaslMechanism, true); http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index f51c36a..380564b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -22,9 +22,7 @@ import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; /** @@ -77,7 +75,8 @@ public class CommonClientConfigs { public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics."; public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol"; - public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Valid values are: " + Utils.join(nonTestingSecurityProtocolNames(), ", ") + "."; + public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Valid values are: " + + Utils.join(SecurityProtocol.names(), ", ") + "."; public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT"; public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms"; @@ -89,14 +88,6 @@ public class CommonClientConfigs { + "elapses the client will resend the request if necessary or fail the request if " + "retries are exhausted."; - private static List<String> nonTestingSecurityProtocolNames() { - List<String> names = new ArrayList<>(); - for (SecurityProtocol protocol : SecurityProtocol.nonTestingValues()) - names.add(protocol.name); - return names; - } - - /** * Postprocess the configuration so that exponential backoff is disabled when reconnect backoff * is explicitly configured but the maximum reconnect backoff is not explicitly configured. http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java index 6dd3ddd..785c671 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java @@ -107,7 +107,6 @@ public class ChannelBuilders { clientSaslMechanism, saslHandshakeRequestEnable, credentialCache); break; case PLAINTEXT: - case TRACE: channelBuilder = new PlaintextChannelBuilder(); break; default: http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java index 99d3b3d..c155481 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java @@ -18,43 +18,34 @@ package org.apache.kafka.common.protocol; import java.util.ArrayList; import java.util.Collections; -import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Set; public enum SecurityProtocol { /** Un-authenticated, non-encrypted channel */ - PLAINTEXT(0, "PLAINTEXT", false), + PLAINTEXT(0, "PLAINTEXT"), /** SSL channel */ - SSL(1, "SSL", false), + SSL(1, "SSL"), /** SASL authenticated, non-encrypted channel */ - SASL_PLAINTEXT(2, "SASL_PLAINTEXT", false), + SASL_PLAINTEXT(2, "SASL_PLAINTEXT"), /** SASL authenticated, SSL channel */ - SASL_SSL(3, "SASL_SSL", false), - /** Currently identical to PLAINTEXT and used for testing only. We may implement extra instrumentation when testing channel code. */ - TRACE(Short.MAX_VALUE, "TRACE", true); + SASL_SSL(3, "SASL_SSL"); private static final Map<Short, SecurityProtocol> CODE_TO_SECURITY_PROTOCOL; private static final List<String> NAMES; - private static final Set<SecurityProtocol> NON_TESTING_VALUES; static { SecurityProtocol[] protocols = SecurityProtocol.values(); List<String> names = new ArrayList<>(protocols.length); Map<Short, SecurityProtocol> codeToSecurityProtocol = new HashMap<>(protocols.length); - Set<SecurityProtocol> nonTestingValues = EnumSet.noneOf(SecurityProtocol.class); for (SecurityProtocol proto : protocols) { codeToSecurityProtocol.put(proto.id, proto); names.add(proto.name); - if (!proto.isTesting) - nonTestingValues.add(proto); } CODE_TO_SECURITY_PROTOCOL = Collections.unmodifiableMap(codeToSecurityProtocol); NAMES = Collections.unmodifiableList(names); - NON_TESTING_VALUES = Collections.unmodifiableSet(nonTestingValues); } /** The permanent and immutable id of a security protocol -- this can't change, and must match kafka.cluster.SecurityProtocol */ @@ -63,24 +54,16 @@ public enum SecurityProtocol { /** Name of the security protocol. This may be used by client configuration. */ public final String name; - /* Whether this security protocol is for testing/debugging */ - private final boolean isTesting; - - SecurityProtocol(int id, String name, boolean isTesting) { + SecurityProtocol(int id, String name) { this.id = (short) id; this.name = name; - this.isTesting = isTesting; - } - - public static String getName(int id) { - return CODE_TO_SECURITY_PROTOCOL.get((short) id).name; } - public static List<String> getNames() { + public static List<String> names() { return NAMES; } - public static SecurityProtocol forId(Short id) { + public static SecurityProtocol forId(short id) { return CODE_TO_SECURITY_PROTOCOL.get(id); } @@ -89,12 +72,4 @@ public enum SecurityProtocol { return SecurityProtocol.valueOf(name.toUpperCase(Locale.ROOT)); } - /** - * Returns the set of non-testing SecurityProtocol instances, that is, SecurityProtocol instances that are suitable - * for production usage. - */ - public static Set<SecurityProtocol> nonTestingValues() { - return NON_TESTING_VALUES; - } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/core/src/main/scala/kafka/consumer/BaseConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala index 2c53258..04ac2d9 100644 --- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala +++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala @@ -23,7 +23,6 @@ import java.util.regex.Pattern import kafka.api.OffsetRequest import kafka.common.StreamEndException import kafka.message.Message -import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.header.Headers http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala index 2e666ed..6c13de4 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala @@ -35,7 +35,7 @@ import java.util import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, LinkedBlockingQueue} import collection.JavaConverters._ -import scala.collection.{concurrent, immutable, mutable} +import scala.collection.{concurrent, immutable} object TransactionMarkerChannelManager { def apply(config: KafkaConfig, http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala index 150b444..bfa25be 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala @@ -20,7 +20,7 @@ package kafka.coordinator.transaction import kafka.utils.Logging import org.apache.kafka.clients.{ClientResponse, RequestCompletionHandler} import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.WriteTxnMarkersResponse import scala.collection.mutable http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala index 467d0a6..d646938 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala @@ -19,7 +19,6 @@ package kafka.javaapi.consumer import kafka.serializer._ import kafka.consumer._ import kafka.common.{OffsetAndMetadata, TopicAndPartition, MessageStreamsExistException} -import scala.collection.mutable import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 33eaf48..ea0c124 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -571,8 +571,8 @@ object KafkaConfig { val LeaderImbalanceCheckIntervalSecondsDoc = "The frequency with which the partition rebalance check is triggered by the controller" val UncleanLeaderElectionEnableDoc = "Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss" val InterBrokerSecurityProtocolDoc = "Security protocol used to communicate between brokers. Valid values are: " + - s"${SecurityProtocol.nonTestingValues.asScala.toSeq.map(_.name).mkString(", ")}. It is an error to set this and " + - s"$InterBrokerListenerNameProp properties at the same time." + s"${SecurityProtocol.names.asScala.mkString(", ")}. It is an error to set this and $InterBrokerListenerNameProp " + + "properties at the same time." val InterBrokerProtocolVersionDoc = "Specify which version of the inter-broker protocol will be used.\n" + " This is typically bumped after all brokers were upgraded to a new version.\n" + " Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.9.0.1 Check ApiVersion for the full list." @@ -1151,7 +1151,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra private def getSecurityProtocol(protocolName: String, configName: String): SecurityProtocol = { try SecurityProtocol.forName(protocolName) catch { - case e: IllegalArgumentException => + case _: IllegalArgumentException => throw new ConfigException(s"Invalid security protocol `$protocolName` defined in $configName") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 2df37b7..d623374 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -50,7 +50,7 @@ import scala.util.control.ControlThrowable class SocketServerTest extends JUnitSuite { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0) - props.put("listeners", "PLAINTEXT://localhost:0,TRACE://localhost:0") + props.put("listeners", "PLAINTEXT://localhost:0") props.put("num.network.threads", "1") props.put("socket.send.buffer.bytes", "300000") props.put("socket.receive.buffer.bytes", "300000") @@ -161,18 +161,12 @@ class SocketServerTest extends JUnitSuite { @Test def simpleRequest() { val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) - val traceSocket = connect(protocol = SecurityProtocol.TRACE) val serializedBytes = producerRequestBytes // Test PLAINTEXT socket sendRequest(plainSocket, serializedBytes) processRequest(server.requestChannel) assertEquals(serializedBytes.toSeq, receiveResponse(plainSocket).toSeq) - - // Test TRACE socket - sendRequest(traceSocket, serializedBytes) - processRequest(server.requestChannel) - assertEquals(serializedBytes.toSeq, receiveResponse(traceSocket).toSeq) } @Test @@ -377,12 +371,9 @@ class SocketServerTest extends JUnitSuite { // open a connection val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) plainSocket.setTcpNoDelay(true) - val traceSocket = connect(protocol = SecurityProtocol.TRACE) - traceSocket.setTcpNoDelay(true) val bytes = new Array[Byte](40) // send a request first to make sure the connection has been picked up by the socket server sendRequest(plainSocket, bytes, Some(0)) - sendRequest(traceSocket, bytes, Some(0)) processRequest(server.requestChannel) // the following sleep is necessary to reliably detect the connection close when we send data below Thread.sleep(200L) @@ -400,13 +391,6 @@ class SocketServerTest extends JUnitSuite { } catch { case _: IOException => // expected } - - try { - sendRequest(traceSocket, largeChunkOfBytes, Some(0)) - fail("expected exception when writing to closed trace socket") - } catch { - case _: IOException => // expected - } } @Test @@ -841,7 +825,7 @@ class SocketServerTest extends JUnitSuite { @Test def controlThrowable(): Unit = { withTestableServer { testableServer => - val (socket, _) = connectAndProcessRequest(testableServer) + connectAndProcessRequest(testableServer) val testableSelector = testableServer.testableSelector testableSelector.operationCounts.clear() @@ -990,8 +974,7 @@ class SocketServerTest extends JUnitSuite { exception.getOrElse(new IllegalStateException(s"Test exception during $operation")) } - private def onOperation(operation: SelectorOperation, - connectionId: Option[String] = None, onFailure: => Unit = {}): Unit = { + private def onOperation(operation: SelectorOperation, connectionId: Option[String], onFailure: => Unit): Unit = { operationCounts(operation) += 1 failures.remove(operation).foreach { e => connectionId.foreach(allFailedChannels.add)