Hi Kurt,

What you said is the 1st reason.
The second reason is this query need to scan the whole table. I think we
can do better :-)

Best,
Jiawei

On Wed, Mar 11, 2020 at 10:52 AM Kurt Young <ykt...@gmail.com> wrote:

> Hi Jiawai,
>
> Sorry I still didn't fully get your question. What's wrong with your
> proposed SQL?
>
> > select vendorId, sum(inventory units)
> > from dynamodb
> > where today's time - inbound time > 15
> > group by vendorId
>
> My guess is that such query would only trigger calculations by new event.
> So if a very old
> inventory like inbounded 17 days ago, and there is no new events coming
> about that inventory,
> then the calculation would not be triggered and you can't sum it, right?
>
> Best,
> Kurt
>
>
> On Wed, Mar 11, 2020 at 10:06 AM Jiawei Wu <wujiawei5837...@gmail.com>
> wrote:
>
>> Hi Robert,
>>
>> Your answer really helps.
>>
>> About the problem, we have 2 choices. The first one is using Flink as
>> described in this email thread. The second one is using AWS Lambda
>> triggered by CDC stream and compute the latest 15 days record, which is a
>> walk-around solution and looks not as elegant as Flink to me.
>>
>> Currently we decided to choose AWS Lambda because we are familiar with
>> it, and the most important, it lead to nearly no operational burden. But we
>> are actively looking for the comparison between Lambda and Flink and want
>> to know in which situation we prefer Flink over Lambda. Several teams in
>> our company are already in a hot debate about the comparison, and the
>> biggest concern is the non-function requirements about Flink, such as fault
>> tolerance, recovery, etc.
>>
>> I also searched the internet but found there are nearly no comparisons
>> between Lambda and Flink except for their market share :-( I'm wondering
>> what do you think of this? Or any comments from flink community is
>> appreciated.
>>
>> Thanks,
>> J
>>
>>
>> On Mon, Mar 9, 2020 at 8:22 PM Robert Metzger <rmetz...@apache.org>
>> wrote:
>>
>>> Hey Jiawei,
>>>
>>> I'm sorry that you haven't received an answer yet.
>>>
>>> So you basically have a stream of dynamodb table updates (let's call id
>>> CDC stream), and you would like to maintain the inventory of the last 15
>>> days for each vendor.
>>> Whenever there's an update in the inventory data (a new event arrives in
>>> the CDC stream), you want to produce a new event with the inventory count.
>>>
>>> If I'm not mistaken, you will need to keep all the inventory in Flink's
>>> state to have an accurate count and to drop old records when they are
>>> expired.
>>> There are two options for maintaining the state:
>>> - in memory (using the FsStateBackend)
>>> - on disk (using the embedded RocksDBStatebackend)
>>>
>>> I would recommend starting with the RocksDBStateBackend. It will work as
>>> long as your state fits on all your machines hard disks (we'll probably not
>>> have an issue there :) )
>>> If you run into performance issues, you can consider switching to a
>>> memory based backend (by then, you should have some knowledge about your
>>> state size)
>>>
>>> For tracking the events, I would recommend you to look into Flink's
>>> windowing API:
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html
>>>  / https://flink.apache.org/news/2015/12/04/Introducing-windows.html
>>> Or alternatively doing an implementation with ProcessFunction:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html
>>> I personally would give it a try with ProcessFunction first.
>>>
>>> For reading the data from DynamoDB, there's an undocumented feature for
>>> it in Flink. This is an example for reading from a DynamoDB stream:
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromDynamoDBStreams.java
>>> Here's also some info: https://issues.apache.org/jira/browse/FLINK-4582
>>>
>>> For writing to DynamoDB there is currently no official sink in Flink. It
>>> should be fairly straightforward to implement a Sink using the SinkFunction
>>> interface of Flink.
>>>
>>> I hope this answers your question.
>>>
>>> Best,
>>> Robert
>>>
>>>
>>>
>>>
>>> On Tue, Mar 3, 2020 at 3:25 AM Jiawei Wu <wujiawei5837...@gmail.com>
>>> wrote:
>>>
>>>> Hi flink users,
>>>>
>>>> We have a problem and think flink may be a good solution for that. But
>>>> I'm new to flink and hope can get some insights from flink community :)
>>>>
>>>> Here is the problem. Suppose we have a DynamoDB table which store the
>>>> inventory data, the schema is like:
>>>>
>>>> * vendorId (primary key)
>>>> * inventory name
>>>> * inventory units
>>>> * inbound time
>>>> ...
>>>>
>>>> This DDB table keeps changing, since we have inventory coming and
>>>> removal. *Every change will trigger a DynamoDB stream. *
>>>> We need to calculate *all the inventory units that > 15 days for a
>>>> specific vendor* like this:
>>>> > select vendorId, sum(inventory units)
>>>> > from dynamodb
>>>> > where today's time - inbound time > 15
>>>> > group by vendorId
>>>> We don't want to schedule a daily batch job, so we are trying to work
>>>> on a micro-batch solution in Flink, and publish this data to another
>>>> DynamoDB table.
>>>>
>>>> A draft idea is to use the total units minus <15 days units, since both
>>>> of then have event trigger. But no detailed solutions yet.
>>>>
>>>> Could anyone help provide some insights here?
>>>>
>>>> Thanks,
>>>> J.
>>>>
>>>

Reply via email to