Re: Suggestions for using both broadcast sync and conditional async-io

2020-06-04 Thread orionemail
Thanks for the response, I had not seen the state processor API, somehow I 
missed that.

Regarding your second point,  this is basically an ID mapping service so I need 
the ID's persisted in the DynamoDB (or indeed any other external store) so that 
other applications may also use the 'mapped' ID value (Also so that any new 
mappings are generated/stored back should the Flink job be restarted or 
redeployed).  Maybe I do not need to use asyncIO and this could be implemented 
as a side output sink providing this data is always keyed on the original ID I 
suppose?

Thanks for your response this is certainly food for thought.

O.

Sent with [ProtonMail](https://protonmail.com) Secure Email.

‐‐‐ Original Message ‐‐‐
On Thursday, 4 June 2020 07:03, Tzu-Li (Gordon) Tai  wrote:

> Hi,
>
> For the initial DB fetch and state bootstrapping:
> That's exactly what the State Processor API is for, have you looked at that 
> already?
> It currently does support bootstrapping broadcast state [1], so that should 
> be good news for you.
>
> As a side note, I may be missing something, is broadcast state really 
> necessary in your use case?
> If in your current application, for each record you lookup DynamoDB with the 
> current key of the record,
> then in the new architecture where you move the DynamoDB database into the 
> application as Flink state, you should co-partition the entries with the 
> input record stream.
> If for each record you need to do cross-key lookups, then of course broadcast 
> state is required.
>
> As for the AsyncIO process -
> From my understanding, in the new architecture, you should no longer need the 
> AsyncIO process / lookup to DynamoDB to generate the new mapping, as all 
> information is locally available in Flink state after the bootstrap.
> So, when a record is processed, you check Flink state for existing mapping 
> and proceed, or generate a new mapping and write it to Flink state.
> Essentially, in this scenario Flink state replaces DynamoDB and all lookups 
> are local.
>
> Cheers,
> Gordon
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#broadcast-state-1
>
> On Wed, Jun 3, 2020 at 10:15 PM orionemail  wrote:
>
>> Hi,
>>
>> My current application makes use of a DynamoDB database too map a key to a 
>> value. As each record enters the system the async-io calls this db and 
>> requests a value for the key but if that value doesn't exist a new value is 
>> generated and inserted.  I have managed to do all this in one update 
>> operation to the dynamodb so performance isn't too bad.  This is usable for 
>> our current load, but our load will increase considerably in the near future 
>> and as writes are expensive (each update even if it actually returns the 
>> existing value is classed as a write) this could be a cost factor going 
>> forward.
>>
>> Looking at broadcast state seems like it might be the answer.  DynamoDB 
>> allows 'streams' of table modification events to be output to what is 
>> essentially a kinesis stream, so it might be possible to avoid the majority 
>> of write calls by storing local copies of the mapping.  I should also point 
>> out that these mappings are essentially capped.  The majority of events that 
>> come through will have an existing mapping.
>>
>> My idea is to try the following:
>>
>> 1. Application startup request the entire dataset from the DB (this is ~5m 
>> key:value pairs)
>> 2. Inject this data into flink state somehow, possibly via broadcast state?
>> 3. Subscribe to the DyanmoDB stream via broadcast state to capture updates 
>> to this table and update the flink state
>> 4. When a record is processed, check flink state for existing mapping and 
>> proceed if found.  If not, then AsyncIO process as before to generate a new 
>> mapping
>> 5. DynamoDB writes the new value to the stream so all operators get the new 
>> value via broadcast state
>>
>> Is this idea workable?  I am unsure about the initial DB fetch and the 
>> AsyncIO process should a new value need to be inserted.
>>
>> Any thoughts appreciated.
>>
>> Thanks
>>
>> O

Re: Suggestions for using both broadcast sync and conditional async-io

2020-06-04 Thread Tzu-Li (Gordon) Tai
Hi,

For the initial DB fetch and state bootstrapping:
That's exactly what the State Processor API is for, have you looked at that
already?
It currently does support bootstrapping broadcast state [1], so that should
be good news for you.

As a side note, I may be missing something, is broadcast state really
necessary in your use case?
If in your current application, for each record you lookup DynamoDB with
the current key of the record,
then in the new architecture where you move the DynamoDB database into the
application as Flink state, you should co-partition the entries with the
input record stream.
If for each record you need to do cross-key lookups, then of course
broadcast state is required.

As for the AsyncIO process -
>From my understanding, in the new architecture, you should no longer need
the AsyncIO process / lookup to DynamoDB to generate the new mapping, as
all information is locally available in Flink state after the bootstrap.
So, when a record is processed, you check Flink state for existing mapping
and proceed, or generate a new mapping and write it to Flink state.
Essentially, in this scenario Flink state replaces DynamoDB and all lookups
are local.

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#broadcast-state-1

On Wed, Jun 3, 2020 at 10:15 PM orionemail 
wrote:

> Hi,
>
> My current application makes use of a DynamoDB database too map a key to a
> value. As each record enters the system the async-io calls this db and
> requests a value for the key but if that value doesn't exist a new value is
> generated and inserted.  I have managed to do all this in one update
> operation to the dynamodb so performance isn't too bad.  This is usable for
> our current load, but our load will increase considerably in the near
> future and as writes are expensive (each update even if it actually returns
> the existing value is classed as a write) this could be a cost factor going
> forward.
>
> Looking at broadcast state seems like it might be the answer.  DynamoDB
> allows 'streams' of table modification events to be output to what is
> essentially a kinesis stream, so it might be possible to avoid the majority
> of write calls by storing local copies of the mapping.  I should also point
> out that these mappings are essentially capped.  The majority of events
> that come through will have an existing mapping.
>
> My idea is to try the following:
>
> 1. Application startup request the entire dataset from the DB (this is ~5m
> key:value pairs)
> 2. Inject this data into flink state somehow, possibly via broadcast state?
> 3. Subscribe to the DyanmoDB stream via broadcast state to capture updates
> to this table and update the flink state
> 4. When a record is processed, check flink state for existing mapping and
> proceed if found.  If not, then AsyncIO process as before to generate a new
> mapping
> 5. DynamoDB writes the new value to the stream so all operators get the
> new value via broadcast state
>
> Is this idea workable?  I am unsure about the initial DB fetch and the
> AsyncIO process should a new value need to be inserted.
>
>
> Any thoughts appreciated.
>
> Thanks
>
> O
>


Suggestions for using both broadcast sync and conditional async-io

2020-06-03 Thread orionemail
Hi,

My current application makes use of a DynamoDB database too map a key to a 
value. As each record enters the system the async-io calls this db and requests 
a value for the key but if that value doesn't exist a new value is generated 
and inserted.  I have managed to do all this in one update operation to the 
dynamodb so performance isn't too bad.  This is usable for our current load, 
but our load will increase considerably in the near future and as writes are 
expensive (each update even if it actually returns the existing value is 
classed as a write) this could be a cost factor going forward.

Looking at broadcast state seems like it might be the answer.  DynamoDB allows 
'streams' of table modification events to be output to what is essentially a 
kinesis stream, so it might be possible to avoid the majority of write calls by 
storing local copies of the mapping.  I should also point out that these 
mappings are essentially capped.  The majority of events that come through will 
have an existing mapping.

My idea is to try the following:

1. Application startup request the entire dataset from the DB (this is ~5m 
key:value pairs)
2. Inject this data into flink state somehow, possibly via broadcast state?
3. Subscribe to the DyanmoDB stream via broadcast state to capture updates to 
this table and update the flink state
4. When a record is processed, check flink state for existing mapping and 
proceed if found.  If not, then AsyncIO process as before to generate a new 
mapping
5. DynamoDB writes the new value to the stream so all operators get the new 
value via broadcast state

Is this idea workable?  I am unsure about the initial DB fetch and the AsyncIO 
process should a new value need to be inserted.

Any thoughts appreciated.

Thanks

O