This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch post-2-62-0
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 6ee5a50ff06cefa7ee31c088632fdf929e328968
Author: Kenneth Knowles <k...@google.com>
AuthorDate: Fri Jan 24 13:55:26 2025 -0500

    Revert "Add ability to encode and decode histogram data to portable runners 
(#33013)"
    
    This reverts commit 5bfacc935f8510bdf287d8c87b810e08ab9a79ae. The change 
caused the
    core SDK, the SDK harness, and the shared runners-core library to depend on 
Dataflow.
---
 runners/core-java/build.gradle                     |   2 -
 .../core/metrics/MonitoringInfoEncodings.java      | 111 +--------------------
 .../core/metrics/MonitoringInfoEncodingsTest.java  |  47 +--------
 sdks/java/core/build.gradle                        |   1 -
 .../org/apache/beam/sdk/util/HistogramData.java    |  78 ---------------
 sdks/java/harness/build.gradle                     |   2 -
 6 files changed, 2 insertions(+), 239 deletions(-)

diff --git a/runners/core-java/build.gradle b/runners/core-java/build.gradle
index 0f55c10b97f..b477dde9121 100644
--- a/runners/core-java/build.gradle
+++ b/runners/core-java/build.gradle
@@ -42,7 +42,6 @@ dependencies {
   implementation project(path: ":model:pipeline", configuration: "shadow")
   implementation project(path: ":sdks:java:core", configuration: "shadow")
   implementation project(path: ":model:job-management", configuration: 
"shadow")
-  implementation library.java.google_api_services_dataflow
   implementation library.java.vendored_guava_32_1_2_jre
   implementation library.java.joda_time
   implementation library.java.vendored_grpc_1_60_1
@@ -53,6 +52,5 @@ dependencies {
   testImplementation library.java.junit
   testImplementation library.java.mockito_core
   testImplementation library.java.slf4j_api
-  testImplementation(library.java.google_api_services_dataflow)
   testRuntimeOnly library.java.slf4j_simple
 }
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java
index 88136a864d0..44995b979dd 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java
@@ -17,19 +17,8 @@
  */
 package org.apache.beam.runners.core.metrics;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.api.services.dataflow.model.Base2Exponent;
-import com.google.api.services.dataflow.model.BucketOptions;
-import com.google.api.services.dataflow.model.DataflowHistogramValue;
-import com.google.api.services.dataflow.model.Linear;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Set;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.DoubleCoder;
@@ -37,14 +26,10 @@ import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.util.ByteStringOutputStream;
-import org.apache.beam.sdk.util.HistogramData;
-import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
 import org.joda.time.Instant;
 
-// TODO(#33093): Refactor out DataflowHistogramValue to be runner agnostic, 
and rename to
-// remove Dataflow reference.
-
 /** A set of functions used to encode and decode common monitoring info types. 
*/
 public class MonitoringInfoEncodings {
   private static final Coder<Long> VARINT_CODER = VarLongCoder.of();
@@ -178,98 +163,4 @@ public class MonitoringInfoEncodings {
       throw new RuntimeException(e);
     }
   }
-
-  /** Encodes to {@link 
MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
-  public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
-    try {
-      int numberOfBuckets = inputHistogram.getBucketType().getNumBuckets();
-
-      DataflowHistogramValue outputHistogram2 = new DataflowHistogramValue();
-
-      if (inputHistogram.getBucketType() instanceof 
HistogramData.LinearBuckets) {
-        HistogramData.LinearBuckets buckets =
-            (HistogramData.LinearBuckets) inputHistogram.getBucketType();
-        Linear linear = new Linear();
-        linear.setNumberOfBuckets(numberOfBuckets);
-        linear.setWidth(buckets.getWidth());
-        linear.setStart(buckets.getStart());
-        outputHistogram2.setBucketOptions(new 
BucketOptions().setLinear(linear));
-      } else if (inputHistogram.getBucketType() instanceof 
HistogramData.ExponentialBuckets) {
-        HistogramData.ExponentialBuckets buckets =
-            (HistogramData.ExponentialBuckets) inputHistogram.getBucketType();
-        Base2Exponent base2Exp = new Base2Exponent();
-        base2Exp.setNumberOfBuckets(numberOfBuckets);
-        base2Exp.setScale(buckets.getScale());
-        outputHistogram2.setBucketOptions(new 
BucketOptions().setExponential(base2Exp));
-      } else {
-        throw new HistogramParsingException(
-            "Unable to encode Int64 Histogram, bucket is not recognized");
-      }
-
-      outputHistogram2.setCount(inputHistogram.getTotalCount());
-
-      List<Long> bucketCounts = new ArrayList<>();
-
-      Arrays.stream(inputHistogram.getBucketCount())
-          .forEach(
-              val -> {
-                bucketCounts.add(val);
-              });
-
-      outputHistogram2.setBucketCounts(bucketCounts);
-
-      ObjectMapper objectMapper = new ObjectMapper();
-      String jsonString = objectMapper.writeValueAsString(outputHistogram2);
-
-      return ByteString.copyFromUtf8(jsonString);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  static class HistogramParsingException extends RuntimeException {
-    public HistogramParsingException(String message) {
-      super(message);
-    }
-  }
-
-  /** Decodes to {@link 
MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
-  public static HistogramData decodeInt64Histogram(ByteString payload) {
-    try {
-      ObjectMapper objectMapper = new ObjectMapper();
-      
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
-      JsonNode jsonNode = objectMapper.readTree(payload.toStringUtf8()); // 
parse afterwards
-      DataflowHistogramValue newHist = new DataflowHistogramValue();
-      newHist.setCount(jsonNode.get("count").asLong());
-
-      List<Long> bucketCounts = new ArrayList<>();
-      Iterator<JsonNode> itr = jsonNode.get("bucketCounts").iterator();
-      while (itr.hasNext()) {
-        Long item = itr.next().asLong();
-        bucketCounts.add(item);
-      }
-      newHist.setBucketCounts(bucketCounts);
-
-      if (jsonNode.get("bucketOptions").has("linear")) {
-        Linear linear = new Linear();
-        JsonNode linearNode = jsonNode.get("bucketOptions").get("linear");
-        linear.setNumberOfBuckets(linearNode.get("numberOfBuckets").asInt());
-        linear.setWidth(linearNode.get("width").asDouble());
-        linear.setStart(linearNode.get("start").asDouble());
-        newHist.setBucketOptions(new BucketOptions().setLinear(linear));
-      } else if (jsonNode.get("bucketOptions").has("exponential")) {
-        Base2Exponent base2Exp = new Base2Exponent();
-        JsonNode expNode = jsonNode.get("bucketOptions").get("exponential");
-        base2Exp.setNumberOfBuckets(expNode.get("numberOfBuckets").asInt());
-        base2Exp.setScale(expNode.get("scale").asInt());
-        newHist.setBucketOptions(new BucketOptions().setExponential(base2Exp));
-      } else {
-        throw new HistogramParsingException(
-            "Unable to parse Int64 Histogram, bucket is not recognized");
-      }
-      return new HistogramData(newHist);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
 }
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java
index 2d7ba61dbc9..dde180b150d 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java
@@ -17,43 +17,30 @@
  */
 package org.apache.beam.runners.core.metrics;
 
-import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.HistogramParsingException;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeDoubleCounter;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
-import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Histogram;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeStringSet;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeDoubleCounter;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeDoubleDistribution;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge;
-import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Histogram;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet;
 import static org.junit.Assert.assertEquals;
 
 import java.util.Collections;
-import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.apache.beam.sdk.util.HistogramData;
-import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
 import org.joda.time.Instant;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /** Tests for {@link MonitoringInfoEncodings}. */
 @RunWith(JUnit4.class)
 public class MonitoringInfoEncodingsTest {
-  @Rule
-  public ExpectedLogs monitoringInfoCodingsExpectedLogs =
-      ExpectedLogs.none(MonitoringInfoEncodings.class);
-
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
   @Test
   public void testInt64DistributionEncoding() {
     DistributionData data = DistributionData.create(1L, 2L, 3L, 4L);
@@ -118,36 +105,4 @@ public class MonitoringInfoEncodingsTest {
     assertEquals(ByteString.copyFrom(new byte[] {0x3f, (byte) 0xf0, 0, 0, 0, 
0, 0, 0}), payload);
     assertEquals(1.0, decodeDoubleCounter(payload), 0.001);
   }
-
-  @Test
-  public void testHistgramInt64EncodingLinearHist() {
-    HistogramData.BucketType buckets = HistogramData.LinearBuckets.of(0, 5, 5);
-
-    HistogramData inputHistogram = new HistogramData(buckets);
-    inputHistogram.record(5, 10, 15, 20);
-    ByteString payload = encodeInt64Histogram(inputHistogram);
-
-    assertEquals(inputHistogram, decodeInt64Histogram(payload));
-  }
-
-  @Test
-  public void testHistgramInt64EncodingExpHist() {
-    HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 
10);
-    HistogramData inputHistogram = new HistogramData(buckets);
-    inputHistogram.record(2, 4, 8, 16, 32);
-    ByteString payload = encodeInt64Histogram(inputHistogram);
-    assertEquals(inputHistogram, decodeInt64Histogram(payload));
-  }
-
-  @Test
-  public void testHistgramInt64EncodingUnsupportedBucket() {
-    thrown.expect(HistogramParsingException.class);
-    thrown.expectMessage("Unable to encode Int64 Histogram, bucket is not 
recognized");
-
-    HistogramData.BucketType buckets = HistogramData.UnsupportedBuckets.of();
-
-    HistogramData inputHistogram = new HistogramData(buckets);
-    inputHistogram.record(2, 4, 8, 16, 32);
-    encodeInt64Histogram(inputHistogram);
-  }
 }
diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle
index 7423cb7c6b8..a8dfbf42f97 100644
--- a/sdks/java/core/build.gradle
+++ b/sdks/java/core/build.gradle
@@ -75,7 +75,6 @@ dependencies {
   permitUnusedDeclared library.java.antlr
   permitUsedUndeclared library.java.antlr_runtime
   // Required to load constants from the model, e.g. max timestamp for global 
window
-  provided library.java.google_api_services_dataflow
   shadow project(path: ":model:pipeline", configuration: "shadow")
   shadow project(path: ":model:fn-execution", configuration: "shadow")
   shadow project(path: ":model:job-management", configuration: "shadow")
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java
index c1ac4bcfba2..65ccda06be6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.util;
 
-import com.google.api.services.dataflow.model.DataflowHistogramValue;
 import com.google.auto.value.AutoValue;
 import com.google.auto.value.extension.memoized.Memoized;
 import java.io.Serializable;
@@ -25,8 +24,6 @@ import java.math.RoundingMode;
 import java.util.Arrays;
 import java.util.Objects;
 import javax.annotation.concurrent.GuardedBy;
-import org.apache.beam.sdk.annotations.Internal;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.DoubleMath;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.IntMath;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -77,41 +74,6 @@ public class HistogramData implements Serializable {
     this.sumOfSquaredDeviations = 0;
   }
 
-  /**
-   * Create a histogram from DataflowHistogramValue proto.
-   *
-   * @param histogramProto DataflowHistogramValue proto used to populate stats 
for the histogram.
-   */
-  public HistogramData(DataflowHistogramValue histogramProto) {
-    int numBuckets;
-    if (histogramProto.getBucketOptions().getLinear() != null) {
-      double start = histogramProto.getBucketOptions().getLinear().getStart();
-      double width = histogramProto.getBucketOptions().getLinear().getWidth();
-      numBuckets = 
histogramProto.getBucketOptions().getLinear().getNumberOfBuckets();
-      this.bucketType = LinearBuckets.of(start, width, numBuckets);
-      this.buckets = new long[bucketType.getNumBuckets()];
-
-      int idx = 0;
-      for (long val : histogramProto.getBucketCounts()) {
-        this.buckets[idx] = val;
-        this.numBoundedBucketRecords += val;
-        idx++;
-      }
-    } else {
-      // Assume it's a exponential histogram if its not linear
-      int scale = 
histogramProto.getBucketOptions().getExponential().getScale();
-      numBuckets = 
histogramProto.getBucketOptions().getExponential().getNumberOfBuckets();
-      this.bucketType = ExponentialBuckets.of(scale, numBuckets);
-      this.buckets = new long[bucketType.getNumBuckets()];
-      int idx = 0;
-      for (long val : histogramProto.getBucketCounts()) {
-        this.buckets[idx] = val;
-        this.numBoundedBucketRecords += val;
-        idx++;
-      }
-    }
-  }
-
   public BucketType getBucketType() {
     return this.bucketType;
   }
@@ -331,10 +293,6 @@ public class HistogramData implements Serializable {
     return numTopRecords;
   }
 
-  public synchronized long[] getBucketCount() {
-    return buckets;
-  }
-
   public synchronized double getTopBucketMean() {
     return numTopRecords == 0 ? 0 : topRecordsSum / numTopRecords;
   }
@@ -615,42 +573,6 @@ public class HistogramData implements Serializable {
     // Note: equals() and hashCode() are implemented by the AutoValue.
   }
 
-  /** Used for testing unsupported Bucket formats. */
-  @AutoValue
-  @Internal
-  @VisibleForTesting
-  public abstract static class UnsupportedBuckets implements BucketType {
-
-    public static UnsupportedBuckets of() {
-      return new AutoValue_HistogramData_UnsupportedBuckets(0);
-    }
-
-    @Override
-    public int getBucketIndex(double value) {
-      return 0;
-    }
-
-    @Override
-    public double getBucketSize(int index) {
-      return 0;
-    }
-
-    @Override
-    public double getAccumulatedBucketSize(int index) {
-      return 0;
-    }
-
-    @Override
-    public double getRangeFrom() {
-      return 0;
-    }
-
-    @Override
-    public double getRangeTo() {
-      return 0;
-    }
-  }
-
   @Override
   public synchronized boolean equals(@Nullable Object object) {
     if (object instanceof HistogramData) {
diff --git a/sdks/java/harness/build.gradle b/sdks/java/harness/build.gradle
index ed3034c0861..2de578cb32c 100644
--- a/sdks/java/harness/build.gradle
+++ b/sdks/java/harness/build.gradle
@@ -30,7 +30,6 @@ dependencies {
   provided project(path: ":model:pipeline", configuration: "shadow")
   provided project(path: ":sdks:java:core", configuration: "shadow")
   provided project(path: ":sdks:java:transform-service:launcher")
-  provided library.java.google_api_services_dataflow
   provided library.java.avro
   provided library.java.jackson_databind
   provided library.java.joda_time
@@ -80,5 +79,4 @@ dependencies {
   shadowTest project(path: ":sdks:java:core", configuration: "shadowTest")
   shadowTestRuntimeClasspath library.java.slf4j_jdk14
   permitUnusedDeclared library.java.avro
-  permitUnusedDeclared library.java.google_api_services_dataflow
 }

Reply via email to