Repository: beam Updated Branches: refs/heads/master f634aecbb -> ba5c4071d
[BEAM-1347] Add a BagUserState implementation over the BeamFnStateClient Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8d36a261 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8d36a261 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8d36a261 Branch: refs/heads/master Commit: 8d36a261d4e8c6569e9036a27d45c00daccd908b Parents: 20d88db Author: Luke Cwik <lc...@google.com> Authored: Thu Aug 24 18:34:47 2017 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Fri Aug 25 08:53:27 2017 -0700 ---------------------------------------------------------------------- .../beam/fn/harness/state/BagUserState.java | 121 +++++++++++++++++++ .../state/LazyCachingIteratorToIterable.java | 72 +++++++++++ .../beam/fn/harness/state/BagUserStateTest.java | 106 ++++++++++++++++ .../fn/harness/state/FakeBeamFnStateClient.java | 110 +++++++++++++++++ .../LazyCachingIteratorToIterableTest.java | 76 ++++++++++++ 5 files changed, 485 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8d36a261/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java new file mode 100644 index 0000000..2d7f0c8 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Iterables; +import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; +import org.apache.beam.fn.harness.stream.DataStreams; +import org.apache.beam.fn.v1.BeamFnApi.StateAppendRequest; +import org.apache.beam.fn.v1.BeamFnApi.StateClearRequest; +import org.apache.beam.fn.v1.BeamFnApi.StateRequest.Builder; +import org.apache.beam.sdk.coders.Coder; + +/** + * An implementation of a bag user state that utilizes the Beam Fn State API to fetch, clear + * and persist values. + * + * <p>Calling {@link #asyncClose()} schedules any required persistence changes. This object should + * no longer be used after it is closed. + * + * <p>TODO: Move to an async persist model where persistence is signalled based upon cache + * memory pressure and its need to flush. + * + * <p>TODO: Support block level caching and prefetch. + */ +public class BagUserState<T> { + private final BeamFnStateClient beamFnStateClient; + private final String stateId; + private final Coder<T> coder; + private final Supplier<Builder> partialRequestSupplier; + private Iterable<T> oldValues; + private ArrayList<T> newValues; + private List<T> unmodifiableNewValues; + private boolean isClosed; + + public BagUserState( + BeamFnStateClient beamFnStateClient, + String stateId, + Coder<T> coder, + Supplier<Builder> partialRequestSupplier) { + this.beamFnStateClient = beamFnStateClient; + this.stateId = stateId; + this.coder = coder; + this.partialRequestSupplier = partialRequestSupplier; + this.oldValues = new LazyCachingIteratorToIterable<>( + new DataStreams.DataStreamDecoder(coder, + DataStreams.inbound( + StateFetchingIterators.usingPartialRequestWithStateKey( + beamFnStateClient, + partialRequestSupplier)))); + this.newValues = new ArrayList<>(); + this.unmodifiableNewValues = Collections.unmodifiableList(newValues); + } + + public Iterable<T> get() { + checkState(!isClosed, + "Bag user state is no longer usable because it is closed for %s", stateId); + // If we were cleared we should disregard old values. + if (oldValues == null) { + return unmodifiableNewValues; + } + return Iterables.concat(oldValues, unmodifiableNewValues); + } + + public void append(T t) { + checkState(!isClosed, + "Bag user state is no longer usable because it is closed for %s", stateId); + newValues.add(t); + } + + public void clear() { + checkState(!isClosed, + "Bag user state is no longer usable because it is closed for %s", stateId); + oldValues = null; + newValues.clear(); + } + + public void asyncClose() throws Exception { + checkState(!isClosed, + "Bag user state is no longer usable because it is closed for %s", stateId); + if (oldValues == null) { + beamFnStateClient.handle( + partialRequestSupplier.get() + .setClear(StateClearRequest.getDefaultInstance()), + new CompletableFuture<>()); + } + if (!newValues.isEmpty()) { + ByteString.Output out = ByteString.newOutput(); + for (T newValue : newValues) { + // TODO: Replace with chunking output stream + coder.encode(newValue, out); + } + beamFnStateClient.handle( + partialRequestSupplier.get() + .setAppend(StateAppendRequest.newBuilder().setData(out.toByteString())), + new CompletableFuture<>()); + } + isClosed = true; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8d36a261/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/LazyCachingIteratorToIterable.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/LazyCachingIteratorToIterable.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/LazyCachingIteratorToIterable.java new file mode 100644 index 0000000..0a43317 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/LazyCachingIteratorToIterable.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * Converts an iterator to an iterable lazily loading values from the underlying iterator + * and caching them to support reiteration. + */ +class LazyCachingIteratorToIterable<T> implements Iterable<T> { + private final List<T> cachedElements; + private final Iterator<T> iterator; + + public LazyCachingIteratorToIterable(Iterator<T> iterator) { + this.cachedElements = new ArrayList<>(); + this.iterator = iterator; + } + + @Override + public Iterator<T> iterator() { + return new CachingIterator(); + } + + /** An {@link Iterator} which adds and fetched values into the cached elements list. */ + private class CachingIterator implements Iterator<T> { + private int position = 0; + + private CachingIterator() { + } + + @Override + public boolean hasNext() { + // The order of the short circuit is important below. + return position < cachedElements.size() || iterator.hasNext(); + } + + @Override + public T next() { + if (position < cachedElements.size()) { + return cachedElements.get(position++); + } + + if (!iterator.hasNext()) { + throw new NoSuchElementException(); + } + + T rval = iterator.next(); + cachedElements.add(rval); + position += 1; + return rval; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8d36a261/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BagUserStateTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BagUserStateTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BagUserStateTest.java new file mode 100644 index 0000000..f3c76ac --- /dev/null +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BagUserStateTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.protobuf.ByteString; +import java.io.IOException; +import org.apache.beam.fn.v1.BeamFnApi.StateKey; +import org.apache.beam.fn.v1.BeamFnApi.StateRequest; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link BagUserState}. */ +@RunWith(JUnit4.class) +public class BagUserStateTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testGet() throws Exception { + FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(ImmutableMap.of( + key("A"), encode("A1", "A2", "A3"))); + BagUserState<String> userState = + new BagUserState<>(fakeClient, "A", StringUtf8Coder.of(), () -> requestForId("A")); + assertArrayEquals(new String[]{ "A1", "A2", "A3" }, + Iterables.toArray(userState.get(), String.class)); + + userState.asyncClose(); + thrown.expect(IllegalStateException.class); + userState.get(); + } + + @Test + public void testAppend() throws Exception { + FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(ImmutableMap.of( + key("A"), encode("A1"))); + BagUserState<String> userState = + new BagUserState<>(fakeClient, "A", StringUtf8Coder.of(), () -> requestForId("A")); + userState.append("A2"); + userState.append("A3"); + userState.asyncClose(); + + assertEquals(encode("A1", "A2", "A3"), fakeClient.getData().get(key("A"))); + thrown.expect(IllegalStateException.class); + userState.append("A4"); + } + + @Test + public void testClear() throws Exception { + FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(ImmutableMap.of( + key("A"), encode("A1", "A2", "A3"))); + BagUserState<String> userState = + new BagUserState<>(fakeClient, "A", StringUtf8Coder.of(), () -> requestForId("A")); + + userState.clear(); + userState.append("A1"); + userState.clear(); + userState.asyncClose(); + + assertNull(fakeClient.getData().get(key("A"))); + thrown.expect(IllegalStateException.class); + userState.clear(); + } + + private StateRequest.Builder requestForId(String id) { + return StateRequest.newBuilder().setStateKey( + StateKey.newBuilder().setBagUserState( + StateKey.BagUserState.newBuilder().setKey(ByteString.copyFromUtf8(id)))); + } + + private StateKey key(String id) { + return StateKey.newBuilder().setBagUserState( + StateKey.BagUserState.newBuilder().setKey(ByteString.copyFromUtf8(id))).build(); + } + + private ByteString encode(String ... values) throws IOException { + ByteString.Output out = ByteString.newOutput(); + for (String value : values) { + StringUtf8Coder.of().encode(value, out); + } + return out.toByteString(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8d36a261/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java new file mode 100644 index 0000000..d260207 --- /dev/null +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import com.google.protobuf.ByteString; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.beam.fn.v1.BeamFnApi.StateAppendResponse; +import org.apache.beam.fn.v1.BeamFnApi.StateClearResponse; +import org.apache.beam.fn.v1.BeamFnApi.StateGetResponse; +import org.apache.beam.fn.v1.BeamFnApi.StateKey; +import org.apache.beam.fn.v1.BeamFnApi.StateKey.TypeCase; +import org.apache.beam.fn.v1.BeamFnApi.StateRequest; +import org.apache.beam.fn.v1.BeamFnApi.StateRequest.RequestCase; +import org.apache.beam.fn.v1.BeamFnApi.StateResponse; + +/** A fake implementation of a {@link BeamFnStateClient} to aid with testing. */ +public class FakeBeamFnStateClient implements BeamFnStateClient { + private final Map<StateKey, ByteString> data; + private int currentId; + + public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData) { + this.data = new ConcurrentHashMap<>(initialData); + } + + public Map<StateKey, ByteString> getData() { + return Collections.unmodifiableMap(data); + } + + @Override + public void handle(StateRequest.Builder requestBuilder, + CompletableFuture<StateResponse> responseFuture) { + // The id should never be filled out + assertEquals("", requestBuilder.getId()); + requestBuilder.setId(generateId()); + + StateRequest request = requestBuilder.build(); + StateKey key = request.getStateKey(); + StateResponse.Builder response; + + assertNotEquals(RequestCase.REQUEST_NOT_SET, request.getRequestCase()); + assertNotEquals(TypeCase.TYPE_NOT_SET, key.getTypeCase()); + // multimap side input and runner based state keys only support get requests + if (key.getTypeCase() == TypeCase.MULTIMAP_SIDE_INPUT + || key.getTypeCase() == TypeCase.RUNNER) { + assertEquals(RequestCase.GET, request.getRequestCase()); + } + + switch (request.getRequestCase()) { + case GET: + // Chunk gets into 5 byte return blocks + ByteString byteString = data.get(request.getStateKey()); + int block = 0; + if (request.getGet().getContinuationToken().size() > 0) { + block = Integer.parseInt(request.getGet().getContinuationToken().toStringUtf8()); + } + ByteString returnBlock = byteString.substring( + block * 5, Math.min(byteString.size(), (block + 1) * 5)); + ByteString continuationToken = ByteString.EMPTY; + if (byteString.size() > (block + 1) * 5) { + continuationToken = ByteString.copyFromUtf8(Integer.toString(block + 1)); + } + response = StateResponse.newBuilder().setGet(StateGetResponse.newBuilder() + .setData(returnBlock) + .setContinuationToken(continuationToken)); + break; + + case CLEAR: + data.remove(request.getStateKey()); + response = StateResponse.newBuilder().setClear(StateClearResponse.getDefaultInstance()); + break; + + case APPEND: + ByteString previousValue = data.getOrDefault(request.getStateKey(), ByteString.EMPTY); + data.put(request.getStateKey(), previousValue.concat(request.getAppend().getData())); + response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); + break; + + default: + throw new IllegalStateException( + String.format("Unknown request type %s", request.getRequestCase())); + } + + responseFuture.complete(response.setId(requestBuilder.getId()).build()); + } + + private String generateId() { + return Integer.toString(++currentId); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8d36a261/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/LazyCachingIteratorToIterableTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/LazyCachingIteratorToIterableTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/LazyCachingIteratorToIterableTest.java new file mode 100644 index 0000000..53eefb4 --- /dev/null +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/LazyCachingIteratorToIterableTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import java.util.Iterator; +import java.util.NoSuchElementException; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link LazyCachingIteratorToIterable}. */ +@RunWith(JUnit4.class) +public class LazyCachingIteratorToIterableTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testEmptyIterator() { + Iterable<Object> iterable = new LazyCachingIteratorToIterable<>(Iterators.forArray()); + assertArrayEquals(new Object[0], Iterables.toArray(iterable, Object.class)); + // iterate multiple times + assertArrayEquals(new Object[0], Iterables.toArray(iterable, Object.class)); + + thrown.expect(NoSuchElementException.class); + iterable.iterator().next(); + } + + @Test + public void testInterleavedIteration() { + Iterable<String> iterable = + new LazyCachingIteratorToIterable<>(Iterators.forArray("A", "B", "C")); + + Iterator<String> iterator1 = iterable.iterator(); + assertTrue(iterator1.hasNext()); + assertEquals("A", iterator1.next()); + Iterator<String> iterator2 = iterable.iterator(); + assertTrue(iterator2.hasNext()); + assertEquals("A", iterator2.next()); + assertTrue(iterator2.hasNext()); + assertEquals("B", iterator2.next()); + assertTrue(iterator1.hasNext()); + assertEquals("B", iterator1.next()); + assertTrue(iterator1.hasNext()); + assertEquals("C", iterator1.next()); + assertFalse(iterator1.hasNext()); + assertTrue(iterator2.hasNext()); + assertEquals("C", iterator2.next()); + assertFalse(iterator2.hasNext()); + + thrown.expect(NoSuchElementException.class); + iterator1.next(); + } +}