mxm commented on code in PR #16155:
URL: https://github.com/apache/iceberg/pull/16155#discussion_r3179458414


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java:
##########
@@ -51,18 +63,12 @@ public IcebergStreamWriterMetrics(MetricGroup metrics, 
String fullTableName) {
     this.lastFlushDurationMs = new AtomicLong();
     writerMetrics.gauge("lastFlushDurationMs", lastFlushDurationMs::get);
 
-    com.codahale.metrics.Histogram dropwizardDataFilesSizeHistogram =
-        new com.codahale.metrics.Histogram(new 
SlidingWindowReservoir(HISTOGRAM_RESERVOIR_SIZE));
     this.dataFilesSizeHistogram =
-        writerMetrics.histogram(
-            "dataFilesSizeHistogram",
-            new DropwizardHistogramWrapper(dropwizardDataFilesSizeHistogram));

Review Comment:
   According to the Flink docs, using Dropwizard metrics is still the official 
way to use Histogram metrics. `DescriptiveStatisticsHistogram` was added to 
flink-runtime for internal metrics around the same time of the initial 
implementation here. The semantics (e.g. bucketing) of the internal histogram 
are different. I would defer this to a follow-up, as the behavior of the 
implementation might change.



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java:
##########
@@ -97,4 +108,72 @@ public Counter getFlushedDataFiles() {
   public Counter getFlushedDeleteFiles() {
     return flushedDeleteFiles;
   }
+
+  @VisibleForTesting
+  @Nullable
+  Histogram dataFilesSizeHistogram() {
+    return dataFilesSizeHistogram;
+  }
+
+  @VisibleForTesting
+  @Nullable
+  Histogram deleteFilesSizeHistogram() {
+    return deleteFilesSizeHistogram;
+  }
+
+  /**
+   * Checks whether the Dropwizard-based histogram wrapper provided through 
Flink's optional
+   * flink-metrics-dropwizard dependency is available.
+   */
+  @SuppressWarnings("CatchBlockLogException")
+  private static Histogram loadHistogramIfAvailable(
+      MetricGroup group, String name, int reservoirSize, ClassLoader 
classLoader) {
+
+    try {
+      Class<?> reservoirInterface =
+          DynClasses.builder()
+              .loader(classLoader)
+              .impl("com.codahale.metrics.Reservoir")
+              .buildChecked();
+      Class<?> slidingWindowReservoirClass =
+          DynClasses.builder()
+              .loader(classLoader)
+              .impl("com.codahale.metrics.SlidingWindowReservoir")
+              .buildChecked();
+      Class<?> codahaleHistogramClass =
+          DynClasses.builder()
+              .loader(classLoader)
+              .impl("com.codahale.metrics.Histogram")
+              .buildChecked();
+      Class<?> wrapperClass =
+          DynClasses.builder()
+              .loader(classLoader)
+              
.impl("org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper")
+              .buildChecked();
+
+      Object reservoir =
+          DynConstructors.builder()
+              .impl(slidingWindowReservoirClass, int.class)
+              .buildChecked()
+              .newInstance(reservoirSize);
+      Object codahaleHistogram =
+          DynConstructors.builder()
+              .impl(codahaleHistogramClass, reservoirInterface)
+              .buildChecked()
+              .newInstance(reservoir);
+      Histogram wrapper =
+          (Histogram)
+              DynConstructors.builder()
+                  .impl(wrapperClass, codahaleHistogramClass)
+                  .buildChecked()
+                  .newInstance(codahaleHistogram);

Review Comment:
   Good suggestion. I've separated those.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to