Repository: kafka Updated Branches: refs/heads/trunk 495184916 -> 1c2bbaa50
MINOR: Fix consumer and producer to actually support metrics recording level Also add tests and a few clean-ups. Author: Ismael Juma <ism...@juma.me.uk> Reviewers: Eno Thereska <eno.there...@gmail.com>, Jason Gustafson <ja...@confluent.io> Closes #2937 from ijuma/metrics-recording-level-producer Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1c2bbaa5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1c2bbaa5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1c2bbaa5 Branch: refs/heads/trunk Commit: 1c2bbaa501c2d0fd4db9c2dacacc3ff7f5236d3d Parents: 4951849 Author: Ismael Juma <ism...@juma.me.uk> Authored: Fri May 12 10:36:44 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Fri May 12 10:36:44 2017 -0700 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 10 +++-- .../kafka/clients/producer/KafkaProducer.java | 8 ++-- .../kafka/clients/producer/ProducerConfig.java | 18 +++++++- .../apache/kafka/common/network/Selector.java | 20 +++++---- .../clients/consumer/KafkaConsumerTest.java | 15 +++++++ .../clients/producer/KafkaProducerTest.java | 46 +++++++++++++------- 6 files changed, 84 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1c2bbaa5/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 9df674d..aad4453 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -41,6 +41,7 @@ import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.requests.IsolationLevel; @@ -58,7 +59,6 @@ import java.util.Collections; import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -519,6 +519,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { private static final String JMX_PREFIX = "kafka.consumer"; static final long DEFAULT_CLOSE_TIMEOUT_MS = 30 * 1000; + // Visible for testing + final Metrics metrics; + private final String clientId; private final ConsumerCoordinator coordinator; private final Deserializer<K> keyDeserializer; @@ -528,7 +531,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { private final Time time; private final ConsumerNetworkClient client; - private final Metrics metrics; private final SubscriptionState subscriptions; private final Metadata metadata; private final long retryBackoffMs; @@ -622,10 +624,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { if (clientId.length() <= 0) clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement(); this.clientId = clientId; - Map<String, String> metricsTags = new LinkedHashMap<>(); - metricsTags.put("client-id", clientId); + Map<String, String> metricsTags = Collections.singletonMap("client-id", clientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) + .recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG))) .tags(metricsTags); List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); http://git-wip-us.apache.org/repos/asf/kafka/blob/1c2bbaa5/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index f812389..b1f405a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -64,7 +64,6 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.util.Collections; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -149,13 +148,14 @@ public class KafkaProducer<K, V> implements Producer<K, V> { private static final String JMX_PREFIX = "kafka.producer"; private String clientId; + // Visible for testing + final Metrics metrics; private final Partitioner partitioner; private final int maxRequestSize; private final long totalMemorySize; private final Metadata metadata; private final RecordAccumulator accumulator; private final Sender sender; - private final Metrics metrics; private final Thread ioThread; private final CompressionType compressionType; private final Sensor errors; @@ -230,10 +230,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> { clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); if (clientId.length() <= 0) clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); - Map<String, String> metricTags = new LinkedHashMap<String, String>(); - metricTags.put("client-id", clientId); + Map<String, String> metricTags = Collections.singletonMap("client-id", clientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) + .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG))) .tags(metricTags); List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); http://git-wip-us.apache.org/repos/asf/kafka/blob/1c2bbaa5/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 12e8c64..4208a90 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serializer; import java.util.HashMap; @@ -145,6 +146,11 @@ public class ProducerConfig extends AbstractConfig { /** <code>metrics.num.samples</code> */ public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG; + /** + * <code>metrics.log.level</code> + */ + public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG; + /** <code>metric.reporters</code> */ public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG; @@ -232,7 +238,6 @@ public class ProducerConfig extends AbstractConfig { Importance.MEDIUM, MAX_REQUEST_SIZE_DOC) .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 50L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC) - .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC) .define(MAX_BLOCK_MS_CONFIG, Type.LONG, @@ -254,6 +259,17 @@ public class ProducerConfig extends AbstractConfig { Importance.LOW, CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC) .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, CommonClientConfigs.METRICS_NUM_SAMPLES_DOC) + .define(METRICS_RECORDING_LEVEL_CONFIG, + Type.STRING, + Sensor.RecordingLevel.INFO.toString(), + in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()), + Importance.LOW, + CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC) + .define(METRIC_REPORTER_CLASSES_CONFIG, + Type.LIST, + "", + Importance.LOW, + CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) .define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, Type.INT, 5, http://git-wip-us.apache.org/repos/asf/kafka/blob/1c2bbaa5/clients/src/main/java/org/apache/kafka/common/network/Selector.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 312e1f5..a74a584 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -26,6 +26,7 @@ import java.nio.channels.SocketChannel; import java.nio.channels.UnresolvedAddressException; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collections; import java.util.Deque; import java.util.HashMap; import java.util.HashSet; @@ -96,11 +97,8 @@ public class Selector implements Selectable, AutoCloseable { private final List<String> failedSends; private final Time time; private final SelectorMetrics sensors; - private final String metricGrpPrefix; - private final Map<String, String> metricTags; private final ChannelBuilder channelBuilder; private final int maxReceiveSize; - private final boolean metricsPerConnection; private final boolean recordTimePerConnection; private final IdleExpiryManager idleExpiryManager; @@ -132,8 +130,6 @@ public class Selector implements Selectable, AutoCloseable { } this.maxReceiveSize = maxReceiveSize; this.time = time; - this.metricGrpPrefix = metricGrpPrefix; - this.metricTags = metricTags; this.channels = new HashMap<>(); this.completedSends = new ArrayList<>(); this.completedReceives = new ArrayList<>(); @@ -143,9 +139,8 @@ public class Selector implements Selectable, AutoCloseable { this.connected = new ArrayList<>(); this.disconnected = new ArrayList<>(); this.failedSends = new ArrayList<>(); - this.sensors = new SelectorMetrics(metrics); + this.sensors = new SelectorMetrics(metrics, metricGrpPrefix, metricTags, metricsPerConnection); this.channelBuilder = channelBuilder; - this.metricsPerConnection = metricsPerConnection; this.recordTimePerConnection = recordTimePerConnection; this.idleExpiryManager = connectionMaxIdleMs < 0 ? null : new IdleExpiryManager(time, connectionMaxIdleMs); } @@ -162,7 +157,7 @@ public class Selector implements Selectable, AutoCloseable { } public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder) { - this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, new HashMap<String, String>(), true, channelBuilder); + this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, Collections.<String, String>emptyMap(), true, channelBuilder); } /** @@ -679,6 +674,10 @@ public class Selector implements Selectable, AutoCloseable { private class SelectorMetrics { private final Metrics metrics; + private final String metricGrpPrefix; + private final Map<String, String> metricTags; + private final boolean metricsPerConnection; + public final Sensor connectionClosed; public final Sensor connectionCreated; public final Sensor bytesTransferred; @@ -691,8 +690,11 @@ public class Selector implements Selectable, AutoCloseable { private final List<MetricName> topLevelMetricNames = new ArrayList<>(); private final List<Sensor> sensors = new ArrayList<>(); - public SelectorMetrics(Metrics metrics) { + public SelectorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection) { this.metrics = metrics; + this.metricGrpPrefix = metricGrpPrefix; + this.metricTags = metricTags; + this.metricsPerConnection = metricsPerConnection; String metricGrpName = metricGrpPrefix + "-metrics"; StringBuilder tagsSuffix = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/kafka/blob/1c2bbaa5/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index a598b5d..5928a28 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -1234,6 +1235,20 @@ public class KafkaConsumerTest { consumer.close(); } + @Test + public void testMetricConfigRecordingLevel() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + try (KafkaConsumer consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + assertEquals(Sensor.RecordingLevel.INFO, consumer.metrics.config().recordLevel()); + } + + props.put(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); + try (KafkaConsumer consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + assertEquals(Sensor.RecordingLevel.DEBUG, consumer.metrics.config().recordLevel()); + } + } + private void consumerCloseTest(final long closeTimeoutMs, List<? extends AbstractResponse> responses, long waitMs, http://git-wip-us.apache.org/repos/asf/kafka/blob/1c2bbaa5/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index ea493d2..3a6426a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.ExtendedSerializer; @@ -53,6 +54,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -87,9 +89,9 @@ public class KafkaProducerTest { KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>( props, new ByteArraySerializer(), new ByteArraySerializer()); } catch (KafkaException e) { - Assert.assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get()); - Assert.assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get()); - Assert.assertEquals("Failed to construct kafka producer", e.getMessage()); + assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get()); + assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get()); + assertEquals("Failed to construct kafka producer", e.getMessage()); return; } fail("should have caught an exception and returned"); @@ -107,12 +109,12 @@ public class KafkaProducerTest { KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>( configs, new MockSerializer(), new MockSerializer()); - Assert.assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get()); - Assert.assertEquals(oldCloseCount, MockSerializer.CLOSE_COUNT.get()); + assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get()); + assertEquals(oldCloseCount, MockSerializer.CLOSE_COUNT.get()); producer.close(); - Assert.assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get()); - Assert.assertEquals(oldCloseCount + 2, MockSerializer.CLOSE_COUNT.get()); + assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get()); + assertEquals(oldCloseCount + 2, MockSerializer.CLOSE_COUNT.get()); } @Test @@ -126,15 +128,15 @@ public class KafkaProducerTest { KafkaProducer<String, String> producer = new KafkaProducer<String, String>( props, new StringSerializer(), new StringSerializer()); - Assert.assertEquals(1, MockProducerInterceptor.INIT_COUNT.get()); - Assert.assertEquals(0, MockProducerInterceptor.CLOSE_COUNT.get()); + assertEquals(1, MockProducerInterceptor.INIT_COUNT.get()); + assertEquals(0, MockProducerInterceptor.CLOSE_COUNT.get()); // Cluster metadata will only be updated on calling onSend. Assert.assertNull(MockProducerInterceptor.CLUSTER_META.get()); producer.close(); - Assert.assertEquals(1, MockProducerInterceptor.INIT_COUNT.get()); - Assert.assertEquals(1, MockProducerInterceptor.CLOSE_COUNT.get()); + assertEquals(1, MockProducerInterceptor.INIT_COUNT.get()); + assertEquals(1, MockProducerInterceptor.CLOSE_COUNT.get()); } finally { // cleanup since we are using mutable static variables in MockProducerInterceptor MockProducerInterceptor.resetCounters(); @@ -150,12 +152,12 @@ public class KafkaProducerTest { KafkaProducer<String, String> producer = new KafkaProducer<String, String>( props, new StringSerializer(), new StringSerializer()); - Assert.assertEquals(1, MockPartitioner.INIT_COUNT.get()); - Assert.assertEquals(0, MockPartitioner.CLOSE_COUNT.get()); + assertEquals(1, MockPartitioner.INIT_COUNT.get()); + assertEquals(0, MockPartitioner.CLOSE_COUNT.get()); producer.close(); - Assert.assertEquals(1, MockPartitioner.INIT_COUNT.get()); - Assert.assertEquals(1, MockPartitioner.CLOSE_COUNT.get()); + assertEquals(1, MockPartitioner.INIT_COUNT.get()); + assertEquals(1, MockPartitioner.CLOSE_COUNT.get()); } finally { // cleanup since we are using mutable static variables in MockPartitioner MockPartitioner.resetCounters(); @@ -418,4 +420,18 @@ public class KafkaProducerTest { producer.close(); } + @Test + public void testMetricConfigRecordingLevel() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + try (KafkaProducer producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) { + assertEquals(Sensor.RecordingLevel.INFO, producer.metrics.config().recordLevel()); + } + + props.put(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); + try (KafkaProducer producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) { + assertEquals(Sensor.RecordingLevel.DEBUG, producer.metrics.config().recordLevel()); + } + } + }