http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java deleted file mode 100644 index 9a52330..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableList; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.List; -import org.apache.beam.runners.core.KeyedWorkItem; -import org.apache.beam.runners.core.KeyedWorkItemCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.StandardCoder; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.PropertyNames; -import org.apache.beam.sdk.util.WindowedValue; - -/** - * Singleton keyed work item coder. - */ -public class SingletonKeyedWorkItemCoder<K, ElemT> - extends StandardCoder<SingletonKeyedWorkItem<K, ElemT>> { - /** - * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window - * coder. - */ - public static <K, ElemT> SingletonKeyedWorkItemCoder<K, ElemT> of( - Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) { - return new SingletonKeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder); - } - - @JsonCreator - public static <K, ElemT> SingletonKeyedWorkItemCoder<K, ElemT> of( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) { - checkArgument(components.size() == 3, "Expecting 3 components, got %s", components.size()); - @SuppressWarnings("unchecked") - Coder<K> keyCoder = (Coder<K>) components.get(0); - @SuppressWarnings("unchecked") - Coder<ElemT> elemCoder = (Coder<ElemT>) components.get(1); - @SuppressWarnings("unchecked") - Coder<? extends BoundedWindow> windowCoder = (Coder<? extends BoundedWindow>) components.get(2); - return new SingletonKeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder); - } - - private final Coder<K> keyCoder; - private final Coder<ElemT> elemCoder; - private final Coder<? extends BoundedWindow> windowCoder; - private final WindowedValue.FullWindowedValueCoder<ElemT> valueCoder; - - private SingletonKeyedWorkItemCoder( - Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) { - this.keyCoder = keyCoder; - this.elemCoder = elemCoder; - this.windowCoder = windowCoder; - valueCoder = WindowedValue.FullWindowedValueCoder.of(elemCoder, windowCoder); - } - - public Coder<K> getKeyCoder() { - return keyCoder; - } - - public Coder<ElemT> getElementCoder() { - return elemCoder; - } - - @Override - public void encode(SingletonKeyedWorkItem<K, ElemT> value, - OutputStream outStream, - Context context) - throws CoderException, IOException { - keyCoder.encode(value.key(), outStream, context.nested()); - valueCoder.encode(value.value, outStream, context); - } - - @Override - public SingletonKeyedWorkItem<K, ElemT> decode(InputStream inStream, Context context) - throws CoderException, IOException { - K key = keyCoder.decode(inStream, context.nested()); - WindowedValue<ElemT> value = valueCoder.decode(inStream, context); - return new SingletonKeyedWorkItem<>(key, value); - } - - @Override - public List<? extends Coder<?>> getCoderArguments() { - return ImmutableList.of(keyCoder, elemCoder, windowCoder); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - keyCoder.verifyDeterministic(); - elemCoder.verifyDeterministic(); - windowCoder.verifyDeterministic(); - } - - /** - * {@inheritDoc}. - * - * {@link KeyedWorkItemCoder} is not consistent with equals as it can return a - * {@link KeyedWorkItem} of a type different from the originally encoded type. - */ - @Override - public boolean consistentWithEquals() { - return false; - } - -}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java deleted file mode 100644 index 40f70e4..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming; - -import static com.google.common.base.Preconditions.checkState; - -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Executors; -import org.apache.beam.runners.core.ElementAndRestriction; -import org.apache.beam.runners.core.KeyedWorkItem; -import org.apache.beam.runners.core.KeyedWorkItems; -import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker; -import org.apache.beam.runners.core.OutputWindowedValue; -import org.apache.beam.runners.core.SplittableParDo; -import org.apache.beam.runners.core.StateInternals; -import org.apache.beam.runners.core.StateInternalsFactory; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.runners.core.TimerInternalsFactory; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.flink.streaming.api.operators.InternalTimer; -import org.joda.time.Duration; -import org.joda.time.Instant; - -/** - * Flink operator for executing splittable {@link DoFn DoFns}. Specifically, for executing - * the {@code @ProcessElement} method of a splittable {@link DoFn}. - */ -public class SplittableDoFnOperator< - InputT, FnOutputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> - extends DoFnOperator< - KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT, OutputT> { - - public SplittableDoFnOperator( - DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT> doFn, - Coder< - WindowedValue< - KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>> inputCoder, - TupleTag<FnOutputT> mainOutputTag, - List<TupleTag<?>> additionalOutputTags, - OutputManagerFactory<OutputT> outputManagerFactory, - WindowingStrategy<?, ?> windowingStrategy, - Map<Integer, PCollectionView<?>> sideInputTagMapping, - Collection<PCollectionView<?>> sideInputs, - PipelineOptions options, - Coder<?> keyCoder) { - super( - doFn, - inputCoder, - mainOutputTag, - additionalOutputTags, - outputManagerFactory, - windowingStrategy, - sideInputTagMapping, - sideInputs, - options, - keyCoder); - - } - - @Override - public void open() throws Exception { - super.open(); - - checkState(doFn instanceof SplittableParDo.ProcessFn); - - StateInternalsFactory<String> stateInternalsFactory = new StateInternalsFactory<String>() { - @Override - public StateInternals<String> stateInternalsForKey(String key) { - //this will implicitly be keyed by the key of the incoming - // element or by the key of a firing timer - return (StateInternals<String>) stateInternals; - } - }; - TimerInternalsFactory<String> timerInternalsFactory = new TimerInternalsFactory<String>() { - @Override - public TimerInternals timerInternalsForKey(String key) { - //this will implicitly be keyed like the StateInternalsFactory - return timerInternals; - } - }; - - ((SplittableParDo.ProcessFn) doFn).setStateInternalsFactory(stateInternalsFactory); - ((SplittableParDo.ProcessFn) doFn).setTimerInternalsFactory(timerInternalsFactory); - ((SplittableParDo.ProcessFn) doFn).setProcessElementInvoker( - new OutputAndTimeBoundedSplittableProcessElementInvoker<>( - doFn, - serializedOptions.getPipelineOptions(), - new OutputWindowedValue<FnOutputT>() { - @Override - public void outputWindowedValue( - FnOutputT output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - outputManager.output( - mainOutputTag, - WindowedValue.of(output, timestamp, windows, pane)); - } - - @Override - public <AdditionalOutputT> void outputWindowedValue( - TupleTag<AdditionalOutputT> tag, - AdditionalOutputT output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane)); - } - }, - sideInputReader, - Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()), - 10000, - Duration.standardSeconds(10))); - } - - @Override - public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) { - doFnRunner.processElement(WindowedValue.valueInGlobalWindow( - KeyedWorkItems.<String, ElementAndRestriction<InputT, RestrictionT>>timersWorkItem( - (String) stateInternals.getKey(), - Collections.singletonList(timer.getNamespace())))); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java deleted file mode 100644 index 9b2136c..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming; - -import static org.apache.beam.runners.core.TimerInternals.TimerData; - -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn; -import org.apache.beam.runners.core.KeyedWorkItem; -import org.apache.beam.runners.core.KeyedWorkItems; -import org.apache.beam.runners.core.StateInternals; -import org.apache.beam.runners.core.StateInternalsFactory; -import org.apache.beam.runners.core.SystemReduceFn; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.runners.core.TimerInternalsFactory; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.flink.streaming.api.operators.InternalTimer; - -/** - * Flink operator for executing window {@link DoFn DoFns}. - */ -public class WindowDoFnOperator<K, InputT, OutputT> - extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, WindowedValue<KV<K, OutputT>>> { - - private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn; - - public WindowDoFnOperator( - SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn, - Coder<WindowedValue<KeyedWorkItem<K, InputT>>> inputCoder, - TupleTag<KV<K, OutputT>> mainOutputTag, - List<TupleTag<?>> additionalOutputTags, - OutputManagerFactory<WindowedValue<KV<K, OutputT>>> outputManagerFactory, - WindowingStrategy<?, ?> windowingStrategy, - Map<Integer, PCollectionView<?>> sideInputTagMapping, - Collection<PCollectionView<?>> sideInputs, - PipelineOptions options, - Coder<K> keyCoder) { - super( - null, - inputCoder, - mainOutputTag, - additionalOutputTags, - outputManagerFactory, - windowingStrategy, - sideInputTagMapping, - sideInputs, - options, - keyCoder); - - this.systemReduceFn = systemReduceFn; - - } - - @Override - protected DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getDoFn() { - StateInternalsFactory<K> stateInternalsFactory = new StateInternalsFactory<K>() { - @Override - public StateInternals<K> stateInternalsForKey(K key) { - //this will implicitly be keyed by the key of the incoming - // element or by the key of a firing timer - return (StateInternals<K>) stateInternals; - } - }; - TimerInternalsFactory<K> timerInternalsFactory = new TimerInternalsFactory<K>() { - @Override - public TimerInternals timerInternalsForKey(K key) { - //this will implicitly be keyed like the StateInternalsFactory - return timerInternals; - } - }; - - // we have to do the unchecked cast because GroupAlsoByWindowViaWindowSetDoFn.create - // has the window type as generic parameter while WindowingStrategy is almost always - // untyped. - @SuppressWarnings("unchecked") - DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFn = - GroupAlsoByWindowViaWindowSetNewDoFn.create( - windowingStrategy, stateInternalsFactory, timerInternalsFactory, sideInputReader, - (SystemReduceFn) systemReduceFn, outputManager, mainOutputTag); - return doFn; - } - - @Override - public void fireTimer(InternalTimer<?, TimerData> timer) { - doFnRunner.processElement(WindowedValue.valueInGlobalWindow( - KeyedWorkItems.<K, InputT>timersWorkItem( - (K) stateInternals.getKey(), - Collections.singletonList(timer.getNamespace())))); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java deleted file mode 100644 index 1dff367..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming; - -import java.nio.ByteBuffer; -import org.apache.beam.runners.core.KeyedWorkItem; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; - -/** - * {@link KeySelector} that retrieves a key from a {@link KeyedWorkItem}. This will return - * the key as encoded by the provided {@link Coder} in a {@link ByteBuffer}. This ensures - * that all key comparisons/hashing happen on the encoded form. - */ -public class WorkItemKeySelector<K, V> - implements KeySelector<WindowedValue<SingletonKeyedWorkItem<K, V>>, ByteBuffer>, - ResultTypeQueryable<ByteBuffer> { - - private final Coder<K> keyCoder; - - public WorkItemKeySelector(Coder<K> keyCoder) { - this.keyCoder = keyCoder; - } - - @Override - public ByteBuffer getKey(WindowedValue<SingletonKeyedWorkItem<K, V>> value) throws Exception { - K key = value.getValue().key(); - byte[] keyBytes = CoderUtils.encodeToByteArray(keyCoder, key); - return ByteBuffer.wrap(keyBytes); - } - - @Override - public TypeInformation<ByteBuffer> getProducedType() { - return new GenericTypeInfo<>(ByteBuffer.class); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java deleted file mode 100644 index 2ed5024..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.io; - -import com.google.common.annotations.VisibleForTesting; -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.flink.api.common.functions.StoppableFunction; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Wrapper for executing {@link BoundedSource BoundedSources} as a Flink Source. - */ -public class BoundedSourceWrapper<OutputT> - extends RichParallelSourceFunction<WindowedValue<OutputT>> - implements StoppableFunction { - - private static final Logger LOG = LoggerFactory.getLogger(BoundedSourceWrapper.class); - - /** - * Keep the options so that we can initialize the readers. - */ - private final SerializedPipelineOptions serializedOptions; - - /** - * The split sources. We split them in the constructor to ensure that all parallel - * sources are consistent about the split sources. - */ - private List<? extends BoundedSource<OutputT>> splitSources; - - /** - * Make it a field so that we can access it in {@link #close()}. - */ - private transient List<BoundedSource.BoundedReader<OutputT>> readers; - - /** - * Initialize here and not in run() to prevent races where we cancel a job before run() is - * ever called or run() is called after cancel(). - */ - private volatile boolean isRunning = true; - - @SuppressWarnings("unchecked") - public BoundedSourceWrapper( - PipelineOptions pipelineOptions, - BoundedSource<OutputT> source, - int parallelism) throws Exception { - this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); - - long desiredBundleSize = source.getEstimatedSizeBytes(pipelineOptions) / parallelism; - - // get the splits early. we assume that the generated splits are stable, - // this is necessary so that the mapping of state to source is correct - // when restoring - splitSources = source.split(desiredBundleSize, pipelineOptions); - } - - @Override - public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception { - - // figure out which split sources we're responsible for - int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); - int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks(); - - List<BoundedSource<OutputT>> localSources = new ArrayList<>(); - - for (int i = 0; i < splitSources.size(); i++) { - if (i % numSubtasks == subtaskIndex) { - localSources.add(splitSources.get(i)); - } - } - - LOG.info("Bounded Flink Source {}/{} is reading from sources: {}", - subtaskIndex, - numSubtasks, - localSources); - - readers = new ArrayList<>(); - // initialize readers from scratch - for (BoundedSource<OutputT> source : localSources) { - readers.add(source.createReader(serializedOptions.getPipelineOptions())); - } - - if (readers.size() == 1) { - // the easy case, we just read from one reader - BoundedSource.BoundedReader<OutputT> reader = readers.get(0); - - boolean dataAvailable = reader.start(); - if (dataAvailable) { - emitElement(ctx, reader); - } - - while (isRunning) { - dataAvailable = reader.advance(); - - if (dataAvailable) { - emitElement(ctx, reader); - } else { - break; - } - } - } else { - // a bit more complicated, we are responsible for several readers - // loop through them and sleep if none of them had any data - - int currentReader = 0; - - // start each reader and emit data if immediately available - for (BoundedSource.BoundedReader<OutputT> reader : readers) { - boolean dataAvailable = reader.start(); - if (dataAvailable) { - emitElement(ctx, reader); - } - } - - // a flag telling us whether any of the readers had data - // if no reader had data, sleep for bit - boolean hadData = false; - while (isRunning && !readers.isEmpty()) { - BoundedSource.BoundedReader<OutputT> reader = readers.get(currentReader); - boolean dataAvailable = reader.advance(); - - if (dataAvailable) { - emitElement(ctx, reader); - hadData = true; - } else { - readers.remove(currentReader); - currentReader--; - if (readers.isEmpty()) { - break; - } - } - - currentReader = (currentReader + 1) % readers.size(); - if (currentReader == 0 && !hadData) { - Thread.sleep(50); - } else if (currentReader == 0) { - hadData = false; - } - } - - } - - // emit final Long.MAX_VALUE watermark, just to be sure - ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); - } - - /** - * Emit the current element from the given Reader. The reader is guaranteed to have data. - */ - private void emitElement( - SourceContext<WindowedValue<OutputT>> ctx, - BoundedSource.BoundedReader<OutputT> reader) { - // make sure that reader state update and element emission are atomic - // with respect to snapshots - synchronized (ctx.getCheckpointLock()) { - - OutputT item = reader.getCurrent(); - Instant timestamp = reader.getCurrentTimestamp(); - - WindowedValue<OutputT> windowedValue = - WindowedValue.of(item, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); - ctx.collectWithTimestamp(windowedValue, timestamp.getMillis()); - } - } - - @Override - public void close() throws Exception { - super.close(); - if (readers != null) { - for (BoundedSource.BoundedReader<OutputT> reader: readers) { - reader.close(); - } - } - } - - @Override - public void cancel() { - isRunning = false; - } - - @Override - public void stop() { - this.isRunning = false; - } - - /** - * Visible so that we can check this in tests. Must not be used for anything else. - */ - @VisibleForTesting - public List<? extends BoundedSource<OutputT>> getSplitSources() { - return splitSources; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java deleted file mode 100644 index 910a33f..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java +++ /dev/null @@ -1,249 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.io; - -import static com.google.common.base.Preconditions.checkArgument; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.util.Collections; -import java.util.List; -import java.util.NoSuchElementException; -import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * An example unbounded Beam source that reads input from a socket. - * This is used mainly for testing and debugging. - * */ -public class UnboundedSocketSource<CheckpointMarkT extends UnboundedSource.CheckpointMark> - extends UnboundedSource<String, CheckpointMarkT> { - - private static final Coder<String> DEFAULT_SOCKET_CODER = StringUtf8Coder.of(); - - private static final long serialVersionUID = 1L; - - private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 500; - - private static final int CONNECTION_TIMEOUT_TIME = 0; - - private final String hostname; - private final int port; - private final char delimiter; - private final long maxNumRetries; - private final long delayBetweenRetries; - - public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries) { - this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP); - } - - public UnboundedSocketSource(String hostname, - int port, - char delimiter, - long maxNumRetries, - long delayBetweenRetries) { - this.hostname = hostname; - this.port = port; - this.delimiter = delimiter; - this.maxNumRetries = maxNumRetries; - this.delayBetweenRetries = delayBetweenRetries; - } - - public String getHostname() { - return this.hostname; - } - - public int getPort() { - return this.port; - } - - public char getDelimiter() { - return this.delimiter; - } - - public long getMaxNumRetries() { - return this.maxNumRetries; - } - - public long getDelayBetweenRetries() { - return this.delayBetweenRetries; - } - - @Override - public List<? extends UnboundedSource<String, CheckpointMarkT>> split( - int desiredNumSplits, - PipelineOptions options) throws Exception { - return Collections.<UnboundedSource<String, CheckpointMarkT>>singletonList(this); - } - - @Override - public UnboundedReader<String> createReader(PipelineOptions options, - @Nullable CheckpointMarkT checkpointMark) { - return new UnboundedSocketReader(this); - } - - @Nullable - @Override - public Coder getCheckpointMarkCoder() { - // Flink and Dataflow have different checkpointing mechanisms. - // In our case we do not need a coder. - return null; - } - - @Override - public void validate() { - checkArgument(port > 0 && port < 65536, "port is out of range"); - checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), " - + "or -1 (infinite retries)"); - checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries must be zero or positive"); - } - - @Override - public Coder getDefaultOutputCoder() { - return DEFAULT_SOCKET_CODER; - } - - /** - * Unbounded socket reader. - */ - public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String> { - - private static final Logger LOG = LoggerFactory.getLogger(UnboundedSocketReader.class); - - private final UnboundedSocketSource source; - - private Socket socket; - private BufferedReader reader; - - private boolean isRunning; - - private String currentRecord; - - public UnboundedSocketReader(UnboundedSocketSource source) { - this.source = source; - } - - private void openConnection() throws IOException { - this.socket = new Socket(); - this.socket.connect(new InetSocketAddress(this.source.getHostname(), this.source.getPort()), - CONNECTION_TIMEOUT_TIME); - this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); - this.isRunning = true; - } - - @Override - public boolean start() throws IOException { - int attempt = 0; - while (!isRunning) { - try { - openConnection(); - LOG.info("Connected to server socket " + this.source.getHostname() + ':' - + this.source.getPort()); - - return advance(); - } catch (IOException e) { - LOG.info("Lost connection to server socket " + this.source.getHostname() + ':' - + this.source.getPort() + ". Retrying in " - + this.source.getDelayBetweenRetries() + " msecs..."); - - if (this.source.getMaxNumRetries() == -1 || attempt++ < this.source.getMaxNumRetries()) { - try { - Thread.sleep(this.source.getDelayBetweenRetries()); - } catch (InterruptedException e1) { - e1.printStackTrace(); - } - } else { - this.isRunning = false; - break; - } - } - } - LOG.error("Unable to connect to host " + this.source.getHostname() - + " : " + this.source.getPort()); - return false; - } - - @Override - public boolean advance() throws IOException { - final StringBuilder buffer = new StringBuilder(); - int data; - while (isRunning && (data = reader.read()) != -1) { - // check if the string is complete - if (data != this.source.getDelimiter()) { - buffer.append((char) data); - } else { - if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\r') { - buffer.setLength(buffer.length() - 1); - } - this.currentRecord = buffer.toString(); - buffer.setLength(0); - return true; - } - } - return false; - } - - @Override - public byte[] getCurrentRecordId() throws NoSuchElementException { - return new byte[0]; - } - - @Override - public String getCurrent() throws NoSuchElementException { - return this.currentRecord; - } - - @Override - public Instant getCurrentTimestamp() throws NoSuchElementException { - return Instant.now(); - } - - @Override - public void close() throws IOException { - this.reader.close(); - this.socket.close(); - this.isRunning = false; - LOG.info("Closed connection to server socket at " + this.source.getHostname() + ":" - + this.source.getPort() + "."); - } - - @Override - public Instant getWatermark() { - return Instant.now(); - } - - @Override - public CheckpointMark getCheckpointMark() { - return null; - } - - @Override - public UnboundedSource<String, ?> getCurrentSource() { - return this.source; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java deleted file mode 100644 index bb9b58a..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ /dev/null @@ -1,476 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.io; - -import com.google.common.annotations.VisibleForTesting; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; -import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.StoppableFunction; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.OperatorStateStore; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.runtime.state.DefaultOperatorStateBackend; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Wrapper for executing {@link UnboundedSource UnboundedSources} as a Flink Source. - */ -public class UnboundedSourceWrapper< - OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> - extends RichParallelSourceFunction<WindowedValue<OutputT>> - implements ProcessingTimeCallback, StoppableFunction, - CheckpointListener, CheckpointedFunction { - - private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class); - - /** - * Keep the options so that we can initialize the localReaders. - */ - private final SerializedPipelineOptions serializedOptions; - - /** - * For snapshot and restore. - */ - private final KvCoder< - ? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> checkpointCoder; - - /** - * The split sources. We split them in the constructor to ensure that all parallel - * sources are consistent about the split sources. - */ - private final List<? extends UnboundedSource<OutputT, CheckpointMarkT>> splitSources; - - /** - * The local split sources. Assigned at runtime when the wrapper is executed in parallel. - */ - private transient List<UnboundedSource<OutputT, CheckpointMarkT>> localSplitSources; - - /** - * The local split readers. Assigned at runtime when the wrapper is executed in parallel. - * Make it a field so that we can access it in {@link #onProcessingTime(long)} for - * emitting watermarks. - */ - private transient List<UnboundedSource.UnboundedReader<OutputT>> localReaders; - - /** - * Flag to indicate whether the source is running. - * Initialize here and not in run() to prevent races where we cancel a job before run() is - * ever called or run() is called after cancel(). - */ - private volatile boolean isRunning = true; - - /** - * Make it a field so that we can access it in {@link #onProcessingTime(long)} for registering new - * triggers. - */ - private transient StreamingRuntimeContext runtimeContext; - - /** - * Make it a field so that we can access it in {@link #onProcessingTime(long)} for emitting - * watermarks. - */ - private transient SourceContext<WindowedValue<OutputT>> context; - - /** - * Pending checkpoints which have not been acknowledged yet. - */ - private transient LinkedHashMap<Long, List<CheckpointMarkT>> pendingCheckpoints; - /** - * Keep a maximum of 32 checkpoints for {@code CheckpointMark.finalizeCheckpoint()}. - */ - private static final int MAX_NUMBER_PENDING_CHECKPOINTS = 32; - - private transient ListState<KV<? extends - UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>> stateForCheckpoint; - - /** - * false if checkpointCoder is null or no restore state by starting first. - */ - private transient boolean isRestored = false; - - @SuppressWarnings("unchecked") - public UnboundedSourceWrapper( - PipelineOptions pipelineOptions, - UnboundedSource<OutputT, CheckpointMarkT> source, - int parallelism) throws Exception { - this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); - - if (source.requiresDeduping()) { - LOG.warn("Source {} requires deduping but Flink runner doesn't support this yet.", source); - } - - Coder<CheckpointMarkT> checkpointMarkCoder = source.getCheckpointMarkCoder(); - if (checkpointMarkCoder == null) { - LOG.info("No CheckpointMarkCoder specified for this source. Won't create snapshots."); - checkpointCoder = null; - } else { - - Coder<? extends UnboundedSource<OutputT, CheckpointMarkT>> sourceCoder = - (Coder) SerializableCoder.of(new TypeDescriptor<UnboundedSource>() { - }); - - checkpointCoder = KvCoder.of(sourceCoder, checkpointMarkCoder); - } - - // get the splits early. we assume that the generated splits are stable, - // this is necessary so that the mapping of state to source is correct - // when restoring - splitSources = source.split(parallelism, pipelineOptions); - } - - - /** - * Initialize and restore state before starting execution of the source. - */ - @Override - public void open(Configuration parameters) throws Exception { - runtimeContext = (StreamingRuntimeContext) getRuntimeContext(); - - // figure out which split sources we're responsible for - int subtaskIndex = runtimeContext.getIndexOfThisSubtask(); - int numSubtasks = runtimeContext.getNumberOfParallelSubtasks(); - - localSplitSources = new ArrayList<>(); - localReaders = new ArrayList<>(); - - pendingCheckpoints = new LinkedHashMap<>(); - - if (isRestored) { - // restore the splitSources from the checkpoint to ensure consistent ordering - for (KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> restored: - stateForCheckpoint.get()) { - localSplitSources.add(restored.getKey()); - localReaders.add(restored.getKey().createReader( - serializedOptions.getPipelineOptions(), restored.getValue())); - } - } else { - // initialize localReaders and localSources from scratch - for (int i = 0; i < splitSources.size(); i++) { - if (i % numSubtasks == subtaskIndex) { - UnboundedSource<OutputT, CheckpointMarkT> source = - splitSources.get(i); - UnboundedSource.UnboundedReader<OutputT> reader = - source.createReader(serializedOptions.getPipelineOptions(), null); - localSplitSources.add(source); - localReaders.add(reader); - } - } - } - - LOG.info("Unbounded Flink Source {}/{} is reading from sources: {}", - subtaskIndex, - numSubtasks, - localSplitSources); - } - - @Override - public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception { - - context = ctx; - - if (localReaders.size() == 0) { - // do nothing, but still look busy ... - // also, output a Long.MAX_VALUE watermark since we know that we're not - // going to emit anything - // we can't return here since Flink requires that all operators stay up, - // otherwise checkpointing would not work correctly anymore - ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); - - // wait until this is canceled - final Object waitLock = new Object(); - while (isRunning) { - try { - // Flink will interrupt us at some point - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (waitLock) { - // don't wait indefinitely, in case something goes horribly wrong - waitLock.wait(1000); - } - } catch (InterruptedException e) { - if (!isRunning) { - // restore the interrupted state, and fall through the loop - Thread.currentThread().interrupt(); - } - } - } - } else if (localReaders.size() == 1) { - // the easy case, we just read from one reader - UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(0); - - boolean dataAvailable = reader.start(); - if (dataAvailable) { - emitElement(ctx, reader); - } - - setNextWatermarkTimer(this.runtimeContext); - - while (isRunning) { - dataAvailable = reader.advance(); - - if (dataAvailable) { - emitElement(ctx, reader); - } else { - Thread.sleep(50); - } - } - } else { - // a bit more complicated, we are responsible for several localReaders - // loop through them and sleep if none of them had any data - - int numReaders = localReaders.size(); - int currentReader = 0; - - // start each reader and emit data if immediately available - for (UnboundedSource.UnboundedReader<OutputT> reader : localReaders) { - boolean dataAvailable = reader.start(); - if (dataAvailable) { - emitElement(ctx, reader); - } - } - - // a flag telling us whether any of the localReaders had data - // if no reader had data, sleep for bit - boolean hadData = false; - while (isRunning) { - UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(currentReader); - boolean dataAvailable = reader.advance(); - - if (dataAvailable) { - emitElement(ctx, reader); - hadData = true; - } - - currentReader = (currentReader + 1) % numReaders; - if (currentReader == 0 && !hadData) { - Thread.sleep(50); - } else if (currentReader == 0) { - hadData = false; - } - } - - } - } - - /** - * Emit the current element from the given Reader. The reader is guaranteed to have data. - */ - private void emitElement( - SourceContext<WindowedValue<OutputT>> ctx, - UnboundedSource.UnboundedReader<OutputT> reader) { - // make sure that reader state update and element emission are atomic - // with respect to snapshots - synchronized (ctx.getCheckpointLock()) { - - OutputT item = reader.getCurrent(); - Instant timestamp = reader.getCurrentTimestamp(); - - WindowedValue<OutputT> windowedValue = - WindowedValue.of(item, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); - ctx.collectWithTimestamp(windowedValue, timestamp.getMillis()); - } - } - - @Override - public void close() throws Exception { - super.close(); - if (localReaders != null) { - for (UnboundedSource.UnboundedReader<OutputT> reader: localReaders) { - reader.close(); - } - } - } - - @Override - public void cancel() { - isRunning = false; - } - - @Override - public void stop() { - isRunning = false; - } - - // ------------------------------------------------------------------------ - // Checkpoint and restore - // ------------------------------------------------------------------------ - - @Override - public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { - if (!isRunning) { - LOG.debug("snapshotState() called on closed source"); - } else { - - if (checkpointCoder == null) { - // no checkpoint coder available in this source - return; - } - - stateForCheckpoint.clear(); - - long checkpointId = functionSnapshotContext.getCheckpointId(); - - // we checkpoint the sources along with the CheckpointMarkT to ensure - // than we have a correct mapping of checkpoints to sources when - // restoring - List<CheckpointMarkT> checkpointMarks = new ArrayList<>(localSplitSources.size()); - - for (int i = 0; i < localSplitSources.size(); i++) { - UnboundedSource<OutputT, CheckpointMarkT> source = localSplitSources.get(i); - UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(i); - - @SuppressWarnings("unchecked") - CheckpointMarkT mark = (CheckpointMarkT) reader.getCheckpointMark(); - checkpointMarks.add(mark); - KV<UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> kv = - KV.of(source, mark); - stateForCheckpoint.add(kv); - } - - // cleanup old pending checkpoints and add new checkpoint - int diff = pendingCheckpoints.size() - MAX_NUMBER_PENDING_CHECKPOINTS; - if (diff >= 0) { - for (Iterator<Long> iterator = pendingCheckpoints.keySet().iterator(); - diff >= 0; - diff--) { - iterator.next(); - iterator.remove(); - } - } - pendingCheckpoints.put(checkpointId, checkpointMarks); - - } - } - - @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - if (checkpointCoder == null) { - // no checkpoint coder available in this source - return; - } - - OperatorStateStore stateStore = context.getOperatorStateStore(); - CoderTypeInformation< - KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>> - typeInformation = (CoderTypeInformation) new CoderTypeInformation<>(checkpointCoder); - stateForCheckpoint = stateStore.getOperatorState( - new ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, - typeInformation.createSerializer(new ExecutionConfig()))); - - if (context.isRestored()) { - isRestored = true; - LOG.info("Having restore state in the UnbounedSourceWrapper."); - } else { - LOG.info("No restore state for UnbounedSourceWrapper."); - } - } - - @Override - public void onProcessingTime(long timestamp) throws Exception { - if (this.isRunning) { - synchronized (context.getCheckpointLock()) { - // find minimum watermark over all localReaders - long watermarkMillis = Long.MAX_VALUE; - for (UnboundedSource.UnboundedReader<OutputT> reader: localReaders) { - Instant watermark = reader.getWatermark(); - if (watermark != null) { - watermarkMillis = Math.min(watermark.getMillis(), watermarkMillis); - } - } - context.emitWatermark(new Watermark(watermarkMillis)); - } - setNextWatermarkTimer(this.runtimeContext); - } - } - - private void setNextWatermarkTimer(StreamingRuntimeContext runtime) { - if (this.isRunning) { - long watermarkInterval = runtime.getExecutionConfig().getAutoWatermarkInterval(); - long timeToNextWatermark = getTimeToNextWatermark(watermarkInterval); - runtime.getProcessingTimeService().registerTimer(timeToNextWatermark, this); - } - } - - private long getTimeToNextWatermark(long watermarkInterval) { - return System.currentTimeMillis() + watermarkInterval; - } - - /** - * Visible so that we can check this in tests. Must not be used for anything else. - */ - @VisibleForTesting - public List<? extends UnboundedSource<OutputT, CheckpointMarkT>> getSplitSources() { - return splitSources; - } - - /** - * Visible so that we can check this in tests. Must not be used for anything else. - */ - @VisibleForTesting - public List<? extends UnboundedSource<OutputT, CheckpointMarkT>> getLocalSplitSources() { - return localSplitSources; - } - - @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { - - List<CheckpointMarkT> checkpointMarks = pendingCheckpoints.get(checkpointId); - - if (checkpointMarks != null) { - - // remove old checkpoints including the current one - Iterator<Long> iterator = pendingCheckpoints.keySet().iterator(); - long currentId; - do { - currentId = iterator.next(); - iterator.remove(); - } while (currentId != checkpointId); - - // confirm all marks - for (CheckpointMarkT mark : checkpointMarks) { - mark.finalizeCheckpoint(); - } - - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/package-info.java deleted file mode 100644 index b431ce7..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Internal implementation of the Beam runner for Apache Flink. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.io; http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/package-info.java deleted file mode 100644 index 0674871..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Internal implementation of the Beam runner for Apache Flink. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming; http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java deleted file mode 100644 index 3203446..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java +++ /dev/null @@ -1,865 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.state; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.core.StateInternals; -import org.apache.beam.runners.core.StateNamespace; -import org.apache.beam.runners.core.StateTag; -import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.coders.MapCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.CombineWithContext; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.util.CombineContextFactory; -import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.CombiningState; -import org.apache.beam.sdk.util.state.MapState; -import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.SetState; -import org.apache.beam.sdk.util.state.State; -import org.apache.beam.sdk.util.state.StateContext; -import org.apache.beam.sdk.util.state.StateContexts; -import org.apache.beam.sdk.util.state.ValueState; -import org.apache.beam.sdk.util.state.WatermarkHoldState; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.runtime.state.DefaultOperatorStateBackend; -import org.apache.flink.runtime.state.OperatorStateBackend; - -/** - * {@link StateInternals} that uses a Flink {@link DefaultOperatorStateBackend} - * to manage the broadcast state. - * The state is the same on all parallel instances of the operator. - * So we just need store state of operator-0 in OperatorStateBackend. - * - * <p>Note: Ignore index of key. - * Mainly for SideInputs. - */ -public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { - - private int indexInSubtaskGroup; - private final DefaultOperatorStateBackend stateBackend; - // stateName -> <namespace, state> - private Map<String, Map<String, ?>> stateForNonZeroOperator; - - public FlinkBroadcastStateInternals(int indexInSubtaskGroup, OperatorStateBackend stateBackend) { - //TODO flink do not yet expose through public API - this.stateBackend = (DefaultOperatorStateBackend) stateBackend; - this.indexInSubtaskGroup = indexInSubtaskGroup; - if (indexInSubtaskGroup != 0) { - stateForNonZeroOperator = new HashMap<>(); - } - } - - @Override - public K getKey() { - return null; - } - - @Override - public <T extends State> T state( - final StateNamespace namespace, - StateTag<? super K, T> address) { - - return state(namespace, address, StateContexts.nullContext()); - } - - @Override - public <T extends State> T state( - final StateNamespace namespace, - StateTag<? super K, T> address, - final StateContext<?> context) { - - return address.bind(new StateTag.StateBinder<K>() { - - @Override - public <T> ValueState<T> bindValue( - StateTag<? super K, ValueState<T>> address, - Coder<T> coder) { - - return new FlinkBroadcastValueState<>(stateBackend, address, namespace, coder); - } - - @Override - public <T> BagState<T> bindBag( - StateTag<? super K, BagState<T>> address, - Coder<T> elemCoder) { - - return new FlinkBroadcastBagState<>(stateBackend, address, namespace, elemCoder); - } - - @Override - public <T> SetState<T> bindSet( - StateTag<? super K, SetState<T>> address, - Coder<T> elemCoder) { - throw new UnsupportedOperationException( - String.format("%s is not supported", SetState.class.getSimpleName())); - } - - @Override - public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( - StateTag<? super K, MapState<KeyT, ValueT>> spec, - Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) { - throw new UnsupportedOperationException( - String.format("%s is not supported", MapState.class.getSimpleName())); - } - - @Override - public <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> - bindCombiningValue( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { - - return new FlinkCombiningState<>( - stateBackend, address, combineFn, namespace, accumCoder); - } - - @Override - public <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { - return new FlinkKeyedCombiningState<>( - stateBackend, - address, - combineFn, - namespace, - accumCoder, - FlinkBroadcastStateInternals.this); - } - - @Override - public <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - CombineWithContext.KeyedCombineFnWithContext< - ? super K, InputT, AccumT, OutputT> combineFn) { - return new FlinkCombiningStateWithContext<>( - stateBackend, - address, - combineFn, - namespace, - accumCoder, - FlinkBroadcastStateInternals.this, - CombineContextFactory.createFromStateContext(context)); - } - - @Override - public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( - StateTag<? super K, WatermarkHoldState<W>> address, - OutputTimeFn<? super W> outputTimeFn) { - throw new UnsupportedOperationException( - String.format("%s is not supported", WatermarkHoldState.class.getSimpleName())); - } - }); - } - - /** - * 1. The way we would use it is to only checkpoint anything from the operator - * with subtask index 0 because we assume that the state is the same on all - * parallel instances of the operator. - * - * <p>2. Use map to support namespace. - */ - private abstract class AbstractBroadcastState<T> { - - private String name; - private final StateNamespace namespace; - private final ListStateDescriptor<Map<String, T>> flinkStateDescriptor; - private final DefaultOperatorStateBackend flinkStateBackend; - - AbstractBroadcastState( - DefaultOperatorStateBackend flinkStateBackend, - String name, - StateNamespace namespace, - Coder<T> coder) { - this.name = name; - - this.namespace = namespace; - this.flinkStateBackend = flinkStateBackend; - - CoderTypeInformation<Map<String, T>> typeInfo = - new CoderTypeInformation<>(MapCoder.of(StringUtf8Coder.of(), coder)); - - flinkStateDescriptor = new ListStateDescriptor<>(name, - typeInfo.createSerializer(new ExecutionConfig())); - } - - /** - * Get map(namespce->T) from index 0. - */ - Map<String, T> getMap() throws Exception { - if (indexInSubtaskGroup == 0) { - return getMapFromBroadcastState(); - } else { - Map<String, T> result = (Map<String, T>) stateForNonZeroOperator.get(name); - // maybe restore from BroadcastState of Operator-0 - if (result == null) { - result = getMapFromBroadcastState(); - if (result != null) { - stateForNonZeroOperator.put(name, result); - // we don't need it anymore, must clear it. - flinkStateBackend.getBroadcastOperatorState( - flinkStateDescriptor).clear(); - } - } - return result; - } - } - - Map<String, T> getMapFromBroadcastState() throws Exception { - ListState<Map<String, T>> state = flinkStateBackend.getBroadcastOperatorState( - flinkStateDescriptor); - Iterable<Map<String, T>> iterable = state.get(); - Map<String, T> ret = null; - if (iterable != null) { - // just use index 0 - Iterator<Map<String, T>> iterator = iterable.iterator(); - if (iterator.hasNext()) { - ret = iterator.next(); - } - } - return ret; - } - - /** - * Update map(namespce->T) from index 0. - */ - void updateMap(Map<String, T> map) throws Exception { - if (indexInSubtaskGroup == 0) { - ListState<Map<String, T>> state = flinkStateBackend.getBroadcastOperatorState( - flinkStateDescriptor); - state.clear(); - if (map.size() > 0) { - state.add(map); - } - } else { - if (map.size() == 0) { - stateForNonZeroOperator.remove(name); - // updateMap is always behind getMap, - // getMap will clear map in BroadcastOperatorState, - // we don't need clear here. - } else { - stateForNonZeroOperator.put(name, map); - } - } - } - - void writeInternal(T input) { - try { - Map<String, T> map = getMap(); - if (map == null) { - map = new HashMap<>(); - } - map.put(namespace.stringKey(), input); - updateMap(map); - } catch (Exception e) { - throw new RuntimeException("Error updating state.", e); - } - } - - T readInternal() { - try { - Map<String, T> map = getMap(); - if (map == null) { - return null; - } else { - return map.get(namespace.stringKey()); - } - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - } - - void clearInternal() { - try { - Map<String, T> map = getMap(); - if (map != null) { - map.remove(namespace.stringKey()); - updateMap(map); - } - } catch (Exception e) { - throw new RuntimeException("Error clearing state.", e); - } - } - - } - - private class FlinkBroadcastValueState<K, T> - extends AbstractBroadcastState<T> implements ValueState<T> { - - private final StateNamespace namespace; - private final StateTag<? super K, ValueState<T>> address; - - FlinkBroadcastValueState( - DefaultOperatorStateBackend flinkStateBackend, - StateTag<? super K, ValueState<T>> address, - StateNamespace namespace, - Coder<T> coder) { - super(flinkStateBackend, address.getId(), namespace, coder); - - this.namespace = namespace; - this.address = address; - - } - - @Override - public void write(T input) { - writeInternal(input); - } - - @Override - public ValueState<T> readLater() { - return this; - } - - @Override - public T read() { - return readInternal(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - FlinkBroadcastValueState<?, ?> that = (FlinkBroadcastValueState<?, ?>) o; - - return namespace.equals(that.namespace) && address.equals(that.address); - - } - - @Override - public int hashCode() { - int result = namespace.hashCode(); - result = 31 * result + address.hashCode(); - return result; - } - - @Override - public void clear() { - clearInternal(); - } - } - - private class FlinkBroadcastBagState<K, T> extends AbstractBroadcastState<List<T>> - implements BagState<T> { - - private final StateNamespace namespace; - private final StateTag<? super K, BagState<T>> address; - - FlinkBroadcastBagState( - DefaultOperatorStateBackend flinkStateBackend, - StateTag<? super K, BagState<T>> address, - StateNamespace namespace, - Coder<T> coder) { - super(flinkStateBackend, address.getId(), namespace, ListCoder.of(coder)); - - this.namespace = namespace; - this.address = address; - } - - @Override - public void add(T input) { - List<T> list = readInternal(); - if (list == null) { - list = new ArrayList<>(); - } - list.add(input); - writeInternal(list); - } - - @Override - public BagState<T> readLater() { - return this; - } - - @Override - public Iterable<T> read() { - List<T> result = readInternal(); - return result != null ? result : Collections.<T>emptyList(); - } - - @Override - public ReadableState<Boolean> isEmpty() { - return new ReadableState<Boolean>() { - @Override - public Boolean read() { - try { - List<T> result = readInternal(); - return result == null; - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - - } - - @Override - public ReadableState<Boolean> readLater() { - return this; - } - }; - } - - @Override - public void clear() { - clearInternal(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - FlinkBroadcastBagState<?, ?> that = (FlinkBroadcastBagState<?, ?>) o; - - return namespace.equals(that.namespace) && address.equals(that.address); - - } - - @Override - public int hashCode() { - int result = namespace.hashCode(); - result = 31 * result + address.hashCode(); - return result; - } - } - - private class FlinkCombiningState<K, InputT, AccumT, OutputT> - extends AbstractBroadcastState<AccumT> - implements CombiningState<InputT, AccumT, OutputT> { - - private final StateNamespace namespace; - private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address; - private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn; - - FlinkCombiningState( - DefaultOperatorStateBackend flinkStateBackend, - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, - Combine.CombineFn<InputT, AccumT, OutputT> combineFn, - StateNamespace namespace, - Coder<AccumT> accumCoder) { - super(flinkStateBackend, address.getId(), namespace, accumCoder); - - this.namespace = namespace; - this.address = address; - this.combineFn = combineFn; - } - - @Override - public CombiningState<InputT, AccumT, OutputT> readLater() { - return this; - } - - @Override - public void add(InputT value) { - AccumT current = readInternal(); - if (current == null) { - current = combineFn.createAccumulator(); - } - current = combineFn.addInput(current, value); - writeInternal(current); - } - - @Override - public void addAccum(AccumT accum) { - AccumT current = readInternal(); - - if (current == null) { - writeInternal(accum); - } else { - current = combineFn.mergeAccumulators(Arrays.asList(current, accum)); - writeInternal(current); - } - } - - @Override - public AccumT getAccum() { - return readInternal(); - } - - @Override - public AccumT mergeAccumulators(Iterable<AccumT> accumulators) { - return combineFn.mergeAccumulators(accumulators); - } - - @Override - public OutputT read() { - AccumT accum = readInternal(); - if (accum != null) { - return combineFn.extractOutput(accum); - } else { - return combineFn.extractOutput(combineFn.createAccumulator()); - } - } - - @Override - public ReadableState<Boolean> isEmpty() { - return new ReadableState<Boolean>() { - @Override - public Boolean read() { - try { - return readInternal() == null; - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - - } - - @Override - public ReadableState<Boolean> readLater() { - return this; - } - }; - } - - @Override - public void clear() { - clearInternal(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - FlinkCombiningState<?, ?, ?, ?> that = - (FlinkCombiningState<?, ?, ?, ?>) o; - - return namespace.equals(that.namespace) && address.equals(that.address); - - } - - @Override - public int hashCode() { - int result = namespace.hashCode(); - result = 31 * result + address.hashCode(); - return result; - } - } - - private class FlinkKeyedCombiningState<K, InputT, AccumT, OutputT> - extends AbstractBroadcastState<AccumT> - implements CombiningState<InputT, AccumT, OutputT> { - - private final StateNamespace namespace; - private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address; - private final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn; - private final FlinkBroadcastStateInternals<K> flinkStateInternals; - - FlinkKeyedCombiningState( - DefaultOperatorStateBackend flinkStateBackend, - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, - Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn, - StateNamespace namespace, - Coder<AccumT> accumCoder, - FlinkBroadcastStateInternals<K> flinkStateInternals) { - super(flinkStateBackend, address.getId(), namespace, accumCoder); - - this.namespace = namespace; - this.address = address; - this.combineFn = combineFn; - this.flinkStateInternals = flinkStateInternals; - - } - - @Override - public CombiningState<InputT, AccumT, OutputT> readLater() { - return this; - } - - @Override - public void add(InputT value) { - try { - AccumT current = readInternal(); - if (current == null) { - current = combineFn.createAccumulator(flinkStateInternals.getKey()); - } - current = combineFn.addInput(flinkStateInternals.getKey(), current, value); - writeInternal(current); - } catch (Exception e) { - throw new RuntimeException("Error adding to state." , e); - } - } - - @Override - public void addAccum(AccumT accum) { - try { - AccumT current = readInternal(); - if (current == null) { - writeInternal(accum); - } else { - current = combineFn.mergeAccumulators( - flinkStateInternals.getKey(), - Arrays.asList(current, accum)); - writeInternal(current); - } - } catch (Exception e) { - throw new RuntimeException("Error adding to state.", e); - } - } - - @Override - public AccumT getAccum() { - try { - return readInternal(); - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - } - - @Override - public AccumT mergeAccumulators(Iterable<AccumT> accumulators) { - return combineFn.mergeAccumulators(flinkStateInternals.getKey(), accumulators); - } - - @Override - public OutputT read() { - try { - AccumT accum = readInternal(); - if (accum != null) { - return combineFn.extractOutput(flinkStateInternals.getKey(), accum); - } else { - return combineFn.extractOutput( - flinkStateInternals.getKey(), - combineFn.createAccumulator(flinkStateInternals.getKey())); - } - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - } - - @Override - public ReadableState<Boolean> isEmpty() { - return new ReadableState<Boolean>() { - @Override - public Boolean read() { - try { - return readInternal() == null; - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - - } - - @Override - public ReadableState<Boolean> readLater() { - return this; - } - }; - } - - @Override - public void clear() { - clearInternal(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - FlinkKeyedCombiningState<?, ?, ?, ?> that = - (FlinkKeyedCombiningState<?, ?, ?, ?>) o; - - return namespace.equals(that.namespace) && address.equals(that.address); - - } - - @Override - public int hashCode() { - int result = namespace.hashCode(); - result = 31 * result + address.hashCode(); - return result; - } - } - - private class FlinkCombiningStateWithContext<K, InputT, AccumT, OutputT> - extends AbstractBroadcastState<AccumT> - implements CombiningState<InputT, AccumT, OutputT> { - - private final StateNamespace namespace; - private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address; - private final CombineWithContext.KeyedCombineFnWithContext< - ? super K, InputT, AccumT, OutputT> combineFn; - private final FlinkBroadcastStateInternals<K> flinkStateInternals; - private final CombineWithContext.Context context; - - FlinkCombiningStateWithContext( - DefaultOperatorStateBackend flinkStateBackend, - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, - CombineWithContext.KeyedCombineFnWithContext< - ? super K, InputT, AccumT, OutputT> combineFn, - StateNamespace namespace, - Coder<AccumT> accumCoder, - FlinkBroadcastStateInternals<K> flinkStateInternals, - CombineWithContext.Context context) { - super(flinkStateBackend, address.getId(), namespace, accumCoder); - - this.namespace = namespace; - this.address = address; - this.combineFn = combineFn; - this.flinkStateInternals = flinkStateInternals; - this.context = context; - - } - - @Override - public CombiningState<InputT, AccumT, OutputT> readLater() { - return this; - } - - @Override - public void add(InputT value) { - try { - AccumT current = readInternal(); - if (current == null) { - current = combineFn.createAccumulator(flinkStateInternals.getKey(), context); - } - current = combineFn.addInput(flinkStateInternals.getKey(), current, value, context); - writeInternal(current); - } catch (Exception e) { - throw new RuntimeException("Error adding to state." , e); - } - } - - @Override - public void addAccum(AccumT accum) { - try { - - AccumT current = readInternal(); - if (current == null) { - writeInternal(accum); - } else { - current = combineFn.mergeAccumulators( - flinkStateInternals.getKey(), - Arrays.asList(current, accum), - context); - writeInternal(current); - } - } catch (Exception e) { - throw new RuntimeException("Error adding to state.", e); - } - } - - @Override - public AccumT getAccum() { - try { - return readInternal(); - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - } - - @Override - public AccumT mergeAccumulators(Iterable<AccumT> accumulators) { - return combineFn.mergeAccumulators(flinkStateInternals.getKey(), accumulators, context); - } - - @Override - public OutputT read() { - try { - AccumT accum = readInternal(); - return combineFn.extractOutput(flinkStateInternals.getKey(), accum, context); - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - } - - @Override - public ReadableState<Boolean> isEmpty() { - return new ReadableState<Boolean>() { - @Override - public Boolean read() { - try { - return readInternal() == null; - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - - } - - @Override - public ReadableState<Boolean> readLater() { - return this; - } - }; - } - - @Override - public void clear() { - clearInternal(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - FlinkCombiningStateWithContext<?, ?, ?, ?> that = - (FlinkCombiningStateWithContext<?, ?, ?, ?>) o; - - return namespace.equals(that.namespace) && address.equals(that.address); - - } - - @Override - public int hashCode() { - int result = namespace.hashCode(); - result = 31 * result + address.hashCode(); - return result; - } - } - -}