rohitsinha54 commented on code in PR #31789:
URL: https://github.com/apache/beam/pull/31789#discussion_r1667758902
##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java:
##########
@@ -182,6 +186,13 @@ private Long getCounterValue(MetricUpdate metricUpdate) {
return ((Number) metricUpdate.getScalar()).longValue();
}
+ private StringSetResult getStringSetValue(MetricUpdate metricUpdate) {
+ if (metricUpdate.getSet() == null) {
+ return StringSetResult.empty();
+ }
+ return StringSetResult.create(new HashSet<>(((ArrayList)
metricUpdate.getSet())));
Review Comment:
Oh ya true. Thanks.
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.metrics.StringSetResult;
+
+/**
+ * Data describing the StringSet. This should retain enough detail that it can
be combined with
+ * other {@link StringSetData}.
+ */
+@AutoValue
+public abstract class StringSetData implements Serializable {
+
+ public abstract Set<String> stringSet();
Review Comment:
Done.
Also I see I missed to set well defined mutability behavior for
StringSetData and Result. Fixed it now. Result needs to immutable. Data I think
we can have either ways, having mutable will allow being able combine more
efficiently but can lead to confusing contract specially for
EmptyStringSetData. I am going to make StringSetData immutable and they can be
combined by only copying
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetCell.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.StringSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Tracks the current value for a {@link StringSet} metric.
+ *
+ * <p>This class generally shouldn't be used directly. The only exception is
within a runner where a
+ * counter is being reported for a specific step (rather than the counter in
the current context).
+ * In that case retrieving the underlying cell and reporting directly to it
avoids a step of
+ * indirection.
+ */
+public class StringSetCell implements StringSet, MetricCell<StringSetData> {
+
+ private final DirtyState dirty = new DirtyState();
+ private final AtomicReference<StringSetData> setValue =
+ new AtomicReference<>(StringSetData.empty());
+ private final MetricName name;
+
+ /**
+ * Generally, runners should construct instances using the methods in {@link
+ * MetricsContainerImpl}, unless they need to define their own version of
{@link
+ * MetricsContainer}. These constructors are *only* public so runners can
instantiate.
+ */
+ public StringSetCell(MetricName name) {
+ this.name = name;
+ }
+
+ @Override
+ public void reset() {
+ setValue.set(StringSetData.empty());
+ dirty.reset();
+ }
+
+ void update(StringSetData data) {
+ StringSetData original;
+ do {
+ original = setValue.get();
+ } while (!setValue.compareAndSet(original, original.combine(data)));
+ dirty.afterModification();
+ }
+
+ @Override
+ public DirtyState getDirty() {
+ return dirty;
+ }
+
+ @Override
+ public StringSetData getCumulative() {
+ return setValue.get();
+ }
+
+ @Override
+ public MetricName getName() {
+ return name;
+ }
+
+ @Override
+ public boolean equals(@Nullable Object object) {
+ if (object instanceof StringSetCell) {
+ StringSetCell stringSetCell = (StringSetCell) object;
+ return Objects.equals(dirty, stringSetCell.dirty)
+ && Objects.equals(setValue.get(), stringSetCell.setValue.get())
+ && Objects.equals(name, stringSetCell.name);
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dirty, setValue.get(), name);
+ }
+
+ @Override
+ public void add(String value) {
+ update(StringSetData.create(new
HashSet<>(Collections.singletonList(value))));
Review Comment:
Good idea. Thanks.
##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java:
##########
@@ -184,6 +189,13 @@ private MetricUpdate makeCounterMetricUpdate(
return setStructuredName(update, name, namespace, step, tentative);
}
+ private MetricUpdate makeStringSetMetricUpdate(
+ String name, String namespace, String step, List<String> setValues,
boolean tentative) {
Review Comment:
Ah you are right it can be set. Fixed. Earlier the documentation tripped me
https://screenshot.googleplex.com/4npsge3GVUo3zRs I thought it needs to be
"list" .
##########
runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetCellTest.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.metrics.MetricName;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Tests for {@link StringSetCell}. */
+public class StringSetCellTest {
+ private final StringSetCell cell = new
StringSetCell(MetricName.named("lineage", "sources"));
+
+ @Test
+ public void testDeltaAndCumulative() {
+ cell.add("pubsub");
+ cell.add("bq", "spanner");
+ assertEquals(cell.getCumulative().stringSet(), ImmutableSet.of("spanner",
"pubsub", "bq"));
+ assertEquals(
+ "getCumulative is idempotent",
+ cell.getCumulative().stringSet(),
+ ImmutableSet.of("spanner", "pubsub", "bq"));
+
+ assertThat(cell.getDirty().beforeCommit(), equalTo(true));
+ cell.getDirty().afterCommit();
+ assertThat(cell.getDirty().beforeCommit(), equalTo(false));
+
+ cell.add("gcs");
+ assertEquals(
+ cell.getCumulative().stringSet(), ImmutableSet.of("spanner", "pubsub",
"bq", "gcs"));
+
+ assertThat(
+ "Adding a new value made the cell dirty",
cell.getDirty().beforeCommit(), equalTo(true));
+ }
+
+ @Test
+ public void testEquals() {
+ StringSetCell stringSetCell = new
StringSetCell(MetricName.named("namespace", "name"));
+ StringSetCell equal = new StringSetCell(MetricName.named("namespace",
"name"));
+ assertEquals(stringSetCell, equal);
+ assertEquals(stringSetCell.hashCode(), equal.hashCode());
+ }
+
+ @Test
+ public void testNotEquals() {
+ StringSetCell stringSetCell = new
StringSetCell(MetricName.named("namespace", "name"));
+
+ Assert.assertNotEquals(stringSetCell, new Object());
+
+ StringSetCell differentDirty = new
StringSetCell(MetricName.named("namespace", "name"));
+ differentDirty.getDirty().afterModification();
+ Assert.assertNotEquals(stringSetCell, differentDirty);
Review Comment:
This is just conventional. I did not find any direct (or even indirect)
usage in my work so far.
##########
runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/StringSetImpl.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jet.metrics;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import org.apache.beam.runners.core.metrics.StringSetData;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.StringSet;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+
+/** Implementation of {@link StringSet}. */
+public class StringSetImpl extends AbstractMetric<StringSetData> implements
StringSet {
+
+ private final StringSetData stringSetData = StringSetData.empty();
+
+ public StringSetImpl(MetricName name) {
+ super(name);
+ }
+
+ @Override
+ StringSetData getValue() {
+ return stringSetData;
+ }
+
+ @Override
+ public void add(String value) {
+ stringSetData.combine(StringSetData.create(ImmutableSet.of("ab")));
Review Comment:
Fixed. Also fixed my typo of ignoring value and using "ab".
##########
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java:
##########
@@ -217,19 +219,43 @@ public GaugeResult extract(GaugeData data) {
}
};
+ private static final MetricAggregation<StringSetData, StringSetResult>
STRING_SET =
+ new MetricAggregation<StringSetData, StringSetResult>() {
+ @Override
+ public StringSetData zero() {
+ return StringSetData.empty();
+ }
+
+ @Override
+ public StringSetData combine(Iterable<StringSetData> updates) {
+ StringSetData result = StringSetData.empty();
+ for (StringSetData update : updates) {
+ result = result.combine(update);
Review Comment:
Ah good point. Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]