Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5230#discussion_r165397632
  
    --- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
    @@ -753,6 +763,182 @@ public void onTimer(
                assertTrue(getOperatorForDataStream(processed) instanceof 
ProcessOperator);
        }
     
    +   @Test
    +   public void testConnectWithBroadcastTranslation() throws Exception {
    +
    +           final Map<Long, String> expected = new HashMap<>();
    +           expected.put(0L, "test:0");
    +           expected.put(1L, "test:1");
    +           expected.put(2L, "test:2");
    +           expected.put(3L, "test:3");
    +           expected.put(4L, "test:4");
    +           expected.put(5L, "test:5");
    +
    +           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +           env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +
    +           final DataStream<Long> srcOne = env.generateSequence(0L, 5L)
    +                           .assignTimestampsAndWatermarks(new 
CustomWmEmitter<Long>() {
    +
    +                                   @Override
    +                                   public long extractTimestamp(Long 
element, long previousElementTimestamp) {
    +                                           return element;
    +                                   }
    +                           }).keyBy((KeySelector<Long, Long>) value -> 
value);
    +
    +           final DataStream<String> srcTwo = 
env.fromCollection(expected.values())
    +                           .assignTimestampsAndWatermarks(new 
CustomWmEmitter<String>() {
    +                                   @Override
    +                                   public long extractTimestamp(String 
element, long previousElementTimestamp) {
    +                                           return 
Long.parseLong(element.split(":")[1]);
    +                                   }
    +                           });
    +
    +           final BroadcastStream<String> broadcast = 
srcTwo.broadcast(TestBroadcastProcessFunction.DESCRIPTOR);
    +
    +           // the timestamp should be high enough to trigger the timer 
after all the elements arrive.
    +           final DataStream<String> output = 
srcOne.connect(broadcast).process(
    +                           new TestBroadcastProcessFunction(100000L, 
expected));
    +
    +           output.addSink(new DiscardingSink<>());
    +           env.execute();
    --- End diff --
    
    So far, all tests in this are purely translation tests. I mentioned this in 
another comment, that it would be good to have an ITCase that actually verifies 
that using keyed state works and that the other features work as well in a 
complete program. Have a look at `SideOutputITCase`, for example. 👍 


---

Reply via email to