Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165398176 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -753,6 +763,182 @@ public void onTimer( assertTrue(getOperatorForDataStream(processed) instanceof ProcessOperator); } + @Test + public void testConnectWithBroadcastTranslation() throws Exception { + + final Map<Long, String> expected = new HashMap<>(); + expected.put(0L, "test:0"); + expected.put(1L, "test:1"); + expected.put(2L, "test:2"); + expected.put(3L, "test:3"); + expected.put(4L, "test:4"); + expected.put(5L, "test:5"); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + final DataStream<Long> srcOne = env.generateSequence(0L, 5L) + .assignTimestampsAndWatermarks(new CustomWmEmitter<Long>() { + + @Override + public long extractTimestamp(Long element, long previousElementTimestamp) { + return element; + } + }).keyBy((KeySelector<Long, Long>) value -> value); + + final DataStream<String> srcTwo = env.fromCollection(expected.values()) + .assignTimestampsAndWatermarks(new CustomWmEmitter<String>() { + @Override + public long extractTimestamp(String element, long previousElementTimestamp) { + return Long.parseLong(element.split(":")[1]); + } + }); + + final BroadcastStream<String> broadcast = srcTwo.broadcast(TestBroadcastProcessFunction.DESCRIPTOR); + + // the timestamp should be high enough to trigger the timer after all the elements arrive. + final DataStream<String> output = srcOne.connect(broadcast).process( + new TestBroadcastProcessFunction(100000L, expected)); + + output.addSink(new DiscardingSink<>()); + env.execute(); + } + + private abstract static class CustomWmEmitter<T> implements AssignerWithPunctuatedWatermarks<T> { + + @Nullable + @Override + public Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp) { + return new Watermark(extractedTimestamp); + } + } + + private static class TestBroadcastProcessFunction extends KeyedBroadcastProcessFunction<Long, Long, String, String> { + + private final Map<Long, String> expectedState; + + private final long timerTimestamp; + + static final MapStateDescriptor<Long, String> DESCRIPTOR = new MapStateDescriptor<>( + "broadcast-state", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO + ); + + TestBroadcastProcessFunction( + final long timerTS, + final Map<Long, String> expectedBroadcastState + ) { + expectedState = expectedBroadcastState; + timerTimestamp = timerTS; + } + + @Override + public void processElement(Long value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception { + ctx.timerService().registerEventTimeTimer(timerTimestamp); + } + + @Override + public void processBroadcastElement(String value, KeyedReadWriteContext ctx, Collector<String> out) throws Exception { + long key = Long.parseLong(value.split(":")[1]); + ctx.getBroadcastState(DESCRIPTOR).put(key, value); + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { + Map<Long, String> map = new HashMap<>(); + for (Map.Entry<Long, String> entry : ctx.getBroadcastState(DESCRIPTOR).immutableEntries()) { + map.put(entry.getKey(), entry.getValue()); + } + Assert.assertEquals(expectedState, map); + } + } + + /** + * Tests that with a {@link KeyedStream} we have to provide a {@link KeyedBroadcastProcessFunction}. + */ + @Test(expected = IllegalArgumentException.class) --- End diff -- The verification of the exception should happen right at the call-site where we expect the exception. As it is now, the exception can be thrown anywhere in the function body and the test would still pass. Other tests use ``` @Rule public ExpectedException expectedException = ExpectedException.none(); ``` for that.
---