This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 14d97aa Moved 'expired-window-' and 'late-' record-drop to StreamsMetricsImpl (#6355) 14d97aa is described below commit 14d97aabad1d5aab4b9407e24ab523157832934b Author: A. Sophie Blee-Goldman <ableegold...@gmail.com> AuthorDate: Wed Mar 6 09:17:58 2019 -0800 Moved 'expired-window-' and 'late-' record-drop to StreamsMetricsImpl (#6355) Moved hard-coded 'expired-window-record-drop' and 'late-record-drop' to static Strings in StreamsMetricsImpl Reviewers: Guozhang Wang <wangg...@gmail.com>, Matthias J. Sax <mj...@apache.org>, Bill Bejeck <bbej...@gmail.com> --- .../org/apache/kafka/streams/kstream/internals/metrics/Sensors.java | 5 +++-- .../streams/processor/internals/metrics/StreamsMetricsImpl.java | 3 +++ .../apache/kafka/streams/state/internals/InMemoryWindowStore.java | 5 +++-- .../kafka/streams/state/internals/RocksDBSegmentedBytesStore.java | 5 +++-- 4 files changed, 12 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java index 12c4813..4ecaeb4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java @@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import java.util.Map; import java.util.concurrent.TimeUnit; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATE_RECORD_DROP; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_ID_TAG; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP; @@ -40,14 +41,14 @@ public class Sensors { final Sensor sensor = metrics.nodeLevelSensor( context.taskId().toString(), context.currentNode().name(), - "late-record-drop", + LATE_RECORD_DROP, Sensor.RecordingLevel.INFO ); StreamsMetricsImpl.addInvocationRateAndCount( sensor, PROCESSOR_NODE_METRICS_GROUP, metrics.tagMap("task-id", context.taskId().toString(), PROCESSOR_NODE_ID_TAG, context.currentNode().name()), - "late-record-drop" + LATE_RECORD_DROP ); return sensor; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index ec35157..0a47fce 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -55,6 +55,9 @@ public class StreamsMetricsImpl implements StreamsMetrics { public static final String PROCESSOR_NODE_METRICS_GROUP = "stream-processor-node-metrics"; public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id"; + public static final String EXPIRED_WINDOW_RECORD_DROP = "expired-window-record-drop"; + public static final String LATE_RECORD_DROP = "late-record-drop"; + public StreamsMetricsImpl(final Metrics metrics, final String threadName) { Objects.requireNonNull(metrics, "Metrics cannot be null"); this.threadName = threadName; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index 77820c5..67eec0d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -42,6 +42,7 @@ import java.util.NavigableMap; import java.util.NoSuchElementException; import java.util.TreeMap; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount; import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreKeyBytes; import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp; @@ -95,14 +96,14 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> { expiredRecordSensor = metrics.storeLevelSensor( taskName, name(), - "expired-window-record-drop", + EXPIRED_WINDOW_RECORD_DROP, Sensor.RecordingLevel.INFO ); addInvocationRateAndCount( expiredRecordSensor, "stream-" + metricScope + "-metrics", metrics.tagMap("task-id", taskName, metricScope + "-id", name()), - "expired-window-record-drop" + EXPIRED_WINDOW_RECORD_DROP ); if (root != null) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java index 0ed4e9d..f733c80 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount; public class RocksDBSegmentedBytesStore implements SegmentedBytesStore { @@ -157,14 +158,14 @@ public class RocksDBSegmentedBytesStore implements SegmentedBytesStore { expiredRecordSensor = metrics.storeLevelSensor( taskName, name(), - "expired-window-record-drop", + EXPIRED_WINDOW_RECORD_DROP, Sensor.RecordingLevel.INFO ); addInvocationRateAndCount( expiredRecordSensor, "stream-" + metricScope + "-metrics", metrics.tagMap("task-id", taskName, metricScope + "-id", name()), - "expired-window-record-drop" + EXPIRED_WINDOW_RECORD_DROP ); segments.openExisting(this.context, observedStreamTime);