shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1588300082


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java:
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.fn.harness.Cache;
+import org.apache.beam.fn.harness.Caches;
+import 
org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet;
+import org.joda.time.Instant;
+
+/**
+ * An implementation of a bag user state that utilizes the Beam Fn State API 
to fetch, clear and
+ * persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence 
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled 
based upon cache memory
+ * pressure and its need to flush.
+ */
+public class OrderedListUserState<T> {
+  private final BeamFnStateClient beamFnStateClient;
+  private final StateRequest request;
+  private final Coder<T> valueCoder;
+  private final TimestampedValueCoder<T> timestampedValueCoder;
+  // Pending updates to persistent storage
+  private NavigableMap<Instant, Collection<T>> pendingAdds = Maps.newTreeMap();
+  private TreeRangeSet<Instant> pendingRemoves = TreeRangeSet.create();
+
+  private boolean isCleared = false;
+  private boolean isClosed = false;
+
+  public OrderedListUserState(
+      Cache<?, ?> cache,
+      BeamFnStateClient beamFnStateClient,
+      String instructionId,
+      StateKey stateKey,
+      Coder<T> valueCoder) {
+    checkArgument(
+        stateKey.hasOrderedListUserState(),
+        "Expected OrderedListUserState StateKey but received %s.",
+        stateKey);
+    this.beamFnStateClient = beamFnStateClient;
+    this.valueCoder = valueCoder;
+    this.timestampedValueCoder = TimestampedValueCoder.of(this.valueCoder);
+    this.request =
+        
StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build();
+  }
+
+  public void add(TimestampedValue<T> value) {
+    checkState(
+        !isClosed,
+        "OrderedList user state is no longer usable because it is closed for 
%s",
+        request.getStateKey());
+    Instant timestamp = value.getTimestamp();
+    pendingAdds.putIfAbsent(timestamp, new ArrayList<>());
+    pendingAdds.get(timestamp).add(value.getValue());
+  }
+
+  public Iterable<TimestampedValue<T>> readRange(Instant minTimestamp, Instant 
limitTimestamp) {
+    checkState(
+        !isClosed,
+        "OrderedList user state is no longer usable because it is closed for 
%s",
+        request.getStateKey());
+
+    // Store pendingAdds whose sort key is in the query range and values are 
truncated by the
+    // current size. The values (collections) of pendingAdds are kept, so that 
they will still be
+    // accessible in pre-existing iterables even after:
+    //   (1) a sort key is added to or removed from pendingAdds, or
+    //   (2) a new value is added to an existing sort key
+    ArrayList<PrefetchableIterable<TimestampedValue<T>>> pendingAddsInRange = 
new ArrayList<>();
+    for (Entry<Instant, Collection<T>> kv :

Review Comment:
   > Does this actually copy the elements, or is it just wrapping the values in 
pendingAdds in an Iterable? 
   
   Iterables in java work like iterables in other languages. Basically, we use 
it to traverse an existing collection of elements without making a copy of 
them. 
   
   > I'm curious about the statements in the comments saying "the values are 
kept" so that we can make modifications to the ordered list and not mess up 
existing Iterables.
   
   The `PrefetchableIterable` interface in Beam is specifically for iterables 
that support prefetching. The function `PrefetchableIterables.limit()` is 
applied on an iterable, resulting in a new iterable, which can be used to 
traverse its backend collection until the number of elements reaches a preset 
limit.
   
   Here is the tricky part. 
   - Note that `pendingAddsInRange` is a list of iterables. For each of these 
iterables, when an iteration is performed at any time after its creation, the 
result will be truncated to the last element we see at the time we initialize 
the iterable with `PrefetchableIterables.limit()`. In other words, appending a 
new element to the end of an existing key of `pendingAdds` would not affect the 
outcome above, because the iterable in `pendingAddsInRange` is designed to be 
truncated to the size when it is created.
   - When we add a new key to `pendingAdds`, it won't change the outcome of 
traversing an pre-existing `pendingAddsInRange` either, because the iterable of 
the new key is not included in `pendingAddsInRange`. 
   - When we delete an existing key from `pendingAdds`, we remove the key-value 
mapping, but not touch its value (i.e. the collection of elements). Therefore, 
the iterables in pre-existing `pendingAddsInRange` will still work as the 
backend collection is accessible. Only when no iterables reference these 
collections will they be garbage collected.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to