On Thu, 21 Feb 2019 at 13:36, Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> It is definitely a solution ;)
>
> You should be aware of the downsides though:
>
>    - you might get different results in case of reprocessing
>    - you might drop some data as late, due to some delays in processing,
>    if the events arrive later then the "ProcessingTime" threshold
>
> So I have a separate stream processor from the "late" side of my window
that works out what the update is.

But I guess the question I have is around what happens with reprocessing.

1. Event 1 goes into the window aggregation because it is before the
watermark

2. State gets checkpointed

3. Crash

4. Recover

Will Event 1 now go to the late stream or will it be tagged as having been
included into the state in the checkpoint.

I don't mind if Event 1 gets included in the window's "create event count
for timebox" output or the "update event count for timebox from late
events" output as long as it is always one and only one of those paths.


>
>
> Best,
>
> Dawid
> On 21/02/2019 14:18, Stephen Connolly wrote:
>
> Yes, it was the "watermarks for event time when no events for that shard"
> problem.
>
> I am now investigating whether we can use a blended watermark of
> max(lastEventTimestamp - 1min, System.currentTimeMillis() - 5min) to ensure
> idle shards do not cause excessive data retention.
>
> Is that the best solution?
>
> On Thu, 21 Feb 2019 at 08:30, Dawid Wysakowicz <dwysakow...@apache.org>
> wrote:
>
>> Hi Stephen,
>>
>> Watermark for a single operator is the minimum of Watermarks received
>> from all inputs, therefore if one of your shards/operators does not have
>> incoming data it will not produce Watermarks thus the Watermark of
>> WindowOperator will not progress. So this is sort of an expected behavior.
>>
>> I recommend reading the docs linked by Congxian, especially this
>> section[1].
>>
>> Best,
>>
>> Dawid
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#watermarks-in-parallel-streams
>> On 19/02/2019 14:31, Stephen Connolly wrote:
>>
>> Hmmm my suspicions are now quite high. I created a file source that just
>> replays the events straight then I get more results....
>>
>> On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <
>> stephen.alan.conno...@gmail.com> wrote:
>>
>>> Hmmm after expanding the dataset such that there was additional data
>>> that ended up on shard-0 (everything in my original dataset was
>>> coincidentally landing on shard-1) I am now getting output... should I
>>> expect this kind of behaviour if no data arrives at shard-0 ever?
>>>
>>> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <
>>> stephen.alan.conno...@gmail.com> wrote:
>>>
>>>> Hi, I’m having a strange situation and I would like to know where I
>>>> should start trying to debug.
>>>>
>>>> I have set up a configurable swap in source, with three implementations:
>>>>
>>>> 1. A mock implementation
>>>> 2. A Kafka consumer implementation
>>>> 3. A Kinesis consumer implementation
>>>>
>>>> From injecting a log and no-op map function I can see that all three
>>>> sources pass through the events correctly.
>>>>
>>>> I then have a window based on event time stamps… and from inspecting
>>>> the aggregation function I can see that the data is getting aggregated…,
>>>> I’m using the `.aggregate(AggregateFunction.WindowFunction)` variant so
>>>> that I can retrieve the key
>>>>
>>>> Here’s the strange thing, I only change the source (and each source
>>>> uses the same deserialization function) but:
>>>>
>>>>
>>>>    - When I use either Kafka or my Mock source, the WindowFunction
>>>>    gets called as events pass the end of the window
>>>>    - When I use the Kinesis source, however, the window function never
>>>>    gets called. I have even tried injecting events into kinesis with really
>>>>    high timestamps to flush the watermarks in my
>>>>    BoundedOutOfOrdernessTimestampExtractor... but nothing
>>>>
>>>> I cannot see how this source switching could result in such a different
>>>> behaviour:
>>>>
>>>>         Properties sourceProperties = new Properties();
>>>>         ConsumerFactory sourceFactory;
>>>>         String sourceName = configParams.getRequired("source");
>>>>         switch (sourceName.toLowerCase(Locale.ENGLISH)) {
>>>>             case "kinesis":
>>>>                 sourceFactory = FlinkKinesisConsumer::new;
>>>>                 copyOptionalArg(configParams, "aws-region",
>>>> sourceProperties, AWSConfigConstants.AWS_REGION);
>>>>                 copyOptionalArg(configParams, "aws-endpoint",
>>>> sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
>>>>                 copyOptionalArg(configParams, "aws-access-key",
>>>> sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
>>>>                 copyOptionalArg(configParams, "aws-secret-key",
>>>> sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
>>>>                 copyOptionalArg(configParams, "aws-profile",
>>>> sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
>>>>                 break;
>>>>             case "kafka":
>>>>                 sourceFactory = FlinkKafkaConsumer010::new;
>>>>                 copyRequiredArg(configParams, "bootstrap-server",
>>>> sourceProperties, "bootstrap.servers");
>>>>                 copyOptionalArg(configParams, "group-id",
>>>> sourceProperties, "group.id");
>>>>                 break;
>>>>             case "mock":
>>>>                 sourceFactory = MockSourceFunction::new;
>>>>                 break;
>>>>             default:
>>>>                 throw new RuntimeException("Unknown source '" +
>>>> sourceName + '\'');
>>>>         }
>>>>
>>>>         // set up the streaming execution environment
>>>>         final StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>
>>>>         // poll watermark every second because using
>>>> BoundedOutOfOrdernessTimestampExtractor
>>>>         env.getConfig().setAutoWatermarkInterval(1000L);
>>>>         env.enableCheckpointing(5000);
>>>>
>>>>         SplitStream<JsonNode> eventsByType =
>>>> env.addSource(sourceFactory.create(
>>>>                 configParams.getRequired("topic"),
>>>>                 new ObjectNodeDeserializationSchema(),
>>>>                 sourceProperties
>>>>         ))
>>>>                 .returns(ObjectNode.class) // the use of
>>>> ConsumerFactory erases the type info so add it back
>>>>                 .name("raw-events")
>>>>                 .assignTimestampsAndWatermarks(
>>>>                         new
>>>> ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp",
>>>> Time.seconds(5))
>>>>                 )
>>>>                 .split(new JsonNodeOutputSelector("eventType"));
>>>> ...
>>>>         eventsByType.select(...)
>>>>                 .keyBy(new JsonNodeStringKeySelector("_key"))
>>>>
>>>> .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
>>>>                         (KeySelector<JsonNode, Time>)
>>>> TasksMain::offsetPerMaster))
>>>>                 .trigger(EventTimeTrigger.create())
>>>>                 .aggregate(new CountsAggregator<>(), new KeyTagger<>())
>>>> // <==== The CountsAggregator is seeing the data
>>>>                 .print() // <==== HERE is where we get no output from
>>>> Kinesis... but Kafka and my Mock are just fine!
>>>>
>>>>
>>>>

Reply via email to