Yes, it was the "watermarks for event time when no events for that shard"
problem.

I am now investigating whether we can use a blended watermark of
max(lastEventTimestamp - 1min, System.currentTimeMillis() - 5min) to ensure
idle shards do not cause excessive data retention.

Is that the best solution?

On Thu, 21 Feb 2019 at 08:30, Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hi Stephen,
>
> Watermark for a single operator is the minimum of Watermarks received from
> all inputs, therefore if one of your shards/operators does not have
> incoming data it will not produce Watermarks thus the Watermark of
> WindowOperator will not progress. So this is sort of an expected behavior.
>
> I recommend reading the docs linked by Congxian, especially this
> section[1].
>
> Best,
>
> Dawid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#watermarks-in-parallel-streams
> On 19/02/2019 14:31, Stephen Connolly wrote:
>
> Hmmm my suspicions are now quite high. I created a file source that just
> replays the events straight then I get more results....
>
> On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
>> Hmmm after expanding the dataset such that there was additional data that
>> ended up on shard-0 (everything in my original dataset was coincidentally
>> landing on shard-1) I am now getting output... should I expect this kind of
>> behaviour if no data arrives at shard-0 ever?
>>
>> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <
>> stephen.alan.conno...@gmail.com> wrote:
>>
>>> Hi, I’m having a strange situation and I would like to know where I
>>> should start trying to debug.
>>>
>>> I have set up a configurable swap in source, with three implementations:
>>>
>>> 1. A mock implementation
>>> 2. A Kafka consumer implementation
>>> 3. A Kinesis consumer implementation
>>>
>>> From injecting a log and no-op map function I can see that all three
>>> sources pass through the events correctly.
>>>
>>> I then have a window based on event time stamps… and from inspecting the
>>> aggregation function I can see that the data is getting aggregated…, I’m
>>> using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I
>>> can retrieve the key
>>>
>>> Here’s the strange thing, I only change the source (and each source uses
>>> the same deserialization function) but:
>>>
>>>
>>>    - When I use either Kafka or my Mock source, the WindowFunction gets
>>>    called as events pass the end of the window
>>>    - When I use the Kinesis source, however, the window function never
>>>    gets called. I have even tried injecting events into kinesis with really
>>>    high timestamps to flush the watermarks in my
>>>    BoundedOutOfOrdernessTimestampExtractor... but nothing
>>>
>>> I cannot see how this source switching could result in such a different
>>> behaviour:
>>>
>>>         Properties sourceProperties = new Properties();
>>>         ConsumerFactory sourceFactory;
>>>         String sourceName = configParams.getRequired("source");
>>>         switch (sourceName.toLowerCase(Locale.ENGLISH)) {
>>>             case "kinesis":
>>>                 sourceFactory = FlinkKinesisConsumer::new;
>>>                 copyOptionalArg(configParams, "aws-region",
>>> sourceProperties, AWSConfigConstants.AWS_REGION);
>>>                 copyOptionalArg(configParams, "aws-endpoint",
>>> sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
>>>                 copyOptionalArg(configParams, "aws-access-key",
>>> sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
>>>                 copyOptionalArg(configParams, "aws-secret-key",
>>> sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
>>>                 copyOptionalArg(configParams, "aws-profile",
>>> sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
>>>                 break;
>>>             case "kafka":
>>>                 sourceFactory = FlinkKafkaConsumer010::new;
>>>                 copyRequiredArg(configParams, "bootstrap-server",
>>> sourceProperties, "bootstrap.servers");
>>>                 copyOptionalArg(configParams, "group-id",
>>> sourceProperties, "group.id");
>>>                 break;
>>>             case "mock":
>>>                 sourceFactory = MockSourceFunction::new;
>>>                 break;
>>>             default:
>>>                 throw new RuntimeException("Unknown source '" +
>>> sourceName + '\'');
>>>         }
>>>
>>>         // set up the streaming execution environment
>>>         final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>>         // poll watermark every second because using
>>> BoundedOutOfOrdernessTimestampExtractor
>>>         env.getConfig().setAutoWatermarkInterval(1000L);
>>>         env.enableCheckpointing(5000);
>>>
>>>         SplitStream<JsonNode> eventsByType =
>>> env.addSource(sourceFactory.create(
>>>                 configParams.getRequired("topic"),
>>>                 new ObjectNodeDeserializationSchema(),
>>>                 sourceProperties
>>>         ))
>>>                 .returns(ObjectNode.class) // the use of ConsumerFactory
>>> erases the type info so add it back
>>>                 .name("raw-events")
>>>                 .assignTimestampsAndWatermarks(
>>>                         new
>>> ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp",
>>> Time.seconds(5))
>>>                 )
>>>                 .split(new JsonNodeOutputSelector("eventType"));
>>> ...
>>>         eventsByType.select(...)
>>>                 .keyBy(new JsonNodeStringKeySelector("_key"))
>>>
>>> .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
>>>                         (KeySelector<JsonNode, Time>)
>>> TasksMain::offsetPerMaster))
>>>                 .trigger(EventTimeTrigger.create())
>>>                 .aggregate(new CountsAggregator<>(), new KeyTagger<>())
>>> // <==== The CountsAggregator is seeing the data
>>>                 .print() // <==== HERE is where we get no output from
>>> Kinesis... but Kafka and my Mock are just fine!
>>>
>>>
>>>

Reply via email to