http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java new file mode 100644 index 0000000..e24bf31 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.types; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +/** + * Flink {@link TypeInformation} for Beam values that have been encoded to byte data + * by a {@link Coder}. + */ +public class EncodedValueTypeInformation + extends TypeInformation<byte[]> + implements AtomicType<byte[]> { + + private static final long serialVersionUID = 1L; + + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 0; + } + + @Override + public int getTotalFields() { + return 0; + } + + @Override + public Class<byte[]> getTypeClass() { + return byte[].class; + } + + @Override + public boolean isKeyType() { + return true; + } + + @Override + public TypeSerializer<byte[]> createSerializer(ExecutionConfig executionConfig) { + return new EncodedValueSerializer(); + } + + @Override + public boolean equals(Object other) { + return other instanceof EncodedValueTypeInformation; + } + + @Override + public int hashCode() { + return this.getClass().hashCode(); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof EncodedValueTypeInformation; + } + + @Override + public String toString() { + return "EncodedValueTypeInformation"; + } + + @Override + public TypeComparator<byte[]> createComparator( + boolean sortOrderAscending, + ExecutionConfig executionConfig) { + return new EncodedValueComparator(sortOrderAscending); + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java new file mode 100644 index 0000000..36b5ba3 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.types; + +import java.io.ByteArrayOutputStream; + +/** + * Version of {@link java.io.ByteArrayOutputStream} that allows to retrieve the internal + * byte[] buffer without incurring an array copy. + */ +public class InspectableByteArrayOutputStream extends ByteArrayOutputStream { + + /** + * Get the underlying byte array. + */ + public byte[] getBuffer() { + return buf; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java new file mode 100644 index 0000000..9df6836 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.types; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; + +/** + * {@link KeySelector} that extracts the key from a {@link KV} and returns + * it in encoded form as a {@code byte} array. + */ +public class KvKeySelector<InputT, K> + implements KeySelector<WindowedValue<KV<K, InputT>>, byte[]>, ResultTypeQueryable<byte[]> { + + private final Coder<K> keyCoder; + + public KvKeySelector(Coder<K> keyCoder) { + this.keyCoder = keyCoder; + } + + @Override + public byte[] getKey(WindowedValue<KV<K, InputT>> value) throws Exception { + return CoderUtils.encodeToByteArray(keyCoder, value.getValue().getKey()); + } + + @Override + public TypeInformation<byte[]> getProducedType() { + return new EncodedValueTypeInformation(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java new file mode 100644 index 0000000..6fb3182 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal implementation of the Beam runner for Apache Flink. + */ +package org.apache.beam.runners.flink.translation.types; http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java new file mode 100644 index 0000000..2256bb1 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.flink.translation.utils; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.IOChannelUtils; + +/** + * Encapsulates the PipelineOptions in serialized form to ship them to the cluster. + */ +public class SerializedPipelineOptions implements Serializable { + + private final byte[] serializedOptions; + + /** Lazily initialized copy of deserialized options. */ + private transient PipelineOptions pipelineOptions; + + public SerializedPipelineOptions(PipelineOptions options) { + checkNotNull(options, "PipelineOptions must not be null."); + + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + new ObjectMapper().writeValue(baos, options); + this.serializedOptions = baos.toByteArray(); + } catch (Exception e) { + throw new RuntimeException("Couldn't serialize PipelineOptions.", e); + } + + } + + public PipelineOptions getPipelineOptions() { + if (pipelineOptions == null) { + try { + pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class); + + IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions); + FileSystems.setDefaultConfigInWorkers(pipelineOptions); + } catch (IOException e) { + throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e); + } + } + + return pipelineOptions; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java new file mode 100644 index 0000000..5dedd53 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal implementation of the Beam runner for Apache Flink. + */ +package org.apache.beam.runners.flink.translation.utils; http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java new file mode 100644 index 0000000..82a2c4e --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import org.apache.flink.core.memory.DataInputView; + +/** + * Wrapper for {@link DataInputView}. We need this because Flink reads data using a + * {@link org.apache.flink.core.memory.DataInputView} while + * Dataflow {@link org.apache.beam.sdk.coders.Coder}s expect an + * {@link java.io.InputStream}. + */ +public class DataInputViewWrapper extends InputStream { + + private DataInputView inputView; + + public DataInputViewWrapper(DataInputView inputView) { + this.inputView = inputView; + } + + public void setInputView(DataInputView inputView) { + this.inputView = inputView; + } + + @Override + public int read() throws IOException { + try { + return inputView.readUnsignedByte(); + } catch (EOFException e) { + // translate between DataInput and InputStream, + // DataInput signals EOF by exception, InputStream does it by returning -1 + return -1; + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return inputView.read(b, off, len); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java new file mode 100644 index 0000000..f2d9db2 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers; + +import java.io.IOException; +import java.io.OutputStream; +import org.apache.flink.core.memory.DataOutputView; + +/** + * Wrapper for {@link org.apache.flink.core.memory.DataOutputView}. We need this because + * Flink writes data using a {@link org.apache.flink.core.memory.DataInputView} while + * Dataflow {@link org.apache.beam.sdk.coders.Coder}s expect an + * {@link java.io.OutputStream}. + */ +public class DataOutputViewWrapper extends OutputStream { + + private DataOutputView outputView; + + public DataOutputViewWrapper(DataOutputView outputView) { + this.outputView = outputView; + } + + public void setOutputView(DataOutputView outputView) { + this.outputView = outputView; + } + + @Override + public void write(int b) throws IOException { + outputView.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + outputView.write(b, off, len); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java new file mode 100644 index 0000000..70d97e3 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import java.io.Serializable; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.flink.api.common.accumulators.Accumulator; + +/** + * Wrapper that wraps a {@link org.apache.beam.sdk.transforms.Combine.CombineFn} + * in a Flink {@link org.apache.flink.api.common.accumulators.Accumulator} for using + * the function as an aggregator in a {@link org.apache.beam.sdk.transforms.ParDo} + * operation. + */ +public class SerializableFnAggregatorWrapper<InputT, OutputT> + implements Aggregator<InputT, OutputT>, Accumulator<InputT, Serializable> { + + private OutputT aa; + private Combine.CombineFn<InputT, ?, OutputT> combiner; + + public SerializableFnAggregatorWrapper(Combine.CombineFn<InputT, ?, OutputT> combiner) { + this.combiner = combiner; + resetLocal(); + } + + @Override + @SuppressWarnings("unchecked") + public void add(InputT value) { + this.aa = combiner.apply(ImmutableList.of((InputT) aa, value)); + } + + @Override + public Serializable getLocalValue() { + return (Serializable) aa; + } + + @Override + public void resetLocal() { + this.aa = combiner.apply(ImmutableList.<InputT>of()); + } + + @Override + @SuppressWarnings("unchecked") + public void merge(Accumulator<InputT, Serializable> other) { + this.aa = combiner.apply(ImmutableList.of((InputT) aa, (InputT) other.getLocalValue())); + } + + @Override + public void addValue(InputT value) { + add(value); + } + + @Override + public String getName() { + return "Aggregator :" + combiner.toString(); + } + + @Override + public Combine.CombineFn<InputT, ?, OutputT> getCombineFn() { + return combiner; + } + + @Override + public Accumulator<InputT, Serializable> clone() { + try { + super.clone(); + } catch (CloneNotSupportedException e) { + // Flink Accumulators cannot throw CloneNotSupportedException, work around that. + throw new RuntimeException(e); + } + + // copy it by merging + OutputT resultCopy = combiner.apply(Lists.newArrayList((InputT) aa)); + SerializableFnAggregatorWrapper<InputT, OutputT> result = new + SerializableFnAggregatorWrapper<>(combiner); + + result.aa = resultCopy; + return result; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java new file mode 100644 index 0000000..a87472b --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers; + +import java.io.IOException; +import java.util.List; +import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplitAssigner; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Wrapper for executing a {@link Source} as a Flink {@link InputFormat}. + */ +public class SourceInputFormat<T> + implements InputFormat<WindowedValue<T>, SourceInputSplit<T>> { + private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class); + + private final BoundedSource<T> initialSource; + + private transient PipelineOptions options; + private final SerializedPipelineOptions serializedOptions; + + private transient BoundedSource.BoundedReader<T> reader; + private boolean inputAvailable = false; + + public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options) { + this.initialSource = initialSource; + this.serializedOptions = new SerializedPipelineOptions(options); + } + + @Override + public void configure(Configuration configuration) { + options = serializedOptions.getPipelineOptions(); + } + + @Override + public void open(SourceInputSplit<T> sourceInputSplit) throws IOException { + reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options); + inputAvailable = reader.start(); + } + + @Override + public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException { + try { + final long estimatedSize = initialSource.getEstimatedSizeBytes(options); + + return new BaseStatistics() { + @Override + public long getTotalInputSize() { + return estimatedSize; + } + + @Override + public long getNumberOfRecords() { + return BaseStatistics.NUM_RECORDS_UNKNOWN; + } + + @Override + public float getAverageRecordWidth() { + return BaseStatistics.AVG_RECORD_BYTES_UNKNOWN; + } + }; + } catch (Exception e) { + LOG.warn("Could not read Source statistics: {}", e); + } + + return null; + } + + @Override + @SuppressWarnings("unchecked") + public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException { + try { + long desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits; + List<? extends Source<T>> shards = + initialSource.split(desiredSizeBytes, options); + int numShards = shards.size(); + SourceInputSplit<T>[] sourceInputSplits = new SourceInputSplit[numShards]; + for (int i = 0; i < numShards; i++) { + sourceInputSplits[i] = new SourceInputSplit<>(shards.get(i), i); + } + return sourceInputSplits; + } catch (Exception e) { + throw new IOException("Could not create input splits from Source.", e); + } + } + + @Override + public InputSplitAssigner getInputSplitAssigner(final SourceInputSplit[] sourceInputSplits) { + return new DefaultInputSplitAssigner(sourceInputSplits); + } + + + @Override + public boolean reachedEnd() throws IOException { + return !inputAvailable; + } + + @Override + public WindowedValue<T> nextRecord(WindowedValue<T> t) throws IOException { + if (inputAvailable) { + final T current = reader.getCurrent(); + final Instant timestamp = reader.getCurrentTimestamp(); + // advance reader to have a record ready next time + inputAvailable = reader.advance(); + return WindowedValue.of( + current, + timestamp, + GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); + } + + return null; + } + + @Override + public void close() throws IOException { + // TODO null check can be removed once FLINK-3796 is fixed + if (reader != null) { + reader.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java new file mode 100644 index 0000000..e4a7386 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers; + +import org.apache.beam.sdk.io.Source; +import org.apache.flink.core.io.InputSplit; + +/** + * {@link org.apache.flink.core.io.InputSplit} for + * {@link org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat}. We pass + * the sharded Source around in the input split because Sources simply split up into several + * Sources for sharding. This is different to how Flink creates a separate InputSplit from + * an InputFormat. + */ +public class SourceInputSplit<T> implements InputSplit { + + private Source<T> source; + private int splitNumber; + + public SourceInputSplit() { + } + + public SourceInputSplit(Source<T> source, int splitNumber) { + this.source = source; + this.splitNumber = splitNumber; + } + + @Override + public int getSplitNumber() { + return splitNumber; + } + + public Source<T> getSource() { + return source; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java new file mode 100644 index 0000000..72f7deb --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal implementation of the Beam runner for Apache Flink. + */ +package org.apache.beam.runners.flink.translation.wrappers; http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java new file mode 100644 index 0000000..8a09286 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -0,0 +1,774 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming; + +import static org.apache.flink.util.Preconditions.checkArgument; + +import com.google.common.base.Optional; +import com.google.common.collect.Iterables; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.AggregatorFactory; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.ExecutionContext; +import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn; +import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; +import org.apache.beam.runners.core.SideInputHandler; +import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.StateNamespaces.WindowNamespace; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTags; +import org.apache.beam.runners.core.StatefulDoFnRunner; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; +import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; +import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkSplitStateInternals; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupCheckpointedOperator; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.NullSideInputReader; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.HeapInternalTimerService; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.joda.time.Instant; + +/** + * Flink operator for executing {@link DoFn DoFns}. + * + * @param <InputT> the input type of the {@link DoFn} + * @param <FnOutputT> the output type of the {@link DoFn} + * @param <OutputT> the output type of the operator, this can be different from the fn output + * type when we have additional tagged outputs + */ +public class DoFnOperator<InputT, FnOutputT, OutputT> + extends AbstractStreamOperator<OutputT> + implements OneInputStreamOperator<WindowedValue<InputT>, OutputT>, + TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, OutputT>, + KeyGroupCheckpointedOperator, Triggerable<Object, TimerData> { + + protected DoFn<InputT, FnOutputT> doFn; + + protected final SerializedPipelineOptions serializedOptions; + + protected final TupleTag<FnOutputT> mainOutputTag; + protected final List<TupleTag<?>> additionalOutputTags; + + protected final Collection<PCollectionView<?>> sideInputs; + protected final Map<Integer, PCollectionView<?>> sideInputTagMapping; + + protected final WindowingStrategy<?, ?> windowingStrategy; + + protected final OutputManagerFactory<OutputT> outputManagerFactory; + + protected transient DoFnRunner<InputT, FnOutputT> doFnRunner; + protected transient PushbackSideInputDoFnRunner<InputT, FnOutputT> pushbackDoFnRunner; + + protected transient SideInputHandler sideInputHandler; + + protected transient SideInputReader sideInputReader; + + protected transient DoFnRunners.OutputManager outputManager; + + private transient DoFnInvoker<InputT, FnOutputT> doFnInvoker; + + protected transient long currentInputWatermark; + + protected transient long currentOutputWatermark; + + private transient StateTag<Object, BagState<WindowedValue<InputT>>> pushedBackTag; + + protected transient FlinkStateInternals<?> stateInternals; + + private Coder<WindowedValue<InputT>> inputCoder; + + private final Coder<?> keyCoder; + + private final TimerInternals.TimerDataCoder timerCoder; + + protected transient HeapInternalTimerService<?, TimerInternals.TimerData> timerService; + + protected transient FlinkTimerInternals timerInternals; + + private transient StateInternals<?> pushbackStateInternals; + + private transient Optional<Long> pushedBackWatermark; + + public DoFnOperator( + DoFn<InputT, FnOutputT> doFn, + Coder<WindowedValue<InputT>> inputCoder, + TupleTag<FnOutputT> mainOutputTag, + List<TupleTag<?>> additionalOutputTags, + OutputManagerFactory<OutputT> outputManagerFactory, + WindowingStrategy<?, ?> windowingStrategy, + Map<Integer, PCollectionView<?>> sideInputTagMapping, + Collection<PCollectionView<?>> sideInputs, + PipelineOptions options, + Coder<?> keyCoder) { + this.doFn = doFn; + this.inputCoder = inputCoder; + this.mainOutputTag = mainOutputTag; + this.additionalOutputTags = additionalOutputTags; + this.sideInputTagMapping = sideInputTagMapping; + this.sideInputs = sideInputs; + this.serializedOptions = new SerializedPipelineOptions(options); + this.windowingStrategy = windowingStrategy; + this.outputManagerFactory = outputManagerFactory; + + setChainingStrategy(ChainingStrategy.ALWAYS); + + this.keyCoder = keyCoder; + + this.timerCoder = + TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder()); + } + + private ExecutionContext.StepContext createStepContext() { + return new StepContext(); + } + + // allow overriding this in WindowDoFnOperator because this one dynamically creates + // the DoFn + protected DoFn<InputT, FnOutputT> getDoFn() { + return doFn; + } + + @Override + public void open() throws Exception { + super.open(); + + currentInputWatermark = Long.MIN_VALUE; + currentOutputWatermark = Long.MIN_VALUE; + + AggregatorFactory aggregatorFactory = new AggregatorFactory() { + @Override + public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( + Class<?> fnClass, + ExecutionContext.StepContext stepContext, + String aggregatorName, + Combine.CombineFn<InputT, AccumT, OutputT> combine) { + + @SuppressWarnings("unchecked") + SerializableFnAggregatorWrapper<InputT, OutputT> result = + (SerializableFnAggregatorWrapper<InputT, OutputT>) + getRuntimeContext().getAccumulator(aggregatorName); + + if (result == null) { + result = new SerializableFnAggregatorWrapper<>(combine); + getRuntimeContext().addAccumulator(aggregatorName, result); + } + return result; + } + }; + + sideInputReader = NullSideInputReader.of(sideInputs); + + if (!sideInputs.isEmpty()) { + + pushedBackTag = StateTags.bag("pushed-back-values", inputCoder); + + FlinkBroadcastStateInternals sideInputStateInternals = + new FlinkBroadcastStateInternals<>( + getContainingTask().getIndexInSubtaskGroup(), getOperatorStateBackend()); + + sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals); + sideInputReader = sideInputHandler; + + // maybe init by initializeState + if (pushbackStateInternals == null) { + if (keyCoder != null) { + pushbackStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder, + getKeyedStateBackend()); + } else { + pushbackStateInternals = + new FlinkSplitStateInternals<Object>(getOperatorStateBackend()); + } + } + + pushedBackWatermark = Optional.absent(); + + } + + outputManager = outputManagerFactory.create(output); + + // StatefulPardo or WindowDoFn + if (keyCoder != null) { + stateInternals = new FlinkStateInternals<>((KeyedStateBackend) getKeyedStateBackend(), + keyCoder); + + timerService = (HeapInternalTimerService<?, TimerInternals.TimerData>) + getInternalTimerService("beam-timer", new CoderTypeSerializer<>(timerCoder), this); + + timerInternals = new FlinkTimerInternals(); + + } + + // WindowDoFnOperator need use state and timer to get DoFn. + // So must wait StateInternals and TimerInternals ready. + this.doFn = getDoFn(); + doFnInvoker = DoFnInvokers.invokerFor(doFn); + + doFnInvoker.invokeSetup(); + + ExecutionContext.StepContext stepContext = createStepContext(); + + doFnRunner = DoFnRunners.simpleRunner( + serializedOptions.getPipelineOptions(), + doFn, + sideInputReader, + outputManager, + mainOutputTag, + additionalOutputTags, + stepContext, + aggregatorFactory, + windowingStrategy); + + if (doFn instanceof GroupAlsoByWindowViaWindowSetNewDoFn) { + // When the doFn is this, we know it came from WindowDoFnOperator and + // InputT = KeyedWorkItem<K, V> + // OutputT = KV<K, V> + // + // for some K, V + + + doFnRunner = DoFnRunners.lateDataDroppingRunner( + (DoFnRunner) doFnRunner, + stepContext, + windowingStrategy, + ((GroupAlsoByWindowViaWindowSetNewDoFn) doFn).getDroppedDueToLatenessAggregator()); + } else if (keyCoder != null) { + // It is a stateful DoFn + + StatefulDoFnRunner.CleanupTimer cleanupTimer = + new StatefulDoFnRunner.TimeInternalsCleanupTimer( + stepContext.timerInternals(), windowingStrategy); + + // we don't know the window type + @SuppressWarnings({"unchecked", "rawtypes"}) + Coder windowCoder = windowingStrategy.getWindowFn().windowCoder(); + + @SuppressWarnings({"unchecked", "rawtypes"}) + StatefulDoFnRunner.StateCleaner<?> stateCleaner = + new StatefulDoFnRunner.StateInternalsStateCleaner<>( + doFn, stepContext.stateInternals(), windowCoder); + + doFnRunner = DoFnRunners.defaultStatefulDoFnRunner( + doFn, + doFnRunner, + stepContext, + aggregatorFactory, + windowingStrategy, + cleanupTimer, + stateCleaner); + } + + pushbackDoFnRunner = + SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler); + } + + @Override + public void close() throws Exception { + super.close(); + doFnInvoker.invokeTeardown(); + } + + protected final long getPushbackWatermarkHold() { + // if we don't have side inputs we never hold the watermark + if (sideInputs.isEmpty()) { + return BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); + } + + try { + checkInitPushedBackWatermark(); + return pushedBackWatermark.get(); + } catch (Exception e) { + throw new RuntimeException("Error retrieving pushed back watermark state.", e); + } + } + + private void checkInitPushedBackWatermark() { + // init and restore from pushedBack state. + // Not done in initializeState, because OperatorState is not ready. + if (!pushedBackWatermark.isPresent()) { + + BagState<WindowedValue<InputT>> pushedBack = + pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); + + long min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); + for (WindowedValue<InputT> value : pushedBack.read()) { + min = Math.min(min, value.getTimestamp().getMillis()); + } + setPushedBackWatermark(min); + } + } + + @Override + public final void processElement( + StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception { + doFnRunner.startBundle(); + doFnRunner.processElement(streamRecord.getValue()); + doFnRunner.finishBundle(); + } + + private void setPushedBackWatermark(long watermark) { + pushedBackWatermark = Optional.fromNullable(watermark); + } + + @Override + public final void processElement1( + StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception { + pushbackDoFnRunner.startBundle(); + Iterable<WindowedValue<InputT>> justPushedBack = + pushbackDoFnRunner.processElementInReadyWindows(streamRecord.getValue()); + + BagState<WindowedValue<InputT>> pushedBack = + pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); + + checkInitPushedBackWatermark(); + + long min = pushedBackWatermark.get(); + for (WindowedValue<InputT> pushedBackValue : justPushedBack) { + min = Math.min(min, pushedBackValue.getTimestamp().getMillis()); + pushedBack.add(pushedBackValue); + } + setPushedBackWatermark(min); + pushbackDoFnRunner.finishBundle(); + } + + @Override + public final void processElement2( + StreamRecord<RawUnionValue> streamRecord) throws Exception { + pushbackDoFnRunner.startBundle(); + + @SuppressWarnings("unchecked") + WindowedValue<Iterable<?>> value = + (WindowedValue<Iterable<?>>) streamRecord.getValue().getValue(); + + PCollectionView<?> sideInput = sideInputTagMapping.get(streamRecord.getValue().getUnionTag()); + sideInputHandler.addSideInputValue(sideInput, value); + + BagState<WindowedValue<InputT>> pushedBack = + pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); + + List<WindowedValue<InputT>> newPushedBack = new ArrayList<>(); + + Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read(); + if (pushedBackContents != null) { + for (WindowedValue<InputT> elem : pushedBackContents) { + + // we need to set the correct key in case the operator is + // a (keyed) window operator + setKeyContextElement1(new StreamRecord<>(elem)); + + Iterable<WindowedValue<InputT>> justPushedBack = + pushbackDoFnRunner.processElementInReadyWindows(elem); + Iterables.addAll(newPushedBack, justPushedBack); + } + } + + pushedBack.clear(); + long min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); + for (WindowedValue<InputT> pushedBackValue : newPushedBack) { + min = Math.min(min, pushedBackValue.getTimestamp().getMillis()); + pushedBack.add(pushedBackValue); + } + setPushedBackWatermark(min); + + pushbackDoFnRunner.finishBundle(); + + // maybe output a new watermark + processWatermark1(new Watermark(currentInputWatermark)); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + processWatermark1(mark); + } + + @Override + public void processWatermark1(Watermark mark) throws Exception { + if (keyCoder == null) { + this.currentInputWatermark = mark.getTimestamp(); + long potentialOutputWatermark = + Math.min(getPushbackWatermarkHold(), currentInputWatermark); + if (potentialOutputWatermark > currentOutputWatermark) { + currentOutputWatermark = potentialOutputWatermark; + output.emitWatermark(new Watermark(currentOutputWatermark)); + } + } else { + // fireTimers, so we need startBundle. + pushbackDoFnRunner.startBundle(); + + this.currentInputWatermark = mark.getTimestamp(); + + // hold back by the pushed back values waiting for side inputs + long actualInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp()); + + timerService.advanceWatermark(actualInputWatermark); + + Instant watermarkHold = stateInternals.watermarkHold(); + + long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold()); + + long potentialOutputWatermark = Math.min(currentInputWatermark, combinedWatermarkHold); + + if (potentialOutputWatermark > currentOutputWatermark) { + currentOutputWatermark = potentialOutputWatermark; + output.emitWatermark(new Watermark(currentOutputWatermark)); + } + pushbackDoFnRunner.finishBundle(); + } + } + + @Override + public void processWatermark2(Watermark mark) throws Exception { + // ignore watermarks from the side-input input + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + // copy from AbstractStreamOperator + if (getKeyedStateBackend() != null) { + KeyedStateCheckpointOutputStream out; + + try { + out = context.getRawKeyedOperatorStateOutput(); + } catch (Exception exception) { + throw new Exception("Could not open raw keyed operator state stream for " + + getOperatorName() + '.', exception); + } + + try { + KeyGroupsList allKeyGroups = out.getKeyGroupList(); + for (int keyGroupIdx : allKeyGroups) { + out.startNewKeyGroup(keyGroupIdx); + + DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out); + + // if (this instanceof KeyGroupCheckpointedOperator) + snapshotKeyGroupState(keyGroupIdx, dov); + + // We can't get all timerServices, so we just snapshot our timerService + // Maybe this is a normal DoFn that has no timerService + if (keyCoder != null) { + timerService.snapshotTimersForKeyGroup(dov, keyGroupIdx); + } + + } + } catch (Exception exception) { + throw new Exception("Could not write timer service of " + getOperatorName() + + " to checkpoint state stream.", exception); + } finally { + try { + out.close(); + } catch (Exception closeException) { + LOG.warn("Could not close raw keyed operator state stream for {}. This " + + "might have prevented deleting some state data.", getOperatorName(), + closeException); + } + } + } + } + + @Override + public void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) throws Exception { + if (!sideInputs.isEmpty() && keyCoder != null) { + ((FlinkKeyGroupStateInternals) pushbackStateInternals).snapshotKeyGroupState( + keyGroupIndex, out); + } + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + if (getKeyedStateBackend() != null) { + int totalKeyGroups = getKeyedStateBackend().getNumberOfKeyGroups(); + KeyGroupsList localKeyGroupRange = getKeyedStateBackend().getKeyGroupRange(); + + for (KeyGroupStatePartitionStreamProvider streamProvider : context.getRawKeyedStateInputs()) { + DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(streamProvider.getStream()); + + int keyGroupIdx = streamProvider.getKeyGroupId(); + checkArgument(localKeyGroupRange.contains(keyGroupIdx), + "Key Group " + keyGroupIdx + " does not belong to the local range."); + + // if (this instanceof KeyGroupRestoringOperator) + restoreKeyGroupState(keyGroupIdx, div); + + // We just initialize our timerService + if (keyCoder != null) { + if (timerService == null) { + timerService = new HeapInternalTimerService<>( + totalKeyGroups, + localKeyGroupRange, + this, + getRuntimeContext().getProcessingTimeService()); + } + timerService.restoreTimersForKeyGroup(div, keyGroupIdx, getUserCodeClassloader()); + } + } + } + } + + @Override + public void restoreKeyGroupState(int keyGroupIndex, DataInputStream in) throws Exception { + if (!sideInputs.isEmpty() && keyCoder != null) { + if (pushbackStateInternals == null) { + pushbackStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder, + getKeyedStateBackend()); + } + ((FlinkKeyGroupStateInternals) pushbackStateInternals) + .restoreKeyGroupState(keyGroupIndex, in, getUserCodeClassloader()); + } + } + + @Override + public void onEventTime(InternalTimer<Object, TimerData> timer) throws Exception { + fireTimer(timer); + } + + @Override + public void onProcessingTime(InternalTimer<Object, TimerData> timer) throws Exception { + fireTimer(timer); + } + + // allow overriding this in WindowDoFnOperator + public void fireTimer(InternalTimer<?, TimerData> timer) { + TimerInternals.TimerData timerData = timer.getNamespace(); + StateNamespace namespace = timerData.getNamespace(); + // This is a user timer, so namespace must be WindowNamespace + checkArgument(namespace instanceof WindowNamespace); + BoundedWindow window = ((WindowNamespace) namespace).getWindow(); + pushbackDoFnRunner.onTimer(timerData.getTimerId(), window, + timerData.getTimestamp(), timerData.getDomain()); + } + + /** + * Factory for creating an {@link DoFnRunners.OutputManager} from + * a Flink {@link Output}. + */ + interface OutputManagerFactory<OutputT> extends Serializable { + DoFnRunners.OutputManager create(Output<StreamRecord<OutputT>> output); + } + + /** + * Default implementation of {@link OutputManagerFactory} that creates an + * {@link DoFnRunners.OutputManager} that only writes to + * a single logical output. + */ + public static class DefaultOutputManagerFactory<OutputT> + implements OutputManagerFactory<OutputT> { + @Override + public DoFnRunners.OutputManager create(final Output<StreamRecord<OutputT>> output) { + return new DoFnRunners.OutputManager() { + @Override + public <T> void output(TupleTag<T> tag, WindowedValue<T> value) { + // with tagged outputs we can't get around this because we don't + // know our own output type... + @SuppressWarnings("unchecked") + OutputT castValue = (OutputT) value; + output.collect(new StreamRecord<>(castValue)); + } + }; + } + } + + /** + * Implementation of {@link OutputManagerFactory} that creates an + * {@link DoFnRunners.OutputManager} that can write to multiple logical + * outputs by unioning them in a {@link RawUnionValue}. + */ + public static class MultiOutputOutputManagerFactory + implements OutputManagerFactory<RawUnionValue> { + + Map<TupleTag<?>, Integer> mapping; + + public MultiOutputOutputManagerFactory(Map<TupleTag<?>, Integer> mapping) { + this.mapping = mapping; + } + + @Override + public DoFnRunners.OutputManager create(final Output<StreamRecord<RawUnionValue>> output) { + return new DoFnRunners.OutputManager() { + @Override + public <T> void output(TupleTag<T> tag, WindowedValue<T> value) { + int intTag = mapping.get(tag); + output.collect(new StreamRecord<>(new RawUnionValue(intTag, value))); + } + }; + } + } + + /** + * {@link StepContext} for running {@link DoFn DoFns} on Flink. This does not allow + * accessing state or timer internals. + */ + protected class StepContext implements ExecutionContext.StepContext { + + @Override + public String getStepName() { + return null; + } + + @Override + public String getTransformName() { + return null; + } + + @Override + public void noteOutput(WindowedValue<?> output) {} + + @Override + public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {} + + @Override + public <T, W extends BoundedWindow> void writePCollectionViewData( + TupleTag<?> tag, + Iterable<WindowedValue<T>> data, + Coder<Iterable<WindowedValue<T>>> dataCoder, + W window, + Coder<W> windowCoder) throws IOException { + throw new UnsupportedOperationException("Writing side-input data is not supported."); + } + + @Override + public StateInternals<?> stateInternals() { + return stateInternals; + } + + @Override + public TimerInternals timerInternals() { + return timerInternals; + } + } + + private class FlinkTimerInternals implements TimerInternals { + + @Override + public void setTimer( + StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) { + setTimer(TimerData.of(timerId, namespace, target, timeDomain)); + } + + @Deprecated + @Override + public void setTimer(TimerData timerKey) { + long time = timerKey.getTimestamp().getMillis(); + if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) { + timerService.registerEventTimeTimer(timerKey, time); + } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) { + timerService.registerProcessingTimeTimer(timerKey, time); + } else { + throw new UnsupportedOperationException( + "Unsupported time domain: " + timerKey.getDomain()); + } + } + + @Deprecated + @Override + public void deleteTimer(StateNamespace namespace, String timerId) { + throw new UnsupportedOperationException( + "Canceling of a timer by ID is not yet supported."); + } + + @Override + public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { + throw new UnsupportedOperationException( + "Canceling of a timer by ID is not yet supported."); + } + + @Deprecated + @Override + public void deleteTimer(TimerData timerKey) { + long time = timerKey.getTimestamp().getMillis(); + if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) { + timerService.deleteEventTimeTimer(timerKey, time); + } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) { + timerService.deleteProcessingTimeTimer(timerKey, time); + } else { + throw new UnsupportedOperationException( + "Unsupported time domain: " + timerKey.getDomain()); + } + } + + @Override + public Instant currentProcessingTime() { + return new Instant(timerService.currentProcessingTime()); + } + + @Nullable + @Override + public Instant currentSynchronizedProcessingTime() { + return new Instant(timerService.currentProcessingTime()); + } + + @Override + public Instant currentInputWatermarkTime() { + return new Instant(Math.min(currentInputWatermark, getPushbackWatermarkHold())); + } + + @Nullable + @Override + public Instant currentOutputWatermarkTime() { + return new Instant(currentOutputWatermark); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java new file mode 100644 index 0000000..dce2e68 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming; + +import java.nio.ByteBuffer; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; + +/** + * {@link KeySelector} that retrieves a key from a {@link KV}. This will return + * the key as encoded by the provided {@link Coder} in a {@link ByteBuffer}. This ensures + * that all key comparisons/hashing happen on the encoded form. + */ +public class KvToByteBufferKeySelector<K, V> + implements KeySelector<WindowedValue<KV<K, V>>, ByteBuffer>, + ResultTypeQueryable<ByteBuffer> { + + private final Coder<K> keyCoder; + + public KvToByteBufferKeySelector(Coder<K> keyCoder) { + this.keyCoder = keyCoder; + } + + @Override + public ByteBuffer getKey(WindowedValue<KV<K, V>> value) throws Exception { + K key = value.getValue().getKey(); + byte[] keyBytes = CoderUtils.encodeToByteArray(keyCoder, key); + return ByteBuffer.wrap(keyBytes); + } + + @Override + public TypeInformation<ByteBuffer> getProducedType() { + return new GenericTypeInfo<>(ByteBuffer.class); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java new file mode 100644 index 0000000..e843660 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming; + +import java.util.Collections; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.sdk.util.WindowedValue; + +/** + * Singleton keyed word item. + */ +public class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> { + + final K key; + final WindowedValue<ElemT> value; + + public SingletonKeyedWorkItem(K key, WindowedValue<ElemT> value) { + this.key = key; + this.value = value; + } + + @Override + public K key() { + return key; + } + + public WindowedValue<ElemT> value() { + return value; + } + + @Override + public Iterable<TimerInternals.TimerData> timersIterable() { + return Collections.EMPTY_LIST; + } + + @Override + public Iterable<WindowedValue<ElemT>> elementsIterable() { + return Collections.singletonList(value); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java new file mode 100644 index 0000000..9a52330 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.KeyedWorkItemCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.PropertyNames; +import org.apache.beam.sdk.util.WindowedValue; + +/** + * Singleton keyed work item coder. + */ +public class SingletonKeyedWorkItemCoder<K, ElemT> + extends StandardCoder<SingletonKeyedWorkItem<K, ElemT>> { + /** + * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window + * coder. + */ + public static <K, ElemT> SingletonKeyedWorkItemCoder<K, ElemT> of( + Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) { + return new SingletonKeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder); + } + + @JsonCreator + public static <K, ElemT> SingletonKeyedWorkItemCoder<K, ElemT> of( + @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) { + checkArgument(components.size() == 3, "Expecting 3 components, got %s", components.size()); + @SuppressWarnings("unchecked") + Coder<K> keyCoder = (Coder<K>) components.get(0); + @SuppressWarnings("unchecked") + Coder<ElemT> elemCoder = (Coder<ElemT>) components.get(1); + @SuppressWarnings("unchecked") + Coder<? extends BoundedWindow> windowCoder = (Coder<? extends BoundedWindow>) components.get(2); + return new SingletonKeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder); + } + + private final Coder<K> keyCoder; + private final Coder<ElemT> elemCoder; + private final Coder<? extends BoundedWindow> windowCoder; + private final WindowedValue.FullWindowedValueCoder<ElemT> valueCoder; + + private SingletonKeyedWorkItemCoder( + Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) { + this.keyCoder = keyCoder; + this.elemCoder = elemCoder; + this.windowCoder = windowCoder; + valueCoder = WindowedValue.FullWindowedValueCoder.of(elemCoder, windowCoder); + } + + public Coder<K> getKeyCoder() { + return keyCoder; + } + + public Coder<ElemT> getElementCoder() { + return elemCoder; + } + + @Override + public void encode(SingletonKeyedWorkItem<K, ElemT> value, + OutputStream outStream, + Context context) + throws CoderException, IOException { + keyCoder.encode(value.key(), outStream, context.nested()); + valueCoder.encode(value.value, outStream, context); + } + + @Override + public SingletonKeyedWorkItem<K, ElemT> decode(InputStream inStream, Context context) + throws CoderException, IOException { + K key = keyCoder.decode(inStream, context.nested()); + WindowedValue<ElemT> value = valueCoder.decode(inStream, context); + return new SingletonKeyedWorkItem<>(key, value); + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return ImmutableList.of(keyCoder, elemCoder, windowCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + keyCoder.verifyDeterministic(); + elemCoder.verifyDeterministic(); + windowCoder.verifyDeterministic(); + } + + /** + * {@inheritDoc}. + * + * {@link KeyedWorkItemCoder} is not consistent with equals as it can return a + * {@link KeyedWorkItem} of a type different from the originally encoded type. + */ + @Override + public boolean consistentWithEquals() { + return false; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java new file mode 100644 index 0000000..40f70e4 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming; + +import static com.google.common.base.Preconditions.checkState; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import org.apache.beam.runners.core.ElementAndRestriction; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.KeyedWorkItems; +import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker; +import org.apache.beam.runners.core.OutputWindowedValue; +import org.apache.beam.runners.core.SplittableParDo; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateInternalsFactory; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.TimerInternalsFactory; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * Flink operator for executing splittable {@link DoFn DoFns}. Specifically, for executing + * the {@code @ProcessElement} method of a splittable {@link DoFn}. + */ +public class SplittableDoFnOperator< + InputT, FnOutputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> + extends DoFnOperator< + KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT, OutputT> { + + public SplittableDoFnOperator( + DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT> doFn, + Coder< + WindowedValue< + KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>> inputCoder, + TupleTag<FnOutputT> mainOutputTag, + List<TupleTag<?>> additionalOutputTags, + OutputManagerFactory<OutputT> outputManagerFactory, + WindowingStrategy<?, ?> windowingStrategy, + Map<Integer, PCollectionView<?>> sideInputTagMapping, + Collection<PCollectionView<?>> sideInputs, + PipelineOptions options, + Coder<?> keyCoder) { + super( + doFn, + inputCoder, + mainOutputTag, + additionalOutputTags, + outputManagerFactory, + windowingStrategy, + sideInputTagMapping, + sideInputs, + options, + keyCoder); + + } + + @Override + public void open() throws Exception { + super.open(); + + checkState(doFn instanceof SplittableParDo.ProcessFn); + + StateInternalsFactory<String> stateInternalsFactory = new StateInternalsFactory<String>() { + @Override + public StateInternals<String> stateInternalsForKey(String key) { + //this will implicitly be keyed by the key of the incoming + // element or by the key of a firing timer + return (StateInternals<String>) stateInternals; + } + }; + TimerInternalsFactory<String> timerInternalsFactory = new TimerInternalsFactory<String>() { + @Override + public TimerInternals timerInternalsForKey(String key) { + //this will implicitly be keyed like the StateInternalsFactory + return timerInternals; + } + }; + + ((SplittableParDo.ProcessFn) doFn).setStateInternalsFactory(stateInternalsFactory); + ((SplittableParDo.ProcessFn) doFn).setTimerInternalsFactory(timerInternalsFactory); + ((SplittableParDo.ProcessFn) doFn).setProcessElementInvoker( + new OutputAndTimeBoundedSplittableProcessElementInvoker<>( + doFn, + serializedOptions.getPipelineOptions(), + new OutputWindowedValue<FnOutputT>() { + @Override + public void outputWindowedValue( + FnOutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + outputManager.output( + mainOutputTag, + WindowedValue.of(output, timestamp, windows, pane)); + } + + @Override + public <AdditionalOutputT> void outputWindowedValue( + TupleTag<AdditionalOutputT> tag, + AdditionalOutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane)); + } + }, + sideInputReader, + Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()), + 10000, + Duration.standardSeconds(10))); + } + + @Override + public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) { + doFnRunner.processElement(WindowedValue.valueInGlobalWindow( + KeyedWorkItems.<String, ElementAndRestriction<InputT, RestrictionT>>timersWorkItem( + (String) stateInternals.getKey(), + Collections.singletonList(timer.getNamespace())))); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java new file mode 100644 index 0000000..9b2136c --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming; + +import static org.apache.beam.runners.core.TimerInternals.TimerData; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.KeyedWorkItems; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateInternalsFactory; +import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.TimerInternalsFactory; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.flink.streaming.api.operators.InternalTimer; + +/** + * Flink operator for executing window {@link DoFn DoFns}. + */ +public class WindowDoFnOperator<K, InputT, OutputT> + extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, WindowedValue<KV<K, OutputT>>> { + + private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn; + + public WindowDoFnOperator( + SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn, + Coder<WindowedValue<KeyedWorkItem<K, InputT>>> inputCoder, + TupleTag<KV<K, OutputT>> mainOutputTag, + List<TupleTag<?>> additionalOutputTags, + OutputManagerFactory<WindowedValue<KV<K, OutputT>>> outputManagerFactory, + WindowingStrategy<?, ?> windowingStrategy, + Map<Integer, PCollectionView<?>> sideInputTagMapping, + Collection<PCollectionView<?>> sideInputs, + PipelineOptions options, + Coder<K> keyCoder) { + super( + null, + inputCoder, + mainOutputTag, + additionalOutputTags, + outputManagerFactory, + windowingStrategy, + sideInputTagMapping, + sideInputs, + options, + keyCoder); + + this.systemReduceFn = systemReduceFn; + + } + + @Override + protected DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getDoFn() { + StateInternalsFactory<K> stateInternalsFactory = new StateInternalsFactory<K>() { + @Override + public StateInternals<K> stateInternalsForKey(K key) { + //this will implicitly be keyed by the key of the incoming + // element or by the key of a firing timer + return (StateInternals<K>) stateInternals; + } + }; + TimerInternalsFactory<K> timerInternalsFactory = new TimerInternalsFactory<K>() { + @Override + public TimerInternals timerInternalsForKey(K key) { + //this will implicitly be keyed like the StateInternalsFactory + return timerInternals; + } + }; + + // we have to do the unchecked cast because GroupAlsoByWindowViaWindowSetDoFn.create + // has the window type as generic parameter while WindowingStrategy is almost always + // untyped. + @SuppressWarnings("unchecked") + DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFn = + GroupAlsoByWindowViaWindowSetNewDoFn.create( + windowingStrategy, stateInternalsFactory, timerInternalsFactory, sideInputReader, + (SystemReduceFn) systemReduceFn, outputManager, mainOutputTag); + return doFn; + } + + @Override + public void fireTimer(InternalTimer<?, TimerData> timer) { + doFnRunner.processElement(WindowedValue.valueInGlobalWindow( + KeyedWorkItems.<K, InputT>timersWorkItem( + (K) stateInternals.getKey(), + Collections.singletonList(timer.getNamespace())))); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java new file mode 100644 index 0000000..1dff367 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming; + +import java.nio.ByteBuffer; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; + +/** + * {@link KeySelector} that retrieves a key from a {@link KeyedWorkItem}. This will return + * the key as encoded by the provided {@link Coder} in a {@link ByteBuffer}. This ensures + * that all key comparisons/hashing happen on the encoded form. + */ +public class WorkItemKeySelector<K, V> + implements KeySelector<WindowedValue<SingletonKeyedWorkItem<K, V>>, ByteBuffer>, + ResultTypeQueryable<ByteBuffer> { + + private final Coder<K> keyCoder; + + public WorkItemKeySelector(Coder<K> keyCoder) { + this.keyCoder = keyCoder; + } + + @Override + public ByteBuffer getKey(WindowedValue<SingletonKeyedWorkItem<K, V>> value) throws Exception { + K key = value.getValue().key(); + byte[] keyBytes = CoderUtils.encodeToByteArray(keyCoder, key); + return ByteBuffer.wrap(keyBytes); + } + + @Override + public TypeInformation<ByteBuffer> getProducedType() { + return new GenericTypeInfo<>(ByteBuffer.class); + } +}