Yes, it was the "watermarks for event time when no events for that shard" problem.
I am now investigating whether we can use a blended watermark of max(lastEventTimestamp - 1min, System.currentTimeMillis() - 5min) to ensure idle shards do not cause excessive data retention. Is that the best solution? On Thu, 21 Feb 2019 at 08:30, Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hi Stephen, > > Watermark for a single operator is the minimum of Watermarks received from > all inputs, therefore if one of your shards/operators does not have > incoming data it will not produce Watermarks thus the Watermark of > WindowOperator will not progress. So this is sort of an expected behavior. > > I recommend reading the docs linked by Congxian, especially this > section[1]. > > Best, > > Dawid > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#watermarks-in-parallel-streams > On 19/02/2019 14:31, Stephen Connolly wrote: > > Hmmm my suspicions are now quite high. I created a file source that just > replays the events straight then I get more results.... > > On Tue, 19 Feb 2019 at 11:50, Stephen Connolly < > stephen.alan.conno...@gmail.com> wrote: > >> Hmmm after expanding the dataset such that there was additional data that >> ended up on shard-0 (everything in my original dataset was coincidentally >> landing on shard-1) I am now getting output... should I expect this kind of >> behaviour if no data arrives at shard-0 ever? >> >> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly < >> stephen.alan.conno...@gmail.com> wrote: >> >>> Hi, I’m having a strange situation and I would like to know where I >>> should start trying to debug. >>> >>> I have set up a configurable swap in source, with three implementations: >>> >>> 1. A mock implementation >>> 2. A Kafka consumer implementation >>> 3. A Kinesis consumer implementation >>> >>> From injecting a log and no-op map function I can see that all three >>> sources pass through the events correctly. >>> >>> I then have a window based on event time stamps… and from inspecting the >>> aggregation function I can see that the data is getting aggregated…, I’m >>> using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I >>> can retrieve the key >>> >>> Here’s the strange thing, I only change the source (and each source uses >>> the same deserialization function) but: >>> >>> >>> - When I use either Kafka or my Mock source, the WindowFunction gets >>> called as events pass the end of the window >>> - When I use the Kinesis source, however, the window function never >>> gets called. I have even tried injecting events into kinesis with really >>> high timestamps to flush the watermarks in my >>> BoundedOutOfOrdernessTimestampExtractor... but nothing >>> >>> I cannot see how this source switching could result in such a different >>> behaviour: >>> >>> Properties sourceProperties = new Properties(); >>> ConsumerFactory sourceFactory; >>> String sourceName = configParams.getRequired("source"); >>> switch (sourceName.toLowerCase(Locale.ENGLISH)) { >>> case "kinesis": >>> sourceFactory = FlinkKinesisConsumer::new; >>> copyOptionalArg(configParams, "aws-region", >>> sourceProperties, AWSConfigConstants.AWS_REGION); >>> copyOptionalArg(configParams, "aws-endpoint", >>> sourceProperties, AWSConfigConstants.AWS_ENDPOINT); >>> copyOptionalArg(configParams, "aws-access-key", >>> sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID); >>> copyOptionalArg(configParams, "aws-secret-key", >>> sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY); >>> copyOptionalArg(configParams, "aws-profile", >>> sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME); >>> break; >>> case "kafka": >>> sourceFactory = FlinkKafkaConsumer010::new; >>> copyRequiredArg(configParams, "bootstrap-server", >>> sourceProperties, "bootstrap.servers"); >>> copyOptionalArg(configParams, "group-id", >>> sourceProperties, "group.id"); >>> break; >>> case "mock": >>> sourceFactory = MockSourceFunction::new; >>> break; >>> default: >>> throw new RuntimeException("Unknown source '" + >>> sourceName + '\''); >>> } >>> >>> // set up the streaming execution environment >>> final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> >>> // poll watermark every second because using >>> BoundedOutOfOrdernessTimestampExtractor >>> env.getConfig().setAutoWatermarkInterval(1000L); >>> env.enableCheckpointing(5000); >>> >>> SplitStream<JsonNode> eventsByType = >>> env.addSource(sourceFactory.create( >>> configParams.getRequired("topic"), >>> new ObjectNodeDeserializationSchema(), >>> sourceProperties >>> )) >>> .returns(ObjectNode.class) // the use of ConsumerFactory >>> erases the type info so add it back >>> .name("raw-events") >>> .assignTimestampsAndWatermarks( >>> new >>> ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp", >>> Time.seconds(5)) >>> ) >>> .split(new JsonNodeOutputSelector("eventType")); >>> ... >>> eventsByType.select(...) >>> .keyBy(new JsonNodeStringKeySelector("_key")) >>> >>> .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration), >>> (KeySelector<JsonNode, Time>) >>> TasksMain::offsetPerMaster)) >>> .trigger(EventTimeTrigger.create()) >>> .aggregate(new CountsAggregator<>(), new KeyTagger<>()) >>> // <==== The CountsAggregator is seeing the data >>> .print() // <==== HERE is where we get no output from >>> Kinesis... but Kafka and my Mock are just fine! >>> >>> >>>