Though I am explicitly assigning watermarks with
DataStream.assignTimestampsAndWatermarks and I see all the data flowing
through that... so shouldn't that override the watermarks from the original
source?

On Tue, 19 Feb 2019 at 15:59, Martin, Nick <nick.mar...@ngc.com> wrote:

> Yeah, that’s expected/known. Watermarks for the empty partition don’t
> advance, so the window in your window function never closes.
>
>
>
> There’s a ticket open to fix it (
> https://issues.apache.org/jira/browse/FLINK-5479) for the kafka
> connector, but in general any time one parallel instance of a source
> function isn’t getting data you have to watch out for this.
>
>
>
> *From:* Stephen Connolly [mailto:stephen.alan.conno...@gmail.com]
> *Sent:* Tuesday, February 19, 2019 6:32 AM
> *To:* user <user@flink.apache.org>
> *Subject:* EXT :Re: How to debug difference between Kinesis and Kafka
>
>
>
> Hmmm my suspicions are now quite high. I created a file source that just
> replays the events straight then I get more results....
>
>
>
> On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
> Hmmm after expanding the dataset such that there was additional data that
> ended up on shard-0 (everything in my original dataset was coincidentally
> landing on shard-1) I am now getting output... should I expect this kind of
> behaviour if no data arrives at shard-0 ever?
>
>
>
> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
> Hi, I’m having a strange situation and I would like to know where I should
> start trying to debug.
>
>
>
> I have set up a configurable swap in source, with three implementations:
>
>
>
> 1. A mock implementation
>
> 2. A Kafka consumer implementation
>
> 3. A Kinesis consumer implementation
>
>
>
> From injecting a log and no-op map function I can see that all three
> sources pass through the events correctly.
>
>
>
> I then have a window based on event time stamps… and from inspecting the
> aggregation function I can see that the data is getting aggregated…, I’m
> using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I
> can retrieve the key
>
>
>
> Here’s the strange thing, I only change the source (and each source uses
> the same deserialization function) but:
>
>
>
>    - When I use either Kafka or my Mock source, the WindowFunction gets
>    called as events pass the end of the window
>    - When I use the Kinesis source, however, the window function never
>    gets called. I have even tried injecting events into kinesis with really
>    high timestamps to flush the watermarks in my
>    BoundedOutOfOrdernessTimestampExtractor... but nothing
>
> I cannot see how this source switching could result in such a different
> behaviour:
>
>
>
>         Properties sourceProperties = new Properties();
>
>         ConsumerFactory sourceFactory;
>
>         String sourceName = configParams.getRequired("source");
>
>         switch (sourceName.toLowerCase(Locale.ENGLISH)) {
>
>             case "kinesis":
>
>                 sourceFactory = FlinkKinesisConsumer::new;
>
>                 copyOptionalArg(configParams, "aws-region",
> sourceProperties, AWSConfigConstants.AWS_REGION);
>
>                 copyOptionalArg(configParams, "aws-endpoint",
> sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
>
>                 copyOptionalArg(configParams, "aws-access-key",
> sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
>
>                 copyOptionalArg(configParams, "aws-secret-key",
> sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
>
>                 copyOptionalArg(configParams, "aws-profile",
> sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
>
>                 break;
>
>             case "kafka":
>
>                 sourceFactory = FlinkKafkaConsumer010::new;
>
>                 copyRequiredArg(configParams, "bootstrap-server",
> sourceProperties, "bootstrap.servers");
>
>                 copyOptionalArg(configParams, "group-id",
> sourceProperties, "group.id");
>
>                 break;
>
>             case "mock":
>
>                 sourceFactory = MockSourceFunction::new;
>
>                 break;
>
>             default:
>
>                 throw new RuntimeException("Unknown source '" + sourceName
> + '\'');
>
>         }
>
>
>
>         // set up the streaming execution environment
>
>         final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>
>
>         // poll watermark every second because using
> BoundedOutOfOrdernessTimestampExtractor
>
>         env.getConfig().setAutoWatermarkInterval(1000L);
>
>         env.enableCheckpointing(5000);
>
>
>
>         SplitStream<JsonNode> eventsByType =
> env.addSource(sourceFactory.create(
>
>                 configParams.getRequired("topic"),
>
>                 new ObjectNodeDeserializationSchema(),
>
>                 sourceProperties
>
>         ))
>
>                 .returns(ObjectNode.class) // the use of ConsumerFactory
> erases the type info so add it back
>
>                 .name("raw-events")
>
>                 .assignTimestampsAndWatermarks(
>
>                         new
> ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp",
> Time.seconds(5))
>
>                 )
>
>                 .split(new JsonNodeOutputSelector("eventType"));
>
> ...
>
>         eventsByType.select(...)
>
>                 .keyBy(new JsonNodeStringKeySelector("_key"))
>
>
> .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
>
>                         (KeySelector<JsonNode, Time>)
> TasksMain::offsetPerMaster))
>
>                 .trigger(EventTimeTrigger.create())
>
>                 .aggregate(new CountsAggregator<>(), new KeyTagger<>()) //
> <==== The CountsAggregator is seeing the data
>
>                 .print() // <==== HERE is where we get no output from
> Kinesis... but Kafka and my Mock are just fine!
>
>
>
>
>
>
> ------------------------------
>
> Notice: This e-mail is intended solely for use of the individual or entity
> to which it is addressed and may contain information that is proprietary,
> privileged and/or exempt from disclosure under applicable law. If the
> reader is not the intended recipient or agent responsible for delivering
> the message to the intended recipient, you are hereby notified that any
> dissemination, distribution or copying of this communication is strictly
> prohibited. This communication may also contain data subject to U.S. export
> laws. If so, data subject to the International Traffic in Arms Regulation
> cannot be disseminated, distributed, transferred, or copied, whether
> incorporated or in its original form, to foreign nationals residing in the
> U.S. or abroad, absent the express prior approval of the U.S. Department of
> State. Data subject to the Export Administration Act may not be
> disseminated, distributed, transferred or copied contrary to U. S.
> Department of Commerce regulations. If you have received this communication
> in error, please notify the sender by reply e-mail and destroy the e-mail
> message and any physical copies made of the communication.
>  Thank you.
> *********************
>
> ------------------------------
> Notice: This e-mail is intended solely for use of the individual or entity
> to which it is addressed and may contain information that is proprietary,
> privileged and/or exempt from disclosure under applicable law. If the
> reader is not the intended recipient or agent responsible for delivering
> the message to the intended recipient, you are hereby notified that any
> dissemination, distribution or copying of this communication is strictly
> prohibited. This communication may also contain data subject to U.S. export
> laws. If so, data subject to the International Traffic in Arms Regulation
> cannot be disseminated, distributed, transferred, or copied, whether
> incorporated or in its original form, to foreign nationals residing in the
> U.S. or abroad, absent the express prior approval of the U.S. Department of
> State. Data subject to the Export Administration Act may not be
> disseminated, distributed, transferred or copied contrary to U. S.
> Department of Commerce regulations. If you have received this communication
> in error, please notify the sender by reply e-mail and destroy the e-mail
> message and any physical copies made of the communication.
>  Thank you.
> *********************
>

Reply via email to