Hmmm my suspicions are now quite high. I created a file source that just
replays the events straight then I get more results....

On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

> Hmmm after expanding the dataset such that there was additional data that
> ended up on shard-0 (everything in my original dataset was coincidentally
> landing on shard-1) I am now getting output... should I expect this kind of
> behaviour if no data arrives at shard-0 ever?
>
> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
>> Hi, I’m having a strange situation and I would like to know where I
>> should start trying to debug.
>>
>> I have set up a configurable swap in source, with three implementations:
>>
>> 1. A mock implementation
>> 2. A Kafka consumer implementation
>> 3. A Kinesis consumer implementation
>>
>> From injecting a log and no-op map function I can see that all three
>> sources pass through the events correctly.
>>
>> I then have a window based on event time stamps… and from inspecting the
>> aggregation function I can see that the data is getting aggregated…, I’m
>> using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I
>> can retrieve the key
>>
>> Here’s the strange thing, I only change the source (and each source uses
>> the same deserialization function) but:
>>
>>
>>    - When I use either Kafka or my Mock source, the WindowFunction gets
>>    called as events pass the end of the window
>>    - When I use the Kinesis source, however, the window function never
>>    gets called. I have even tried injecting events into kinesis with really
>>    high timestamps to flush the watermarks in my
>>    BoundedOutOfOrdernessTimestampExtractor... but nothing
>>
>> I cannot see how this source switching could result in such a different
>> behaviour:
>>
>>         Properties sourceProperties = new Properties();
>>         ConsumerFactory sourceFactory;
>>         String sourceName = configParams.getRequired("source");
>>         switch (sourceName.toLowerCase(Locale.ENGLISH)) {
>>             case "kinesis":
>>                 sourceFactory = FlinkKinesisConsumer::new;
>>                 copyOptionalArg(configParams, "aws-region",
>> sourceProperties, AWSConfigConstants.AWS_REGION);
>>                 copyOptionalArg(configParams, "aws-endpoint",
>> sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
>>                 copyOptionalArg(configParams, "aws-access-key",
>> sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
>>                 copyOptionalArg(configParams, "aws-secret-key",
>> sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
>>                 copyOptionalArg(configParams, "aws-profile",
>> sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
>>                 break;
>>             case "kafka":
>>                 sourceFactory = FlinkKafkaConsumer010::new;
>>                 copyRequiredArg(configParams, "bootstrap-server",
>> sourceProperties, "bootstrap.servers");
>>                 copyOptionalArg(configParams, "group-id",
>> sourceProperties, "group.id");
>>                 break;
>>             case "mock":
>>                 sourceFactory = MockSourceFunction::new;
>>                 break;
>>             default:
>>                 throw new RuntimeException("Unknown source '" +
>> sourceName + '\'');
>>         }
>>
>>         // set up the streaming execution environment
>>         final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>         // poll watermark every second because using
>> BoundedOutOfOrdernessTimestampExtractor
>>         env.getConfig().setAutoWatermarkInterval(1000L);
>>         env.enableCheckpointing(5000);
>>
>>         SplitStream<JsonNode> eventsByType =
>> env.addSource(sourceFactory.create(
>>                 configParams.getRequired("topic"),
>>                 new ObjectNodeDeserializationSchema(),
>>                 sourceProperties
>>         ))
>>                 .returns(ObjectNode.class) // the use of ConsumerFactory
>> erases the type info so add it back
>>                 .name("raw-events")
>>                 .assignTimestampsAndWatermarks(
>>                         new
>> ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp",
>> Time.seconds(5))
>>                 )
>>                 .split(new JsonNodeOutputSelector("eventType"));
>> ...
>>         eventsByType.select(...)
>>                 .keyBy(new JsonNodeStringKeySelector("_key"))
>>
>> .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
>>                         (KeySelector<JsonNode, Time>)
>> TasksMain::offsetPerMaster))
>>                 .trigger(EventTimeTrigger.create())
>>                 .aggregate(new CountsAggregator<>(), new KeyTagger<>())
>> // <==== The CountsAggregator is seeing the data
>>                 .print() // <==== HERE is where we get no output from
>> Kinesis... but Kafka and my Mock are just fine!
>>
>>
>>

Reply via email to