Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165397632 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -753,6 +763,182 @@ public void onTimer( assertTrue(getOperatorForDataStream(processed) instanceof ProcessOperator); } + @Test + public void testConnectWithBroadcastTranslation() throws Exception { + + final Map<Long, String> expected = new HashMap<>(); + expected.put(0L, "test:0"); + expected.put(1L, "test:1"); + expected.put(2L, "test:2"); + expected.put(3L, "test:3"); + expected.put(4L, "test:4"); + expected.put(5L, "test:5"); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + final DataStream<Long> srcOne = env.generateSequence(0L, 5L) + .assignTimestampsAndWatermarks(new CustomWmEmitter<Long>() { + + @Override + public long extractTimestamp(Long element, long previousElementTimestamp) { + return element; + } + }).keyBy((KeySelector<Long, Long>) value -> value); + + final DataStream<String> srcTwo = env.fromCollection(expected.values()) + .assignTimestampsAndWatermarks(new CustomWmEmitter<String>() { + @Override + public long extractTimestamp(String element, long previousElementTimestamp) { + return Long.parseLong(element.split(":")[1]); + } + }); + + final BroadcastStream<String> broadcast = srcTwo.broadcast(TestBroadcastProcessFunction.DESCRIPTOR); + + // the timestamp should be high enough to trigger the timer after all the elements arrive. + final DataStream<String> output = srcOne.connect(broadcast).process( + new TestBroadcastProcessFunction(100000L, expected)); + + output.addSink(new DiscardingSink<>()); + env.execute(); --- End diff -- So far, all tests in this are purely translation tests. I mentioned this in another comment, that it would be good to have an ITCase that actually verifies that using keyed state works and that the other features work as well in a complete program. Have a look at `SideOutputITCase`, for example. ð
---