Repository: beam
Updated Branches:
  refs/heads/master b44056fc6 -> ec5e72403


[BEAM-3027] Correctly set output type on SourceId-stripper


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ca20e69c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ca20e69c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ca20e69c

Branch: refs/heads/master
Commit: ca20e69ce1a817f63600e38a9d4450c2ac3bf949
Parents: b44056f
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Fri Oct 6 14:55:10 2017 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Mon Oct 9 14:57:21 2017 +0200

----------------------------------------------------------------------
 .../beam/runners/flink/FlinkStreamingTransformTranslators.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ca20e69c/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 7cedb56..4d2166c 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -188,9 +188,9 @@ class FlinkStreamingTransformTranslators {
         if (transform.getSource().requiresDeduping()) {
           source = nonDedupSource.keyBy(
               new ValueWithRecordIdKeySelector<T>())
-              .transform("debuping", outputTypeInfo, new 
DedupingOperator<T>());
+              .transform("deduping", outputTypeInfo, new 
DedupingOperator<T>());
         } else {
-          source = nonDedupSource.flatMap(new StripIdsMap<T>());
+          source = nonDedupSource.flatMap(new 
StripIdsMap<T>()).returns(outputTypeInfo);
         }
       } catch (Exception e) {
         throw new RuntimeException(

Reply via email to