mxm commented on code in PR #16155:
URL: https://github.com/apache/iceberg/pull/16155#discussion_r3179460123


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java:
##########
@@ -97,4 +108,72 @@ public Counter getFlushedDataFiles() {
   public Counter getFlushedDeleteFiles() {
     return flushedDeleteFiles;
   }
+
+  @VisibleForTesting
+  @Nullable
+  Histogram dataFilesSizeHistogram() {
+    return dataFilesSizeHistogram;
+  }
+
+  @VisibleForTesting
+  @Nullable
+  Histogram deleteFilesSizeHistogram() {
+    return deleteFilesSizeHistogram;
+  }
+
+  /**
+   * Checks whether the Dropwizard-based histogram wrapper provided through 
Flink's optional
+   * flink-metrics-dropwizard dependency is available.
+   */
+  @SuppressWarnings("CatchBlockLogException")
+  private static Histogram loadHistogramIfAvailable(
+      MetricGroup group, String name, int reservoirSize, ClassLoader 
classLoader) {

Review Comment:
   This is cleaner. I agree on the possible side effects. Thanks!



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java:
##########
@@ -97,4 +108,72 @@ public Counter getFlushedDataFiles() {
   public Counter getFlushedDeleteFiles() {
     return flushedDeleteFiles;
   }
+
+  @VisibleForTesting
+  @Nullable
+  Histogram dataFilesSizeHistogram() {
+    return dataFilesSizeHistogram;
+  }
+
+  @VisibleForTesting
+  @Nullable
+  Histogram deleteFilesSizeHistogram() {
+    return deleteFilesSizeHistogram;
+  }
+
+  /**
+   * Checks whether the Dropwizard-based histogram wrapper provided through 
Flink's optional
+   * flink-metrics-dropwizard dependency is available.
+   */
+  @SuppressWarnings("CatchBlockLogException")
+  private static Histogram loadHistogramIfAvailable(
+      MetricGroup group, String name, int reservoirSize, ClassLoader 
classLoader) {
+
+    try {
+      Class<?> reservoirInterface =
+          DynClasses.builder()
+              .loader(classLoader)
+              .impl("com.codahale.metrics.Reservoir")
+              .buildChecked();
+      Class<?> slidingWindowReservoirClass =
+          DynClasses.builder()
+              .loader(classLoader)
+              .impl("com.codahale.metrics.SlidingWindowReservoir")
+              .buildChecked();
+      Class<?> codahaleHistogramClass =
+          DynClasses.builder()
+              .loader(classLoader)
+              .impl("com.codahale.metrics.Histogram")
+              .buildChecked();
+      Class<?> wrapperClass =
+          DynClasses.builder()
+              .loader(classLoader)
+              
.impl("org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper")
+              .buildChecked();
+
+      Object reservoir =
+          DynConstructors.builder()
+              .impl(slidingWindowReservoirClass, int.class)
+              .buildChecked()
+              .newInstance(reservoirSize);
+      Object codahaleHistogram =
+          DynConstructors.builder()
+              .impl(codahaleHistogramClass, reservoirInterface)
+              .buildChecked()
+              .newInstance(reservoir);
+      Histogram wrapper =
+          (Histogram)
+              DynConstructors.builder()
+                  .impl(wrapperClass, codahaleHistogramClass)
+                  .buildChecked()
+                  .newInstance(codahaleHistogram);
+
+      return group.histogram(name, wrapper);
+    } catch (ClassNotFoundException | NoSuchMethodException e) {
+      LOG.warn(
+          "flink-metrics-dropwizard is not on the classpath. '{}' histogram 
metrics will be disabled. Add org.apache.flink:flink-metrics-dropwizard to 
enable them.",
+          name);

Review Comment:
   Thanks! I've change the code to print just once on loading the class. This 
would otherwise print on creating the writer, which is once per job, but on 
recovery this would print again, which is redundant.



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java:
##########
@@ -97,4 +108,72 @@ public Counter getFlushedDataFiles() {
   public Counter getFlushedDeleteFiles() {
     return flushedDeleteFiles;
   }
+
+  @VisibleForTesting
+  @Nullable
+  Histogram dataFilesSizeHistogram() {
+    return dataFilesSizeHistogram;
+  }
+
+  @VisibleForTesting
+  @Nullable
+  Histogram deleteFilesSizeHistogram() {
+    return deleteFilesSizeHistogram;
+  }
+
+  /**
+   * Checks whether the Dropwizard-based histogram wrapper provided through 
Flink's optional
+   * flink-metrics-dropwizard dependency is available.
+   */
+  @SuppressWarnings("CatchBlockLogException")
+  private static Histogram loadHistogramIfAvailable(
+      MetricGroup group, String name, int reservoirSize, ClassLoader 
classLoader) {
+
+    try {
+      Class<?> reservoirInterface =
+          DynClasses.builder()
+              .loader(classLoader)
+              .impl("com.codahale.metrics.Reservoir")
+              .buildChecked();
+      Class<?> slidingWindowReservoirClass =
+          DynClasses.builder()
+              .loader(classLoader)
+              .impl("com.codahale.metrics.SlidingWindowReservoir")
+              .buildChecked();
+      Class<?> codahaleHistogramClass =
+          DynClasses.builder()
+              .loader(classLoader)
+              .impl("com.codahale.metrics.Histogram")
+              .buildChecked();
+      Class<?> wrapperClass =
+          DynClasses.builder()
+              .loader(classLoader)
+              
.impl("org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper")
+              .buildChecked();
+
+      Object reservoir =
+          DynConstructors.builder()
+              .impl(slidingWindowReservoirClass, int.class)
+              .buildChecked()
+              .newInstance(reservoirSize);
+      Object codahaleHistogram =
+          DynConstructors.builder()
+              .impl(codahaleHistogramClass, reservoirInterface)
+              .buildChecked()
+              .newInstance(reservoir);
+      Histogram wrapper =
+          (Histogram)
+              DynConstructors.builder()

Review Comment:
   That's nicer. Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to