What exactly would you prefer? Without the stream name and shard id you'd end up with name clashes all over the place.

Why can you not aggregate them? Surely Datadog supports some way to define a wildcard when definying the tags to aggregate.


On 03/10/2019 09:09, Yitzchak Lieberman wrote:
Hi.

I would like to have the ability to control the metric group of flink kinesis consumer: As written below it creates metric identifier for each stream name and shard id (in our case more than 1000 metric identifiers), in such matter it cannot be aggregated in data dog graph
private static ShardMetricsReporterregisterShardMetrics(MetricGroup 
metricGroup, KinesisStreamShardState shardState) {
    ShardMetricsReporter shardMetrics =new ShardMetricsReporter(); MetricGroup 
streamShardMetricGroup = metricGroup
       .addGroup(
          KinesisConsumerMetricConstants.STREAM_METRICS_GROUP, 
shardState.getStreamShardHandle().getStreamName())
       .addGroup(
          KinesisConsumerMetricConstants.SHARD_METRICS_GROUP, 
shardState.getStreamShardHandle().getShard().getShardId()); 
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MILLIS_BEHIND_LATEST_GAUGE,
 shardMetrics::getMillisBehindLatest); 
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MAX_RECORDS_PER_FETCH,
 shardMetrics::getMaxNumberOfRecordsPerFetch); 
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_AGGREGATED_RECORDS_PER_FETCH,
 shardMetrics::getNumberOfAggregatedRecords); 
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_DEAGGREGATED_RECORDS_PER_FETCH,
 shardMetrics::getNumberOfDeaggregatedRecords); 
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.AVG_RECORD_SIZE_BYTES,
 shardMetrics::getAverageRecordSizeBytes); 
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.BYTES_PER_READ, 
shardMetrics::getBytesPerRead); 
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.RUNTIME_LOOP_NANOS, 
shardMetrics::getRunLoopTimeNanos); 
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.LOOP_FREQUENCY_HZ, 
shardMetrics::getLoopFrequencyHz); 
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.SLEEP_TIME_MILLIS, 
shardMetrics::getSleepTimeMillis); return shardMetrics; }
Would be happy for your advice.
Thanks,
Yitzchak.


Reply via email to