mxm commented on code in PR #16155:
URL: https://github.com/apache/iceberg/pull/16155#discussion_r3179460123
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java:
##########
@@ -97,4 +108,72 @@ public Counter getFlushedDataFiles() {
public Counter getFlushedDeleteFiles() {
return flushedDeleteFiles;
}
+
+ @VisibleForTesting
+ @Nullable
+ Histogram dataFilesSizeHistogram() {
+ return dataFilesSizeHistogram;
+ }
+
+ @VisibleForTesting
+ @Nullable
+ Histogram deleteFilesSizeHistogram() {
+ return deleteFilesSizeHistogram;
+ }
+
+ /**
+ * Checks whether the Dropwizard-based histogram wrapper provided through
Flink's optional
+ * flink-metrics-dropwizard dependency is available.
+ */
+ @SuppressWarnings("CatchBlockLogException")
+ private static Histogram loadHistogramIfAvailable(
+ MetricGroup group, String name, int reservoirSize, ClassLoader
classLoader) {
Review Comment:
This is cleaner. I agree on the possible side effects. Thanks!
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java:
##########
@@ -97,4 +108,72 @@ public Counter getFlushedDataFiles() {
public Counter getFlushedDeleteFiles() {
return flushedDeleteFiles;
}
+
+ @VisibleForTesting
+ @Nullable
+ Histogram dataFilesSizeHistogram() {
+ return dataFilesSizeHistogram;
+ }
+
+ @VisibleForTesting
+ @Nullable
+ Histogram deleteFilesSizeHistogram() {
+ return deleteFilesSizeHistogram;
+ }
+
+ /**
+ * Checks whether the Dropwizard-based histogram wrapper provided through
Flink's optional
+ * flink-metrics-dropwizard dependency is available.
+ */
+ @SuppressWarnings("CatchBlockLogException")
+ private static Histogram loadHistogramIfAvailable(
+ MetricGroup group, String name, int reservoirSize, ClassLoader
classLoader) {
+
+ try {
+ Class<?> reservoirInterface =
+ DynClasses.builder()
+ .loader(classLoader)
+ .impl("com.codahale.metrics.Reservoir")
+ .buildChecked();
+ Class<?> slidingWindowReservoirClass =
+ DynClasses.builder()
+ .loader(classLoader)
+ .impl("com.codahale.metrics.SlidingWindowReservoir")
+ .buildChecked();
+ Class<?> codahaleHistogramClass =
+ DynClasses.builder()
+ .loader(classLoader)
+ .impl("com.codahale.metrics.Histogram")
+ .buildChecked();
+ Class<?> wrapperClass =
+ DynClasses.builder()
+ .loader(classLoader)
+
.impl("org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper")
+ .buildChecked();
+
+ Object reservoir =
+ DynConstructors.builder()
+ .impl(slidingWindowReservoirClass, int.class)
+ .buildChecked()
+ .newInstance(reservoirSize);
+ Object codahaleHistogram =
+ DynConstructors.builder()
+ .impl(codahaleHistogramClass, reservoirInterface)
+ .buildChecked()
+ .newInstance(reservoir);
+ Histogram wrapper =
+ (Histogram)
+ DynConstructors.builder()
+ .impl(wrapperClass, codahaleHistogramClass)
+ .buildChecked()
+ .newInstance(codahaleHistogram);
+
+ return group.histogram(name, wrapper);
+ } catch (ClassNotFoundException | NoSuchMethodException e) {
+ LOG.warn(
+ "flink-metrics-dropwizard is not on the classpath. '{}' histogram
metrics will be disabled. Add org.apache.flink:flink-metrics-dropwizard to
enable them.",
+ name);
Review Comment:
Thanks! I've change the code to print just once on loading the class. This
would otherwise print on creating the writer, which is once per job, but on
recovery this would print again, which is redundant.
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java:
##########
@@ -97,4 +108,72 @@ public Counter getFlushedDataFiles() {
public Counter getFlushedDeleteFiles() {
return flushedDeleteFiles;
}
+
+ @VisibleForTesting
+ @Nullable
+ Histogram dataFilesSizeHistogram() {
+ return dataFilesSizeHistogram;
+ }
+
+ @VisibleForTesting
+ @Nullable
+ Histogram deleteFilesSizeHistogram() {
+ return deleteFilesSizeHistogram;
+ }
+
+ /**
+ * Checks whether the Dropwizard-based histogram wrapper provided through
Flink's optional
+ * flink-metrics-dropwizard dependency is available.
+ */
+ @SuppressWarnings("CatchBlockLogException")
+ private static Histogram loadHistogramIfAvailable(
+ MetricGroup group, String name, int reservoirSize, ClassLoader
classLoader) {
+
+ try {
+ Class<?> reservoirInterface =
+ DynClasses.builder()
+ .loader(classLoader)
+ .impl("com.codahale.metrics.Reservoir")
+ .buildChecked();
+ Class<?> slidingWindowReservoirClass =
+ DynClasses.builder()
+ .loader(classLoader)
+ .impl("com.codahale.metrics.SlidingWindowReservoir")
+ .buildChecked();
+ Class<?> codahaleHistogramClass =
+ DynClasses.builder()
+ .loader(classLoader)
+ .impl("com.codahale.metrics.Histogram")
+ .buildChecked();
+ Class<?> wrapperClass =
+ DynClasses.builder()
+ .loader(classLoader)
+
.impl("org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper")
+ .buildChecked();
+
+ Object reservoir =
+ DynConstructors.builder()
+ .impl(slidingWindowReservoirClass, int.class)
+ .buildChecked()
+ .newInstance(reservoirSize);
+ Object codahaleHistogram =
+ DynConstructors.builder()
+ .impl(codahaleHistogramClass, reservoirInterface)
+ .buildChecked()
+ .newInstance(reservoir);
+ Histogram wrapper =
+ (Histogram)
+ DynConstructors.builder()
Review Comment:
That's nicer. Updated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]