This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c02880d5f Emit metrics for distribution of number of rows per segment 
(#12730)
8c02880d5f is described below

commit 8c02880d5f3e48d3e8189a240eae391eb6107862
Author: TSFenwick <tom.fenw...@imply.io>
AuthorDate: Tue Jul 12 07:04:42 2022 -0700

    Emit metrics for distribution of number of rows per segment (#12730)
    
    * initial commit of bucket dimensions for metrics
    
    return counts of segments that have rowcount in a bucket size for a 
datasource
    return average value of rowcount per segment in a datasource
    added unit test
    naming could use a lot of work
    buckets right now are not finalized
    added javadocs
    altered metrics.md
    
    * fix checkstyle issues
    
    * addressed review comments
    
    add monitor test
    move added functionality to new monitor
    update docs
    
    * address comments
    
    renamed monitor
    handle tombstones better
    update docs
    added javadocs
    
    * Add support for tombstones in the segment distribution
    
    * undo changes to tombstone segmentizer factory
    
    * fix accidental whitespacing changes
    
    * address comments regarding metrics documentation
    
    and rename variable to be more accurate
    
    * fix tests
    
    * fix checkstyle issues
    
    * fix broken test
    
    * undo removal of timeout
---
 docs/configuration/index.md                        |   1 +
 docs/operations/metrics.md                         |   2 +
 .../org/apache/druid/server/SegmentManager.java    |  51 ++++-
 .../coordination/SegmentLoadDropHandler.java       |  11 +
 .../metrics/SegmentRowCountDistribution.java       | 161 ++++++++++++++
 .../druid/server/metrics/SegmentStatsMonitor.java  | 105 ++++++++++
 .../segment/loading/CacheTestSegmentLoader.java    | 102 ++++++++-
 .../apache/druid/server/SegmentManagerTest.java    |   6 +-
 .../server/SegmentManagerThreadSafetyTest.java     |   6 +-
 .../metrics/SegmentRowCountDistributionTest.java   | 233 +++++++++++++++++++++
 .../server/metrics/SegmentStatsMonitorTest.java    | 214 +++++++++++++++++++
 11 files changed, 885 insertions(+), 7 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index a38f79ebf4..32cedc5e7f 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -380,6 +380,7 @@ Metric monitoring is an essential part of Druid operations. 
 The following monit
 |`org.apache.druid.java.util.metrics.CgroupMemoryMonitor`|Reports memory 
statistic as per the memory cgroup.|
 |`org.apache.druid.server.metrics.EventReceiverFirehoseMonitor`|Reports how 
many events have been queued in the EventReceiverFirehose.|
 |`org.apache.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics 
on Historical processes. Available only on Historical processes.|
+|`org.apache.druid.server.metrics.SegmentStatsMonitor` | **EXPERIMENTAL** 
Reports statistics about segments on Historical processes. Available only on 
Historical processes. Not to be used when lazy loading is configured.           
                                                              |
 |`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many 
queries have been successful/failed/interrupted.|
 |`org.apache.druid.server.emitter.HttpEmittingMonitor`|Reports internal 
metrics of `http` or `parametrized` emitter (see below). Must not be used with 
another emitter type. See the description of the metrics here: 
https://github.com/apache/druid/pull/4973.|
 |`org.apache.druid.server.metrics.TaskCountStatsMonitor`|Reports how many 
ingestion tasks are currently running/pending/waiting and also the number of 
successful/failed tasks per emission period.|
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 59fbb9f908..e427532cc7 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -323,6 +323,8 @@ decisions.
 |`segment/usedPercent`|Percentage of space used by served 
segments.|dataSource, tier, priority.|< 100%|
 |`segment/count`|Number of served segments.|dataSource, tier, 
priority.|Varies.|
 |`segment/pendingDelete`|On-disk size in bytes of segments that are waiting to 
be cleared out|Varies.|
+|`segment/rowCount/avg`| The average number of rows per segment on a 
historical. `SegmentStatsMonitor` must be enabled.| dataSource, tier, 
priority.|Varies. See [segment 
optimization](../operations/segment-optimization.md) for guidance on optimal 
segment sizes. |
+|`segment/rowCount/range/count`| The number of segments in a bucket. 
`SegmentStatsMonitor` must be enabled.| dataSource, tier, priority, 
range.|Varies.|
 
 ### JVM
 
diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java 
b/server/src/main/java/org/apache/druid/server/SegmentManager.java
index 329fc16248..ff509272a8 100644
--- a/server/src/main/java/org/apache/druid/server/SegmentManager.java
+++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java
@@ -31,10 +31,12 @@ import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.planning.DataSourceAnalysis;
 import org.apache.druid.segment.ReferenceCountingSegment;
 import org.apache.druid.segment.SegmentLazyLoadFailCallback;
+import org.apache.druid.segment.StorageAdapter;
 import org.apache.druid.segment.join.table.IndexedTable;
 import org.apache.druid.segment.join.table.ReferenceCountingIndexedTable;
 import org.apache.druid.segment.loading.SegmentLoader;
 import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.server.metrics.SegmentRowCountDistribution;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
@@ -74,17 +76,31 @@ public class SegmentManager
     private final ConcurrentHashMap<SegmentId, ReferenceCountingIndexedTable> 
tablesLookup = new ConcurrentHashMap<>();
     private long totalSegmentSize;
     private long numSegments;
+    private long rowCount;
+    private final SegmentRowCountDistribution segmentRowCountDistribution = 
new SegmentRowCountDistribution();
 
-    private void addSegment(DataSegment segment)
+    private void addSegment(DataSegment segment, long numOfRows)
     {
       totalSegmentSize += segment.getSize();
       numSegments++;
+      rowCount += (numOfRows);
+      if (segment.isTombstone()) {
+        segmentRowCountDistribution.addTombstoneToDistribution();
+      } else {
+        segmentRowCountDistribution.addRowCountToDistribution(numOfRows);
+      }
     }
 
-    private void removeSegment(DataSegment segment)
+    private void removeSegment(DataSegment segment, long numOfRows)
     {
       totalSegmentSize -= segment.getSize();
       numSegments--;
+      rowCount -= numOfRows;
+      if (segment.isTombstone()) {
+        segmentRowCountDistribution.removeTombstoneFromDistribution();
+      } else {
+        segmentRowCountDistribution.removeRowCountFromDistribution(numOfRows);
+      }
     }
 
     public VersionedIntervalTimeline<String, ReferenceCountingSegment> 
getTimeline()
@@ -97,6 +113,11 @@ public class SegmentManager
       return tablesLookup;
     }
 
+    public long getAverageRowCount()
+    {
+      return numSegments == 0 ? 0 : rowCount / numSegments;
+    }
+
     public long getTotalSegmentSize()
     {
       return totalSegmentSize;
@@ -111,8 +132,14 @@ public class SegmentManager
     {
       return numSegments == 0;
     }
+
+    private SegmentRowCountDistribution getSegmentRowCountDistribution()
+    {
+      return segmentRowCountDistribution;
+    }
   }
 
+
   @Inject
   public SegmentManager(
       SegmentLoader segmentLoader
@@ -138,6 +165,16 @@ public class SegmentManager
     return CollectionUtils.mapValues(dataSources, 
SegmentManager.DataSourceState::getTotalSegmentSize);
   }
 
+  public Map<String, Long> getAverageRowCountForDatasource()
+  {
+    return CollectionUtils.mapValues(dataSources, 
SegmentManager.DataSourceState::getAverageRowCount);
+  }
+
+  public Map<String, SegmentRowCountDistribution> getRowCountDistribution()
+  {
+    return CollectionUtils.mapValues(dataSources, 
SegmentManager.DataSourceState::getSegmentRowCountDistribution);
+  }
+
   public Set<String> getDataSourceNames()
   {
     return dataSources.keySet();
@@ -265,7 +302,9 @@ public class SegmentManager
                 segment.getVersion(),
                 segment.getShardSpec().createChunk(adapter)
             );
-            dataSourceState.addSegment(segment);
+            StorageAdapter storageAdapter = adapter.asStorageAdapter();
+            long numOfRows = (segment.isTombstone() || storageAdapter == null) 
? 0 : storageAdapter.getNumRows();
+            dataSourceState.addSegment(segment, numOfRows);
             // Asyncly load segment index files into page cache in a thread 
pool
             segmentLoader.loadSegmentIntoPageCache(segment, 
loadSegmentIntoPageCacheExec);
             resultSupplier.set(true);
@@ -321,9 +360,13 @@ public class SegmentManager
             );
             final ReferenceCountingSegment oldQueryable = (removed == null) ? 
null : removed.getObject();
 
+
             if (oldQueryable != null) {
               try (final Closer closer = Closer.create()) {
-                dataSourceState.removeSegment(segment);
+                StorageAdapter storageAdapter = 
oldQueryable.asStorageAdapter();
+                long numOfRows = (segment.isTombstone() || storageAdapter == 
null) ? 0 : storageAdapter.getNumRows();
+                dataSourceState.removeSegment(segment, numOfRows);
+
                 closer.register(oldQueryable);
                 log.info("Attempting to close segment %s", segment.getId());
                 final ReferenceCountingIndexedTable oldTable = 
dataSourceState.tablesLookup.remove(segment.getId());
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index 5d4bf76018..ba918d8fbd 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -45,6 +45,7 @@ import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.loading.SegmentLoadingException;
 import org.apache.druid.server.SegmentManager;
+import org.apache.druid.server.metrics.SegmentRowCountDistribution;
 import org.apache.druid.timeline.DataSegment;
 
 import javax.annotation.Nullable;
@@ -306,6 +307,16 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
     }
   }
 
+  public Map<String, Long> getAverageNumOfRowsPerSegmentForDatasource()
+  {
+    return segmentManager.getAverageRowCountForDatasource();
+  }
+
+  public Map<String, SegmentRowCountDistribution> 
getRowCountDistributionPerDatasource()
+  {
+    return segmentManager.getRowCountDistribution();
+  }
+
   @Override
   public void addSegment(DataSegment segment, @Nullable 
DataSegmentChangeCallback callback)
   {
diff --git 
a/server/src/main/java/org/apache/druid/server/metrics/SegmentRowCountDistribution.java
 
b/server/src/main/java/org/apache/druid/server/metrics/SegmentRowCountDistribution.java
new file mode 100644
index 0000000000..337df6384d
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/metrics/SegmentRowCountDistribution.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import java.util.function.ObjIntConsumer;
+
+/**
+ * Class that creates a count of segments that have row counts in certain 
buckets
+ */
+public class SegmentRowCountDistribution
+{
+  private static final EmittingLogger log = new 
EmittingLogger(SegmentRowCountDistribution.class);
+
+  private final int[] buckets = new int[9];
+  private static final int TOMBSTONE_BUCKET_INDEX = 0;
+
+  /**
+   * Increments the count for a particular bucket held in this class
+   *
+   * @param rowCount the number of rows to figure out which bucket to increment
+   */
+  public void addRowCountToDistribution(long rowCount)
+  {
+    int bucketIndex = determineBucketFromRowCount(rowCount);
+    buckets[bucketIndex]++;
+  }
+
+  /**
+   * Decrements the count for a particular bucket held in this class
+   *
+   * @param rowCount the count which determines which bucket to decrement
+   */
+  public void removeRowCountFromDistribution(long rowCount)
+  {
+    int bucketIndex = determineBucketFromRowCount(rowCount);
+    buckets[bucketIndex]--;
+    if (buckets[bucketIndex] < 0) {
+      // can this ever go negative?
+      log.error("somehow got a count of less than 0, resetting value to 0");
+      buckets[bucketIndex] = 0;
+    }
+  }
+
+  /**
+   * Increments the count for number of tombstones in the distribution
+   */
+  public void addTombstoneToDistribution()
+  {
+    buckets[TOMBSTONE_BUCKET_INDEX]++;
+  }
+
+  /**
+   * Decrements the count for the number of tombstones in he distribution.
+   */
+  public void removeTombstoneFromDistribution()
+  {
+    buckets[TOMBSTONE_BUCKET_INDEX]--;
+  }
+
+  /**
+   * Determines the name of the dimension used for a bucket. Should never 
return NA as this isn't public and this
+   * method is private to this class
+   *
+   * @param index the index of the bucket
+   * @return the dimension which the bucket index refers to
+   */
+  private static String getBucketDimensionFromIndex(int index)
+  {
+    switch (index) {
+      case 0:
+        return "Tombstone";
+      case 1:
+        return "0";
+      case 2:
+        return "1-10k";
+      case 3:
+        return "10k-2M";
+      case 4:
+        return "2M-4M";
+      case 5:
+        return "4M-6M";
+      case 6:
+        return "6M-8M";
+      case 7:
+        return "8M-10M";
+      case 8:
+        return "10M+";
+      // should never get to default
+      default:
+        return "NA";
+    }
+  }
+
+  /**
+   * Figures out which bucket the specified rowCount belongs to
+   *
+   * @param rowCount the number of rows in a segment
+   * @return the bucket index
+   */
+  private static int determineBucketFromRowCount(long rowCount)
+  {
+    // 0 indexed bucket is reserved for tombstones
+    if (rowCount <= 0L) {
+      return 1;
+    }
+    if (rowCount <= 10_000L) {
+      return 2;
+    }
+    if (rowCount <= 2_000_000L) {
+      return 3;
+    }
+    if (rowCount <= 4_000_000L) {
+      return 4;
+    }
+    if (rowCount <= 6_000_000L) {
+      return 5;
+    }
+    if (rowCount <= 8_000_000L) {
+      return 6;
+    }
+    if (rowCount <= 10_000_000L) {
+      return 7;
+    }
+    return 8;
+  }
+
+  /**
+   * Gives the consumer the range dimension and the associated count. Will not 
give zero range unless there is a count there.
+   *
+   * @param consumer
+   */
+  public void forEachDimension(final ObjIntConsumer<String> consumer)
+  {
+    for (int ii = 0; ii < buckets.length; ii++) {
+      // only report tombstones and 0 bucket if it has nonzero value
+      if (ii > 1 || buckets[ii] != 0) {
+        consumer.accept(getBucketDimensionFromIndex(ii), buckets[ii]);
+      }
+    }
+  }
+
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/metrics/SegmentStatsMonitor.java 
b/server/src/main/java/org/apache/druid/server/metrics/SegmentStatsMonitor.java
new file mode 100644
index 0000000000..c72aba2957
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/metrics/SegmentStatsMonitor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.druid.server.metrics;
+
+
+import com.google.inject.Inject;
+import org.apache.druid.client.DruidServerConfig;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.AbstractMonitor;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.server.coordination.SegmentLoadDropHandler;
+
+import java.util.Map;
+
+/**
+ * An experimental monitor used to keep track of segment stats. Can only be 
used on a historical and cannot be used with lazy loading.
+ *
+ * It keeps track of the average number of rows in a segment and the 
distribution of segments according to rowCount.
+ */
+public class SegmentStatsMonitor extends AbstractMonitor
+{
+  private final DruidServerConfig serverConfig;
+  private final SegmentLoadDropHandler segmentLoadDropHandler;
+
+  private static final Logger log = new Logger(SegmentStatsMonitor.class);
+
+  /**
+   * Constructor for this monitor. Will throw IllegalStateException if lazy 
load on start is set to true.
+   *
+   * @param serverConfig
+   * @param segmentLoadDropHandler
+   * @param segmentLoaderConfig
+   */
+  @Inject
+  public SegmentStatsMonitor(
+      DruidServerConfig serverConfig,
+      SegmentLoadDropHandler segmentLoadDropHandler,
+      SegmentLoaderConfig segmentLoaderConfig
+  )
+  {
+    if (segmentLoaderConfig.isLazyLoadOnStart()) {
+      // log message ensures there is an error displayed at startup if this 
fails as the exception isn't logged.
+      log.error("Monitor doesn't support working with lazy loading on start");
+      // throw this exception it kill the process at startup
+      throw new IllegalStateException("Monitor doesn't support working with 
lazy loading on start");
+    }
+    this.serverConfig = serverConfig;
+    this.segmentLoadDropHandler = segmentLoadDropHandler;
+
+  }
+
+  @Override
+  public boolean doMonitor(ServiceEmitter emitter)
+  {
+    for (Map.Entry<String, Long> entry : 
segmentLoadDropHandler.getAverageNumOfRowsPerSegmentForDatasource().entrySet()) 
{
+      String dataSource = entry.getKey();
+      long averageSize = entry.getValue();
+      final ServiceMetricEvent.Builder builder = new 
ServiceMetricEvent.Builder()
+          .setDimension(DruidMetrics.DATASOURCE, dataSource)
+          .setDimension("tier", serverConfig.getTier())
+          .setDimension("priority", 
String.valueOf(serverConfig.getPriority()));
+      emitter.emit(builder.build("segment/rowCount/avg", averageSize));
+    }
+
+    for (Map.Entry<String, SegmentRowCountDistribution> entry : 
segmentLoadDropHandler.getRowCountDistributionPerDatasource()
+                                                                               
       .entrySet()) {
+      String dataSource = entry.getKey();
+      SegmentRowCountDistribution rowCountBucket = entry.getValue();
+
+      rowCountBucket.forEachDimension((final String bucketDimension, final int 
count) -> {
+        final ServiceMetricEvent.Builder builder = new 
ServiceMetricEvent.Builder()
+            .setDimension(DruidMetrics.DATASOURCE, dataSource)
+            .setDimension("tier", serverConfig.getTier())
+            .setDimension("priority", 
String.valueOf(serverConfig.getPriority()));
+        builder.setDimension("range", bucketDimension);
+        ServiceEventBuilder<ServiceMetricEvent> output = 
builder.build("segment/rowCount/range/count", count);
+        emitter.emit(output);
+      });
+    }
+
+    return true;
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
 
b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
index 941b459c73..f80688276c 100644
--- 
a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
+++ 
b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
@@ -19,15 +19,26 @@
 
 package org.apache.druid.segment.loading;
 
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.Metadata;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.ReferenceCountingSegment;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.SegmentLazyLoadFailCallback;
 import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.data.Indexed;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
+import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
+import javax.annotation.Nullable;
 import java.util.concurrent.ExecutorService;
 
 /**
@@ -61,7 +72,96 @@ public class CacheTestSegmentLoader implements SegmentLoader
       @Override
       public StorageAdapter asStorageAdapter()
       {
-        throw new UnsupportedOperationException();
+        return new StorageAdapter()
+        {
+          @Override
+          public Interval getInterval()
+          {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public Indexed<String> getAvailableDimensions()
+          {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public Iterable<String> getAvailableMetrics()
+          {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public int getDimensionCardinality(String column)
+          {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public DateTime getMinTime()
+          {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public DateTime getMaxTime()
+          {
+            throw new UnsupportedOperationException();
+          }
+
+          @Nullable
+          @Override
+          public Comparable getMinValue(String column)
+          {
+            throw new UnsupportedOperationException();
+          }
+
+          @Nullable
+          @Override
+          public Comparable getMaxValue(String column)
+          {
+            throw new UnsupportedOperationException();
+          }
+
+          @Nullable
+          @Override
+          public ColumnCapabilities getColumnCapabilities(String column)
+          {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public int getNumRows()
+          {
+            return 1;
+          }
+
+          @Override
+          public DateTime getMaxIngestedEventTime()
+          {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public Metadata getMetadata()
+          {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public Sequence<Cursor> makeCursors(
+              @Nullable Filter filter,
+              Interval interval,
+              VirtualColumns virtualColumns,
+              Granularity gran,
+              boolean descending,
+              @Nullable QueryMetrics<?> queryMetrics
+          )
+          {
+            throw new UnsupportedOperationException();
+          }
+        };
       }
 
       @Override
diff --git 
a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java 
b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
index a64078b4d7..9d1ac4c4e2 100644
--- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
@@ -45,6 +45,7 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -89,11 +90,14 @@ public class SegmentManagerTest
   {
     private final String version;
     private final Interval interval;
+    private final StorageAdapter storageAdapter;
 
     SegmentForTesting(String version, Interval interval)
     {
       this.version = version;
       this.interval = interval;
+      storageAdapter = Mockito.mock(StorageAdapter.class);
+      Mockito.when(storageAdapter.getNumRows()).thenReturn(1);
     }
 
     public String getVersion()
@@ -127,7 +131,7 @@ public class SegmentManagerTest
     @Override
     public StorageAdapter asStorageAdapter()
     {
-      throw new UnsupportedOperationException();
+      return storageAdapter;
     }
 
     @Override
diff --git 
a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
 
b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
index b1de3a22ee..27abe09161 100644
--- 
a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
@@ -56,6 +56,7 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
 
 import javax.annotation.Nullable;
 import java.io.File;
@@ -227,6 +228,8 @@ public class SegmentManagerThreadSafetyTest
     {
       return new Segment()
       {
+        StorageAdapter storageAdapter = Mockito.mock(StorageAdapter.class);
+
         @Override
         public SegmentId getId()
         {
@@ -249,7 +252,8 @@ public class SegmentManagerThreadSafetyTest
         @Override
         public StorageAdapter asStorageAdapter()
         {
-          throw new UnsupportedOperationException();
+          Mockito.when(storageAdapter.getNumRows()).thenReturn(1);
+          return storageAdapter;
         }
 
         @Override
diff --git 
a/server/src/test/java/org/apache/druid/server/metrics/SegmentRowCountDistributionTest.java
 
b/server/src/test/java/org/apache/druid/server/metrics/SegmentRowCountDistributionTest.java
new file mode 100644
index 0000000000..b5ebfe0174
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/metrics/SegmentRowCountDistributionTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.function.ObjIntConsumer;
+
+
+public class SegmentRowCountDistributionTest
+{
+
+  private SegmentRowCountDistribution rowCountBucket;
+
+  @Before
+  public void setUp()
+  {
+    rowCountBucket = new SegmentRowCountDistribution();
+  }
+
+  @Test
+  public void test_bucketCountSanity()
+  {
+    // test base case
+    rowCountBucket.forEachDimension((final String dimension, final int count) 
-> {
+      Assert.assertEquals(0, count);
+    });
+
+    // tombstones
+    // add tombstones
+    rowCountBucket.addTombstoneToDistribution();
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 0));
+    rowCountBucket.addTombstoneToDistribution();
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 0));
+    // remove tombstones
+    rowCountBucket.removeTombstoneFromDistribution();
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 0));
+    rowCountBucket.removeTombstoneFromDistribution();
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 0));
+
+    // test bounds of 1st bucket
+    // with addition
+    rowCountBucket.addRowCountToDistribution(0);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 1));
+    rowCountBucket.addRowCountToDistribution(0);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 1));
+    // with removal
+    rowCountBucket.removeRowCountFromDistribution(0);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 1));
+    rowCountBucket.removeRowCountFromDistribution(0);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 1));
+
+    // test bounds of 2nd bucket
+    // with addition
+    rowCountBucket.addRowCountToDistribution(1);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 2));
+    rowCountBucket.addRowCountToDistribution(10_000);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 2));
+    // with removal
+    rowCountBucket.removeRowCountFromDistribution(1);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 2));
+    rowCountBucket.removeRowCountFromDistribution(10_000);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 2));
+
+    // test bounds of 3rd bucket
+    // with addition
+    rowCountBucket.addRowCountToDistribution(10_001);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 3));
+    rowCountBucket.addRowCountToDistribution(2_000_000);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 3));
+    // with removal
+    rowCountBucket.removeRowCountFromDistribution(10_001);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 3));
+    rowCountBucket.removeRowCountFromDistribution(2_000_000);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 3));
+
+    // test bounds of 4th bucket
+    // with addition
+    rowCountBucket.addRowCountToDistribution(2_000_001);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 4));
+    rowCountBucket.addRowCountToDistribution(4_000_000);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 4));
+    // with removal
+    rowCountBucket.removeRowCountFromDistribution(2_000_001);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 4));
+
+    rowCountBucket.removeRowCountFromDistribution(4_000_000);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 4));
+
+
+    // test bounds of 5th bucket
+    // with addition
+    rowCountBucket.addRowCountToDistribution(4_000_001);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 5));
+    rowCountBucket.addRowCountToDistribution(6_000_000);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 5));
+    // with removal
+    rowCountBucket.removeRowCountFromDistribution(4_000_001);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 5));
+    rowCountBucket.removeRowCountFromDistribution(6_000_000);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 5));
+
+    // test bounds of 6th bucket
+    // with addition
+    rowCountBucket.addRowCountToDistribution(6_000_001);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 6));
+    rowCountBucket.addRowCountToDistribution(8_000_000);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 6));
+    // with removal
+    rowCountBucket.removeRowCountFromDistribution(6_000_001);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 6));
+    rowCountBucket.removeRowCountFromDistribution(8_000_000);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 6));
+
+    // test bounds of 7th bucket
+    // with addition
+    rowCountBucket.addRowCountToDistribution(8_000_001);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 7));
+    rowCountBucket.addRowCountToDistribution(10_000_000);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 7));
+    // with removal
+    rowCountBucket.removeRowCountFromDistribution(8_000_001);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 7));
+    rowCountBucket.removeRowCountFromDistribution(10_000_000);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 7));
+
+    // test bounds of 8th bucket
+    // with addition
+    rowCountBucket.addRowCountToDistribution(10_000_001);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 8));
+    rowCountBucket.addRowCountToDistribution(Long.MAX_VALUE);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 8));
+    // with removal
+    rowCountBucket.removeRowCountFromDistribution(10_000_001);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 8));
+    rowCountBucket.removeRowCountFromDistribution(Long.MAX_VALUE);
+    rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 8));
+  }
+
+  // this is used to test part of the functionality in 
AssertBucketHasValue.assertExpected
+  @Test
+  public void test_bucketDimensionFromIndex()
+  {
+    Assert.assertEquals("Tombstone", getBucketDimensionFromIndex(0));
+    Assert.assertEquals("0", getBucketDimensionFromIndex(1));
+    Assert.assertEquals("1-10k", getBucketDimensionFromIndex(2));
+    Assert.assertEquals("10k-2M", getBucketDimensionFromIndex(3));
+    Assert.assertEquals("2M-4M", getBucketDimensionFromIndex(4));
+    Assert.assertEquals("4M-6M", getBucketDimensionFromIndex(5));
+    Assert.assertEquals("6M-8M", getBucketDimensionFromIndex(6));
+    Assert.assertEquals("8M-10M", getBucketDimensionFromIndex(7));
+    Assert.assertEquals("10M+", getBucketDimensionFromIndex(8));
+    Assert.assertEquals("NA", getBucketDimensionFromIndex(9));
+  }
+
+  private static class AssertBucketHasValue implements ObjIntConsumer<String>
+  {
+
+    private final int expectedBucket;
+    private final int expectedValue;
+
+    private AssertBucketHasValue(int expectedBucket, int expectedValue)
+    {
+      this.expectedBucket = expectedBucket;
+      this.expectedValue = expectedValue;
+    }
+
+    static AssertBucketHasValue assertExpected(int expectedValue, int 
expectedBucket)
+    {
+      return new AssertBucketHasValue(expectedBucket, expectedValue);
+    }
+
+    @Override
+    public void accept(String s, int value)
+    {
+      if (s.equals(getBucketDimensionFromIndex(expectedBucket))) {
+        Assert.assertEquals(expectedValue, value);
+      } else {
+        // assert all other values are empty
+        Assert.assertEquals(0, value);
+      }
+    }
+  }
+
+  // this is here because we didn't want to expose the internals of the 
buckets for segment rowCount distributions
+  private static String getBucketDimensionFromIndex(int index)
+  {
+    switch (index) {
+      case 0:
+        return "Tombstone";
+      case 1:
+        return "0";
+      case 2:
+        return "1-10k";
+      case 3:
+        return "10k-2M";
+      case 4:
+        return "2M-4M";
+      case 5:
+        return "4M-6M";
+      case 6:
+        return "6M-8M";
+      case 7:
+        return "8M-10M";
+      case 8:
+        return "10M+";
+      // should never get to default
+      default:
+        return "NA";
+    }
+  }
+
+
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/metrics/SegmentStatsMonitorTest.java
 
b/server/src/test/java/org/apache/druid/server/metrics/SegmentStatsMonitorTest.java
new file mode 100644
index 0000000000..cb08a39fc3
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/metrics/SegmentStatsMonitorTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.druid.server.metrics;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.client.DruidServerConfig;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.server.coordination.SegmentLoadDropHandler;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import javax.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class SegmentStatsMonitorTest
+{
+  private static final String DATA_SOURCE = "dataSource";
+  private static final int PRIORITY = 111;
+  private static final String TIER = "tier";
+
+  private DruidServerConfig druidServerConfig;
+  private SegmentLoadDropHandler segmentLoadDropMgr;
+  private ServiceEmitter serviceEmitter;
+  private SegmentStatsMonitor monitor;
+  private final SegmentLoaderConfig segmentLoaderConfig = new 
SegmentLoaderConfig();
+
+  @Before
+  public void setUp()
+  {
+    druidServerConfig = Mockito.mock(DruidServerConfig.class);
+    segmentLoadDropMgr = Mockito.mock(SegmentLoadDropHandler.class);
+    serviceEmitter = Mockito.mock(ServiceEmitter.class);
+    monitor = new SegmentStatsMonitor(
+        druidServerConfig,
+        segmentLoadDropMgr,
+        segmentLoaderConfig
+    );
+    Mockito.when(druidServerConfig.getTier()).thenReturn(TIER);
+    Mockito.when(druidServerConfig.getPriority()).thenReturn(PRIORITY);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testLazyLoadOnStartThrowsException()
+  {
+    SegmentLoaderConfig segmentLoaderConfig = 
Mockito.mock(SegmentLoaderConfig.class);
+    Mockito.when(segmentLoaderConfig.isLazyLoadOnStart()).thenReturn(true);
+
+    //should throw an exception here
+    new SegmentStatsMonitor(druidServerConfig, segmentLoadDropMgr, 
segmentLoaderConfig);
+  }
+
+  @Test
+  public void testSimple()
+  {
+    final SegmentRowCountDistribution segmentRowCountDistribution = new 
SegmentRowCountDistribution();
+    segmentRowCountDistribution.addRowCountToDistribution(100_000L);
+
+    
Mockito.when(segmentLoadDropMgr.getAverageNumOfRowsPerSegmentForDatasource())
+           .thenReturn(ImmutableMap.of(DATA_SOURCE, 100_000L));
+    Mockito.when(segmentLoadDropMgr.getRowCountDistributionPerDatasource())
+           .thenReturn(ImmutableMap.of(DATA_SOURCE, 
segmentRowCountDistribution));
+
+    ArgumentCaptor<ServiceEventBuilder<ServiceMetricEvent>> 
eventArgumentCaptor = ArgumentCaptor.forClass(
+        ServiceEventBuilder.class);
+    monitor.doMonitor(serviceEmitter);
+    Mockito.verify(serviceEmitter, 
Mockito.atLeastOnce()).emit(eventArgumentCaptor.capture());
+
+    List<Map<String, Object>> eventsAsMaps = 
getEventMaps(eventArgumentCaptor.getAllValues());
+    Map<String, Map<String, Object>> actual = metricKeyedMap(eventsAsMaps);
+
+    List<ServiceEventBuilder<ServiceMetricEvent>> expectedEvents = new 
ArrayList<>();
+    expectedEvents.add(averageRowCountEvent(100_000L));
+    expectedEvents.add(rowCountRangeEvent("1-10k", 0));
+    expectedEvents.add(rowCountRangeEvent("10k-2M", 1));
+    expectedEvents.add(rowCountRangeEvent("2M-4M", 0));
+    expectedEvents.add(rowCountRangeEvent("4M-6M", 0));
+    expectedEvents.add(rowCountRangeEvent("6M-8M", 0));
+    expectedEvents.add(rowCountRangeEvent("8M-10M", 0));
+    expectedEvents.add(rowCountRangeEvent("10M+", 0));
+
+    List<Map<String, Object>> expectedEventsAsMap = 
getEventMaps(expectedEvents);
+    Map<String, Map<String, Object>> expected = 
metricKeyedMap(expectedEventsAsMap);
+
+    Assert.assertEquals("different number of metrics were returned", 
expected.size(), actual.size());
+    for (Map.Entry<String, Map<String, Object>> expectedKeyedEntry : 
expected.entrySet()) {
+      Map<String, Object> actualValue = 
actual.get(expectedKeyedEntry.getKey());
+      assertMetricMapsEqual(expectedKeyedEntry.getKey(), 
expectedKeyedEntry.getValue(), actualValue);
+    }
+  }
+
+  @Test
+  public void testZeroAndTombstoneDistribution()
+  {
+    final SegmentRowCountDistribution segmentRowCountDistribution = new 
SegmentRowCountDistribution();
+    segmentRowCountDistribution.addRowCountToDistribution(100_000L);
+    segmentRowCountDistribution.addRowCountToDistribution(0L);
+    segmentRowCountDistribution.addTombstoneToDistribution();
+    segmentRowCountDistribution.addTombstoneToDistribution();
+
+    
Mockito.when(segmentLoadDropMgr.getAverageNumOfRowsPerSegmentForDatasource())
+           .thenReturn(ImmutableMap.of(DATA_SOURCE, 50_000L));
+    Mockito.when(segmentLoadDropMgr.getRowCountDistributionPerDatasource())
+           .thenReturn(ImmutableMap.of(DATA_SOURCE, 
segmentRowCountDistribution));
+
+    ArgumentCaptor<ServiceEventBuilder<ServiceMetricEvent>> 
eventArgumentCaptor = ArgumentCaptor.forClass(
+        ServiceEventBuilder.class);
+    monitor.doMonitor(serviceEmitter);
+    Mockito.verify(serviceEmitter, 
Mockito.atLeastOnce()).emit(eventArgumentCaptor.capture());
+
+    List<Map<String, Object>> eventsAsMaps = 
getEventMaps(eventArgumentCaptor.getAllValues());
+    Map<String, Map<String, Object>> actual = metricKeyedMap(eventsAsMaps);
+
+    List<ServiceEventBuilder<ServiceMetricEvent>> expectedEvents = new 
ArrayList<>();
+    expectedEvents.add(averageRowCountEvent(50_000L));
+    expectedEvents.add(rowCountRangeEvent("0", 1));
+    expectedEvents.add(rowCountRangeEvent("Tombstone", 2));
+    expectedEvents.add(rowCountRangeEvent("1-10k", 0));
+    expectedEvents.add(rowCountRangeEvent("10k-2M", 1));
+    expectedEvents.add(rowCountRangeEvent("2M-4M", 0));
+    expectedEvents.add(rowCountRangeEvent("4M-6M", 0));
+    expectedEvents.add(rowCountRangeEvent("6M-8M", 0));
+    expectedEvents.add(rowCountRangeEvent("8M-10M", 0));
+    expectedEvents.add(rowCountRangeEvent("10M+", 0));
+
+    List<Map<String, Object>> expectedEventsAsMap = 
getEventMaps(expectedEvents);
+    Map<String, Map<String, Object>> expected = 
metricKeyedMap(expectedEventsAsMap);
+
+    Assert.assertEquals("different number of metrics were returned", 
expected.size(), actual.size());
+    for (Map.Entry<String, Map<String, Object>> expectedKeyedEntry : 
expected.entrySet()) {
+      Map<String, Object> actualValue = 
actual.get(expectedKeyedEntry.getKey());
+      assertMetricMapsEqual(expectedKeyedEntry.getKey(), 
expectedKeyedEntry.getValue(), actualValue);
+    }
+  }
+
+  private void assertMetricMapsEqual(String messagePrefix, Map<String, Object> 
expected, Map<String, Object> actual)
+  {
+    Assert.assertEquals("different number of expected values for metrics", 
expected.size(), actual.size());
+    for (Map.Entry<String, Object> expectedMetricEntry : expected.entrySet()) {
+      Assert.assertEquals(
+          messagePrefix + " " + expectedMetricEntry.getKey(),
+          expectedMetricEntry.getValue(),
+          actual.get(expectedMetricEntry.getKey())
+      );
+    }
+  }
+
+  @Nonnull
+  private List<Map<String, Object>> 
getEventMaps(List<ServiceEventBuilder<ServiceMetricEvent>> eventBuilders)
+  {
+    return eventBuilders.stream()
+                        .map(eventBuilder -> new 
HashMap<>(eventBuilder.build(ImmutableMap.of()).toMap()))
+                        .peek(mappedValues -> mappedValues.remove("timestamp"))
+                        .collect(Collectors.toList());
+  }
+
+  private Map<String, Map<String, Object>> metricKeyedMap(List<Map<String, 
Object>> eventsAsMaps)
+  {
+    return eventsAsMaps.stream()
+                       .collect(
+                           Collectors.toMap(eventasdf -> {
+                             String metricName = 
eventasdf.get("metric").toString();
+                             String range = eventasdf.getOrDefault("range", 
"").toString();
+                             return metricName + range;
+                           }, Function.identity())
+                       );
+  }
+
+  private ServiceEventBuilder<ServiceMetricEvent> averageRowCountEvent(Number 
value)
+  {
+    return new 
ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, DATA_SOURCE)
+                                           .setDimension("tier", TIER)
+                                           .setDimension("priority", 
String.valueOf(PRIORITY))
+                                           .build("segment/rowCount/avg", 
value);
+  }
+
+  private ServiceEventBuilder<ServiceMetricEvent> rowCountRangeEvent(String 
range, Number value)
+  {
+    return new 
ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, DATA_SOURCE)
+                                           .setDimension("tier", TIER)
+                                           .setDimension("priority", 
String.valueOf(PRIORITY))
+                                           .setDimension("range", range)
+                                           
.build("segment/rowCount/range/count", value);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to