Repository: beam Updated Branches: refs/heads/master d4db66dd6 -> c4517d04c
[BEAM-1347] Provide an abstraction which creates an Iterator view over the Beam Fn State API Combining this with the DataStreams.DataStreamDecoder converts the Beam Fn State API into a an input stream backed by multiple logical chunks. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b3f7e218 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b3f7e218 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b3f7e218 Branch: refs/heads/master Commit: b3f7e2181ef32579646381573f9d147e0220d0d7 Parents: d4db66d Author: Luke Cwik <lc...@google.com> Authored: Thu Aug 17 11:49:35 2017 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Wed Aug 23 17:44:41 2017 -0700 ---------------------------------------------------------------------- .../fn/harness/state/BeamFnStateClient.java | 16 ++- .../harness/state/StateFetchingIterators.java | 126 +++++++++++++++++++ .../beam/fn/harness/stream/DataStreams.java | 2 +- .../state/StateFetchingIteratorsTest.java | 99 +++++++++++++++ 4 files changed, 241 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b3f7e218/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java index 8150530..682adb9 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java @@ -17,9 +17,23 @@ */ package org.apache.beam.fn.harness.state; +import java.util.concurrent.CompletableFuture; +import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.fn.v1.BeamFnApi.StateResponse; + /** - * TODO: Define interface required for handling state calls. + * The {@link BeamFnStateClient} is able to forward state requests to a handler which returns + * a corresponding response or error if completed unsuccessfully. */ public interface BeamFnStateClient { + /** + * Consumes a state request populating a unique id returning a future to the response. + * + * @param requestBuilder A partially completed state request. The id will be populated the client. + * @param response A future containing a corresponding {@link StateResponse} for the supplied + * request. + */ + void handle(BeamFnApi.StateRequest.Builder requestBuilder, + CompletableFuture<StateResponse> response); } http://git-wip-us.apache.org/repos/asf/beam/blob/b3f7e218/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java new file mode 100644 index 0000000..0526183 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java @@ -0,0 +1,126 @@ +package org.apache.beam.fn.harness.state; + +import com.google.common.base.Throwables; +import com.google.protobuf.ByteString; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; +import org.apache.beam.fn.v1.BeamFnApi.StateGetRequest; +import org.apache.beam.fn.v1.BeamFnApi.StateRequest; +import org.apache.beam.fn.v1.BeamFnApi.StateRequest.Builder; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import org.apache.beam.fn.v1.BeamFnApi.StateResponse; + +/** + * Adapters which convert a a logical series of chunks using continuation tokens over the Beam + * Fn State API into an {@link Iterator} of {@link ByteString}s. + */ +public class StateFetchingIterators { + + // do not instantiate + private StateFetchingIterators() {} + + /** + * This adapter handles using the continuation token to provide iteration over all the chunks + * returned by the Beam Fn State API using the supplied state client and partially filled + * out state request containing a state key. + * + * @param beamFnStateClient A client for handling state requests. + * @param partialStateRequestBuilder A {@link StateRequest} with the + * {@link StateRequest#getStateKey()} already set. + * @return An {@code Iterator<ByteString>} representing all the requested data. + */ + public static Iterator<ByteString> usingPartialRequestWithStateKey( + BeamFnStateClient beamFnStateClient, + Supplier<StateRequest.Builder> partialStateRequestBuilder) { + return new LazyBlockingStateFetchingIterator(beamFnStateClient, partialStateRequestBuilder); + } + + /** + * An {@link Iterator} which fetches {@link ByteString} chunks using the State API. + * + * <p>This iterator will only request a chunk on first access. Also it does not eagerly + * pre-fetch any future chunks and blocks whenever required to fetch the next block. + */ + static class LazyBlockingStateFetchingIterator implements Iterator<ByteString> { + private enum State { READ_REQUIRED, HAS_NEXT, EOF }; + private final BeamFnStateClient beamFnStateClient; + /** Allows for the partially built state request to be memoized across many requests. */ + private final Supplier<Builder> stateRequestSupplier; + private State currentState; + private ByteString continuationToken; + private ByteString next; + + LazyBlockingStateFetchingIterator( + BeamFnStateClient beamFnStateClient, + Supplier<StateRequest.Builder> stateRequestSupplier) { + this.currentState = State.READ_REQUIRED; + this.beamFnStateClient = beamFnStateClient; + this.stateRequestSupplier = stateRequestSupplier; + this.continuationToken = ByteString.EMPTY; + } + + @Override + public boolean hasNext() { + switch (currentState) { + case EOF: + return false; + case READ_REQUIRED: + CompletableFuture<StateResponse> stateResponseFuture = new CompletableFuture<>(); + beamFnStateClient.handle( + stateRequestSupplier.get().setGet( + StateGetRequest.newBuilder().setContinuationToken(continuationToken)), + stateResponseFuture); + StateResponse stateResponse; + try { + stateResponse = stateResponseFuture.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } catch (ExecutionException e) { + if (e.getCause() == null) { + throw new IllegalStateException(e); + } + Throwables.throwIfUnchecked(e.getCause()); + throw new IllegalStateException(e.getCause()); + } + continuationToken = stateResponse.getGet().getContinuationToken(); + next = stateResponse.getGet().getData(); + currentState = State.HAS_NEXT; + return true; + case HAS_NEXT: + return true; + } + throw new IllegalStateException(String.format("Unknown state %s", currentState)); + } + + @Override + public ByteString next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + // If the continuation token is empty, that means we have reached EOF. + currentState = ByteString.EMPTY.equals(continuationToken) ? State.EOF : State.READ_REQUIRED; + return next; + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/b3f7e218/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java index 6967160..3ecd303 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java @@ -153,7 +153,7 @@ public class DataStreams { } catch (IOException e) { throw new IllegalStateException(e); } - // fall through expected + return true; case HAS_NEXT: return true; } http://git-wip-us.apache.org/repos/asf/beam/blob/b3f7e218/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java new file mode 100644 index 0000000..67e36e1 --- /dev/null +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static org.junit.Assert.assertArrayEquals; + +import com.google.common.collect.Iterators; +import com.google.protobuf.ByteString; +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; +import org.apache.beam.fn.harness.state.StateFetchingIterators.LazyBlockingStateFetchingIterator; +import org.apache.beam.fn.v1.BeamFnApi.StateGetResponse; +import org.apache.beam.fn.v1.BeamFnApi.StateRequest; +import org.apache.beam.fn.v1.BeamFnApi.StateResponse; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link StateFetchingIterators}. */ +@RunWith(Enclosed.class) +public class StateFetchingIteratorsTest { + /** Tests for {@link StateFetchingIterators.LazyBlockingStateFetchingIterator}. */ + @RunWith(JUnit4.class) + public static class LazyBlockingStateFetchingIteratorTest { + + @Test + public void testEmpty() throws Exception { + testFetch(ByteString.EMPTY); + } + + @Test + public void testNonEmpty() throws Exception { + testFetch(ByteString.copyFromUtf8("A")); + } + + @Test + public void testWithLastByteStringBeingEmpty() throws Exception { + testFetch(ByteString.copyFromUtf8("A"), ByteString.EMPTY); + } + + @Test + public void testMulti() throws Exception { + testFetch(ByteString.copyFromUtf8("BC"), ByteString.copyFromUtf8("DEF")); + } + + @Test + public void testMultiWithEmptyByteStrings() throws Exception { + testFetch(ByteString.EMPTY, ByteString.copyFromUtf8("BC"), ByteString.EMPTY, + ByteString.EMPTY, ByteString.copyFromUtf8("DEF"), ByteString.EMPTY); + } + + private void testFetch(ByteString... expected) { + BeamFnStateClient fakeStateClient = new BeamFnStateClient() { + @Override + public void handle( + StateRequest.Builder requestBuilder, CompletableFuture<StateResponse> response) { + ByteString continuationToken = requestBuilder.getGet().getContinuationToken(); + StateGetResponse.Builder builder = StateGetResponse.newBuilder(); + + int requestedPosition = 0; // Default position is 0 + if (!ByteString.EMPTY.equals(continuationToken)) { + requestedPosition = Integer.parseInt(continuationToken.toStringUtf8()); + } + + // Compute the new continuation token + ByteString newContinuationToken = ByteString.EMPTY; + if (requestedPosition != expected.length - 1) { + newContinuationToken = ByteString.copyFromUtf8(Integer.toString(requestedPosition + 1)); + } + response.complete(StateResponse.newBuilder() + .setId(requestBuilder.getId()) + .setGet(StateGetResponse.newBuilder() + .setData(expected[requestedPosition]) + .setContinuationToken(newContinuationToken)) + .build()); + } + }; + Iterator<ByteString> byteStrings = + new LazyBlockingStateFetchingIterator(fakeStateClient, StateRequest::newBuilder); + assertArrayEquals(expected, Iterators.toArray(byteStrings, Object.class)); + } + } +}