Though I am explicitly assigning watermarks with DataStream.assignTimestampsAndWatermarks and I see all the data flowing through that... so shouldn't that override the watermarks from the original source?
On Tue, 19 Feb 2019 at 15:59, Martin, Nick <nick.mar...@ngc.com> wrote: > Yeah, that’s expected/known. Watermarks for the empty partition don’t > advance, so the window in your window function never closes. > > > > There’s a ticket open to fix it ( > https://issues.apache.org/jira/browse/FLINK-5479) for the kafka > connector, but in general any time one parallel instance of a source > function isn’t getting data you have to watch out for this. > > > > *From:* Stephen Connolly [mailto:stephen.alan.conno...@gmail.com] > *Sent:* Tuesday, February 19, 2019 6:32 AM > *To:* user <user@flink.apache.org> > *Subject:* EXT :Re: How to debug difference between Kinesis and Kafka > > > > Hmmm my suspicions are now quite high. I created a file source that just > replays the events straight then I get more results.... > > > > On Tue, 19 Feb 2019 at 11:50, Stephen Connolly < > stephen.alan.conno...@gmail.com> wrote: > > Hmmm after expanding the dataset such that there was additional data that > ended up on shard-0 (everything in my original dataset was coincidentally > landing on shard-1) I am now getting output... should I expect this kind of > behaviour if no data arrives at shard-0 ever? > > > > On Tue, 19 Feb 2019 at 11:14, Stephen Connolly < > stephen.alan.conno...@gmail.com> wrote: > > Hi, I’m having a strange situation and I would like to know where I should > start trying to debug. > > > > I have set up a configurable swap in source, with three implementations: > > > > 1. A mock implementation > > 2. A Kafka consumer implementation > > 3. A Kinesis consumer implementation > > > > From injecting a log and no-op map function I can see that all three > sources pass through the events correctly. > > > > I then have a window based on event time stamps… and from inspecting the > aggregation function I can see that the data is getting aggregated…, I’m > using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I > can retrieve the key > > > > Here’s the strange thing, I only change the source (and each source uses > the same deserialization function) but: > > > > - When I use either Kafka or my Mock source, the WindowFunction gets > called as events pass the end of the window > - When I use the Kinesis source, however, the window function never > gets called. I have even tried injecting events into kinesis with really > high timestamps to flush the watermarks in my > BoundedOutOfOrdernessTimestampExtractor... but nothing > > I cannot see how this source switching could result in such a different > behaviour: > > > > Properties sourceProperties = new Properties(); > > ConsumerFactory sourceFactory; > > String sourceName = configParams.getRequired("source"); > > switch (sourceName.toLowerCase(Locale.ENGLISH)) { > > case "kinesis": > > sourceFactory = FlinkKinesisConsumer::new; > > copyOptionalArg(configParams, "aws-region", > sourceProperties, AWSConfigConstants.AWS_REGION); > > copyOptionalArg(configParams, "aws-endpoint", > sourceProperties, AWSConfigConstants.AWS_ENDPOINT); > > copyOptionalArg(configParams, "aws-access-key", > sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID); > > copyOptionalArg(configParams, "aws-secret-key", > sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY); > > copyOptionalArg(configParams, "aws-profile", > sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME); > > break; > > case "kafka": > > sourceFactory = FlinkKafkaConsumer010::new; > > copyRequiredArg(configParams, "bootstrap-server", > sourceProperties, "bootstrap.servers"); > > copyOptionalArg(configParams, "group-id", > sourceProperties, "group.id"); > > break; > > case "mock": > > sourceFactory = MockSourceFunction::new; > > break; > > default: > > throw new RuntimeException("Unknown source '" + sourceName > + '\''); > > } > > > > // set up the streaming execution environment > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > // poll watermark every second because using > BoundedOutOfOrdernessTimestampExtractor > > env.getConfig().setAutoWatermarkInterval(1000L); > > env.enableCheckpointing(5000); > > > > SplitStream<JsonNode> eventsByType = > env.addSource(sourceFactory.create( > > configParams.getRequired("topic"), > > new ObjectNodeDeserializationSchema(), > > sourceProperties > > )) > > .returns(ObjectNode.class) // the use of ConsumerFactory > erases the type info so add it back > > .name("raw-events") > > .assignTimestampsAndWatermarks( > > new > ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp", > Time.seconds(5)) > > ) > > .split(new JsonNodeOutputSelector("eventType")); > > ... > > eventsByType.select(...) > > .keyBy(new JsonNodeStringKeySelector("_key")) > > > .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration), > > (KeySelector<JsonNode, Time>) > TasksMain::offsetPerMaster)) > > .trigger(EventTimeTrigger.create()) > > .aggregate(new CountsAggregator<>(), new KeyTagger<>()) // > <==== The CountsAggregator is seeing the data > > .print() // <==== HERE is where we get no output from > Kinesis... but Kafka and my Mock are just fine! > > > > > > > ------------------------------ > > Notice: This e-mail is intended solely for use of the individual or entity > to which it is addressed and may contain information that is proprietary, > privileged and/or exempt from disclosure under applicable law. If the > reader is not the intended recipient or agent responsible for delivering > the message to the intended recipient, you are hereby notified that any > dissemination, distribution or copying of this communication is strictly > prohibited. This communication may also contain data subject to U.S. export > laws. If so, data subject to the International Traffic in Arms Regulation > cannot be disseminated, distributed, transferred, or copied, whether > incorporated or in its original form, to foreign nationals residing in the > U.S. or abroad, absent the express prior approval of the U.S. Department of > State. Data subject to the Export Administration Act may not be > disseminated, distributed, transferred or copied contrary to U. S. > Department of Commerce regulations. If you have received this communication > in error, please notify the sender by reply e-mail and destroy the e-mail > message and any physical copies made of the communication. > Thank you. > ********************* > > ------------------------------ > Notice: This e-mail is intended solely for use of the individual or entity > to which it is addressed and may contain information that is proprietary, > privileged and/or exempt from disclosure under applicable law. If the > reader is not the intended recipient or agent responsible for delivering > the message to the intended recipient, you are hereby notified that any > dissemination, distribution or copying of this communication is strictly > prohibited. This communication may also contain data subject to U.S. export > laws. If so, data subject to the International Traffic in Arms Regulation > cannot be disseminated, distributed, transferred, or copied, whether > incorporated or in its original form, to foreign nationals residing in the > U.S. or abroad, absent the express prior approval of the U.S. Department of > State. Data subject to the Export Administration Act may not be > disseminated, distributed, transferred or copied contrary to U. S. > Department of Commerce regulations. If you have received this communication > in error, please notify the sender by reply e-mail and destroy the e-mail > message and any physical copies made of the communication. > Thank you. > ********************* >