Repository: beam Updated Branches: refs/heads/master b44056fc6 -> ec5e72403
[BEAM-3027] Correctly set output type on SourceId-stripper Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ca20e69c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ca20e69c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ca20e69c Branch: refs/heads/master Commit: ca20e69ce1a817f63600e38a9d4450c2ac3bf949 Parents: b44056f Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Fri Oct 6 14:55:10 2017 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Mon Oct 9 14:57:21 2017 +0200 ---------------------------------------------------------------------- .../beam/runners/flink/FlinkStreamingTransformTranslators.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ca20e69c/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 7cedb56..4d2166c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -188,9 +188,9 @@ class FlinkStreamingTransformTranslators { if (transform.getSource().requiresDeduping()) { source = nonDedupSource.keyBy( new ValueWithRecordIdKeySelector<T>()) - .transform("debuping", outputTypeInfo, new DedupingOperator<T>()); + .transform("deduping", outputTypeInfo, new DedupingOperator<T>()); } else { - source = nonDedupSource.flatMap(new StripIdsMap<T>()); + source = nonDedupSource.flatMap(new StripIdsMap<T>()).returns(outputTypeInfo); } } catch (Exception e) { throw new RuntimeException(