robertwb commented on code in PR #31789:
URL: https://github.com/apache/beam/pull/31789#discussion_r1667613190


##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java:
##########
@@ -193,6 +196,23 @@ public GaugeCell getGauge(MetricName metricName) {
     return gauges.tryGet(metricName);
   }
 
+  /**
+   * Return a {@code StringSetCell} named {@code metricName}. If it doesn't 
exist, create a {@code
+   * Metric} with the specified name.
+   */
+  @Override
+  public StringSetCell getStringSet(MetricName metricName) {
+    return stringSets.get(metricName);
+  }
+
+  /**
+   * Return a {@code StringSetCell} named {@code metricName}.If it doesn't 
exist, return {@code

Review Comment:
   Nit: space after period.



##########
runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetCellTest.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.metrics.MetricName;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Tests for {@link StringSetCell}. */
+public class StringSetCellTest {
+  private final StringSetCell cell = new 
StringSetCell(MetricName.named("lineage", "sources"));
+
+  @Test
+  public void testDeltaAndCumulative() {
+    cell.add("pubsub");
+    cell.add("bq", "spanner");
+    assertEquals(cell.getCumulative().stringSet(), ImmutableSet.of("spanner", 
"pubsub", "bq"));
+    assertEquals(
+        "getCumulative is idempotent",
+        cell.getCumulative().stringSet(),
+        ImmutableSet.of("spanner", "pubsub", "bq"));
+
+    assertThat(cell.getDirty().beforeCommit(), equalTo(true));
+    cell.getDirty().afterCommit();
+    assertThat(cell.getDirty().beforeCommit(), equalTo(false));
+
+    cell.add("gcs");
+    assertEquals(
+        cell.getCumulative().stringSet(), ImmutableSet.of("spanner", "pubsub", 
"bq", "gcs"));
+
+    assertThat(
+        "Adding a new value made the cell dirty", 
cell.getDirty().beforeCommit(), equalTo(true));
+  }
+
+  @Test
+  public void testEquals() {
+    StringSetCell stringSetCell = new 
StringSetCell(MetricName.named("namespace", "name"));
+    StringSetCell equal = new StringSetCell(MetricName.named("namespace", 
"name"));
+    assertEquals(stringSetCell, equal);
+    assertEquals(stringSetCell.hashCode(), equal.hashCode());
+  }
+
+  @Test
+  public void testNotEquals() {
+    StringSetCell stringSetCell = new 
StringSetCell(MetricName.named("namespace", "name"));
+
+    Assert.assertNotEquals(stringSetCell, new Object());
+
+    StringSetCell differentDirty = new 
StringSetCell(MetricName.named("namespace", "name"));
+    differentDirty.getDirty().afterModification();
+    Assert.assertNotEquals(stringSetCell, differentDirty);

Review Comment:
   I'll admit this feels a bit odd. But when do cells need to be compared (or 
be hashable)? (If it's just following the convention of what's done elsewhere, 
that's fine.)



##########
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java:
##########
@@ -217,19 +219,43 @@ public GaugeResult extract(GaugeData data) {
         }
       };
 
+  private static final MetricAggregation<StringSetData, StringSetResult> 
STRING_SET =
+      new MetricAggregation<StringSetData, StringSetResult>() {
+        @Override
+        public StringSetData zero() {
+          return StringSetData.empty();
+        }
+
+        @Override
+        public StringSetData combine(Iterable<StringSetData> updates) {
+          StringSetData result = StringSetData.empty();
+          for (StringSetData update : updates) {
+            result = result.combine(update);

Review Comment:
   Doing a one-at-a-time binary combine of multiple sets might be inefficient 
(vs. creating a single set and adding everything to it). I don't know how often 
this method is called thoug. 



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetCell.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.StringSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Tracks the current value for a {@link StringSet} metric.
+ *
+ * <p>This class generally shouldn't be used directly. The only exception is 
within a runner where a
+ * counter is being reported for a specific step (rather than the counter in 
the current context).
+ * In that case retrieving the underlying cell and reporting directly to it 
avoids a step of
+ * indirection.
+ */
+public class StringSetCell implements StringSet, MetricCell<StringSetData> {
+
+  private final DirtyState dirty = new DirtyState();
+  private final AtomicReference<StringSetData> setValue =
+      new AtomicReference<>(StringSetData.empty());
+  private final MetricName name;
+
+  /**
+   * Generally, runners should construct instances using the methods in {@link
+   * MetricsContainerImpl}, unless they need to define their own version of 
{@link
+   * MetricsContainer}. These constructors are *only* public so runners can 
instantiate.
+   */
+  public StringSetCell(MetricName name) {
+    this.name = name;
+  }
+
+  @Override
+  public void reset() {
+    setValue.set(StringSetData.empty());
+    dirty.reset();
+  }
+
+  void update(StringSetData data) {
+    StringSetData original;
+    do {
+      original = setValue.get();
+    } while (!setValue.compareAndSet(original, original.combine(data)));
+    dirty.afterModification();
+  }
+
+  @Override
+  public DirtyState getDirty() {
+    return dirty;
+  }
+
+  @Override
+  public StringSetData getCumulative() {
+    return setValue.get();
+  }
+
+  @Override
+  public MetricName getName() {
+    return name;
+  }
+
+  @Override
+  public boolean equals(@Nullable Object object) {
+    if (object instanceof StringSetCell) {
+      StringSetCell stringSetCell = (StringSetCell) object;
+      return Objects.equals(dirty, stringSetCell.dirty)
+          && Objects.equals(setValue.get(), stringSetCell.setValue.get())
+          && Objects.equals(name, stringSetCell.name);
+    }
+
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(dirty, setValue.get(), name);
+  }
+
+  @Override
+  public void add(String value) {
+    update(StringSetData.create(new 
HashSet<>(Collections.singletonList(value))));

Review Comment:
   Perhaps optimized for the case where value is already in the set by skipping 
the update (and creation of sets and just to do a no-op merge, and setting the 
dirty bit). 



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.metrics.StringSetResult;
+
+/**
+ * Data describing the StringSet. This should retain enough detail that it can 
be combined with
+ * other {@link StringSetData}.
+ */
+@AutoValue
+public abstract class StringSetData implements Serializable {
+
+  public abstract Set<String> stringSet();

Review Comment:
   Perhaps here and below we should document whether the sets are (expected to 
be?) immutable or not. E.g. is one allowed ot modify the set after passing it 
to create? Modify the set returned from stringSet()? 



##########
runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/StringSetImpl.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jet.metrics;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import org.apache.beam.runners.core.metrics.StringSetData;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.StringSet;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+
+/** Implementation of {@link StringSet}. */
+public class StringSetImpl extends AbstractMetric<StringSetData> implements 
StringSet {
+
+  private final StringSetData stringSetData = StringSetData.empty();
+
+  public StringSetImpl(MetricName name) {
+    super(name);
+  }
+
+  @Override
+  StringSetData getValue() {
+    return stringSetData;
+  }
+
+  @Override
+  public void add(String value) {
+    stringSetData.combine(StringSetData.create(ImmutableSet.of("ab")));

Review Comment:
   Again, perhaps worth optimizing for the case where value is already present. 



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java:
##########
@@ -98,6 +103,36 @@ public static GaugeData decodeInt64Gauge(ByteString 
payload) {
     }
   }
 
+  /** Encodes to {@link MonitoringInfoConstants.TypeUrns#SET_STRING_TYPE}. */
+  public static ByteString encodeStringSet(StringSetData data) {
+    try (ByteStringOutputStream output = new ByteStringOutputStream()) {
+      // encode the length of set
+      STRING_CODER.encode(String.valueOf(data.stringSet().size()), output);

Review Comment:
   Use `IterableCoder.of(StringsUtf8Coder.of())` rather than encoding the 
length as  a string. 



##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java:
##########
@@ -182,6 +186,13 @@ private Long getCounterValue(MetricUpdate metricUpdate) {
       return ((Number) metricUpdate.getScalar()).longValue();
     }
 
+    private StringSetResult getStringSetValue(MetricUpdate metricUpdate) {
+      if (metricUpdate.getSet() == null) {
+        return StringSetResult.empty();
+      }
+      return StringSetResult.create(new HashSet<>(((ArrayList) 
metricUpdate.getSet())));

Review Comment:
   ImmutableSet.copyOf(...) is likely faster, especially for sets of {0,1} 
elements. (Might be worth using elsewhere too.)



##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java:
##########
@@ -184,6 +189,13 @@ private MetricUpdate makeCounterMetricUpdate(
     return setStructuredName(update, name, namespace, step, tentative);
   }
 
+  private MetricUpdate makeStringSetMetricUpdate(
+      String name, String namespace, String step, List<String> setValues, 
boolean tentative) {

Review Comment:
   Why is this a List<String> of values? Should it be a Set<String> or 
Container<String>? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to