Re: Broadcasting to multiple operators

2021-03-05 Thread David Anderson
Glad to hear it! Thanks for letting us know.

David

On Fri, Mar 5, 2021 at 10:22 PM Roger  wrote:

> Confirmed. This worked!
> Thanks!
> Roger
>
> On Fri, Mar 5, 2021 at 12:41 PM Roger  wrote:
>
>> Hey David.
>> Thank you very much for your response. This is making sense now. It was
>> confusing because I was able to use the Broadcast stream prior to adding
>> the second stream. However, now I realize that this part of the pipeline
>> occurs after the windowing so I'm not affected the same way. This is
>> definitely going to help fix my problem.
>>
>> On Fri, Mar 5, 2021 at 12:33 PM David Anderson 
>> wrote:
>>
>>> This is a watermarking issue. Whenever an operator has two or more input
>>> streams, its watermark is the minimum of watermarks of the incoming
>>> streams. In this case your broadcast stream doesn't have a watermark
>>> generator, so it is preventing the watermarks from advancing. This in turn
>>> is preventing the windows from being triggered.
>>>
>>> You should call assignTimestampsAndWatermarks on the broadcast stream.
>>> If time is irrelevant for this stream, you could do something like this:
>>>
>>> public static class ConfigStreamAssigner implements 
>>> AssignerWithPeriodicWatermarks {
>>>   @Nullable
>>>   @Override
>>>   public Watermark getCurrentWatermark() {
>>>  return Watermark.MAX_WATERMARK;
>>>   }
>>>
>>>   @Override
>>>   public long extractTimestamp(T element, long 
>>> previousElementTimestamp) {
>>>  return 0;
>>>   }
>>> }
>>>
>>>
>>> By setting the watermark for this stream to MAX_WATERMARK, you are
>>> effectively removing this stream's watermarks from consideration.
>>>
>>> Regards,
>>> David
>>>
>>> On Fri, Mar 5, 2021 at 5:48 PM Roger  wrote:
>>>
 Hello.
 I am having an issue with a Flink 1.8 pipeline when trying to consume
 broadcast state across multiple operators.  I currently
 have a working pipeline that looks like the following:

 records
 .assignTimestampsAndWatermarks(
 new BoundedOutOfOrdernessGenerator(

 Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")
 .keyBy(new ApplicationNameKeySelector())
 .window(
 TumblingEventTimeWindows.of(

 Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")
 .aggregate(new Aggregator())
 .connect(configurationBroadcastStream)
 .process(excluder)
 .addSink(KinesisProducer.createSinkFromStaticConfig(properties));

 * records are a FlinkKafkaConsumer stream
 * configurationBroadcastStream is a FlinkKafkaConsumer
 * aggregator is an AggregateFunction
 * filter is a BroadcastProcessFunction


 I now have requirements to filter out transactions at the beginning of
 the pipeline using the same broadcast stream I am consuming towards the end
 of the pipeline. I updated the pipeline to look like this:

 records
 .assignTimestampsAndWatermarks(
 new BoundedOutOfOrdernessGenerator(

 Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")
 .connect(configurationBroadcastStream) **new**
 .process(filter) **new**
 .keyBy(new ApplicationNameKeySelector())
 .window(
 TumblingEventTimeWindows.of(

 Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")
 .aggregate(new Aggregator())
 .connect(configurationBroadcastStream)
 .process(excluder)
 .addSink(KinesisProducer.createSinkFromStaticConfig(properties));

 * records are a FlinkKafkaConsumer stream
 * configurationBroadcastStream is a FlinkKafkaConsumer
 * aggregator is an AggregateFunction
 * excluder is a BroadcastProcessFunction

 With this change, the aggregated records are not making it into the
 excluder process.

 1. The aggregator add is working. I can see this in the logs.
 2. The aggregator getResult is never called. This makes me think this
 is a window issue.
 3. Both processBroadcastElement methods from the two broadcast
 functions are working and
  retrieving the broadcasted state. I see this in logging.
 4. The pipeline definitely worked prior to me adding in the second
 .connect and .process at the beginning of the pipeline.
 5. I have considered creating a new record object from the new
 process(filter) that contains the config retrieved from the broadcast
 stream along with the transactions and passing that down the pipeline but
 that is really not desirable.

 Any ideas on what might be going on here?

 Thanks!
 Roger




Re: Broadcasting to multiple operators

2021-03-05 Thread Roger
Confirmed. This worked!
Thanks!
Roger

On Fri, Mar 5, 2021 at 12:41 PM Roger  wrote:

> Hey David.
> Thank you very much for your response. This is making sense now. It was
> confusing because I was able to use the Broadcast stream prior to adding
> the second stream. However, now I realize that this part of the pipeline
> occurs after the windowing so I'm not affected the same way. This is
> definitely going to help fix my problem.
>
> On Fri, Mar 5, 2021 at 12:33 PM David Anderson 
> wrote:
>
>> This is a watermarking issue. Whenever an operator has two or more input
>> streams, its watermark is the minimum of watermarks of the incoming
>> streams. In this case your broadcast stream doesn't have a watermark
>> generator, so it is preventing the watermarks from advancing. This in turn
>> is preventing the windows from being triggered.
>>
>> You should call assignTimestampsAndWatermarks on the broadcast stream. If
>> time is irrelevant for this stream, you could do something like this:
>>
>> public static class ConfigStreamAssigner implements 
>> AssignerWithPeriodicWatermarks {
>>   @Nullable
>>   @Override
>>   public Watermark getCurrentWatermark() {
>>  return Watermark.MAX_WATERMARK;
>>   }
>>
>>   @Override
>>   public long extractTimestamp(T element, long previousElementTimestamp) 
>> {
>>  return 0;
>>   }
>> }
>>
>>
>> By setting the watermark for this stream to MAX_WATERMARK, you are
>> effectively removing this stream's watermarks from consideration.
>>
>> Regards,
>> David
>>
>> On Fri, Mar 5, 2021 at 5:48 PM Roger  wrote:
>>
>>> Hello.
>>> I am having an issue with a Flink 1.8 pipeline when trying to consume
>>> broadcast state across multiple operators.  I currently
>>> have a working pipeline that looks like the following:
>>>
>>> records
>>> .assignTimestampsAndWatermarks(
>>> new BoundedOutOfOrdernessGenerator(
>>>
>>> Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")
>>> .keyBy(new ApplicationNameKeySelector())
>>> .window(
>>> TumblingEventTimeWindows.of(
>>>
>>> Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")
>>> .aggregate(new Aggregator())
>>> .connect(configurationBroadcastStream)
>>> .process(excluder)
>>> .addSink(KinesisProducer.createSinkFromStaticConfig(properties));
>>>
>>> * records are a FlinkKafkaConsumer stream
>>> * configurationBroadcastStream is a FlinkKafkaConsumer
>>> * aggregator is an AggregateFunction
>>> * filter is a BroadcastProcessFunction
>>>
>>>
>>> I now have requirements to filter out transactions at the beginning of
>>> the pipeline using the same broadcast stream I am consuming towards the end
>>> of the pipeline. I updated the pipeline to look like this:
>>>
>>> records
>>> .assignTimestampsAndWatermarks(
>>> new BoundedOutOfOrdernessGenerator(
>>>
>>> Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")
>>> .connect(configurationBroadcastStream) **new**
>>> .process(filter) **new**
>>> .keyBy(new ApplicationNameKeySelector())
>>> .window(
>>> TumblingEventTimeWindows.of(
>>>
>>> Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")
>>> .aggregate(new Aggregator())
>>> .connect(configurationBroadcastStream)
>>> .process(excluder)
>>> .addSink(KinesisProducer.createSinkFromStaticConfig(properties));
>>>
>>> * records are a FlinkKafkaConsumer stream
>>> * configurationBroadcastStream is a FlinkKafkaConsumer
>>> * aggregator is an AggregateFunction
>>> * excluder is a BroadcastProcessFunction
>>>
>>> With this change, the aggregated records are not making it into the
>>> excluder process.
>>>
>>> 1. The aggregator add is working. I can see this in the logs.
>>> 2. The aggregator getResult is never called. This makes me think this is
>>> a window issue.
>>> 3. Both processBroadcastElement methods from the two broadcast functions
>>> are working and
>>>  retrieving the broadcasted state. I see this in logging.
>>> 4. The pipeline definitely worked prior to me adding in the second
>>> .connect and .process at the beginning of the pipeline.
>>> 5. I have considered creating a new record object from the new
>>> process(filter) that contains the config retrieved from the broadcast
>>> stream along with the transactions and passing that down the pipeline but
>>> that is really not desirable.
>>>
>>> Any ideas on what might be going on here?
>>>
>>> Thanks!
>>> Roger
>>>
>>>


Re: Broadcasting to multiple operators

2021-03-05 Thread Roger
Hey David.
Thank you very much for your response. This is making sense now. It was
confusing because I was able to use the Broadcast stream prior to adding
the second stream. However, now I realize that this part of the pipeline
occurs after the windowing so I'm not affected the same way. This is
definitely going to help fix my problem.

On Fri, Mar 5, 2021 at 12:33 PM David Anderson  wrote:

> This is a watermarking issue. Whenever an operator has two or more input
> streams, its watermark is the minimum of watermarks of the incoming
> streams. In this case your broadcast stream doesn't have a watermark
> generator, so it is preventing the watermarks from advancing. This in turn
> is preventing the windows from being triggered.
>
> You should call assignTimestampsAndWatermarks on the broadcast stream. If
> time is irrelevant for this stream, you could do something like this:
>
> public static class ConfigStreamAssigner implements 
> AssignerWithPeriodicWatermarks {
>   @Nullable
>   @Override
>   public Watermark getCurrentWatermark() {
>  return Watermark.MAX_WATERMARK;
>   }
>
>   @Override
>   public long extractTimestamp(T element, long previousElementTimestamp) {
>  return 0;
>   }
> }
>
>
> By setting the watermark for this stream to MAX_WATERMARK, you are
> effectively removing this stream's watermarks from consideration.
>
> Regards,
> David
>
> On Fri, Mar 5, 2021 at 5:48 PM Roger  wrote:
>
>> Hello.
>> I am having an issue with a Flink 1.8 pipeline when trying to consume
>> broadcast state across multiple operators.  I currently
>> have a working pipeline that looks like the following:
>>
>> records
>> .assignTimestampsAndWatermarks(
>> new BoundedOutOfOrdernessGenerator(
>>
>> Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")
>> .keyBy(new ApplicationNameKeySelector())
>> .window(
>> TumblingEventTimeWindows.of(
>>
>> Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")
>> .aggregate(new Aggregator())
>> .connect(configurationBroadcastStream)
>> .process(excluder)
>> .addSink(KinesisProducer.createSinkFromStaticConfig(properties));
>>
>> * records are a FlinkKafkaConsumer stream
>> * configurationBroadcastStream is a FlinkKafkaConsumer
>> * aggregator is an AggregateFunction
>> * filter is a BroadcastProcessFunction
>>
>>
>> I now have requirements to filter out transactions at the beginning of
>> the pipeline using the same broadcast stream I am consuming towards the end
>> of the pipeline. I updated the pipeline to look like this:
>>
>> records
>> .assignTimestampsAndWatermarks(
>> new BoundedOutOfOrdernessGenerator(
>>
>> Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")
>> .connect(configurationBroadcastStream) **new**
>> .process(filter) **new**
>> .keyBy(new ApplicationNameKeySelector())
>> .window(
>> TumblingEventTimeWindows.of(
>>
>> Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")
>> .aggregate(new Aggregator())
>> .connect(configurationBroadcastStream)
>> .process(excluder)
>> .addSink(KinesisProducer.createSinkFromStaticConfig(properties));
>>
>> * records are a FlinkKafkaConsumer stream
>> * configurationBroadcastStream is a FlinkKafkaConsumer
>> * aggregator is an AggregateFunction
>> * excluder is a BroadcastProcessFunction
>>
>> With this change, the aggregated records are not making it into the
>> excluder process.
>>
>> 1. The aggregator add is working. I can see this in the logs.
>> 2. The aggregator getResult is never called. This makes me think this is
>> a window issue.
>> 3. Both processBroadcastElement methods from the two broadcast functions
>> are working and
>>  retrieving the broadcasted state. I see this in logging.
>> 4. The pipeline definitely worked prior to me adding in the second
>> .connect and .process at the beginning of the pipeline.
>> 5. I have considered creating a new record object from the new
>> process(filter) that contains the config retrieved from the broadcast
>> stream along with the transactions and passing that down the pipeline but
>> that is really not desirable.
>>
>> Any ideas on what might be going on here?
>>
>> Thanks!
>> Roger
>>
>>


Re: Broadcasting to multiple operators

2021-03-05 Thread David Anderson
This is a watermarking issue. Whenever an operator has two or more input
streams, its watermark is the minimum of watermarks of the incoming
streams. In this case your broadcast stream doesn't have a watermark
generator, so it is preventing the watermarks from advancing. This in turn
is preventing the windows from being triggered.

You should call assignTimestampsAndWatermarks on the broadcast stream. If
time is irrelevant for this stream, you could do something like this:

public static class ConfigStreamAssigner implements
AssignerWithPeriodicWatermarks {
  @Nullable
  @Override
  public Watermark getCurrentWatermark() {
 return Watermark.MAX_WATERMARK;
  }

  @Override
  public long extractTimestamp(T element, long previousElementTimestamp) {
 return 0;
  }
}


By setting the watermark for this stream to MAX_WATERMARK, you are
effectively removing this stream's watermarks from consideration.

Regards,
David

On Fri, Mar 5, 2021 at 5:48 PM Roger  wrote:

> Hello.
> I am having an issue with a Flink 1.8 pipeline when trying to consume
> broadcast state across multiple operators.  I currently
> have a working pipeline that looks like the following:
>
> records
> .assignTimestampsAndWatermarks(
> new BoundedOutOfOrdernessGenerator(
>
> Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")
> .keyBy(new ApplicationNameKeySelector())
> .window(
> TumblingEventTimeWindows.of(
>
> Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")
> .aggregate(new Aggregator())
> .connect(configurationBroadcastStream)
> .process(excluder)
> .addSink(KinesisProducer.createSinkFromStaticConfig(properties));
>
> * records are a FlinkKafkaConsumer stream
> * configurationBroadcastStream is a FlinkKafkaConsumer
> * aggregator is an AggregateFunction
> * filter is a BroadcastProcessFunction
>
>
> I now have requirements to filter out transactions at the beginning of the
> pipeline using the same broadcast stream I am consuming towards the end of
> the pipeline. I updated the pipeline to look like this:
>
> records
> .assignTimestampsAndWatermarks(
> new BoundedOutOfOrdernessGenerator(
>
> Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")
> .connect(configurationBroadcastStream) **new**
> .process(filter) **new**
> .keyBy(new ApplicationNameKeySelector())
> .window(
> TumblingEventTimeWindows.of(
>
> Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")
> .aggregate(new Aggregator())
> .connect(configurationBroadcastStream)
> .process(excluder)
> .addSink(KinesisProducer.createSinkFromStaticConfig(properties));
>
> * records are a FlinkKafkaConsumer stream
> * configurationBroadcastStream is a FlinkKafkaConsumer
> * aggregator is an AggregateFunction
> * excluder is a BroadcastProcessFunction
>
> With this change, the aggregated records are not making it into the
> excluder process.
>
> 1. The aggregator add is working. I can see this in the logs.
> 2. The aggregator getResult is never called. This makes me think this is a
> window issue.
> 3. Both processBroadcastElement methods from the two broadcast functions
> are working and
>  retrieving the broadcasted state. I see this in logging.
> 4. The pipeline definitely worked prior to me adding in the second
> .connect and .process at the beginning of the pipeline.
> 5. I have considered creating a new record object from the new
> process(filter) that contains the config retrieved from the broadcast
> stream along with the transactions and passing that down the pipeline but
> that is really not desirable.
>
> Any ideas on what might be going on here?
>
> Thanks!
> Roger
>
>


Broadcasting to multiple operators

2021-03-05 Thread Roger
Hello.
I am having an issue with a Flink 1.8 pipeline when trying to consume
broadcast state across multiple operators.  I currently
have a working pipeline that looks like the following:

records
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessGenerator(

Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")
.keyBy(new ApplicationNameKeySelector())
.window(
TumblingEventTimeWindows.of(

Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")
.aggregate(new Aggregator())
.connect(configurationBroadcastStream)
.process(excluder)
.addSink(KinesisProducer.createSinkFromStaticConfig(properties));

* records are a FlinkKafkaConsumer stream
* configurationBroadcastStream is a FlinkKafkaConsumer
* aggregator is an AggregateFunction
* filter is a BroadcastProcessFunction


I now have requirements to filter out transactions at the beginning of the
pipeline using the same broadcast stream I am consuming towards the end of
the pipeline. I updated the pipeline to look like this:

records
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessGenerator(

Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")
.connect(configurationBroadcastStream) **new**
.process(filter) **new**
.keyBy(new ApplicationNameKeySelector())
.window(
TumblingEventTimeWindows.of(

Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")
.aggregate(new Aggregator())
.connect(configurationBroadcastStream)
.process(excluder)
.addSink(KinesisProducer.createSinkFromStaticConfig(properties));

* records are a FlinkKafkaConsumer stream
* configurationBroadcastStream is a FlinkKafkaConsumer
* aggregator is an AggregateFunction
* excluder is a BroadcastProcessFunction

With this change, the aggregated records are not making it into the
excluder process.

1. The aggregator add is working. I can see this in the logs.
2. The aggregator getResult is never called. This makes me think this is a
window issue.
3. Both processBroadcastElement methods from the two broadcast functions
are working and
 retrieving the broadcasted state. I see this in logging.
4. The pipeline definitely worked prior to me adding in the second .connect
and .process at the beginning of the pipeline.
5. I have considered creating a new record object from the new
process(filter) that contains the config retrieved from the broadcast
stream along with the transactions and passing that down the pipeline but
that is really not desirable.

Any ideas on what might be going on here?

Thanks!
Roger