mxm commented on code in PR #16155:
URL: https://github.com/apache/iceberg/pull/16155#discussion_r3179458953


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java:
##########
@@ -97,4 +108,72 @@ public Counter getFlushedDataFiles() {
   public Counter getFlushedDeleteFiles() {
     return flushedDeleteFiles;
   }
+
+  @VisibleForTesting
+  @Nullable
+  Histogram dataFilesSizeHistogram() {
+    return dataFilesSizeHistogram;
+  }
+
+  @VisibleForTesting
+  @Nullable
+  Histogram deleteFilesSizeHistogram() {
+    return deleteFilesSizeHistogram;
+  }
+
+  /**
+   * Checks whether the Dropwizard-based histogram wrapper provided through 
Flink's optional
+   * flink-metrics-dropwizard dependency is available.
+   */
+  @SuppressWarnings("CatchBlockLogException")
+  private static Histogram loadHistogramIfAvailable(
+      MetricGroup group, String name, int reservoirSize, ClassLoader 
classLoader) {
+
+    try {
+      Class<?> reservoirInterface =
+          DynClasses.builder()
+              .loader(classLoader)
+              .impl("com.codahale.metrics.Reservoir")
+              .buildChecked();
+      Class<?> slidingWindowReservoirClass =
+          DynClasses.builder()
+              .loader(classLoader)
+              .impl("com.codahale.metrics.SlidingWindowReservoir")
+              .buildChecked();
+      Class<?> codahaleHistogramClass =
+          DynClasses.builder()
+              .loader(classLoader)
+              .impl("com.codahale.metrics.Histogram")
+              .buildChecked();
+      Class<?> wrapperClass =
+          DynClasses.builder()
+              .loader(classLoader)
+              
.impl("org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper")
+              .buildChecked();

Review Comment:
   I've factored out these to static constants.



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java:
##########
@@ -43,6 +50,11 @@ public class IcebergStreamWriterMetrics {
   private final Histogram deleteFilesSizeHistogram;
 
   public IcebergStreamWriterMetrics(MetricGroup metrics, String fullTableName) 
{
+    this(metrics, fullTableName, 
IcebergStreamWriterMetrics.class.getClassLoader());
+  }
+
+  @VisibleForTesting
+  IcebergStreamWriterMetrics(MetricGroup metrics, String fullTableName, 
ClassLoader classLoader) {

Review Comment:
   This change was purely for class-loading injection during testing, to assert 
that everything works fine when the histogram classes are not in the classpath. 
With the Dyn* classes, this is no longer needed.



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java:
##########
@@ -18,19 +18,26 @@
  */
 package org.apache.iceberg.flink.sink;
 
-import com.codahale.metrics.SlidingWindowReservoir;
 import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;

Review Comment:
   I find it helpful to indicate to the caller, that a `null` value can be 
expected. That said, IDEs are pretty good at indicating that as well. Removed.



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java:
##########
@@ -97,4 +108,72 @@ public Counter getFlushedDataFiles() {
   public Counter getFlushedDeleteFiles() {
     return flushedDeleteFiles;
   }
+
+  @VisibleForTesting
+  @Nullable
+  Histogram dataFilesSizeHistogram() {
+    return dataFilesSizeHistogram;
+  }
+
+  @VisibleForTesting
+  @Nullable
+  Histogram deleteFilesSizeHistogram() {
+    return deleteFilesSizeHistogram;
+  }
+
+  /**
+   * Checks whether the Dropwizard-based histogram wrapper provided through 
Flink's optional
+   * flink-metrics-dropwizard dependency is available.
+   */
+  @SuppressWarnings("CatchBlockLogException")
+  private static Histogram loadHistogramIfAvailable(
+      MetricGroup group, String name, int reservoirSize, ClassLoader 
classLoader) {
+
+    try {
+      Class<?> reservoirInterface =
+          DynClasses.builder()
+              .loader(classLoader)
+              .impl("com.codahale.metrics.Reservoir")
+              .buildChecked();
+      Class<?> slidingWindowReservoirClass =
+          DynClasses.builder()
+              .loader(classLoader)
+              .impl("com.codahale.metrics.SlidingWindowReservoir")
+              .buildChecked();
+      Class<?> codahaleHistogramClass =
+          DynClasses.builder()
+              .loader(classLoader)
+              .impl("com.codahale.metrics.Histogram")
+              .buildChecked();
+      Class<?> wrapperClass =
+          DynClasses.builder()
+              .loader(classLoader)
+              
.impl("org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper")
+              .buildChecked();
+
+      Object reservoir =
+          DynConstructors.builder()
+              .impl(slidingWindowReservoirClass, int.class)

Review Comment:
   Thanks! Changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to