http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java deleted file mode 100644 index 3a08088..0000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.streaming; - -import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; -import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.DelegateCoder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.values.KV; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * An unbounded source for testing the unbounded sources framework code. - * - * <p>Each split of this sources produces records of the form KV(split_id, i), - * where i counts up from 0. Each record has a timestamp of i, and the watermark - * accurately tracks these timestamps. The reader will occasionally return false - * from {@code advance}, in order to simulate a source where not all the data is - * available immediately. - */ -public class TestCountingSource - extends UnboundedSource<KV<Integer, Integer>, TestCountingSource.CounterMark> { - private static final Logger LOG = LoggerFactory.getLogger(TestCountingSource.class); - - private static List<Integer> finalizeTracker; - private final int numMessagesPerShard; - private final int shardNumber; - private final boolean dedup; - private final boolean throwOnFirstSnapshot; - private final boolean allowSplitting; - - /** - * We only allow an exception to be thrown from getCheckpointMark - * at most once. This must be static since the entire TestCountingSource - * instance may re-serialized when the pipeline recovers and retries. - */ - private static boolean thrown = false; - - public static void setFinalizeTracker(List<Integer> finalizeTracker) { - TestCountingSource.finalizeTracker = finalizeTracker; - } - - public TestCountingSource(int numMessagesPerShard) { - this(numMessagesPerShard, 0, false, false, true); - } - - public TestCountingSource withDedup() { - return new TestCountingSource( - numMessagesPerShard, shardNumber, true, throwOnFirstSnapshot, true); - } - - private TestCountingSource withShardNumber(int shardNumber) { - return new TestCountingSource( - numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, true); - } - - public TestCountingSource withThrowOnFirstSnapshot(boolean throwOnFirstSnapshot) { - return new TestCountingSource( - numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, true); - } - - public TestCountingSource withoutSplitting() { - return new TestCountingSource( - numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, false); - } - - private TestCountingSource(int numMessagesPerShard, int shardNumber, boolean dedup, - boolean throwOnFirstSnapshot, boolean allowSplitting) { - this.numMessagesPerShard = numMessagesPerShard; - this.shardNumber = shardNumber; - this.dedup = dedup; - this.throwOnFirstSnapshot = throwOnFirstSnapshot; - this.allowSplitting = allowSplitting; - } - - public int getShardNumber() { - return shardNumber; - } - - @Override - public List<TestCountingSource> split( - int desiredNumSplits, PipelineOptions options) { - List<TestCountingSource> splits = new ArrayList<>(); - int numSplits = allowSplitting ? desiredNumSplits : 1; - for (int i = 0; i < numSplits; i++) { - splits.add(withShardNumber(i)); - } - return splits; - } - - class CounterMark implements UnboundedSource.CheckpointMark { - int current; - - public CounterMark(int current) { - this.current = current; - } - - @Override - public void finalizeCheckpoint() { - if (finalizeTracker != null) { - finalizeTracker.add(current); - } - } - } - - @Override - public Coder<CounterMark> getCheckpointMarkCoder() { - return DelegateCoder.of( - VarIntCoder.of(), - new DelegateCoder.CodingFunction<CounterMark, Integer>() { - @Override - public Integer apply(CounterMark input) { - return input.current; - } - }, - new DelegateCoder.CodingFunction<Integer, CounterMark>() { - @Override - public CounterMark apply(Integer input) { - return new CounterMark(input); - } - }); - } - - @Override - public boolean requiresDeduping() { - return dedup; - } - - /** - * Public only so that the checkpoint can be conveyed from {@link #getCheckpointMark()} to - * {@link TestCountingSource#createReader(PipelineOptions, CounterMark)} without cast. - */ - public class CountingSourceReader extends UnboundedReader<KV<Integer, Integer>> { - private int current; - - public CountingSourceReader(int startingPoint) { - this.current = startingPoint; - } - - @Override - public boolean start() { - return advance(); - } - - @Override - public boolean advance() { - if (current >= numMessagesPerShard - 1) { - return false; - } - // If testing dedup, occasionally insert a duplicate value; - if (current >= 0 && dedup && ThreadLocalRandom.current().nextInt(5) == 0) { - return true; - } - current++; - return true; - } - - @Override - public KV<Integer, Integer> getCurrent() { - return KV.of(shardNumber, current); - } - - @Override - public Instant getCurrentTimestamp() { - return new Instant(current); - } - - @Override - public byte[] getCurrentRecordId() { - try { - return encodeToByteArray(KvCoder.of(VarIntCoder.of(), VarIntCoder.of()), getCurrent()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void close() {} - - @Override - public TestCountingSource getCurrentSource() { - return TestCountingSource.this; - } - - @Override - public Instant getWatermark() { - // The watermark is a promise about future elements, and the timestamps of elements are - // strictly increasing for this source. - return new Instant(current + 1); - } - - @Override - public CounterMark getCheckpointMark() { - if (throwOnFirstSnapshot && !thrown) { - thrown = true; - LOG.error("Throwing exception while checkpointing counter"); - throw new RuntimeException("failed during checkpoint"); - } - // The checkpoint can assume all records read, including the current, have - // been commited. - return new CounterMark(current); - } - - @Override - public long getSplitBacklogBytes() { - return 7L; - } - } - - @Override - public CountingSourceReader createReader( - PipelineOptions options, @Nullable CounterMark checkpointMark) { - if (checkpointMark == null) { - LOG.debug("creating reader"); - } else { - LOG.debug("restoring reader from checkpoint with current = {}", checkpointMark.current); - } - return new CountingSourceReader(checkpointMark != null ? checkpointMark.current : -1); - } - - @Override - public void validate() {} - - @Override - public Coder<KV<Integer, Integer>> getDefaultOutputCoder() { - return KvCoder.of(VarIntCoder.of(), VarIntCoder.of()); - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java deleted file mode 100644 index 9e6bba8..0000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.streaming; - -import com.google.api.services.bigquery.model.TableRow; -import com.google.common.base.Joiner; -import java.io.Serializable; -import java.util.Arrays; -import org.apache.beam.runners.flink.FlinkTestPipeline; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.Sessions; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.flink.streaming.util.StreamingProgramTestBase; -import org.joda.time.Duration; -import org.joda.time.Instant; - - -/** - * Session window test. - */ -public class TopWikipediaSessionsITCase extends StreamingProgramTestBase implements Serializable { - protected String resultPath; - - public TopWikipediaSessionsITCase(){ - } - - static final String[] EXPECTED_RESULT = new String[] { - "user: user1 value:3", - "user: user1 value:1", - "user: user2 value:4", - "user: user2 value:6", - "user: user3 value:7", - "user: user3 value:2" - }; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - } - - @Override - protected void testProgram() throws Exception { - - Pipeline p = FlinkTestPipeline.createForStreaming(); - - Long now = (System.currentTimeMillis() + 10000) / 1000; - - PCollection<KV<String, Long>> output = - p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", now).set - ("contributor_username", "user1"), new TableRow().set("timestamp", now + 10).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now).set - ("contributor_username", "user1"), new TableRow().set("timestamp", now + 2).set - ("contributor_username", "user1"), new TableRow().set("timestamp", now).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 1).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 5).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 7).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 8).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 200).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 230).set - ("contributor_username", "user1"), new TableRow().set("timestamp", now + 230).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 240).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 245).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 235).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 236).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 237).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 238).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 239).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 240).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 241).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now) - .set("contributor_username", "user3")))) - - - - .apply(ParDo.of(new DoFn<TableRow, String>() { - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - TableRow row = c.element(); - long timestamp = (Integer) row.get("timestamp"); - String userName = (String) row.get("contributor_username"); - if (userName != null) { - // Sets the timestamp field to be used in windowing. - c.outputWithTimestamp(userName, new Instant(timestamp * 1000L)); - } - } - })) - - .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1)))) - - .apply(Count.<String>perElement()); - - PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>, String>() { - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - KV<String, Long> el = c.element(); - String out = "user: " + el.getKey() + " value:" + el.getValue(); - c.output(out); - } - })); - - format.apply(TextIO.Write.to(resultPath)); - - p.run(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java deleted file mode 100644 index 90f95d6..0000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java +++ /dev/null @@ -1,464 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.streaming; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.OperatorStateStore; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; -import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.api.operators.StreamSource; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; -import org.apache.flink.util.InstantiationUtil; -import org.junit.Test; -import org.junit.experimental.runners.Enclosed; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.mockito.Matchers; - -/** - * Tests for {@link UnboundedSourceWrapper}. - */ -@RunWith(Enclosed.class) -public class UnboundedSourceWrapperTest { - - /** - * Parameterized tests. - */ - @RunWith(Parameterized.class) - public static class UnboundedSourceWrapperTestWithParams { - private final int numTasks; - private final int numSplits; - - public UnboundedSourceWrapperTestWithParams(int numTasks, int numSplits) { - this.numTasks = numTasks; - this.numSplits = numSplits; - } - - @Parameterized.Parameters - public static Collection<Object[]> data() { - /* - * Parameters for initializing the tests: - * {numTasks, numSplits} - * The test currently assumes powers of two for some assertions. - */ - return Arrays.asList(new Object[][]{ - {1, 1}, {1, 2}, {1, 4}, - {2, 1}, {2, 2}, {2, 4}, - {4, 1}, {4, 2}, {4, 4} - }); - } - - /** - * Creates a {@link UnboundedSourceWrapper} that has one or multiple readers per source. - * If numSplits > numTasks the source has one source will manage multiple readers. - */ - @Test - public void testReaders() throws Exception { - final int numElements = 20; - final Object checkpointLock = new Object(); - PipelineOptions options = PipelineOptionsFactory.create(); - - // this source will emit exactly NUM_ELEMENTS across all parallel readers, - // afterwards it will stall. We check whether we also receive NUM_ELEMENTS - // elements later. - TestCountingSource source = new TestCountingSource(numElements); - UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper = - new UnboundedSourceWrapper<>(options, source, numSplits); - - assertEquals(numSplits, flinkWrapper.getSplitSources().size()); - - StreamSource<WindowedValue< - KV<Integer, Integer>>, - UnboundedSourceWrapper< - KV<Integer, Integer>, - TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper); - - setupSourceOperator(sourceOperator, numTasks); - - try { - sourceOperator.open(); - sourceOperator.run(checkpointLock, - new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() { - private int count = 0; - - @Override - public void emitWatermark(Watermark watermark) { - } - - @Override - public void emitLatencyMarker(LatencyMarker latencyMarker) { - } - - @Override - public void collect( - StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) { - - count++; - if (count >= numElements) { - throw new SuccessException(); - } - } - - @Override - public void close() { - - } - }); - } catch (SuccessException e) { - - assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size()); - - // success - return; - } - fail("Read terminated without producing expected number of outputs"); - } - - /** - * Verify that snapshot/restore work as expected. We bring up a source and cancel - * after seeing a certain number of elements. Then we snapshot that source, - * bring up a completely new source that we restore from the snapshot and verify - * that we see all expected elements in the end. - */ - @Test - public void testRestore() throws Exception { - final int numElements = 20; - final Object checkpointLock = new Object(); - PipelineOptions options = PipelineOptionsFactory.create(); - - // this source will emit exactly NUM_ELEMENTS across all parallel readers, - // afterwards it will stall. We check whether we also receive NUM_ELEMENTS - // elements later. - TestCountingSource source = new TestCountingSource(numElements); - UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper = - new UnboundedSourceWrapper<>(options, source, numSplits); - - assertEquals(numSplits, flinkWrapper.getSplitSources().size()); - - StreamSource< - WindowedValue<KV<Integer, Integer>>, - UnboundedSourceWrapper< - KV<Integer, Integer>, - TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper); - - - OperatorStateStore backend = mock(OperatorStateStore.class); - - TestingListState<KV<UnboundedSource, TestCountingSource.CounterMark>> - listState = new TestingListState<>(); - - when(backend.getOperatorState(Matchers.any(ListStateDescriptor.class))) - .thenReturn(listState); - - StateInitializationContext initializationContext = mock(StateInitializationContext.class); - - when(initializationContext.getOperatorStateStore()).thenReturn(backend); - when(initializationContext.isRestored()).thenReturn(false, true); - - flinkWrapper.initializeState(initializationContext); - - setupSourceOperator(sourceOperator, numTasks); - - final Set<KV<Integer, Integer>> emittedElements = new HashSet<>(); - - boolean readFirstBatchOfElements = false; - - try { - sourceOperator.open(); - sourceOperator.run(checkpointLock, - new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() { - private int count = 0; - - @Override - public void emitWatermark(Watermark watermark) { - } - - @Override - public void emitLatencyMarker(LatencyMarker latencyMarker) { - } - - @Override - public void collect( - StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) { - - emittedElements.add(windowedValueStreamRecord.getValue().getValue()); - count++; - if (count >= numElements / 2) { - throw new SuccessException(); - } - } - - @Override - public void close() { - - } - }); - } catch (SuccessException e) { - // success - readFirstBatchOfElements = true; - } - - assertTrue("Did not successfully read first batch of elements.", readFirstBatchOfElements); - - // draw a snapshot - flinkWrapper.snapshotState(new StateSnapshotContextSynchronousImpl(0, 0)); - - // test snapshot offsets - assertEquals(flinkWrapper.getLocalSplitSources().size(), - listState.getList().size()); - int totalEmit = 0; - for (KV<UnboundedSource, TestCountingSource.CounterMark> kv : listState.get()) { - totalEmit += kv.getValue().current + 1; - } - assertEquals(numElements / 2, totalEmit); - - // test that finalizeCheckpoint on CheckpointMark is called - final ArrayList<Integer> finalizeList = new ArrayList<>(); - TestCountingSource.setFinalizeTracker(finalizeList); - flinkWrapper.notifyCheckpointComplete(0); - assertEquals(flinkWrapper.getLocalSplitSources().size(), finalizeList.size()); - - // create a completely new source but restore from the snapshot - TestCountingSource restoredSource = new TestCountingSource(numElements); - UnboundedSourceWrapper< - KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper = - new UnboundedSourceWrapper<>(options, restoredSource, numSplits); - - assertEquals(numSplits, restoredFlinkWrapper.getSplitSources().size()); - - StreamSource< - WindowedValue<KV<Integer, Integer>>, - UnboundedSourceWrapper< - KV<Integer, Integer>, - TestCountingSource.CounterMark>> restoredSourceOperator = - new StreamSource<>(restoredFlinkWrapper); - - setupSourceOperator(restoredSourceOperator, numTasks); - - // restore snapshot - restoredFlinkWrapper.initializeState(initializationContext); - - boolean readSecondBatchOfElements = false; - - // run again and verify that we see the other elements - try { - restoredSourceOperator.open(); - restoredSourceOperator.run(checkpointLock, - new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() { - private int count = 0; - - @Override - public void emitWatermark(Watermark watermark) { - } - - @Override - public void emitLatencyMarker(LatencyMarker latencyMarker) { - } - - @Override - public void collect( - StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) { - emittedElements.add(windowedValueStreamRecord.getValue().getValue()); - count++; - if (count >= numElements / 2) { - throw new SuccessException(); - } - } - - @Override - public void close() { - - } - }); - } catch (SuccessException e) { - // success - readSecondBatchOfElements = true; - } - - assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size()); - - assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements); - - // verify that we saw all NUM_ELEMENTS elements - assertTrue(emittedElements.size() == numElements); - } - - @Test - public void testNullCheckpoint() throws Exception { - final int numElements = 20; - PipelineOptions options = PipelineOptionsFactory.create(); - - TestCountingSource source = new TestCountingSource(numElements) { - @Override - public Coder<CounterMark> getCheckpointMarkCoder() { - return null; - } - }; - UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper = - new UnboundedSourceWrapper<>(options, source, numSplits); - - OperatorStateStore backend = mock(OperatorStateStore.class); - - TestingListState<KV<UnboundedSource, TestCountingSource.CounterMark>> - listState = new TestingListState<>(); - - when(backend.getOperatorState(Matchers.any(ListStateDescriptor.class))) - .thenReturn(listState); - - StateInitializationContext initializationContext = mock(StateInitializationContext.class); - - when(initializationContext.getOperatorStateStore()).thenReturn(backend); - when(initializationContext.isRestored()).thenReturn(false, true); - - flinkWrapper.initializeState(initializationContext); - - StreamSource sourceOperator = new StreamSource<>(flinkWrapper); - setupSourceOperator(sourceOperator, numTasks); - sourceOperator.open(); - - flinkWrapper.snapshotState(new StateSnapshotContextSynchronousImpl(0, 0)); - - assertEquals(0, listState.getList().size()); - - UnboundedSourceWrapper< - KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper = - new UnboundedSourceWrapper<>(options, new TestCountingSource(numElements), - numSplits); - - StreamSource restoredSourceOperator = new StreamSource<>(flinkWrapper); - setupSourceOperator(restoredSourceOperator, numTasks); - sourceOperator.open(); - - restoredFlinkWrapper.initializeState(initializationContext); - - assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size()); - - } - - @SuppressWarnings("unchecked") - private static <T> void setupSourceOperator(StreamSource<T, ?> operator, int numSubTasks) { - ExecutionConfig executionConfig = new ExecutionConfig(); - StreamConfig cfg = new StreamConfig(new Configuration()); - - cfg.setTimeCharacteristic(TimeCharacteristic.EventTime); - - Environment env = new DummyEnvironment("MockTwoInputTask", numSubTasks, 0); - - StreamTask<?, ?> mockTask = mock(StreamTask.class); - when(mockTask.getName()).thenReturn("Mock Task"); - when(mockTask.getCheckpointLock()).thenReturn(new Object()); - when(mockTask.getConfiguration()).thenReturn(cfg); - when(mockTask.getEnvironment()).thenReturn(env); - when(mockTask.getExecutionConfig()).thenReturn(executionConfig); - when(mockTask.getAccumulatorMap()) - .thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap()); - TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService(); - when(mockTask.getProcessingTimeService()).thenReturn(testProcessingTimeService); - - operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class)); - } - - /** - * A special {@link RuntimeException} that we throw to signal that the test was successful. - */ - private static class SuccessException extends RuntimeException { - } - } - - /** - * Not parameterized tests. - */ - public static class BasicTest { - - /** - * Check serialization a {@link UnboundedSourceWrapper}. - */ - @Test - public void testSerialization() throws Exception { - final int parallelism = 1; - final int numElements = 20; - PipelineOptions options = PipelineOptionsFactory.create(); - - TestCountingSource source = new TestCountingSource(numElements); - UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper = - new UnboundedSourceWrapper<>(options, source, parallelism); - - InstantiationUtil.serializeObject(flinkWrapper); - } - - } - - private static final class TestingListState<T> implements ListState<T> { - - private final List<T> list = new ArrayList<>(); - - @Override - public void clear() { - list.clear(); - } - - @Override - public Iterable<T> get() throws Exception { - return list; - } - - @Override - public void add(T value) throws Exception { - list.add(value); - } - - public List<T> getList() { - return list; - } - - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java deleted file mode 100644 index 08a1e03..0000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Internal implementation of the Beam runner for Apache Flink. - */ -package org.apache.beam.runners.flink.streaming; http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/resources/log4j-test.properties b/runners/flink/runner/src/test/resources/log4j-test.properties deleted file mode 100644 index 4c74d85..0000000 --- a/runners/flink/runner/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,27 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -# Set root logger level to OFF to not flood build logs -# set manually to INFO for debugging purposes -log4j.rootLogger=OFF, testlogger - -# A1 is set to be a ConsoleAppender. -log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target = System.err -log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout -log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java new file mode 100644 index 0000000..b745f0b --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.GlobalConfiguration; + +/** + * {@link DefaultValueFactory} for getting a default value for the parallelism option + * on {@link FlinkPipelineOptions}. + * + * <p>This will return either the default value from {@link GlobalConfiguration} or {@code 1}. + * A valid {@link GlobalConfiguration} is only available if the program is executed by the Flink + * run scripts. + */ +public class DefaultParallelismFactory implements DefaultValueFactory<Integer> { + @Override + public Integer create(PipelineOptions options) { + return GlobalConfiguration.loadConfiguration() + .getInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, 1); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java new file mode 100644 index 0000000..854b674 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link Pipeline.PipelineVisitor} for executing a {@link Pipeline} as a + * Flink batch job. + */ +class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkBatchPipelineTranslator.class); + + /** + * The necessary context in the case of a batch job. + */ + private final FlinkBatchTranslationContext batchContext; + + private int depth = 0; + + public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions options) { + this.batchContext = new FlinkBatchTranslationContext(env, options); + } + + @Override + @SuppressWarnings("rawtypes, unchecked") + public void translate(Pipeline pipeline) { + super.translate(pipeline); + + // terminate dangling DataSets + for (DataSet<?> dataSet: batchContext.getDanglingDataSets().values()) { + dataSet.output(new DiscardingOutputFormat()); + } + } + + // -------------------------------------------------------------------------------------------- + // Pipeline Visitor Methods + // -------------------------------------------------------------------------------------------- + + @Override + public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { + LOG.info("{} enterCompositeTransform- {}", genSpaces(this.depth), node.getFullName()); + this.depth++; + + BatchTransformTranslator<?> translator = getTranslator(node); + + if (translator != null) { + applyBatchTransform(node.getTransform(), node, translator); + LOG.info("{} translated- {}", genSpaces(this.depth), node.getFullName()); + return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; + } else { + return CompositeBehavior.ENTER_TRANSFORM; + } + } + + @Override + public void leaveCompositeTransform(TransformHierarchy.Node node) { + this.depth--; + LOG.info("{} leaveCompositeTransform- {}", genSpaces(this.depth), node.getFullName()); + } + + @Override + public void visitPrimitiveTransform(TransformHierarchy.Node node) { + LOG.info("{} visitPrimitiveTransform- {}", genSpaces(this.depth), node.getFullName()); + + // get the transformation corresponding to the node we are + // currently visiting and translate it into its Flink alternative. + PTransform<?, ?> transform = node.getTransform(); + BatchTransformTranslator<?> translator = + FlinkBatchTransformTranslators.getTranslator(transform); + if (translator == null) { + LOG.info(node.getTransform().getClass().toString()); + throw new UnsupportedOperationException("The transform " + transform + + " is currently not supported."); + } + applyBatchTransform(transform, node, translator); + } + + private <T extends PTransform<?, ?>> void applyBatchTransform( + PTransform<?, ?> transform, + TransformHierarchy.Node node, + BatchTransformTranslator<?> translator) { + + @SuppressWarnings("unchecked") + T typedTransform = (T) transform; + + @SuppressWarnings("unchecked") + BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator; + + // create the applied PTransform on the batchContext + batchContext.setCurrentTransform(node.toAppliedPTransform()); + typedTranslator.translateNode(typedTransform, batchContext); + } + + /** + * A translator of a {@link PTransform}. + */ + public interface BatchTransformTranslator<TransformT extends PTransform> { + void translateNode(TransformT transform, FlinkBatchTranslationContext context); + } + + /** + * Returns a translator for the given node, if it is possible, otherwise null. + */ + private static BatchTransformTranslator<?> getTranslator(TransformHierarchy.Node node) { + PTransform<?, ?> transform = node.getTransform(); + + // Root of the graph is null + if (transform == null) { + return null; + } + + return FlinkBatchTransformTranslators.getTranslator(transform); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java new file mode 100644 index 0000000..ff9521c --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java @@ -0,0 +1,723 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkMergingPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkMergingReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkStatefulDoFnFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.CombineFnBase; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.Reshuffle; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.api.java.operators.FlatMapOperator; +import org.apache.flink.api.java.operators.GroupCombineOperator; +import org.apache.flink.api.java.operators.GroupReduceOperator; +import org.apache.flink.api.java.operators.Grouping; +import org.apache.flink.api.java.operators.MapPartitionOperator; +import org.apache.flink.api.java.operators.SingleInputUdfOperator; +import org.apache.flink.util.Collector; + +/** + * Translators for transforming {@link PTransform PTransforms} to + * Flink {@link DataSet DataSets}. + */ +class FlinkBatchTransformTranslators { + + // -------------------------------------------------------------------------------------------- + // Transform Translator Registry + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("rawtypes") + private static final Map< + Class<? extends PTransform>, + FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new HashMap<>(); + + static { + TRANSLATORS.put(View.CreatePCollectionView.class, new CreatePCollectionViewTranslatorBatch()); + + TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch()); + TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch()); + TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorBatch()); + + TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslatorBatch()); + + TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslatorBatch()); + + TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoTranslatorBatch()); + + TRANSLATORS.put(Read.Bounded.class, new ReadSourceTranslatorBatch()); + } + + + static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator( + PTransform<?, ?> transform) { + return TRANSLATORS.get(transform.getClass()); + } + + private static class ReadSourceTranslatorBatch<T> + implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Read.Bounded<T>> { + + @Override + public void translateNode(Read.Bounded<T> transform, FlinkBatchTranslationContext context) { + String name = transform.getName(); + BoundedSource<T> source = transform.getSource(); + PCollection<T> output = context.getOutput(transform); + + TypeInformation<WindowedValue<T>> typeInformation = context.getTypeInfo(output); + + DataSource<WindowedValue<T>> dataSource = new DataSource<>( + context.getExecutionEnvironment(), + new SourceInputFormat<>(source, context.getPipelineOptions()), + typeInformation, + name); + + context.setOutputDataSet(output, dataSource); + } + } + + private static class WindowAssignTranslatorBatch<T> + implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Window.Assign<T>> { + + @Override + public void translateNode(Window.Assign<T> transform, FlinkBatchTranslationContext context) { + PValue input = context.getInput(transform); + + TypeInformation<WindowedValue<T>> resultTypeInfo = + context.getTypeInfo(context.getOutput(transform)); + + DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(input); + + @SuppressWarnings("unchecked") + final WindowingStrategy<T, ? extends BoundedWindow> windowingStrategy = + (WindowingStrategy<T, ? extends BoundedWindow>) + context.getOutput(transform).getWindowingStrategy(); + + WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn(); + + FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction = + new FlinkAssignWindows<>(windowFn); + + DataSet<WindowedValue<T>> resultDataSet = inputDataSet + .flatMap(assignWindowsFunction) + .name(context.getOutput(transform).getName()) + .returns(resultTypeInfo); + + context.setOutputDataSet(context.getOutput(transform), resultDataSet); + } + } + + private static class GroupByKeyTranslatorBatch<K, InputT> + implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, InputT>> { + + @Override + public void translateNode( + GroupByKey<K, InputT> transform, + FlinkBatchTranslationContext context) { + + // for now, this is copied from the Combine.PerKey translater. Once we have the new runner API + // we can replace GroupByKey by a Combine.PerKey with the Concatenate CombineFn + + DataSet<WindowedValue<KV<K, InputT>>> inputDataSet = + context.getInputDataSet(context.getInput(transform)); + + Combine.KeyedCombineFn<K, InputT, List<InputT>, List<InputT>> combineFn = + new Concatenate<InputT>().asKeyedFn(); + + KvCoder<K, InputT> inputCoder = + (KvCoder<K, InputT>) context.getInput(transform).getCoder(); + + Coder<List<InputT>> accumulatorCoder; + + try { + accumulatorCoder = + combineFn.getAccumulatorCoder( + context.getInput(transform).getPipeline().getCoderRegistry(), + inputCoder.getKeyCoder(), + inputCoder.getValueCoder()); + } catch (CannotProvideCoderException e) { + throw new RuntimeException(e); + } + + WindowingStrategy<?, ?> windowingStrategy = + context.getInput(transform).getWindowingStrategy(); + + TypeInformation<WindowedValue<KV<K, List<InputT>>>> partialReduceTypeInfo = + new CoderTypeInformation<>( + WindowedValue.getFullCoder( + KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder), + windowingStrategy.getWindowFn().windowCoder())); + + + Grouping<WindowedValue<KV<K, InputT>>> inputGrouping = + inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder())); + + FlinkPartialReduceFunction<K, InputT, List<InputT>, ?> partialReduceFunction; + FlinkReduceFunction<K, List<InputT>, List<InputT>, ?> reduceFunction; + + if (windowingStrategy.getWindowFn().isNonMerging()) { + @SuppressWarnings("unchecked") + WindowingStrategy<?, BoundedWindow> boundedStrategy = + (WindowingStrategy<?, BoundedWindow>) windowingStrategy; + + partialReduceFunction = new FlinkPartialReduceFunction<>( + combineFn, + boundedStrategy, + Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(), + context.getPipelineOptions()); + + reduceFunction = new FlinkReduceFunction<>( + combineFn, + boundedStrategy, + Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(), + context.getPipelineOptions()); + + } else { + if (!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) { + throw new UnsupportedOperationException( + "Merging WindowFn with windows other than IntervalWindow are not supported."); + } + + @SuppressWarnings("unchecked") + WindowingStrategy<?, IntervalWindow> intervalStrategy = + (WindowingStrategy<?, IntervalWindow>) windowingStrategy; + + partialReduceFunction = new FlinkMergingPartialReduceFunction<>( + combineFn, + intervalStrategy, + Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(), + context.getPipelineOptions()); + + reduceFunction = new FlinkMergingReduceFunction<>( + combineFn, + intervalStrategy, + Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(), + context.getPipelineOptions()); + } + + // Partially GroupReduce the values into the intermediate format AccumT (combine) + GroupCombineOperator< + WindowedValue<KV<K, InputT>>, + WindowedValue<KV<K, List<InputT>>>> groupCombine = + new GroupCombineOperator<>( + inputGrouping, + partialReduceTypeInfo, + partialReduceFunction, + "GroupCombine: " + transform.getName()); + + Grouping<WindowedValue<KV<K, List<InputT>>>> intermediateGrouping = + groupCombine.groupBy(new KvKeySelector<List<InputT>, K>(inputCoder.getKeyCoder())); + + // Fully reduce the values and create output format VO + GroupReduceOperator< + WindowedValue<KV<K, List<InputT>>>, WindowedValue<KV<K, List<InputT>>>> outputDataSet = + new GroupReduceOperator<>( + intermediateGrouping, partialReduceTypeInfo, reduceFunction, transform.getName()); + + context.setOutputDataSet(context.getOutput(transform), outputDataSet); + + } + + } + + private static class ReshuffleTranslatorBatch<K, InputT> + implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Reshuffle<K, InputT>> { + + @Override + public void translateNode( + Reshuffle<K, InputT> transform, + FlinkBatchTranslationContext context) { + + DataSet<WindowedValue<KV<K, InputT>>> inputDataSet = + context.getInputDataSet(context.getInput(transform)); + + context.setOutputDataSet(context.getOutput(transform), inputDataSet.rebalance()); + + } + + } + + /** + * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs. + * + * <p>For internal use to translate {@link GroupByKey}. For a large {@link PCollection} this + * is expected to crash! + * + * <p>This is copied from the dataflow runner code. + * + * @param <T> the type of elements to concatenate. + */ + private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> { + @Override + public List<T> createAccumulator() { + return new ArrayList<>(); + } + + @Override + public List<T> addInput(List<T> accumulator, T input) { + accumulator.add(input); + return accumulator; + } + + @Override + public List<T> mergeAccumulators(Iterable<List<T>> accumulators) { + List<T> result = createAccumulator(); + for (List<T> accumulator : accumulators) { + result.addAll(accumulator); + } + return result; + } + + @Override + public List<T> extractOutput(List<T> accumulator) { + return accumulator; + } + + @Override + public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) { + return ListCoder.of(inputCoder); + } + + @Override + public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) { + return ListCoder.of(inputCoder); + } + } + + + private static class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT> + implements FlinkBatchPipelineTranslator.BatchTransformTranslator< + Combine.PerKey<K, InputT, OutputT>> { + + @Override + @SuppressWarnings("unchecked") + public void translateNode( + Combine.PerKey<K, InputT, OutputT> transform, + FlinkBatchTranslationContext context) { + DataSet<WindowedValue<KV<K, InputT>>> inputDataSet = + context.getInputDataSet(context.getInput(transform)); + + CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn = + (CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT>) transform.getFn(); + + KvCoder<K, InputT> inputCoder = + (KvCoder<K, InputT>) context.getInput(transform).getCoder(); + + Coder<AccumT> accumulatorCoder; + + try { + accumulatorCoder = + combineFn.getAccumulatorCoder( + context.getInput(transform).getPipeline().getCoderRegistry(), + inputCoder.getKeyCoder(), + inputCoder.getValueCoder()); + } catch (CannotProvideCoderException e) { + throw new RuntimeException(e); + } + + WindowingStrategy<?, ?> windowingStrategy = + context.getInput(transform).getWindowingStrategy(); + + TypeInformation<WindowedValue<KV<K, AccumT>>> partialReduceTypeInfo = + context.getTypeInfo( + KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder), + windowingStrategy); + + Grouping<WindowedValue<KV<K, InputT>>> inputGrouping = + inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder())); + + // construct a map from side input to WindowingStrategy so that + // the DoFn runner can map main-input windows to side input windows + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>(); + for (PCollectionView<?> sideInput: transform.getSideInputs()) { + sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal()); + } + + if (windowingStrategy.getWindowFn().isNonMerging()) { + WindowingStrategy<?, BoundedWindow> boundedStrategy = + (WindowingStrategy<?, BoundedWindow>) windowingStrategy; + + FlinkPartialReduceFunction<K, InputT, AccumT, ?> partialReduceFunction = + new FlinkPartialReduceFunction<>( + combineFn, + boundedStrategy, + sideInputStrategies, + context.getPipelineOptions()); + + FlinkReduceFunction<K, AccumT, OutputT, ?> reduceFunction = + new FlinkReduceFunction<>( + combineFn, + boundedStrategy, + sideInputStrategies, + context.getPipelineOptions()); + + // Partially GroupReduce the values into the intermediate format AccumT (combine) + GroupCombineOperator< + WindowedValue<KV<K, InputT>>, + WindowedValue<KV<K, AccumT>>> groupCombine = + new GroupCombineOperator<>( + inputGrouping, + partialReduceTypeInfo, + partialReduceFunction, + "GroupCombine: " + transform.getName()); + + transformSideInputs(transform.getSideInputs(), groupCombine, context); + + TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo = + context.getTypeInfo(context.getOutput(transform)); + + Grouping<WindowedValue<KV<K, AccumT>>> intermediateGrouping = + groupCombine.groupBy(new KvKeySelector<AccumT, K>(inputCoder.getKeyCoder())); + + // Fully reduce the values and create output format OutputT + GroupReduceOperator< + WindowedValue<KV<K, AccumT>>, WindowedValue<KV<K, OutputT>>> outputDataSet = + new GroupReduceOperator<>( + intermediateGrouping, reduceTypeInfo, reduceFunction, transform.getName()); + + transformSideInputs(transform.getSideInputs(), outputDataSet, context); + + context.setOutputDataSet(context.getOutput(transform), outputDataSet); + + } else { + if (!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) { + throw new UnsupportedOperationException( + "Merging WindowFn with windows other than IntervalWindow are not supported."); + } + + // for merging windows we can't to a pre-shuffle combine step since + // elements would not be in their correct windows for side-input access + + WindowingStrategy<?, IntervalWindow> intervalStrategy = + (WindowingStrategy<?, IntervalWindow>) windowingStrategy; + + FlinkMergingNonShuffleReduceFunction<K, InputT, AccumT, OutputT, ?> reduceFunction = + new FlinkMergingNonShuffleReduceFunction<>( + combineFn, + intervalStrategy, + sideInputStrategies, + context.getPipelineOptions()); + + TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo = + context.getTypeInfo(context.getOutput(transform)); + + Grouping<WindowedValue<KV<K, InputT>>> grouping = + inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder())); + + // Fully reduce the values and create output format OutputT + GroupReduceOperator< + WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> outputDataSet = + new GroupReduceOperator<>( + grouping, reduceTypeInfo, reduceFunction, transform.getName()); + + transformSideInputs(transform.getSideInputs(), outputDataSet, context); + + context.setOutputDataSet(context.getOutput(transform), outputDataSet); + } + + + } + } + + private static void rejectSplittable(DoFn<?, ?> doFn) { + DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); + if (signature.processElement().isSplittable()) { + throw new UnsupportedOperationException( + String.format( + "%s does not currently support splittable DoFn: %s", + FlinkRunner.class.getSimpleName(), doFn)); + } + } + + private static class ParDoTranslatorBatch<InputT, OutputT> + implements FlinkBatchPipelineTranslator.BatchTransformTranslator< + ParDo.MultiOutput<InputT, OutputT>> { + + @Override + @SuppressWarnings("unchecked") + public void translateNode( + ParDo.MultiOutput<InputT, OutputT> transform, + FlinkBatchTranslationContext context) { + DoFn<InputT, OutputT> doFn = transform.getFn(); + rejectSplittable(doFn); + DataSet<WindowedValue<InputT>> inputDataSet = + context.getInputDataSet(context.getInput(transform)); + + Map<TupleTag<?>, PValue> outputs = context.getOutputs(transform); + + Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap(); + // put the main output at index 0, FlinkMultiOutputDoFnFunction expects this + outputMap.put(transform.getMainOutputTag(), 0); + int count = 1; + for (TupleTag<?> tag : outputs.keySet()) { + if (!outputMap.containsKey(tag)) { + outputMap.put(tag, count++); + } + } + + // assume that the windowing strategy is the same for all outputs + WindowingStrategy<?, ?> windowingStrategy = null; + + // collect all output Coders and create a UnionCoder for our tagged outputs + List<Coder<?>> outputCoders = Lists.newArrayList(); + for (PValue taggedValue : outputs.values()) { + checkState( + taggedValue instanceof PCollection, + "Within ParDo, got a non-PCollection output %s of type %s", + taggedValue, + taggedValue.getClass().getSimpleName()); + PCollection<?> coll = (PCollection<?>) taggedValue; + outputCoders.add(coll.getCoder()); + windowingStrategy = coll.getWindowingStrategy(); + } + + if (windowingStrategy == null) { + throw new IllegalStateException("No outputs defined."); + } + + UnionCoder unionCoder = UnionCoder.of(outputCoders); + + TypeInformation<WindowedValue<RawUnionValue>> typeInformation = + new CoderTypeInformation<>( + WindowedValue.getFullCoder( + unionCoder, + windowingStrategy.getWindowFn().windowCoder())); + + List<PCollectionView<?>> sideInputs = transform.getSideInputs(); + + // construct a map from side input to WindowingStrategy so that + // the DoFn runner can map main-input windows to side input windows + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>(); + for (PCollectionView<?> sideInput: sideInputs) { + sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal()); + } + + SingleInputUdfOperator<WindowedValue<InputT>, WindowedValue<RawUnionValue>, ?> outputDataSet; + DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); + if (signature.stateDeclarations().size() > 0 + || signature.timerDeclarations().size() > 0) { + + // Based on the fact that the signature is stateful, DoFnSignatures ensures + // that it is also keyed + KvCoder<?, InputT> inputCoder = + (KvCoder<?, InputT>) context.getInput(transform).getCoder(); + + FlinkStatefulDoFnFunction<?, ?, OutputT> doFnWrapper = new FlinkStatefulDoFnFunction<>( + (DoFn) doFn, windowingStrategy, sideInputStrategies, context.getPipelineOptions(), + outputMap, transform.getMainOutputTag() + ); + + Grouping<WindowedValue<InputT>> grouping = + inputDataSet.groupBy(new KvKeySelector(inputCoder.getKeyCoder())); + + outputDataSet = + new GroupReduceOperator(grouping, typeInformation, doFnWrapper, transform.getName()); + + } else { + FlinkDoFnFunction<InputT, RawUnionValue> doFnWrapper = + new FlinkDoFnFunction( + doFn, + windowingStrategy, + sideInputStrategies, + context.getPipelineOptions(), + outputMap, + transform.getMainOutputTag()); + + outputDataSet = new MapPartitionOperator<>( + inputDataSet, typeInformation, + doFnWrapper, transform.getName()); + + } + + transformSideInputs(sideInputs, outputDataSet, context); + + for (Entry<TupleTag<?>, PValue> output : outputs.entrySet()) { + pruneOutput( + outputDataSet, + context, + outputMap.get(output.getKey()), + (PCollection) output.getValue()); + } + + } + + private <T> void pruneOutput( + DataSet<WindowedValue<RawUnionValue>> taggedDataSet, + FlinkBatchTranslationContext context, + int integerTag, + PCollection<T> collection) { + TypeInformation<WindowedValue<T>> outputType = context.getTypeInfo(collection); + + FlinkMultiOutputPruningFunction<T> pruningFunction = + new FlinkMultiOutputPruningFunction<>(integerTag); + + FlatMapOperator<WindowedValue<RawUnionValue>, WindowedValue<T>> pruningOperator = + new FlatMapOperator<>( + taggedDataSet, + outputType, + pruningFunction, + collection.getName()); + + context.setOutputDataSet(collection, pruningOperator); + } + } + + private static class FlattenPCollectionTranslatorBatch<T> + implements FlinkBatchPipelineTranslator.BatchTransformTranslator< + Flatten.PCollections<T>> { + + @Override + @SuppressWarnings("unchecked") + public void translateNode( + Flatten.PCollections<T> transform, + FlinkBatchTranslationContext context) { + + Map<TupleTag<?>, PValue> allInputs = context.getInputs(transform); + DataSet<WindowedValue<T>> result = null; + + if (allInputs.isEmpty()) { + + // create an empty dummy source to satisfy downstream operations + // we cannot create an empty source in Flink, therefore we have to + // add the flatMap that simply never forwards the single element + DataSource<String> dummySource = + context.getExecutionEnvironment().fromElements("dummy"); + result = dummySource.flatMap(new FlatMapFunction<String, WindowedValue<T>>() { + @Override + public void flatMap(String s, Collector<WindowedValue<T>> collector) throws Exception { + // never return anything + } + }).returns( + new CoderTypeInformation<>( + WindowedValue.getFullCoder( + (Coder<T>) VoidCoder.of(), + GlobalWindow.Coder.INSTANCE))); + } else { + for (PValue taggedPc : allInputs.values()) { + checkArgument( + taggedPc instanceof PCollection, + "Got non-PCollection input to flatten: %s of type %s", + taggedPc, + taggedPc.getClass().getSimpleName()); + PCollection<T> collection = (PCollection<T>) taggedPc; + DataSet<WindowedValue<T>> current = context.getInputDataSet(collection); + if (result == null) { + result = current; + } else { + result = result.union(current); + } + } + } + + // insert a dummy filter, there seems to be a bug in Flink + // that produces duplicate elements after the union in some cases + // if we don't + result = result.filter(new FilterFunction<WindowedValue<T>>() { + @Override + public boolean filter(WindowedValue<T> tWindowedValue) throws Exception { + return true; + } + }).name("UnionFixFilter"); + context.setOutputDataSet(context.getOutput(transform), result); + } + } + + private static class CreatePCollectionViewTranslatorBatch<ElemT, ViewT> + implements FlinkBatchPipelineTranslator.BatchTransformTranslator< + View.CreatePCollectionView<ElemT, ViewT>> { + + @Override + public void translateNode( + View.CreatePCollectionView<ElemT, ViewT> transform, + FlinkBatchTranslationContext context) { + DataSet<WindowedValue<ElemT>> inputDataSet = + context.getInputDataSet(context.getInput(transform)); + + PCollectionView<ViewT> input = transform.getView(); + + context.setSideInputDataSet(input, inputDataSet); + } + } + + private static void transformSideInputs( + List<PCollectionView<?>> sideInputs, + SingleInputUdfOperator<?, ?, ?> outputDataSet, + FlinkBatchTranslationContext context) { + // get corresponding Flink broadcast DataSets + for (PCollectionView<?> input : sideInputs) { + DataSet<?> broadcastSet = context.getSideInputDataSet(input); + outputDataSet.withBroadcastSet(broadcastSet, input.getTagInternal().getId()); + } + } + + private FlinkBatchTransformTranslators() {} + +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java new file mode 100644 index 0000000..98dd0fb --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.common.collect.Iterables; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; + +/** + * Helper for {@link FlinkBatchPipelineTranslator} and translators in + * {@link FlinkBatchTransformTranslators}. + */ +class FlinkBatchTranslationContext { + + private final Map<PValue, DataSet<?>> dataSets; + private final Map<PCollectionView<?>, DataSet<?>> broadcastDataSets; + + /** + * For keeping track about which DataSets don't have a successor. We + * need to terminate these with a discarding sink because the Beam + * model allows dangling operations. + */ + private final Map<PValue, DataSet<?>> danglingDataSets; + + private final ExecutionEnvironment env; + private final PipelineOptions options; + + private AppliedPTransform<?, ?, ?> currentTransform; + + // ------------------------------------------------------------------------ + + public FlinkBatchTranslationContext(ExecutionEnvironment env, PipelineOptions options) { + this.env = env; + this.options = options; + this.dataSets = new HashMap<>(); + this.broadcastDataSets = new HashMap<>(); + + this.danglingDataSets = new HashMap<>(); + } + + // ------------------------------------------------------------------------ + + public Map<PValue, DataSet<?>> getDanglingDataSets() { + return danglingDataSets; + } + + public ExecutionEnvironment getExecutionEnvironment() { + return env; + } + + public PipelineOptions getPipelineOptions() { + return options; + } + + @SuppressWarnings("unchecked") + public <T> DataSet<WindowedValue<T>> getInputDataSet(PValue value) { + // assume that the DataSet is used as an input if retrieved here + danglingDataSets.remove(value); + return (DataSet<WindowedValue<T>>) dataSets.get(value); + } + + public <T> void setOutputDataSet(PValue value, DataSet<WindowedValue<T>> set) { + if (!dataSets.containsKey(value)) { + dataSets.put(value, set); + danglingDataSets.put(value, set); + } + } + + /** + * Sets the AppliedPTransform which carries input/output. + * @param currentTransform + */ + public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) { + this.currentTransform = currentTransform; + } + + @SuppressWarnings("unchecked") + public <T> DataSet<T> getSideInputDataSet(PCollectionView<?> value) { + return (DataSet<T>) broadcastDataSets.get(value); + } + + public <ViewT, ElemT> void setSideInputDataSet( + PCollectionView<ViewT> value, + DataSet<WindowedValue<ElemT>> set) { + if (!broadcastDataSets.containsKey(value)) { + broadcastDataSets.put(value, set); + } + } + + @SuppressWarnings("unchecked") + public <T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> collection) { + return getTypeInfo(collection.getCoder(), collection.getWindowingStrategy()); + } + + @SuppressWarnings("unchecked") + public <T> TypeInformation<WindowedValue<T>> getTypeInfo( + Coder<T> coder, + WindowingStrategy<?, ?> windowingStrategy) { + WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = + WindowedValue.getFullCoder( + coder, + windowingStrategy.getWindowFn().windowCoder()); + + return new CoderTypeInformation<>(windowedValueCoder); + } + + Map<TupleTag<?>, PValue> getInputs(PTransform<?, ?> transform) { + return currentTransform.getInputs(); + } + + @SuppressWarnings("unchecked") + <T extends PValue> T getInput(PTransform<T, ?> transform) { + return (T) Iterables.getOnlyElement(currentTransform.getInputs().values()); + } + + Map<TupleTag<?>, PValue> getOutputs(PTransform<?, ?> transform) { + return currentTransform.getOutputs(); + } + + @SuppressWarnings("unchecked") + <T extends PValue> T getOutput(PTransform<?, T> transform) { + return (T) Iterables.getOnlyElement(currentTransform.getOutputs().values()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java new file mode 100644 index 0000000..bf4395f --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import java.io.IOException; + +import org.apache.beam.sdk.AggregatorRetrievalException; +import org.apache.beam.sdk.AggregatorValues; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.transforms.Aggregator; +import org.joda.time.Duration; + + +/** + * Result of a detached execution of a {@link org.apache.beam.sdk.Pipeline} with Flink. + * In detached execution, results and job execution are currently unavailable. + */ +public class FlinkDetachedRunnerResult implements PipelineResult { + + FlinkDetachedRunnerResult() {} + + @Override + public State getState() { + return State.UNKNOWN; + } + + @Override + public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator) + throws AggregatorRetrievalException { + throw new AggregatorRetrievalException( + "Accumulators can't be retrieved for detached Job executions.", + new UnsupportedOperationException()); + } + + @Override + public MetricResults metrics() { + throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics."); + } + + @Override + public State cancel() throws IOException { + throw new UnsupportedOperationException("Cancelling is not yet supported."); + } + + @Override + public State waitUntilFinish() { + return State.UNKNOWN; + } + + @Override + public State waitUntilFinish(Duration duration) { + return State.UNKNOWN; + } + + @Override + public String toString() { + return "FlinkDetachedRunnerResult{}"; + } +}