http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java deleted file mode 100644 index 24b340e..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java +++ /dev/null @@ -1,487 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.state; - -import static org.apache.flink.util.Preconditions.checkArgument; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.core.StateInternals; -import org.apache.beam.runners.core.StateNamespace; -import org.apache.beam.runners.core.StateTag; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.Coder.Context; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.CombineWithContext; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.CombiningState; -import org.apache.beam.sdk.util.state.MapState; -import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.SetState; -import org.apache.beam.sdk.util.state.State; -import org.apache.beam.sdk.util.state.StateContext; -import org.apache.beam.sdk.util.state.StateContexts; -import org.apache.beam.sdk.util.state.ValueState; -import org.apache.beam.sdk.util.state.WatermarkHoldState; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.state.KeyGroupsList; -import org.apache.flink.runtime.state.KeyedStateBackend; -import org.apache.flink.streaming.api.operators.HeapInternalTimerService; -import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.Preconditions; - -/** - * {@link StateInternals} that uses {@link KeyGroupCheckpointedOperator} - * to checkpoint state. - * - * <p>Note: - * Ignore index of key. - * Just implement BagState. - * - * <p>Reference from {@link HeapInternalTimerService} to the local key-group range. - */ -public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> { - - private final Coder<K> keyCoder; - private final KeyGroupsList localKeyGroupRange; - private KeyedStateBackend keyedStateBackend; - private final int localKeyGroupRangeStartIdx; - - // stateName -> namespace -> (valueCoder, value) - private final Map<String, Tuple2<Coder<?>, Map<String, ?>>>[] stateTables; - - public FlinkKeyGroupStateInternals( - Coder<K> keyCoder, - KeyedStateBackend keyedStateBackend) { - this.keyCoder = keyCoder; - this.keyedStateBackend = keyedStateBackend; - this.localKeyGroupRange = keyedStateBackend.getKeyGroupRange(); - // find the starting index of the local key-group range - int startIdx = Integer.MAX_VALUE; - for (Integer keyGroupIdx : localKeyGroupRange) { - startIdx = Math.min(keyGroupIdx, startIdx); - } - this.localKeyGroupRangeStartIdx = startIdx; - stateTables = (Map<String, Tuple2<Coder<?>, Map<String, ?>>>[]) - new Map[localKeyGroupRange.getNumberOfKeyGroups()]; - for (int i = 0; i < stateTables.length; i++) { - stateTables[i] = new HashMap<>(); - } - } - - @Override - public K getKey() { - ByteBuffer keyBytes = (ByteBuffer) keyedStateBackend.getCurrentKey(); - try { - return CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array()); - } catch (CoderException e) { - throw new RuntimeException("Error decoding key.", e); - } - } - - @Override - public <T extends State> T state( - final StateNamespace namespace, - StateTag<? super K, T> address) { - - return state(namespace, address, StateContexts.nullContext()); - } - - @Override - public <T extends State> T state( - final StateNamespace namespace, - StateTag<? super K, T> address, - final StateContext<?> context) { - - return address.bind(new StateTag.StateBinder<K>() { - - @Override - public <T> ValueState<T> bindValue( - StateTag<? super K, ValueState<T>> address, - Coder<T> coder) { - throw new UnsupportedOperationException( - String.format("%s is not supported", ValueState.class.getSimpleName())); - } - - @Override - public <T> BagState<T> bindBag( - StateTag<? super K, BagState<T>> address, - Coder<T> elemCoder) { - - return new FlinkKeyGroupBagState<>(address, namespace, elemCoder); - } - - @Override - public <T> SetState<T> bindSet( - StateTag<? super K, SetState<T>> address, - Coder<T> elemCoder) { - throw new UnsupportedOperationException( - String.format("%s is not supported", SetState.class.getSimpleName())); - } - - @Override - public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( - StateTag<? super K, MapState<KeyT, ValueT>> spec, - Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) { - throw new UnsupportedOperationException( - String.format("%s is not supported", MapState.class.getSimpleName())); - } - - @Override - public <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> - bindCombiningValue( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { - throw new UnsupportedOperationException("bindCombiningValue is not supported."); - } - - @Override - public <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { - throw new UnsupportedOperationException("bindKeyedCombiningValue is not supported."); - - } - - @Override - public <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - CombineWithContext.KeyedCombineFnWithContext< - ? super K, InputT, AccumT, OutputT> combineFn) { - throw new UnsupportedOperationException( - "bindKeyedCombiningValueWithContext is not supported."); - } - - @Override - public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( - StateTag<? super K, WatermarkHoldState<W>> address, - OutputTimeFn<? super W> outputTimeFn) { - throw new UnsupportedOperationException( - String.format("%s is not supported", CombiningState.class.getSimpleName())); - } - }); - } - - /** - * Reference from {@link Combine.CombineFn}. - * - * <p>Accumulators are stored in each KeyGroup, call addInput() when a element comes, - * call extractOutput() to produce the desired value when need to read data. - */ - interface KeyGroupCombiner<InputT, AccumT, OutputT> { - - /** - * Returns a new, mutable accumulator value, representing the accumulation - * of zero input values. - */ - AccumT createAccumulator(); - - /** - * Adds the given input value to the given accumulator, returning the - * new accumulator value. - */ - AccumT addInput(AccumT accumulator, InputT input); - - /** - * Returns the output value that is the result of all accumulators from KeyGroups - * that are assigned to this operator. - */ - OutputT extractOutput(Iterable<AccumT> accumulators); - } - - private abstract class AbstractKeyGroupState<InputT, AccumT, OutputT> { - - private String stateName; - private String namespace; - private Coder<AccumT> coder; - private KeyGroupCombiner<InputT, AccumT, OutputT> keyGroupCombiner; - - AbstractKeyGroupState( - String stateName, - String namespace, - Coder<AccumT> coder, - KeyGroupCombiner<InputT, AccumT, OutputT> keyGroupCombiner) { - this.stateName = stateName; - this.namespace = namespace; - this.coder = coder; - this.keyGroupCombiner = keyGroupCombiner; - } - - /** - * Choose keyGroup of input and addInput to accumulator. - */ - void addInput(InputT input) { - int keyGroupIdx = keyedStateBackend.getCurrentKeyGroupIndex(); - int localIdx = getIndexForKeyGroup(keyGroupIdx); - Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable = stateTables[localIdx]; - Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName); - if (tuple2 == null) { - tuple2 = new Tuple2<>(); - tuple2.f0 = coder; - tuple2.f1 = new HashMap<>(); - stateTable.put(stateName, tuple2); - } - Map<String, AccumT> map = (Map<String, AccumT>) tuple2.f1; - AccumT accumulator = map.get(namespace); - if (accumulator == null) { - accumulator = keyGroupCombiner.createAccumulator(); - } - accumulator = keyGroupCombiner.addInput(accumulator, input); - map.put(namespace, accumulator); - } - - /** - * Get all accumulators and invoke extractOutput(). - */ - OutputT extractOutput() { - List<AccumT> accumulators = new ArrayList<>(stateTables.length); - for (Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable : stateTables) { - Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName); - if (tuple2 != null) { - AccumT accumulator = (AccumT) tuple2.f1.get(namespace); - if (accumulator != null) { - accumulators.add(accumulator); - } - } - } - return keyGroupCombiner.extractOutput(accumulators); - } - - /** - * Find the first accumulator and return immediately. - */ - boolean isEmptyInternal() { - for (Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable : stateTables) { - Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName); - if (tuple2 != null) { - AccumT accumulator = (AccumT) tuple2.f1.get(namespace); - if (accumulator != null) { - return false; - } - } - } - return true; - } - - /** - * Clear accumulators and clean empty map. - */ - void clearInternal() { - for (Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable : stateTables) { - Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName); - if (tuple2 != null) { - tuple2.f1.remove(namespace); - if (tuple2.f1.size() == 0) { - stateTable.remove(stateName); - } - } - } - } - - } - - private int getIndexForKeyGroup(int keyGroupIdx) { - checkArgument(localKeyGroupRange.contains(keyGroupIdx), - "Key Group " + keyGroupIdx + " does not belong to the local range."); - return keyGroupIdx - this.localKeyGroupRangeStartIdx; - } - - private class KeyGroupBagCombiner<T> implements KeyGroupCombiner<T, List<T>, Iterable<T>> { - - @Override - public List<T> createAccumulator() { - return new ArrayList<>(); - } - - @Override - public List<T> addInput(List<T> accumulator, T input) { - accumulator.add(input); - return accumulator; - } - - @Override - public Iterable<T> extractOutput(Iterable<List<T>> accumulators) { - List<T> result = new ArrayList<>(); - // maybe can return an unmodifiable view. - for (List<T> list : accumulators) { - result.addAll(list); - } - return result; - } - } - - private class FlinkKeyGroupBagState<T> extends AbstractKeyGroupState<T, List<T>, Iterable<T>> - implements BagState<T> { - - private final StateNamespace namespace; - private final StateTag<? super K, BagState<T>> address; - - FlinkKeyGroupBagState( - StateTag<? super K, BagState<T>> address, - StateNamespace namespace, - Coder<T> coder) { - super(address.getId(), namespace.stringKey(), ListCoder.of(coder), - new KeyGroupBagCombiner<T>()); - this.namespace = namespace; - this.address = address; - } - - @Override - public void add(T input) { - addInput(input); - } - - @Override - public BagState<T> readLater() { - return this; - } - - @Override - public Iterable<T> read() { - Iterable<T> result = extractOutput(); - return result != null ? result : Collections.<T>emptyList(); - } - - @Override - public ReadableState<Boolean> isEmpty() { - return new ReadableState<Boolean>() { - @Override - public Boolean read() { - try { - return isEmptyInternal(); - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - - } - - @Override - public ReadableState<Boolean> readLater() { - return this; - } - }; - } - - @Override - public void clear() { - clearInternal(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - FlinkKeyGroupBagState<?> that = (FlinkKeyGroupBagState<?>) o; - - return namespace.equals(that.namespace) && address.equals(that.address); - - } - - @Override - public int hashCode() { - int result = namespace.hashCode(); - result = 31 * result + address.hashCode(); - return result; - } - } - - /** - * Snapshots the state {@code (stateName -> (valueCoder && (namespace -> value)))} for a given - * {@code keyGroupIdx}. - * - * @param keyGroupIdx the id of the key-group to be put in the snapshot. - * @param out the stream to write to. - */ - public void snapshotKeyGroupState(int keyGroupIdx, DataOutputStream out) throws Exception { - int localIdx = getIndexForKeyGroup(keyGroupIdx); - Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable = stateTables[localIdx]; - Preconditions.checkState(stateTable.size() <= Short.MAX_VALUE, - "Too many States: " + stateTable.size() + ". Currently at most " - + Short.MAX_VALUE + " states are supported"); - out.writeShort(stateTable.size()); - for (Map.Entry<String, Tuple2<Coder<?>, Map<String, ?>>> entry : stateTable.entrySet()) { - out.writeUTF(entry.getKey()); - Coder coder = entry.getValue().f0; - InstantiationUtil.serializeObject(out, coder); - Map<String, ?> map = entry.getValue().f1; - out.writeInt(map.size()); - for (Map.Entry<String, ?> entry1 : map.entrySet()) { - StringUtf8Coder.of().encode(entry1.getKey(), out, Context.NESTED); - coder.encode(entry1.getValue(), out, Context.NESTED); - } - } - } - - /** - * Restore the state {@code (stateName -> (valueCoder && (namespace -> value)))} - * for a given {@code keyGroupIdx}. - * - * @param keyGroupIdx the id of the key-group to be put in the snapshot. - * @param in the stream to read from. - * @param userCodeClassLoader the class loader that will be used to deserialize - * the valueCoder. - */ - public void restoreKeyGroupState(int keyGroupIdx, DataInputStream in, - ClassLoader userCodeClassLoader) throws Exception { - int localIdx = getIndexForKeyGroup(keyGroupIdx); - Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable = stateTables[localIdx]; - int numStates = in.readShort(); - for (int i = 0; i < numStates; ++i) { - String stateName = in.readUTF(); - Coder coder = InstantiationUtil.deserializeObject(in, userCodeClassLoader); - Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName); - if (tuple2 == null) { - tuple2 = new Tuple2<>(); - tuple2.f0 = coder; - tuple2.f1 = new HashMap<>(); - stateTable.put(stateName, tuple2); - } - Map<String, Object> map = (Map<String, Object>) tuple2.f1; - int mapSize = in.readInt(); - for (int j = 0; j < mapSize; j++) { - String namespace = StringUtf8Coder.of().decode(in, Context.NESTED); - Object value = coder.decode(in, Context.NESTED); - map.put(namespace, value); - } - } - } - -}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java deleted file mode 100644 index 2bf0bf1..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.state; - -import com.google.common.collect.Iterators; -import java.util.Collections; -import org.apache.beam.runners.core.StateInternals; -import org.apache.beam.runners.core.StateNamespace; -import org.apache.beam.runners.core.StateTag; -import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.CombineWithContext; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.CombiningState; -import org.apache.beam.sdk.util.state.MapState; -import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.SetState; -import org.apache.beam.sdk.util.state.State; -import org.apache.beam.sdk.util.state.StateContext; -import org.apache.beam.sdk.util.state.StateContexts; -import org.apache.beam.sdk.util.state.ValueState; -import org.apache.beam.sdk.util.state.WatermarkHoldState; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.runtime.state.OperatorStateBackend; - -/** - * {@link StateInternals} that uses a Flink {@link OperatorStateBackend} - * to manage the split-distribute state. - * - * <p>Elements in ListState will be redistributed in round robin fashion - * to operators when restarting with a different parallelism. - * - * <p>Note: - * Ignore index of key and namespace. - * Just implement BagState. - */ -public class FlinkSplitStateInternals<K> implements StateInternals<K> { - - private final OperatorStateBackend stateBackend; - - public FlinkSplitStateInternals(OperatorStateBackend stateBackend) { - this.stateBackend = stateBackend; - } - - @Override - public K getKey() { - return null; - } - - @Override - public <T extends State> T state( - final StateNamespace namespace, - StateTag<? super K, T> address) { - - return state(namespace, address, StateContexts.nullContext()); - } - - @Override - public <T extends State> T state( - final StateNamespace namespace, - StateTag<? super K, T> address, - final StateContext<?> context) { - - return address.bind(new StateTag.StateBinder<K>() { - - @Override - public <T> ValueState<T> bindValue( - StateTag<? super K, ValueState<T>> address, - Coder<T> coder) { - throw new UnsupportedOperationException( - String.format("%s is not supported", ValueState.class.getSimpleName())); - } - - @Override - public <T> BagState<T> bindBag( - StateTag<? super K, BagState<T>> address, - Coder<T> elemCoder) { - - return new FlinkSplitBagState<>(stateBackend, address, namespace, elemCoder); - } - - @Override - public <T> SetState<T> bindSet( - StateTag<? super K, SetState<T>> address, - Coder<T> elemCoder) { - throw new UnsupportedOperationException( - String.format("%s is not supported", SetState.class.getSimpleName())); - } - - @Override - public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( - StateTag<? super K, MapState<KeyT, ValueT>> spec, - Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) { - throw new UnsupportedOperationException( - String.format("%s is not supported", MapState.class.getSimpleName())); - } - - @Override - public <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> - bindCombiningValue( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { - throw new UnsupportedOperationException("bindCombiningValue is not supported."); - } - - @Override - public <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { - throw new UnsupportedOperationException("bindKeyedCombiningValue is not supported."); - - } - - @Override - public <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - CombineWithContext.KeyedCombineFnWithContext< - ? super K, InputT, AccumT, OutputT> combineFn) { - throw new UnsupportedOperationException( - "bindKeyedCombiningValueWithContext is not supported."); - } - - @Override - public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( - StateTag<? super K, WatermarkHoldState<W>> address, - OutputTimeFn<? super W> outputTimeFn) { - throw new UnsupportedOperationException( - String.format("%s is not supported", CombiningState.class.getSimpleName())); - } - }); - } - - private static class FlinkSplitBagState<K, T> implements BagState<T> { - - private final ListStateDescriptor<T> descriptor; - private OperatorStateBackend flinkStateBackend; - private final StateNamespace namespace; - private final StateTag<? super K, BagState<T>> address; - - FlinkSplitBagState( - OperatorStateBackend flinkStateBackend, - StateTag<? super K, BagState<T>> address, - StateNamespace namespace, - Coder<T> coder) { - this.flinkStateBackend = flinkStateBackend; - this.namespace = namespace; - this.address = address; - - CoderTypeInformation<T> typeInfo = - new CoderTypeInformation<>(coder); - - descriptor = new ListStateDescriptor<>(address.getId(), - typeInfo.createSerializer(new ExecutionConfig())); - } - - @Override - public void add(T input) { - try { - flinkStateBackend.getOperatorState(descriptor).add(input); - } catch (Exception e) { - throw new RuntimeException("Error updating state.", e); - } - } - - @Override - public BagState<T> readLater() { - return this; - } - - @Override - public Iterable<T> read() { - try { - Iterable<T> result = flinkStateBackend.getOperatorState(descriptor).get(); - return result != null ? result : Collections.<T>emptyList(); - } catch (Exception e) { - throw new RuntimeException("Error updating state.", e); - } - } - - @Override - public ReadableState<Boolean> isEmpty() { - return new ReadableState<Boolean>() { - @Override - public Boolean read() { - try { - Iterable<T> result = flinkStateBackend.getOperatorState(descriptor).get(); - // PartitionableListState.get() return empty collection When there is no element, - // KeyedListState different. (return null) - return result == null || Iterators.size(result.iterator()) == 0; - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - - } - - @Override - public ReadableState<Boolean> readLater() { - return this; - } - }; - } - - @Override - public void clear() { - try { - flinkStateBackend.getOperatorState(descriptor).clear(); - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - FlinkSplitBagState<?, ?> that = (FlinkSplitBagState<?, ?>) o; - - return namespace.equals(that.namespace) && address.equals(that.address); - - } - - @Override - public int hashCode() { - int result = namespace.hashCode(); - result = 31 * result + address.hashCode(); - return result; - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java deleted file mode 100644 index 4f961e5..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java +++ /dev/null @@ -1,1053 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.state; - -import com.google.common.collect.Lists; -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import org.apache.beam.runners.core.StateInternals; -import org.apache.beam.runners.core.StateNamespace; -import org.apache.beam.runners.core.StateTag; -import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.InstantCoder; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.CombineWithContext; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.CombineContextFactory; -import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.CombiningState; -import org.apache.beam.sdk.util.state.MapState; -import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.SetState; -import org.apache.beam.sdk.util.state.State; -import org.apache.beam.sdk.util.state.StateContext; -import org.apache.beam.sdk.util.state.StateContexts; -import org.apache.beam.sdk.util.state.ValueState; -import org.apache.beam.sdk.util.state.WatermarkHoldState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.runtime.state.KeyedStateBackend; -import org.joda.time.Instant; - -/** - * {@link StateInternals} that uses a Flink {@link KeyedStateBackend} to manage state. - * - * <p>Note: In the Flink streaming runner the key is always encoded - * using an {@link Coder} and stored in a {@link ByteBuffer}. - */ -public class FlinkStateInternals<K> implements StateInternals<K> { - - private final KeyedStateBackend<ByteBuffer> flinkStateBackend; - private Coder<K> keyCoder; - - // on recovery, these will no be properly set because we don't - // know which watermark hold states there are in the Flink State Backend - private final Map<String, Instant> watermarkHolds = new HashMap<>(); - - public FlinkStateInternals(KeyedStateBackend<ByteBuffer> flinkStateBackend, Coder<K> keyCoder) { - this.flinkStateBackend = flinkStateBackend; - this.keyCoder = keyCoder; - } - - /** - * Returns the minimum over all watermark holds. - */ - public Instant watermarkHold() { - long min = Long.MAX_VALUE; - for (Instant hold: watermarkHolds.values()) { - min = Math.min(min, hold.getMillis()); - } - return new Instant(min); - } - - @Override - public K getKey() { - ByteBuffer keyBytes = flinkStateBackend.getCurrentKey(); - try { - return CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array()); - } catch (CoderException e) { - throw new RuntimeException("Error decoding key.", e); - } - } - - @Override - public <T extends State> T state( - final StateNamespace namespace, - StateTag<? super K, T> address) { - - return state(namespace, address, StateContexts.nullContext()); - } - - @Override - public <T extends State> T state( - final StateNamespace namespace, - StateTag<? super K, T> address, - final StateContext<?> context) { - - return address.bind(new StateTag.StateBinder<K>() { - - @Override - public <T> ValueState<T> bindValue( - StateTag<? super K, ValueState<T>> address, - Coder<T> coder) { - - return new FlinkValueState<>(flinkStateBackend, address, namespace, coder); - } - - @Override - public <T> BagState<T> bindBag( - StateTag<? super K, BagState<T>> address, - Coder<T> elemCoder) { - - return new FlinkBagState<>(flinkStateBackend, address, namespace, elemCoder); - } - - @Override - public <T> SetState<T> bindSet( - StateTag<? super K, SetState<T>> address, - Coder<T> elemCoder) { - throw new UnsupportedOperationException( - String.format("%s is not supported", SetState.class.getSimpleName())); - } - - @Override - public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( - StateTag<? super K, MapState<KeyT, ValueT>> spec, - Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) { - throw new UnsupportedOperationException( - String.format("%s is not supported", MapState.class.getSimpleName())); - } - - @Override - public <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> - bindCombiningValue( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { - - return new FlinkCombiningState<>( - flinkStateBackend, address, combineFn, namespace, accumCoder); - } - - @Override - public <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { - return new FlinkKeyedCombiningState<>( - flinkStateBackend, - address, - combineFn, - namespace, - accumCoder, - FlinkStateInternals.this); - } - - @Override - public <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - CombineWithContext.KeyedCombineFnWithContext< - ? super K, InputT, AccumT, OutputT> combineFn) { - return new FlinkCombiningStateWithContext<>( - flinkStateBackend, - address, - combineFn, - namespace, - accumCoder, - FlinkStateInternals.this, - CombineContextFactory.createFromStateContext(context)); - } - - @Override - public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( - StateTag<? super K, WatermarkHoldState<W>> address, - OutputTimeFn<? super W> outputTimeFn) { - - return new FlinkWatermarkHoldState<>( - flinkStateBackend, FlinkStateInternals.this, address, namespace, outputTimeFn); - } - }); - } - - private static class FlinkValueState<K, T> implements ValueState<T> { - - private final StateNamespace namespace; - private final StateTag<? super K, ValueState<T>> address; - private final ValueStateDescriptor<T> flinkStateDescriptor; - private final KeyedStateBackend<ByteBuffer> flinkStateBackend; - - FlinkValueState( - KeyedStateBackend<ByteBuffer> flinkStateBackend, - StateTag<? super K, ValueState<T>> address, - StateNamespace namespace, - Coder<T> coder) { - - this.namespace = namespace; - this.address = address; - this.flinkStateBackend = flinkStateBackend; - - CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(coder); - - flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null); - } - - @Override - public void write(T input) { - try { - flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor).update(input); - } catch (Exception e) { - throw new RuntimeException("Error updating state.", e); - } - } - - @Override - public ValueState<T> readLater() { - return this; - } - - @Override - public T read() { - try { - return flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor).value(); - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - } - - @Override - public void clear() { - try { - flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor).clear(); - } catch (Exception e) { - throw new RuntimeException("Error clearing state.", e); - } - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - FlinkValueState<?, ?> that = (FlinkValueState<?, ?>) o; - - return namespace.equals(that.namespace) && address.equals(that.address); - - } - - @Override - public int hashCode() { - int result = namespace.hashCode(); - result = 31 * result + address.hashCode(); - return result; - } - } - - private static class FlinkBagState<K, T> implements BagState<T> { - - private final StateNamespace namespace; - private final StateTag<? super K, BagState<T>> address; - private final ListStateDescriptor<T> flinkStateDescriptor; - private final KeyedStateBackend<ByteBuffer> flinkStateBackend; - - FlinkBagState( - KeyedStateBackend<ByteBuffer> flinkStateBackend, - StateTag<? super K, BagState<T>> address, - StateNamespace namespace, - Coder<T> coder) { - - this.namespace = namespace; - this.address = address; - this.flinkStateBackend = flinkStateBackend; - - CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(coder); - - flinkStateDescriptor = new ListStateDescriptor<>(address.getId(), typeInfo); - } - - @Override - public void add(T input) { - try { - flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor).add(input); - } catch (Exception e) { - throw new RuntimeException("Error adding to bag state.", e); - } - } - - @Override - public BagState<T> readLater() { - return this; - } - - @Override - public Iterable<T> read() { - try { - Iterable<T> result = flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor).get(); - - return result != null ? result : Collections.<T>emptyList(); - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - } - - @Override - public ReadableState<Boolean> isEmpty() { - return new ReadableState<Boolean>() { - @Override - public Boolean read() { - try { - Iterable<T> result = flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor).get(); - return result == null; - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - - } - - @Override - public ReadableState<Boolean> readLater() { - return this; - } - }; - } - - @Override - public void clear() { - try { - flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor).clear(); - } catch (Exception e) { - throw new RuntimeException("Error clearing state.", e); - } - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - FlinkBagState<?, ?> that = (FlinkBagState<?, ?>) o; - - return namespace.equals(that.namespace) && address.equals(that.address); - - } - - @Override - public int hashCode() { - int result = namespace.hashCode(); - result = 31 * result + address.hashCode(); - return result; - } - } - - private static class FlinkCombiningState<K, InputT, AccumT, OutputT> - implements CombiningState<InputT, AccumT, OutputT> { - - private final StateNamespace namespace; - private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address; - private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn; - private final ValueStateDescriptor<AccumT> flinkStateDescriptor; - private final KeyedStateBackend<ByteBuffer> flinkStateBackend; - - FlinkCombiningState( - KeyedStateBackend<ByteBuffer> flinkStateBackend, - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, - Combine.CombineFn<InputT, AccumT, OutputT> combineFn, - StateNamespace namespace, - Coder<AccumT> accumCoder) { - - this.namespace = namespace; - this.address = address; - this.combineFn = combineFn; - this.flinkStateBackend = flinkStateBackend; - - CoderTypeInformation<AccumT> typeInfo = new CoderTypeInformation<>(accumCoder); - - flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null); - } - - @Override - public CombiningState<InputT, AccumT, OutputT> readLater() { - return this; - } - - @Override - public void add(InputT value) { - try { - org.apache.flink.api.common.state.ValueState<AccumT> state = - flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor); - - AccumT current = state.value(); - if (current == null) { - current = combineFn.createAccumulator(); - } - current = combineFn.addInput(current, value); - state.update(current); - } catch (Exception e) { - throw new RuntimeException("Error adding to state." , e); - } - } - - @Override - public void addAccum(AccumT accum) { - try { - org.apache.flink.api.common.state.ValueState<AccumT> state = - flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor); - - AccumT current = state.value(); - if (current == null) { - state.update(accum); - } else { - current = combineFn.mergeAccumulators(Lists.newArrayList(current, accum)); - state.update(current); - } - } catch (Exception e) { - throw new RuntimeException("Error adding to state.", e); - } - } - - @Override - public AccumT getAccum() { - try { - return flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor).value(); - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - } - - @Override - public AccumT mergeAccumulators(Iterable<AccumT> accumulators) { - return combineFn.mergeAccumulators(accumulators); - } - - @Override - public OutputT read() { - try { - org.apache.flink.api.common.state.ValueState<AccumT> state = - flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor); - - AccumT accum = state.value(); - if (accum != null) { - return combineFn.extractOutput(accum); - } else { - return combineFn.extractOutput(combineFn.createAccumulator()); - } - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - } - - @Override - public ReadableState<Boolean> isEmpty() { - return new ReadableState<Boolean>() { - @Override - public Boolean read() { - try { - return flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor).value() == null; - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - - } - - @Override - public ReadableState<Boolean> readLater() { - return this; - } - }; - } - - @Override - public void clear() { - try { - flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor).clear(); - } catch (Exception e) { - throw new RuntimeException("Error clearing state.", e); - } - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - FlinkCombiningState<?, ?, ?, ?> that = - (FlinkCombiningState<?, ?, ?, ?>) o; - - return namespace.equals(that.namespace) && address.equals(that.address); - - } - - @Override - public int hashCode() { - int result = namespace.hashCode(); - result = 31 * result + address.hashCode(); - return result; - } - } - - private static class FlinkKeyedCombiningState<K, InputT, AccumT, OutputT> - implements CombiningState<InputT, AccumT, OutputT> { - - private final StateNamespace namespace; - private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address; - private final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn; - private final ValueStateDescriptor<AccumT> flinkStateDescriptor; - private final KeyedStateBackend<ByteBuffer> flinkStateBackend; - private final FlinkStateInternals<K> flinkStateInternals; - - FlinkKeyedCombiningState( - KeyedStateBackend<ByteBuffer> flinkStateBackend, - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, - Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn, - StateNamespace namespace, - Coder<AccumT> accumCoder, - FlinkStateInternals<K> flinkStateInternals) { - - this.namespace = namespace; - this.address = address; - this.combineFn = combineFn; - this.flinkStateBackend = flinkStateBackend; - this.flinkStateInternals = flinkStateInternals; - - CoderTypeInformation<AccumT> typeInfo = new CoderTypeInformation<>(accumCoder); - - flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null); - } - - @Override - public CombiningState<InputT, AccumT, OutputT> readLater() { - return this; - } - - @Override - public void add(InputT value) { - try { - org.apache.flink.api.common.state.ValueState<AccumT> state = - flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor); - - AccumT current = state.value(); - if (current == null) { - current = combineFn.createAccumulator(flinkStateInternals.getKey()); - } - current = combineFn.addInput(flinkStateInternals.getKey(), current, value); - state.update(current); - } catch (Exception e) { - throw new RuntimeException("Error adding to state." , e); - } - } - - @Override - public void addAccum(AccumT accum) { - try { - org.apache.flink.api.common.state.ValueState<AccumT> state = - flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor); - - AccumT current = state.value(); - if (current == null) { - state.update(accum); - } else { - current = combineFn.mergeAccumulators( - flinkStateInternals.getKey(), - Lists.newArrayList(current, accum)); - state.update(current); - } - } catch (Exception e) { - throw new RuntimeException("Error adding to state.", e); - } - } - - @Override - public AccumT getAccum() { - try { - return flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor).value(); - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - } - - @Override - public AccumT mergeAccumulators(Iterable<AccumT> accumulators) { - return combineFn.mergeAccumulators(flinkStateInternals.getKey(), accumulators); - } - - @Override - public OutputT read() { - try { - org.apache.flink.api.common.state.ValueState<AccumT> state = - flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor); - - AccumT accum = state.value(); - if (accum != null) { - return combineFn.extractOutput(flinkStateInternals.getKey(), accum); - } else { - return combineFn.extractOutput( - flinkStateInternals.getKey(), - combineFn.createAccumulator(flinkStateInternals.getKey())); - } - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - } - - @Override - public ReadableState<Boolean> isEmpty() { - return new ReadableState<Boolean>() { - @Override - public Boolean read() { - try { - return flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor).value() == null; - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - - } - - @Override - public ReadableState<Boolean> readLater() { - return this; - } - }; - } - - @Override - public void clear() { - try { - flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor).clear(); - } catch (Exception e) { - throw new RuntimeException("Error clearing state.", e); - } - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - FlinkKeyedCombiningState<?, ?, ?, ?> that = - (FlinkKeyedCombiningState<?, ?, ?, ?>) o; - - return namespace.equals(that.namespace) && address.equals(that.address); - - } - - @Override - public int hashCode() { - int result = namespace.hashCode(); - result = 31 * result + address.hashCode(); - return result; - } - } - - private static class FlinkCombiningStateWithContext<K, InputT, AccumT, OutputT> - implements CombiningState<InputT, AccumT, OutputT> { - - private final StateNamespace namespace; - private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address; - private final CombineWithContext.KeyedCombineFnWithContext< - ? super K, InputT, AccumT, OutputT> combineFn; - private final ValueStateDescriptor<AccumT> flinkStateDescriptor; - private final KeyedStateBackend<ByteBuffer> flinkStateBackend; - private final FlinkStateInternals<K> flinkStateInternals; - private final CombineWithContext.Context context; - - FlinkCombiningStateWithContext( - KeyedStateBackend<ByteBuffer> flinkStateBackend, - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, - CombineWithContext.KeyedCombineFnWithContext< - ? super K, InputT, AccumT, OutputT> combineFn, - StateNamespace namespace, - Coder<AccumT> accumCoder, - FlinkStateInternals<K> flinkStateInternals, - CombineWithContext.Context context) { - - this.namespace = namespace; - this.address = address; - this.combineFn = combineFn; - this.flinkStateBackend = flinkStateBackend; - this.flinkStateInternals = flinkStateInternals; - this.context = context; - - CoderTypeInformation<AccumT> typeInfo = new CoderTypeInformation<>(accumCoder); - - flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null); - } - - @Override - public CombiningState<InputT, AccumT, OutputT> readLater() { - return this; - } - - @Override - public void add(InputT value) { - try { - org.apache.flink.api.common.state.ValueState<AccumT> state = - flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor); - - AccumT current = state.value(); - if (current == null) { - current = combineFn.createAccumulator(flinkStateInternals.getKey(), context); - } - current = combineFn.addInput(flinkStateInternals.getKey(), current, value, context); - state.update(current); - } catch (Exception e) { - throw new RuntimeException("Error adding to state." , e); - } - } - - @Override - public void addAccum(AccumT accum) { - try { - org.apache.flink.api.common.state.ValueState<AccumT> state = - flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor); - - AccumT current = state.value(); - if (current == null) { - state.update(accum); - } else { - current = combineFn.mergeAccumulators( - flinkStateInternals.getKey(), - Lists.newArrayList(current, accum), - context); - state.update(current); - } - } catch (Exception e) { - throw new RuntimeException("Error adding to state.", e); - } - } - - @Override - public AccumT getAccum() { - try { - return flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor).value(); - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - } - - @Override - public AccumT mergeAccumulators(Iterable<AccumT> accumulators) { - return combineFn.mergeAccumulators(flinkStateInternals.getKey(), accumulators, context); - } - - @Override - public OutputT read() { - try { - org.apache.flink.api.common.state.ValueState<AccumT> state = - flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor); - - AccumT accum = state.value(); - return combineFn.extractOutput(flinkStateInternals.getKey(), accum, context); - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - } - - @Override - public ReadableState<Boolean> isEmpty() { - return new ReadableState<Boolean>() { - @Override - public Boolean read() { - try { - return flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor).value() == null; - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - - } - - @Override - public ReadableState<Boolean> readLater() { - return this; - } - }; - } - - @Override - public void clear() { - try { - flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor).clear(); - } catch (Exception e) { - throw new RuntimeException("Error clearing state.", e); - } - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - FlinkCombiningStateWithContext<?, ?, ?, ?> that = - (FlinkCombiningStateWithContext<?, ?, ?, ?>) o; - - return namespace.equals(that.namespace) && address.equals(that.address); - - } - - @Override - public int hashCode() { - int result = namespace.hashCode(); - result = 31 * result + address.hashCode(); - return result; - } - } - - private static class FlinkWatermarkHoldState<K, W extends BoundedWindow> - implements WatermarkHoldState<W> { - private final StateTag<? super K, WatermarkHoldState<W>> address; - private final OutputTimeFn<? super W> outputTimeFn; - private final StateNamespace namespace; - private final KeyedStateBackend<ByteBuffer> flinkStateBackend; - private final FlinkStateInternals<K> flinkStateInternals; - private final ValueStateDescriptor<Instant> flinkStateDescriptor; - - public FlinkWatermarkHoldState( - KeyedStateBackend<ByteBuffer> flinkStateBackend, - FlinkStateInternals<K> flinkStateInternals, - StateTag<? super K, WatermarkHoldState<W>> address, - StateNamespace namespace, - OutputTimeFn<? super W> outputTimeFn) { - this.address = address; - this.outputTimeFn = outputTimeFn; - this.namespace = namespace; - this.flinkStateBackend = flinkStateBackend; - this.flinkStateInternals = flinkStateInternals; - - CoderTypeInformation<Instant> typeInfo = new CoderTypeInformation<>(InstantCoder.of()); - flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null); - } - - @Override - public OutputTimeFn<? super W> getOutputTimeFn() { - return outputTimeFn; - } - - @Override - public WatermarkHoldState<W> readLater() { - return this; - } - - @Override - public ReadableState<Boolean> isEmpty() { - return new ReadableState<Boolean>() { - @Override - public Boolean read() { - try { - return flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor).value() == null; - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - } - - @Override - public ReadableState<Boolean> readLater() { - return this; - } - }; - - } - - @Override - public void add(Instant value) { - try { - org.apache.flink.api.common.state.ValueState<Instant> state = - flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor); - - Instant current = state.value(); - if (current == null) { - state.update(value); - flinkStateInternals.watermarkHolds.put(namespace.stringKey(), value); - } else { - Instant combined = outputTimeFn.combine(current, value); - state.update(combined); - flinkStateInternals.watermarkHolds.put(namespace.stringKey(), combined); - } - } catch (Exception e) { - throw new RuntimeException("Error updating state.", e); - } - } - - @Override - public Instant read() { - try { - org.apache.flink.api.common.state.ValueState<Instant> state = - flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor); - return state.value(); - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - } - - @Override - public void clear() { - flinkStateInternals.watermarkHolds.remove(namespace.stringKey()); - try { - org.apache.flink.api.common.state.ValueState<Instant> state = - flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor); - state.clear(); - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - FlinkWatermarkHoldState<?, ?> that = (FlinkWatermarkHoldState<?, ?>) o; - - if (!address.equals(that.address)) { - return false; - } - if (!outputTimeFn.equals(that.outputTimeFn)) { - return false; - } - return namespace.equals(that.namespace); - - } - - @Override - public int hashCode() { - int result = address.hashCode(); - result = 31 * result + outputTimeFn.hashCode(); - result = 31 * result + namespace.hashCode(); - return result; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupCheckpointedOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupCheckpointedOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupCheckpointedOperator.java deleted file mode 100644 index b38a520..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupCheckpointedOperator.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.state; - -import java.io.DataOutputStream; - -/** - * This interface is used to checkpoint key-groups state. - */ -public interface KeyGroupCheckpointedOperator extends KeyGroupRestoringOperator{ - /** - * Snapshots the state for a given {@code keyGroupIdx}. - * - * <p>AbstractStreamOperator would call this hook in - * AbstractStreamOperator.snapshotState() while iterating over the key groups. - * @param keyGroupIndex the id of the key-group to be put in the snapshot. - * @param out the stream to write to. - */ - void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) throws Exception; -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupRestoringOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupRestoringOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupRestoringOperator.java deleted file mode 100644 index 2bdfc6e..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupRestoringOperator.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.state; - -import java.io.DataInputStream; - -/** - * This interface is used to restore key-groups state. - */ -public interface KeyGroupRestoringOperator { - /** - * Restore the state for a given {@code keyGroupIndex}. - * @param keyGroupIndex the id of the key-group to be put in the snapshot. - * @param in the stream to read from. - */ - void restoreKeyGroupState(int keyGroupIndex, DataInputStream in) throws Exception; -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/package-info.java deleted file mode 100644 index 0004e9e..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Internal state implementation of the Beam runner for Apache Flink. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.state; http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/resources/log4j.properties b/runners/flink/runner/src/main/resources/log4j.properties deleted file mode 100644 index 4b6a708..0000000 --- a/runners/flink/runner/src/main/resources/log4j.properties +++ /dev/null @@ -1,23 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=OFF,console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java deleted file mode 100644 index 10d6d9d..0000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.runners.flink.translation.types.EncodedValueComparator; -import org.apache.beam.runners.flink.translation.types.EncodedValueTypeInformation; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.ComparatorTestBase; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.junit.Assert; - -/** - * Test for {@link EncodedValueComparator}. - */ -public class EncodedValueComparatorTest extends ComparatorTestBase<byte[]> { - - @Override - protected TypeComparator<byte[]> createComparator(boolean ascending) { - return new EncodedValueTypeInformation().createComparator(ascending, new ExecutionConfig()); - } - - @Override - protected TypeSerializer<byte[]> createSerializer() { - return new EncodedValueTypeInformation().createSerializer(new ExecutionConfig()); - } - - @Override - protected void deepEquals(String message, byte[] should, byte[] is) { - Assert.assertArrayEquals(message, should, is); - } - - @Override - protected byte[][] getSortedTestData() { - StringUtf8Coder coder = StringUtf8Coder.of(); - - try { - return new byte[][]{ - CoderUtils.encodeToByteArray(coder, ""), - CoderUtils.encodeToByteArray(coder, "Lorem Ipsum Dolor Omit Longer"), - CoderUtils.encodeToByteArray(coder, "aaaa"), - CoderUtils.encodeToByteArray(coder, "abcd"), - CoderUtils.encodeToByteArray(coder, "abce"), - CoderUtils.encodeToByteArray(coder, "abdd"), - CoderUtils.encodeToByteArray(coder, "accd"), - CoderUtils.encodeToByteArray(coder, "bbcd") - }; - } catch (CoderException e) { - throw new RuntimeException("Could not encode values.", e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java deleted file mode 100644 index d9d174c..0000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.flink; - -import static org.junit.Assert.assertEquals; - -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.junit.Test; - -/** - * Tests the proper registration of the Flink runner. - */ -public class FlinkRunnerRegistrarTest { - - @Test - public void testFullName() { - String[] args = - new String[] {String.format("--runner=%s", FlinkRunner.class.getName())}; - PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create(); - assertEquals(opts.getRunner(), FlinkRunner.class); - } - - @Test - public void testClassName() { - String[] args = - new String[] {String.format("--runner=%s", FlinkRunner.class.getSimpleName())}; - PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create(); - assertEquals(opts.getRunner(), FlinkRunner.class); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java deleted file mode 100644 index d6240c4..0000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.runners.PipelineRunner; - -/** - * {@link org.apache.beam.sdk.Pipeline} for testing Dataflow programs on the - * {@link FlinkRunner}. - */ -public class FlinkTestPipeline extends Pipeline { - - /** - * Creates and returns a new test pipeline for batch execution. - * - * <p>Use {@link org.apache.beam.sdk.testing.PAssert} to add tests, then call - * {@link Pipeline#run} to execute the pipeline and check the tests. - */ - public static FlinkTestPipeline createForBatch() { - return create(false); - } - - /** - * Creates and returns a new test pipeline for streaming execution. - * - * <p>Use {@link org.apache.beam.sdk.testing.PAssert} to add tests, then call - * {@link Pipeline#run} to execute the pipeline and check the tests. - * - * @return The Test Pipeline - */ - public static FlinkTestPipeline createForStreaming() { - return create(true); - } - - /** - * Creates and returns a new test pipeline for streaming or batch execution. - * - * <p>Use {@link org.apache.beam.sdk.testing.PAssert} to add tests, then call - * {@link Pipeline#run} to execute the pipeline and check the tests. - * - * @param streaming <code>True</code> for streaming mode, <code>False</code> for batch. - * @return The Test Pipeline. - */ - private static FlinkTestPipeline create(boolean streaming) { - TestFlinkRunner flinkRunner = TestFlinkRunner.create(streaming); - return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions()); - } - - private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner, - PipelineOptions options) { - super(runner, options); - } -} - http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java deleted file mode 100644 index 06187f6..0000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.util.Collections; -import java.util.HashMap; -import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; -import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.commons.lang3.SerializationUtils; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.joda.time.Instant; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * Tests for serialization and deserialization of {@link PipelineOptions} in {@link DoFnOperator}. - */ -public class PipelineOptionsTest { - - /** - * Pipeline options. - */ - public interface MyOptions extends FlinkPipelineOptions { - @Description("Bla bla bla") - @Default.String("Hello") - String getTestOption(); - void setTestOption(String value); - } - - private static MyOptions options; - private static SerializedPipelineOptions serializedOptions; - - private static final String[] args = new String[]{"--testOption=nothing"}; - - @BeforeClass - public static void beforeTest() { - options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class); - serializedOptions = new SerializedPipelineOptions(options); - } - - @Test - public void testDeserialization() { - MyOptions deserializedOptions = serializedOptions.getPipelineOptions().as(MyOptions.class); - assertEquals("nothing", deserializedOptions.getTestOption()); - } - - @Test - public void testIgnoredFieldSerialization() { - FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); - options.setStateBackend(new MemoryStateBackend()); - - FlinkPipelineOptions deserialized = - new SerializedPipelineOptions(options).getPipelineOptions().as(FlinkPipelineOptions.class); - - assertNull(deserialized.getStateBackend()); - } - - @Test - public void testCaching() { - PipelineOptions deserializedOptions = - serializedOptions.getPipelineOptions().as(PipelineOptions.class); - - assertNotNull(deserializedOptions); - assertTrue(deserializedOptions == serializedOptions.getPipelineOptions()); - assertTrue(deserializedOptions == serializedOptions.getPipelineOptions()); - assertTrue(deserializedOptions == serializedOptions.getPipelineOptions()); - } - - @Test(expected = Exception.class) - public void testNonNull() { - new SerializedPipelineOptions(null); - } - - @Test(expected = Exception.class) - public void parDoBaseClassPipelineOptionsNullTest() { - DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>( - new TestDoFn(), - WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()), - new TupleTag<String>("main-output"), - Collections.<TupleTag<?>>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory<String>(), - WindowingStrategy.globalDefault(), - new HashMap<Integer, PCollectionView<?>>(), - Collections.<PCollectionView<?>>emptyList(), - null, - null); - - } - - /** - * Tests that PipelineOptions are present after serialization. - */ - @Test - public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception { - - DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>( - new TestDoFn(), - WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()), - new TupleTag<String>("main-output"), - Collections.<TupleTag<?>>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory<String>(), - WindowingStrategy.globalDefault(), - new HashMap<Integer, PCollectionView<?>>(), - Collections.<PCollectionView<?>>emptyList(), - options, - null); - - final byte[] serialized = SerializationUtils.serialize(doFnOperator); - - @SuppressWarnings("unchecked") - DoFnOperator<Object, Object, Object> deserialized = - (DoFnOperator<Object, Object, Object>) SerializationUtils.deserialize(serialized); - - TypeInformation<WindowedValue<Object>> typeInformation = TypeInformation.of( - new TypeHint<WindowedValue<Object>>() {}); - - OneInputStreamOperatorTestHarness<WindowedValue<Object>, Object> testHarness = - new OneInputStreamOperatorTestHarness<>(deserialized, - typeInformation.createSerializer(new ExecutionConfig())); - - testHarness.open(); - - // execute once to access options - testHarness.processElement(new StreamRecord<>( - WindowedValue.of( - new Object(), - Instant.now(), - GlobalWindow.INSTANCE, - PaneInfo.NO_FIRING))); - - testHarness.close(); - - } - - - private static class TestDoFn extends DoFn<String, String> { - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - Assert.assertNotNull(c.getPipelineOptions()); - Assert.assertEquals( - options.getTestOption(), - c.getPipelineOptions().as(MyOptions.class).getTestOption()); - } - } -}