m-trieu commented on code in PR #33013:
URL: https://github.com/apache/beam/pull/33013#discussion_r1841048796
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java:
##########
@@ -163,4 +178,90 @@ public static double decodeDoubleCounter(ByteString
payload) {
throw new RuntimeException(e);
}
}
+
+ /** Encodes to {@link
MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
+ public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
+ try {
+ int numberOfBuckets = inputHistogram.getBucketType().getNumBuckets();
+
+ DataflowHistogramValue outputHistogram2 = new DataflowHistogramValue();
+
+ if (inputHistogram.getBucketType() instanceof
HistogramData.LinearBuckets) {
+ HistogramData.LinearBuckets buckets =
+ (HistogramData.LinearBuckets) inputHistogram.getBucketType();
+ Linear linear = new Linear();
+ linear.setNumberOfBuckets(numberOfBuckets);
+ linear.setWidth(buckets.getWidth());
+ linear.setStart(buckets.getStart());
+ outputHistogram2.setBucketOptions(new
BucketOptions().setLinear(linear));
+ } else if (inputHistogram.getBucketType() instanceof
HistogramData.ExponentialBuckets) {
+ HistogramData.ExponentialBuckets buckets =
+ (HistogramData.ExponentialBuckets) inputHistogram.getBucketType();
+ Base2Exponent base2Exp = new Base2Exponent();
+ base2Exp.setNumberOfBuckets(numberOfBuckets);
+ base2Exp.setScale(buckets.getScale());
+ outputHistogram2.setBucketOptions(new
BucketOptions().setExponential(base2Exp));
+ } else {
+ throw new RuntimeException("Unable to parse histogram, bucket is not
recognized");
+ }
+
+ outputHistogram2.setCount(inputHistogram.getTotalCount());
+
+ List<Long> bucketCounts = new ArrayList<>();
+
+ Arrays.stream(inputHistogram.getBucketCount())
+ .forEach(
+ val -> {
+ bucketCounts.add(val);
+ });
+
+ outputHistogram2.setBucketCounts(bucketCounts);
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ String jsonString = objectMapper.writeValueAsString(outputHistogram2);
+
+ return ByteString.copyFromUtf8(jsonString);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** Decodes to {@link
MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
+ public static HistogramData decodeInt64Histogram(ByteString payload) {
+ try {
+ ObjectMapper objectMapper = new ObjectMapper();
+
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
+ JsonNode jsonNode = objectMapper.readTree(payload.toStringUtf8()); //
parse afterwards
+ DataflowHistogramValue newHist = new DataflowHistogramValue();
+ newHist.setCount(jsonNode.get("count").asLong());
+
+ List<Long> bucketCounts = new ArrayList<>();
+ Iterator<JsonNode> itr = jsonNode.get("bucketCounts").iterator();
+ while (itr.hasNext()) {
+ Long item = itr.next().asLong();
+ bucketCounts.add(item);
+ }
+ newHist.setBucketCounts(bucketCounts);
+
+ if (jsonNode.get("bucketOptions").has("linear")) {
+ Linear linear = new Linear();
+ JsonNode linearNode = jsonNode.get("bucketOptions").get("linear");
+ linear.setNumberOfBuckets(linearNode.get("numberOfBuckets").asInt());
+ linear.setWidth(linearNode.get("width").asDouble());
+ linear.setStart(linearNode.get("start").asDouble());
+ newHist.setBucketOptions(new BucketOptions().setLinear(linear));
+ } else if (jsonNode.get("bucketOptions").has("exponential")) {
+ Base2Exponent base2Exp = new Base2Exponent();
+ JsonNode expNode = jsonNode.get("bucketOptions").get("exponential");
+ base2Exp.setNumberOfBuckets(expNode.get("numberOfBuckets").asInt());
+ base2Exp.setScale(expNode.get("scale").asInt());
+ newHist.setBucketOptions(new BucketOptions().setExponential(base2Exp));
+ } else {
+ throw new RuntimeException("Unable to parse histogram, bucket is not
recognized");
Review Comment:
ditto above
Can we throw a more specific exception here?
It can extend RuntimeException but will help with debugging
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java:
##########
@@ -74,6 +74,42 @@ public HistogramData(BucketType bucketType) {
this.sumOfSquaredDeviations = 0;
}
+ /**
+ * Create a histogram from DataflowHistogramValue proto.
+ *
+ * @param histogramProto DataflowHistogramValue proto used to populate stats
for the histogram.
+ */
+ public HistogramData(
+ com.google.api.services.dataflow.model.DataflowHistogramValue
histogramProto) {
Review Comment:
is this qualifier needed?
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java:
##########
@@ -163,4 +178,90 @@ public static double decodeDoubleCounter(ByteString
payload) {
throw new RuntimeException(e);
}
}
+
+ /** Encodes to {@link
MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
+ public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
+ try {
+ int numberOfBuckets = inputHistogram.getBucketType().getNumBuckets();
+
+ DataflowHistogramValue outputHistogram2 = new DataflowHistogramValue();
+
+ if (inputHistogram.getBucketType() instanceof
HistogramData.LinearBuckets) {
+ HistogramData.LinearBuckets buckets =
+ (HistogramData.LinearBuckets) inputHistogram.getBucketType();
+ Linear linear = new Linear();
+ linear.setNumberOfBuckets(numberOfBuckets);
+ linear.setWidth(buckets.getWidth());
+ linear.setStart(buckets.getStart());
+ outputHistogram2.setBucketOptions(new
BucketOptions().setLinear(linear));
+ } else if (inputHistogram.getBucketType() instanceof
HistogramData.ExponentialBuckets) {
+ HistogramData.ExponentialBuckets buckets =
+ (HistogramData.ExponentialBuckets) inputHistogram.getBucketType();
+ Base2Exponent base2Exp = new Base2Exponent();
+ base2Exp.setNumberOfBuckets(numberOfBuckets);
+ base2Exp.setScale(buckets.getScale());
+ outputHistogram2.setBucketOptions(new
BucketOptions().setExponential(base2Exp));
+ } else {
+ throw new RuntimeException("Unable to parse histogram, bucket is not
recognized");
+ }
+
+ outputHistogram2.setCount(inputHistogram.getTotalCount());
+
+ List<Long> bucketCounts = new ArrayList<>();
+
+ Arrays.stream(inputHistogram.getBucketCount())
+ .forEach(
+ val -> {
+ bucketCounts.add(val);
+ });
+
+ outputHistogram2.setBucketCounts(bucketCounts);
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ String jsonString = objectMapper.writeValueAsString(outputHistogram2);
+
+ return ByteString.copyFromUtf8(jsonString);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
Review Comment:
ditto above
Can we throw a more specific exception here?
It can extend RuntimeException but will help with debugging
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java:
##########
@@ -573,6 +613,40 @@ public double getRangeTo() {
// Note: equals() and hashCode() are implemented by the AutoValue.
}
+ // Used for testing unsupported Bucket formats
+ @AutoValue
+ public abstract static class UnsupportedBuckets implements BucketType {
Review Comment:
nit: format comment like javadoc
`/** Used for testing unsupported Bucket formats. */`
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java:
##########
@@ -573,6 +613,40 @@ public double getRangeTo() {
// Note: equals() and hashCode() are implemented by the AutoValue.
}
+ // Used for testing unsupported Bucket formats
+ @AutoValue
Review Comment:
if just for testing add annotation `@VisibleForTesting` and since its public
add `@Internal` beam annoation
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java:
##########
@@ -163,4 +178,90 @@ public static double decodeDoubleCounter(ByteString
payload) {
throw new RuntimeException(e);
}
}
+
+ /** Encodes to {@link
MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
+ public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
+ try {
+ int numberOfBuckets = inputHistogram.getBucketType().getNumBuckets();
+
+ DataflowHistogramValue outputHistogram2 = new DataflowHistogramValue();
+
+ if (inputHistogram.getBucketType() instanceof
HistogramData.LinearBuckets) {
+ HistogramData.LinearBuckets buckets =
+ (HistogramData.LinearBuckets) inputHistogram.getBucketType();
+ Linear linear = new Linear();
+ linear.setNumberOfBuckets(numberOfBuckets);
+ linear.setWidth(buckets.getWidth());
+ linear.setStart(buckets.getStart());
+ outputHistogram2.setBucketOptions(new
BucketOptions().setLinear(linear));
+ } else if (inputHistogram.getBucketType() instanceof
HistogramData.ExponentialBuckets) {
+ HistogramData.ExponentialBuckets buckets =
+ (HistogramData.ExponentialBuckets) inputHistogram.getBucketType();
+ Base2Exponent base2Exp = new Base2Exponent();
+ base2Exp.setNumberOfBuckets(numberOfBuckets);
+ base2Exp.setScale(buckets.getScale());
+ outputHistogram2.setBucketOptions(new
BucketOptions().setExponential(base2Exp));
+ } else {
+ throw new RuntimeException("Unable to parse histogram, bucket is not
recognized");
+ }
+
+ outputHistogram2.setCount(inputHistogram.getTotalCount());
+
+ List<Long> bucketCounts = new ArrayList<>();
+
+ Arrays.stream(inputHistogram.getBucketCount())
+ .forEach(
+ val -> {
+ bucketCounts.add(val);
+ });
+
+ outputHistogram2.setBucketCounts(bucketCounts);
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ String jsonString = objectMapper.writeValueAsString(outputHistogram2);
+
+ return ByteString.copyFromUtf8(jsonString);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** Decodes to {@link
MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
+ public static HistogramData decodeInt64Histogram(ByteString payload) {
+ try {
+ ObjectMapper objectMapper = new ObjectMapper();
+
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
+ JsonNode jsonNode = objectMapper.readTree(payload.toStringUtf8()); //
parse afterwards
+ DataflowHistogramValue newHist = new DataflowHistogramValue();
+ newHist.setCount(jsonNode.get("count").asLong());
+
+ List<Long> bucketCounts = new ArrayList<>();
+ Iterator<JsonNode> itr = jsonNode.get("bucketCounts").iterator();
+ while (itr.hasNext()) {
+ Long item = itr.next().asLong();
+ bucketCounts.add(item);
+ }
+ newHist.setBucketCounts(bucketCounts);
+
+ if (jsonNode.get("bucketOptions").has("linear")) {
+ Linear linear = new Linear();
+ JsonNode linearNode = jsonNode.get("bucketOptions").get("linear");
+ linear.setNumberOfBuckets(linearNode.get("numberOfBuckets").asInt());
+ linear.setWidth(linearNode.get("width").asDouble());
+ linear.setStart(linearNode.get("start").asDouble());
+ newHist.setBucketOptions(new BucketOptions().setLinear(linear));
+ } else if (jsonNode.get("bucketOptions").has("exponential")) {
+ Base2Exponent base2Exp = new Base2Exponent();
+ JsonNode expNode = jsonNode.get("bucketOptions").get("exponential");
+ base2Exp.setNumberOfBuckets(expNode.get("numberOfBuckets").asInt());
+ base2Exp.setScale(expNode.get("scale").asInt());
+ newHist.setBucketOptions(new BucketOptions().setExponential(base2Exp));
+ } else {
+ throw new RuntimeException("Unable to parse histogram, bucket is not
recognized");
Review Comment:
Also what does the caller do if an exception is thrown?
Do we want to LOG the error and return Optional<HistogramData> if we don't
care about the exception?
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java:
##########
@@ -163,4 +178,90 @@ public static double decodeDoubleCounter(ByteString
payload) {
throw new RuntimeException(e);
}
}
+
+ /** Encodes to {@link
MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
+ public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
+ try {
+ int numberOfBuckets = inputHistogram.getBucketType().getNumBuckets();
+
+ DataflowHistogramValue outputHistogram2 = new DataflowHistogramValue();
+
+ if (inputHistogram.getBucketType() instanceof
HistogramData.LinearBuckets) {
+ HistogramData.LinearBuckets buckets =
+ (HistogramData.LinearBuckets) inputHistogram.getBucketType();
+ Linear linear = new Linear();
+ linear.setNumberOfBuckets(numberOfBuckets);
+ linear.setWidth(buckets.getWidth());
+ linear.setStart(buckets.getStart());
+ outputHistogram2.setBucketOptions(new
BucketOptions().setLinear(linear));
+ } else if (inputHistogram.getBucketType() instanceof
HistogramData.ExponentialBuckets) {
+ HistogramData.ExponentialBuckets buckets =
+ (HistogramData.ExponentialBuckets) inputHistogram.getBucketType();
+ Base2Exponent base2Exp = new Base2Exponent();
+ base2Exp.setNumberOfBuckets(numberOfBuckets);
+ base2Exp.setScale(buckets.getScale());
+ outputHistogram2.setBucketOptions(new
BucketOptions().setExponential(base2Exp));
+ } else {
+ throw new RuntimeException("Unable to parse histogram, bucket is not
recognized");
Review Comment:
Also what does the caller do if an exception is thrown?
Do we want to LOG the error and return Optional<ByteString> if we don't care
about the exception?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]