Re: Persisting state in RocksDB

2021-06-10 Thread Arvid Heise
Hi Paul,

You can leave operators dangling. So no need to add fake sinks.

If you write to HTTP, the best option is actually asyncIO. [1] This will
run much much faster.

AsyncIO however has no state access (we want to change that eventually but
for now it's to avoid too many antipatterns).
For me it's not clear how you exactly want to use the state. If it's to
avoid duplicate requests or the requests look differently then I'd propose.

Source -> Async (transform) -> keyBy+KeyedProcessFunction (deduplicate or
update) -> Async (submit)

For the state access transition from DB, it helps to really just think in
terms of a single key: what does an operator see if he would only receive
records with the same single key? So you probably have a value state which
holds information about the previous articles with the same key.
For deduplication, you just need a boolean and you'd not emit anything in
the process function if the state is present. If not you set state and emit.
For diffs, you'd fetch the old state value and compare it to the new value.
Then you would also update the old state with the new value and emit the
diff.
For deleting the old article and replacing it with a new article, you'd
fetch the old state, emit a record (delete, old), update state, emit record
(put, new).

Flink then takes care of ensuring that the different keys are processed in
parallel without interference and also manages the persistence to rocks DB.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/

On Thu, Jun 10, 2021 at 1:14 PM Paul K Moore  wrote:

> Hi Arvid - thanks for the welcome :)
>
> The SinkFunction is custom (extends RichSinkFunction), and writes to a
> legacy web service over HTTP.
>
> I’ll investigate the keyBy+KeyedProcessFunction further - thanks. Frankly
> I looked at this but I think I was confusing myself between working with KV
> store (database thinking) and the new (to me) world of "stream state".
>
> Additionally, if I move my SinkFunction functionality (HTTP POST, PUT etc)
> into the KeyedProcess, I assume I would use a DiscardingSink to terminate
> the flow, or is this an anti-pattern?
>
> Many thanks
>
> Paul
>
>
>
> On 9 Jun 2021, at 13:02, Arvid Heise  wrote:
>
> Hi Paul,
>
> Welcome to the club!
>
> What's your SinkFunction? Is it custom? If so, you could also implement
> CheckpointedFunction to read and write data.
> Here you could use OperatorStateStore and with it the BroadcastState.
> However, broadcast state is quite heavy (it sends all data to all
> instances, so it doesn't scale).
>
> A better way would be to have a keyBy+KeyedProcessFunction before the sink
> function. You could keyBy your key and use a normal value state [1] to
> store the data point. If you configure your state backend to be rocksdb
> [2]. Then you have everything together.
>
> Note that you could also have it next to sink function. There is no reason
> to not have a dangling operator (no sink).
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#using-keyed-state
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/
>
> On Wed, Jun 9, 2021 at 12:13 AM Paul K Moore  wrote:
>
>> Hi all,
>>
>> First post here, so please be kind :)
>>
>> Firstly some context; I have the following high-level job topology:
>>
>> (1) FlinkPulsarSource -> (2) RichAsyncFunction -> (3) SinkFunction
>>
>> 1. The FlinkPulsarSource reads event notifications about article updates
>> from a Pulsar topic
>> 2. The RichAsyncFunction fetches the “full” article from the specified
>> URL end-point, and transmutes it into a “legacy” article format
>> 3. The SinkFunction writes the “legacy” article to a (legacy) web
>> platform i.e. the sink is effectively another web site
>>
>> I have this all up and running (despite lots of shading fun).
>>
>> When the SinkFunction creates an article on the legacy platform it
>> returns an 'HTTP 201 - Created’ with a Location header suitably populated.
>>
>> Now, I need to persist that Location header and, more explicitly, need to
>> persist a map between the URLs for the new and legacy platforms.  This is
>> needed for latter update and delete processing.
>>
>> The question is how do I store this mapping information?
>>
>> I’ve spent some time trying to grok state management and the various
>> backends, but from what I can see the state management is focused on
>> “operator scoped” state.  This seems reasonable given the requirement for
>> barriers etc to ensure accurate recovery.
>>
>> However, I need some persistence between operators (shared state?) and
>> with longevity beyond the processing of an operator.
>>
>> My gut reaction is that I need an external K-V store such as Ignite (or
>> similar). Frankly given that Flink ships with embedded RocksDB I was hoping
>> to use that, but there seems no obvious way to do this, and lots of advice
>> saying don’t :)
>>
>> Am I 

Re: Persisting state in RocksDB

2021-06-10 Thread Paul K Moore
Hi Arvid - thanks for the welcome :)

The SinkFunction is custom (extends RichSinkFunction), and writes to a legacy 
web service over HTTP.

I’ll investigate the keyBy+KeyedProcessFunction further - thanks. Frankly I 
looked at this but I think I was confusing myself between working with KV store 
(database thinking) and the new (to me) world of "stream state".

Additionally, if I move my SinkFunction functionality (HTTP POST, PUT etc) into 
the KeyedProcess, I assume I would use a DiscardingSink to terminate the flow, 
or is this an anti-pattern?

Many thanks

Paul



> On 9 Jun 2021, at 13:02, Arvid Heise  wrote:
> 
> Hi Paul,
> 
> Welcome to the club!
> 
> What's your SinkFunction? Is it custom? If so, you could also implement 
> CheckpointedFunction to read and write data.
> Here you could use OperatorStateStore and with it the BroadcastState. 
> However, broadcast state is quite heavy (it sends all data to all instances, 
> so it doesn't scale).
> 
> A better way would be to have a keyBy+KeyedProcessFunction before the sink 
> function. You could keyBy your key and use a normal value state [1] to store 
> the data point. If you configure your state backend to be rocksdb [2]. Then 
> you have everything together.
> 
> Note that you could also have it next to sink function. There is no reason to 
> not have a dangling operator (no sink).
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#using-keyed-state
>  
> 
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/
>  
> 
> On Wed, Jun 9, 2021 at 12:13 AM Paul K Moore  > wrote:
> Hi all,
> 
> First post here, so please be kind :)
> 
> Firstly some context; I have the following high-level job topology:
> 
> (1) FlinkPulsarSource -> (2) RichAsyncFunction -> (3) SinkFunction
> 
> 1. The FlinkPulsarSource reads event notifications about article updates from 
> a Pulsar topic
> 2. The RichAsyncFunction fetches the “full” article from the specified URL 
> end-point, and transmutes it into a “legacy” article format
> 3. The SinkFunction writes the “legacy” article to a (legacy) web platform 
> i.e. the sink is effectively another web site
> 
> I have this all up and running (despite lots of shading fun).
> 
> When the SinkFunction creates an article on the legacy platform it returns an 
> 'HTTP 201 - Created’ with a Location header suitably populated.
> 
> Now, I need to persist that Location header and, more explicitly, need to 
> persist a map between the URLs for the new and legacy platforms.  This is 
> needed for latter update and delete processing.
> 
> The question is how do I store this mapping information?
> 
> I’ve spent some time trying to grok state management and the various 
> backends, but from what I can see the state management is focused on 
> “operator scoped” state.  This seems reasonable given the requirement for 
> barriers etc to ensure accurate recovery.
> 
> However, I need some persistence between operators (shared state?) and with 
> longevity beyond the processing of an operator.
> 
> My gut reaction is that I need an external K-V store such as Ignite (or 
> similar). Frankly given that Flink ships with embedded RocksDB I was hoping 
> to use that, but there seems no obvious way to do this, and lots of advice 
> saying don’t :)
> 
> Am I missing something obvious here?
> 
> Many thanks in advance
> 
> Paul
> 
> 



Re: Persisting state in RocksDB

2021-06-09 Thread Arvid Heise
Hi Paul,

Welcome to the club!

What's your SinkFunction? Is it custom? If so, you could also implement
CheckpointedFunction to read and write data.
Here you could use OperatorStateStore and with it the BroadcastState.
However, broadcast state is quite heavy (it sends all data to all
instances, so it doesn't scale).

A better way would be to have a keyBy+KeyedProcessFunction before the sink
function. You could keyBy your key and use a normal value state [1] to
store the data point. If you configure your state backend to be rocksdb
[2]. Then you have everything together.

Note that you could also have it next to sink function. There is no reason
to not have a dangling operator (no sink).

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#using-keyed-state
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/

On Wed, Jun 9, 2021 at 12:13 AM Paul K Moore  wrote:

> Hi all,
>
> First post here, so please be kind :)
>
> Firstly some context; I have the following high-level job topology:
>
> (1) FlinkPulsarSource -> (2) RichAsyncFunction -> (3) SinkFunction
>
> 1. The FlinkPulsarSource reads event notifications about article updates
> from a Pulsar topic
> 2. The RichAsyncFunction fetches the “full” article from the specified URL
> end-point, and transmutes it into a “legacy” article format
> 3. The SinkFunction writes the “legacy” article to a (legacy) web platform
> i.e. the sink is effectively another web site
>
> I have this all up and running (despite lots of shading fun).
>
> When the SinkFunction creates an article on the legacy platform it returns
> an 'HTTP 201 - Created’ with a Location header suitably populated.
>
> Now, I need to persist that Location header and, more explicitly, need to
> persist a map between the URLs for the new and legacy platforms.  This is
> needed for latter update and delete processing.
>
> The question is how do I store this mapping information?
>
> I’ve spent some time trying to grok state management and the various
> backends, but from what I can see the state management is focused on
> “operator scoped” state.  This seems reasonable given the requirement for
> barriers etc to ensure accurate recovery.
>
> However, I need some persistence between operators (shared state?) and
> with longevity beyond the processing of an operator.
>
> My gut reaction is that I need an external K-V store such as Ignite (or
> similar). Frankly given that Flink ships with embedded RocksDB I was hoping
> to use that, but there seems no obvious way to do this, and lots of advice
> saying don’t :)
>
> Am I missing something obvious here?
>
> Many thanks in advance
>
> Paul
>
>
>


Persisting state in RocksDB

2021-06-08 Thread Paul K Moore
Hi all,

First post here, so please be kind :)

Firstly some context; I have the following high-level job topology:

(1) FlinkPulsarSource -> (2) RichAsyncFunction -> (3) SinkFunction

1. The FlinkPulsarSource reads event notifications about article updates from a 
Pulsar topic
2. The RichAsyncFunction fetches the “full” article from the specified URL 
end-point, and transmutes it into a “legacy” article format
3. The SinkFunction writes the “legacy” article to a (legacy) web platform i.e. 
the sink is effectively another web site

I have this all up and running (despite lots of shading fun).

When the SinkFunction creates an article on the legacy platform it returns an 
'HTTP 201 - Created’ with a Location header suitably populated.

Now, I need to persist that Location header and, more explicitly, need to 
persist a map between the URLs for the new and legacy platforms.  This is 
needed for latter update and delete processing.

The question is how do I store this mapping information?

I’ve spent some time trying to grok state management and the various backends, 
but from what I can see the state management is focused on “operator scoped” 
state.  This seems reasonable given the requirement for barriers etc to ensure 
accurate recovery.

However, I need some persistence between operators (shared state?) and with 
longevity beyond the processing of an operator.

My gut reaction is that I need an external K-V store such as Ignite (or 
similar). Frankly given that Flink ships with embedded RocksDB I was hoping to 
use that, but there seems no obvious way to do this, and lots of advice saying 
don’t :)

Am I missing something obvious here?

Many thanks in advance

Paul