RE: EXT :Re: How to debug difference between Kinesis and Kafka

2019-02-19 Thread Martin, Nick
Yeah, that’s expected/known. Watermarks for the empty partition don’t advance, 
so the window in your window function never closes.

There’s a ticket open to fix it 
(https://issues.apache.org/jira/browse/FLINK-5479) for the kafka connector, but 
in general any time one parallel instance of a source function isn’t getting 
data you have to watch out for this.

From: Stephen Connolly [mailto:stephen.alan.conno...@gmail.com]
Sent: Tuesday, February 19, 2019 6:32 AM
To: user 
Subject: EXT :Re: How to debug difference between Kinesis and Kafka

Hmmm my suspicions are now quite high. I created a file source that just 
replays the events straight then I get more results

On Tue, 19 Feb 2019 at 11:50, Stephen Connolly 
mailto:stephen.alan.conno...@gmail.com>> wrote:
Hmmm after expanding the dataset such that there was additional data that ended 
up on shard-0 (everything in my original dataset was coincidentally landing on 
shard-1) I am now getting output... should I expect this kind of behaviour if 
no data arrives at shard-0 ever?

On Tue, 19 Feb 2019 at 11:14, Stephen Connolly 
mailto:stephen.alan.conno...@gmail.com>> wrote:
Hi, I’m having a strange situation and I would like to know where I should 
start trying to debug.

I have set up a configurable swap in source, with three implementations:

1. A mock implementation
2. A Kafka consumer implementation
3. A Kinesis consumer implementation

From injecting a log and no-op map function I can see that all three sources 
pass through the events correctly.

I then have a window based on event time stamps… and from inspecting the 
aggregation function I can see that the data is getting aggregated…, I’m using 
the `.aggregate(AggregateFunction.WindowFunction)` variant so that I can 
retrieve the key

Here’s the strange thing, I only change the source (and each source uses the 
same deserialization function) but:


  *   When I use either Kafka or my Mock source, the WindowFunction gets called 
as events pass the end of the window
  *   When I use the Kinesis source, however, the window function never gets 
called. I have even tried injecting events into kinesis with really high 
timestamps to flush the watermarks in my 
BoundedOutOfOrdernessTimestampExtractor... but nothing
I cannot see how this source switching could result in such a different 
behaviour:

Properties sourceProperties = new Properties();
ConsumerFactory sourceFactory;
String sourceName = configParams.getRequired("source");
switch (sourceName.toLowerCase(Locale.ENGLISH)) {
case "kinesis":
sourceFactory = FlinkKinesisConsumer::new;
copyOptionalArg(configParams, "aws-region", sourceProperties, 
AWSConfigConstants.AWS_REGION);
copyOptionalArg(configParams, "aws-endpoint", sourceProperties, 
AWSConfigConstants.AWS_ENDPOINT);
copyOptionalArg(configParams, "aws-access-key", 
sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
copyOptionalArg(configParams, "aws-secret-key", 
sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
copyOptionalArg(configParams, "aws-profile", sourceProperties, 
AWSConfigConstants.AWS_PROFILE_NAME);
break;
case "kafka":
sourceFactory = FlinkKafkaConsumer010::new;
copyRequiredArg(configParams, "bootstrap-server", 
sourceProperties, "bootstrap.servers");
copyOptionalArg(configParams, "group-id", sourceProperties, 
"group.id<http://group.id>");
break;
case "mock":
sourceFactory = MockSourceFunction::new;
break;
default:
throw new RuntimeException("Unknown source '" + sourceName + 
'\'');
}

// set up the streaming execution environment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

// poll watermark every second because using 
BoundedOutOfOrdernessTimestampExtractor
env.getConfig().setAutoWatermarkInterval(1000L);
env.enableCheckpointing(5000);

SplitStream eventsByType = env.addSource(sourceFactory.create(
configParams.getRequired("topic"),
new ObjectNodeDeserializationSchema(),
sourceProperties
))
.returns(ObjectNode.class) // the use of ConsumerFactory erases 
the type info so add it back
.name("raw-events")
.assignTimestampsAndWatermarks(
new 
ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp", Time.seconds(5))
)
.split(new JsonNodeOutputSelector("eventType"));
...
event

Re: EXT :Re: How to debug difference between Kinesis and Kafka

2019-02-19 Thread Stephen Connolly
Though I am explicitly assigning watermarks with
DataStream.assignTimestampsAndWatermarks and I see all the data flowing
through that... so shouldn't that override the watermarks from the original
source?

On Tue, 19 Feb 2019 at 15:59, Martin, Nick  wrote:

> Yeah, that’s expected/known. Watermarks for the empty partition don’t
> advance, so the window in your window function never closes.
>
>
>
> There’s a ticket open to fix it (
> https://issues.apache.org/jira/browse/FLINK-5479) for the kafka
> connector, but in general any time one parallel instance of a source
> function isn’t getting data you have to watch out for this.
>
>
>
> *From:* Stephen Connolly [mailto:stephen.alan.conno...@gmail.com]
> *Sent:* Tuesday, February 19, 2019 6:32 AM
> *To:* user 
> *Subject:* EXT :Re: How to debug difference between Kinesis and Kafka
>
>
>
> Hmmm my suspicions are now quite high. I created a file source that just
> replays the events straight then I get more results
>
>
>
> On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
> Hmmm after expanding the dataset such that there was additional data that
> ended up on shard-0 (everything in my original dataset was coincidentally
> landing on shard-1) I am now getting output... should I expect this kind of
> behaviour if no data arrives at shard-0 ever?
>
>
>
> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
> Hi, I’m having a strange situation and I would like to know where I should
> start trying to debug.
>
>
>
> I have set up a configurable swap in source, with three implementations:
>
>
>
> 1. A mock implementation
>
> 2. A Kafka consumer implementation
>
> 3. A Kinesis consumer implementation
>
>
>
> From injecting a log and no-op map function I can see that all three
> sources pass through the events correctly.
>
>
>
> I then have a window based on event time stamps… and from inspecting the
> aggregation function I can see that the data is getting aggregated…, I’m
> using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I
> can retrieve the key
>
>
>
> Here’s the strange thing, I only change the source (and each source uses
> the same deserialization function) but:
>
>
>
>- When I use either Kafka or my Mock source, the WindowFunction gets
>called as events pass the end of the window
>- When I use the Kinesis source, however, the window function never
>gets called. I have even tried injecting events into kinesis with really
>high timestamps to flush the watermarks in my
>BoundedOutOfOrdernessTimestampExtractor... but nothing
>
> I cannot see how this source switching could result in such a different
> behaviour:
>
>
>
> Properties sourceProperties = new Properties();
>
> ConsumerFactory sourceFactory;
>
> String sourceName = configParams.getRequired("source");
>
> switch (sourceName.toLowerCase(Locale.ENGLISH)) {
>
> case "kinesis":
>
> sourceFactory = FlinkKinesisConsumer::new;
>
> copyOptionalArg(configParams, "aws-region",
> sourceProperties, AWSConfigConstants.AWS_REGION);
>
> copyOptionalArg(configParams, "aws-endpoint",
> sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
>
> copyOptionalArg(configParams, "aws-access-key",
> sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
>
> copyOptionalArg(configParams, "aws-secret-key",
> sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
>
> copyOptionalArg(configParams, "aws-profile",
> sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
>
> break;
>
> case "kafka":
>
> sourceFactory = FlinkKafkaConsumer010::new;
>
> copyRequiredArg(configParams, "bootstrap-server",
> sourceProperties, "bootstrap.servers");
>
> copyOptionalArg(configParams, "group-id",
> sourceProperties, "group.id");
>
> break;
>
> case "mock":
>
> sourceFactory = MockSourceFunction::new;
>
> break;
>
> default:
>
> throw new RuntimeException("Unknown source '" + sourceName
> + '\'');
>
> }
>
>
>
> // set up the streaming execution environment
>
> final StreamExecutionEnvironment en

Re: EXT :Re: How to debug difference between Kinesis and Kafka

2019-02-19 Thread Dian Fu
DataStream.assignTimestampsAndWatermarks will add a watermark generator 
operator after each source operator(if their parallelism is the same which is 
true for the code you showed) and so if one instance of the source operator has 
no data, the corresponding watermark generator operator cannot generate 
watermark.

Regards,
Dian


> 在 2019年2月20日,上午12:56,Stephen Connolly  写道:
> 
> Though I am explicitly assigning watermarks with 
> DataStream.assignTimestampsAndWatermarks and I see all the data flowing 
> through that... so shouldn't that override the watermarks from the original 
> source?
> 
> On Tue, 19 Feb 2019 at 15:59, Martin, Nick  <mailto:nick.mar...@ngc.com>> wrote:
> Yeah, that’s expected/known. Watermarks for the empty partition don’t 
> advance, so the window in your window function never closes.
> 
>  
> 
> There’s a ticket open to fix it 
> (https://issues.apache.org/jira/browse/FLINK-5479 
> <https://issues.apache.org/jira/browse/FLINK-5479>) for the kafka connector, 
> but in general any time one parallel instance of a source function isn’t 
> getting data you have to watch out for this.
> 
>  
> 
> From: Stephen Connolly [mailto:stephen.alan.conno...@gmail.com 
> <mailto:stephen.alan.conno...@gmail.com>] 
> Sent: Tuesday, February 19, 2019 6:32 AM
> To: user mailto:user@flink.apache.org>>
> Subject: EXT :Re: How to debug difference between Kinesis and Kafka
> 
>  
> 
> Hmmm my suspicions are now quite high. I created a file source that just 
> replays the events straight then I get more results
> 
>  
> 
> On Tue, 19 Feb 2019 at 11:50, Stephen Connolly 
> mailto:stephen.alan.conno...@gmail.com>> 
> wrote:
> 
> Hmmm after expanding the dataset such that there was additional data that 
> ended up on shard-0 (everything in my original dataset was coincidentally 
> landing on shard-1) I am now getting output... should I expect this kind of 
> behaviour if no data arrives at shard-0 ever?
> 
>  
> 
> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly 
> mailto:stephen.alan.conno...@gmail.com>> 
> wrote:
> 
> Hi, I’m having a strange situation and I would like to know where I should 
> start trying to debug.
> 
>  
> 
> I have set up a configurable swap in source, with three implementations:
> 
>  
> 
> 1. A mock implementation
> 
> 2. A Kafka consumer implementation
> 
> 3. A Kinesis consumer implementation
> 
>  
> 
> From injecting a log and no-op map function I can see that all three sources 
> pass through the events correctly.
> 
>  
> 
> I then have a window based on event time stamps… and from inspecting the 
> aggregation function I can see that the data is getting aggregated…, I’m 
> using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I 
> can retrieve the key
> 
>  
> 
> Here’s the strange thing, I only change the source (and each source uses the 
> same deserialization function) but:
> 
>  
> 
> When I use either Kafka or my Mock source, the WindowFunction gets called as 
> events pass the end of the window
> When I use the Kinesis source, however, the window function never gets 
> called. I have even tried injecting events into kinesis with really high 
> timestamps to flush the watermarks in my 
> BoundedOutOfOrdernessTimestampExtractor... but nothing
> I cannot see how this source switching could result in such a different 
> behaviour:
> 
>  
> 
> Properties sourceProperties = new Properties();
> 
> ConsumerFactory sourceFactory;
> 
> String sourceName = configParams.getRequired("source");
> 
> switch (sourceName.toLowerCase(Locale.ENGLISH)) {
> 
> case "kinesis":
> 
> sourceFactory = FlinkKinesisConsumer::new;
> 
> copyOptionalArg(configParams, "aws-region", sourceProperties, 
> AWSConfigConstants.AWS_REGION);
> 
> copyOptionalArg(configParams, "aws-endpoint", 
> sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
> 
> copyOptionalArg(configParams, "aws-access-key", 
> sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
> 
> copyOptionalArg(configParams, "aws-secret-key", 
> sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
> 
> copyOptionalArg(configParams, "aws-profile", 
> sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
> 
> break;
> 
> case "kafka":
> 
> sourceFactory = FlinkKafkaConsumer010::new;
> 
> copyRequiredArg(configPar