Unsubscribe

2020-07-21 Thread Harshvardhan Agrawal
-- 
Regards,
Harshvardhan


Re: Writing a custom Rocksdb statistics collector

2019-01-31 Thread Harshvardhan Agrawal
It looks like the DBOptions that are created by the OptionsFactory class
are used for opening RocksDB.

And yes I missed the fact that DBOptions is not serializable. Thanks for
pointing that out. I will go through the metrics exposed via Flink. But
does this mean that there no good way of getting native RocksDB metrics in
Flink?

On Wed, Jan 30, 2019 at 23:07 Yun Tang  wrote:

> Hi Harshvardhan
>
> First of all, 'DBOptions' is not serializable, I think you cannot include
> it in the source constructor.
>
> I also wondering whether the given `DBOptions` could query RocksDB's
> statistics since they are not the actual options to open RocksDB.
>
> We have tried to report RocksDB's statistics each time when RocksDB
> state-backend snapshots, but this solution means you have to modify RocksDB
> state-backend's source code. By the way, Flink supports to report some
> native metrics[1], hope this could be helpful.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#rocksdb-native-metrics
> <https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#rocksdb-native-metrics>
>
> Best
> Yun Tang
> --
> *From:* Harshvardhan Agrawal 
> *Sent:* Thursday, January 31, 2019 0:23
> *To:* user
> *Subject:* Writing a custom Rocksdb statistics collector
>
>
> Hi,
>
> I am currently trying to integrate RocksDB statistics in my pipeline.
>
> The basic idea is that we want to pass RocksDB stats through the same 
> pipeline that is doing our processing and write them to Elasticsearch so that 
> we can visualize them in Kibana.
>
> I have written a custom source function that takes in the DBOptions object 
> from the stream environment and supply it to the source function which then 
> uses this dboptions object to continuously query Rocksdb for metrics. Here's 
> the code:
>
> public class RocksDBStatsStreamRunner {
>
>
> public static void main(String[] args) throws IOException {
>
> final StreamExecutionEnvironment streamEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> RocksDBStateBackend rocksDBStateBackend = new 
> RocksDBStateBackend("/tmp",true);
> rocksDBStateBackend.setOptions(new MyOptionsFactory());
> streamEnv.setStateBackend(rocksDBStateBackend);
>
> DBOptions dbOptions = 
> ((RocksDBStateBackend)streamEnv.getStateBackend()).getDbOptions();
> streamEnv.addSource(new RocksDBStatisticsSource(dbOptions));
> }
> }
>
>
> public RocksDBStatisticsSource(DBOptions dbOptions) {
> this(dbOptions, DEFAULT_SLEEP_TIME_MS);
> }
>
> public RocksDBStatisticsSource(DBOptions dbOptions, long waitTimeMs) {
> this.dbOptions = dbOptions;
> this.waitTimeMs = waitTimeMs;
> }
>
>
> @Override
> public void stop() {
> this.isRunning = false;
> }
>
> @Override
> public void run(SourceContext sourceContext) throws Exception {
> while(isRunning) {
> //create rocksdb statistics object
> //query rocksdb for statistics using the options field
> //sourceContext.collect(rocksdbStats object)
> //sleep
> }
> }
>
> @Override
> public void cancel() {
> this.isRunning = false;
> }
>
> I am assuming that we will get a separate RocksDB options object for each
> of the slots. Is this a good way to approach this problem? Do you think
> this will work?
>
> Thanks in advance! :)
> --
>
> *Regards, Harshvardhan Agrawal*
>
-- 
Regards,
Harshvardhan


Writing a custom Rocksdb statistics collector

2019-01-30 Thread Harshvardhan Agrawal
Hi,

I am currently trying to integrate RocksDB statistics in my pipeline.

The basic idea is that we want to pass RocksDB stats through the same
pipeline that is doing our processing and write them to Elasticsearch
so that we can visualize them in Kibana.

I have written a custom source function that takes in the DBOptions
object from the stream environment and supply it to the source
function which then uses this dboptions object to continuously query
Rocksdb for metrics. Here's the code:

public class RocksDBStatsStreamRunner {


public static void main(String[] args) throws IOException {

final StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
RocksDBStateBackend rocksDBStateBackend = new
RocksDBStateBackend("/tmp",true);
rocksDBStateBackend.setOptions(new MyOptionsFactory());
streamEnv.setStateBackend(rocksDBStateBackend);

DBOptions dbOptions =
((RocksDBStateBackend)streamEnv.getStateBackend()).getDbOptions();
streamEnv.addSource(new RocksDBStatisticsSource(dbOptions));
}
}


public RocksDBStatisticsSource(DBOptions dbOptions) {
this(dbOptions, DEFAULT_SLEEP_TIME_MS);
}

public RocksDBStatisticsSource(DBOptions dbOptions, long waitTimeMs) {
this.dbOptions = dbOptions;
this.waitTimeMs = waitTimeMs;
}


@Override
public void stop() {
this.isRunning = false;
}

@Override
public void run(SourceContext sourceContext) throws Exception {
while(isRunning) {
//create rocksdb statistics object
//query rocksdb for statistics using the options field
//sourceContext.collect(rocksdbStats object)
//sleep
}
}

@Override
public void cancel() {
this.isRunning = false;
}

I am assuming that we will get a separate RocksDB options object for each
of the slots. Is this a good way to approach this problem? Do you think
this will work?

Thanks in advance! :)
-- 

*Regards,Harshvardhan Agrawal*


Take RocksDB state dump

2018-10-17 Thread Harshvardhan Agrawal
Hello,

We are currently using a RocksDBStateBackend for our Flink pipeline. We
want to analyze the data that is stored in Rocksdb state.Is  there a
recommended process to do that? The sst_dump tool available from RocksDB
isn’t working for us and we keep on getting errors like “Snappy not
supported or corrupted Snappy compressed block contents”. My thought was
that it might be happening since I am trying to take a dump while the Flink
pipeline is running. Upon cancelling the pipeline all the state was removed
and I didn’t have any sst files to look at. I was wondering how have people
approached this problem.

Regards,
Harsh


-- 
Regards,Harshvardhan


Re: Between Checkpoints in Kafka 11

2018-09-23 Thread Harshvardhan Agrawal
Hi,

Can someone please help me understand how does the exactly once semantic
work with Kafka 11 in Flink?

Thanks,
Harsh

On Tue, Sep 11, 2018 at 10:54 AM Harshvardhan Agrawal <
harshvardhan.ag...@gmail.com> wrote:

> Hi,
>
> I was going through the blog post on how TwoPhaseCommitSink function works
> with Kafka 11. One of the things I don’t understand is: What is the
> behavior of the Kafka 11 Producer between two checkpoints? Say that the
> time interval between two checkpoints is set to 15 minutes. Will Flink
> buffer all records in memory in that case and start writing to Kafka when
> the next checkpoint starts?
>
> Thanks!
> --
> Regards,
> Harshvardhan
>


-- 

*Regards,Harshvardhan Agrawal*
*267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*


Between Checkpoints in Kafka 11

2018-09-11 Thread Harshvardhan Agrawal
Hi,

I was going through the blog post on how TwoPhaseCommitSink function works
with Kafka 11. One of the things I don’t understand is: What is the
behavior of the Kafka 11 Producer between two checkpoints? Say that the
time interval between two checkpoints is set to 15 minutes. Will Flink
buffer all records in memory in that case and start writing to Kafka when
the next checkpoint starts?

Thanks!
-- 
Regards,
Harshvardhan


Re: Behaviour of Process Window Function

2018-09-10 Thread Harshvardhan Agrawal
Hi,

Our application is financial data enrichment. What we want to do is that we
want to first key the positions by Account Number and then window them.
Within a window I want to get all the unique products across all the
accounts and make an external service call to hydrate the cache for that
window. We also want to be able to handle cases where it is possible that a
product could be owned by multiple accounts in a window in which case we
don't want to be making the external call multiple times. In my case, all
the incremental aggregation really does is that it collects all the unqiue
product keys in a set and then it supplies the set to the process window
function which takes care of hydrating and clearing the Guava cache.

Is there any way we could share some managed state across multiple keys? If
not, I will have to use Guava cache and I won't be able to take the benefit
of async checkpointing.

On Fri, Sep 7, 2018 at 9:49 AM Hequn Cheng  wrote:

> Hi Harshvardhan,
>
> *> 1) Does the state in the process window function qualify as KeyedState
> or OperatorState? *
> KeyedState
>
> *> We want to be able to rehydrate the guava cache at the beginning of
> each window by making an external rest call and clear the cache at the end
> of that respective window. How can we enforce this behaviour in Flink?*
> Why do you want to clear cache after window if the cache is shared across
> all keys. Do you want to load cache per key?
> If you want to aggregate elements incrementally, I think it is hard to get
> start and end in `ProcessWindowFunction` or in `IncrementalAggregation`
> function. However, I think we can get start and end in the trigger
> function, i.e., do cache load and clear in the trigger function.
>
> Best, Hequn
>
>
> On Fri, Sep 7, 2018 at 11:28 AM vino yang  wrote:
>
>> Hi Harshvardhan,
>>
>> 1) Yes, ProcessWindowFunction extends AbstractRichFunction, through
>> getRuntimeContext,you can access keyed state API.
>> 2) ProcessWindowFunction has given you considerable flexibility, you can
>> based on processing time / event time / timer / it's clear method /
>> customized implementation, the specific design depends on your business
>> logic, how long you need to save the cache.
>>
>> Thanks, vino.
>>
>> Harshvardhan Agrawal  于2018年9月6日周四
>> 下午10:10写道:
>>
>>> Hello,
>>>
>>> We have a Flink pipeline where we are windowing our data after a keyBy.
>>> i.e.
>>> myStream.keyBy().window().process(MyIncrementalAggregation(),
>>> MyProcessFunction()).
>>>
>>> I have two questions about the above line of code:
>>> 1) Does the state in the process window function qualify as KeyedState
>>> or OperatorState? If operator state, can we access KeyedState from the
>>> Process Window function?
>>> 2) We also have certain reference information that we want to share
>>> across all keys in the process window function. We are currently storing
>>> all that info in a Guava cache. We want to be able to rehydrate the guava
>>> cache at the beginning of each window by making an external rest call and
>>> clear the cache at the end of that respective window. How can we enforce
>>> this behaviour in Flink? Do I need to use a timerservice for this where the
>>> callback will be a window.maxtimestamp() or just clearing the cache in the
>>> clear method will do the trick?
>>>
>>> --
>>>
>>> *Regards,Harshvardhan Agrawal*
>>>
>>

-- 

*Regards,Harshvardhan Agrawal*
*267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*


Behaviour of Process Window Function

2018-09-06 Thread Harshvardhan Agrawal
Hello,

We have a Flink pipeline where we are windowing our data after a keyBy. i.e.
myStream.keyBy().window().process(MyIncrementalAggregation(),
MyProcessFunction()).

I have two questions about the above line of code:
1) Does the state in the process window function qualify as KeyedState or
OperatorState? If operator state, can we access KeyedState from the Process
Window function?
2) We also have certain reference information that we want to share across
all keys in the process window function. We are currently storing all that
info in a Guava cache. We want to be able to rehydrate the guava cache at
the beginning of each window by making an external rest call and clear the
cache at the end of that respective window. How can we enforce this
behaviour in Flink? Do I need to use a timerservice for this where the
callback will be a window.maxtimestamp() or just clearing the cache in the
clear method will do the trick?

-- 

*Regards,Harshvardhan Agrawal*


Re: Implement Joins with Lookup Data

2018-08-22 Thread Harshvardhan Agrawal
Hi Hequn,

We considered that but unfortunately we have a lot of reference data and we
would need enormous amount of memory to hold the data. As a proof of
concept I had added a Guava cache and that did improve performance but then
it can't hold all of our reference data. We have a lot of use cases where
we want to join position data with Account, Product, Exchange Rate, etc.
The joins can easily be across several datasets in order to obtain the
final enriched information.
Now if I were to keep an external cache say something like Ignite, I would
need the some service that constantly keeps hitting the cache for every
position which makes the pipeline super chatty. Hence we thought going with
the windowing approach would help us control that chattiness.

I like Till's solution of connecting streams and using CoFlatMap. I can
also see an example on Data Artisan's website (
http://training.data-artisans.com/exercises/eventTimeJoin.html#). What I
don't get is, how would this work when I have more than 2 datasets
involved. In my case say I wanted to enrich Positions using Account,
Product and Exchange Rate datasets.

Regards,
Harsh

On Sun, Aug 19, 2018 at 10:22 PM Hequn Cheng  wrote:

> Hi Harshvardhan,
>
> Have you ever consider adding a cache when lookup from the database, so
> that we don't have to add so many pipelines, also don't have to do window
> distinct.
> The cache can be a LRU cache with size and expire time specified.
> If your data is limited it can also be an All data cache. The All data
> cache can be updated, say each 2h, according to our requirement.
>
> Adding a cache can not only simplify your pipeline but also improve the
> job performance.
>
> Best, Hequn
>
>
> On Mon, Aug 20, 2018 at 5:42 AM, Harshvardhan Agrawal <
> harshvardhan.ag...@gmail.com> wrote:
>
>> Hello Everyone,
>>
>> Sorry for the delayed response.
>> This is what I am thinking of doing. We are thinking of creating 2
>> pipelines. The first one only enriches the position data with product
>> information. The second pipeline will use the enriched position and get all
>> the account information for performing aggregations.
>>
>> *First Pipeline*:
>> 1) Get the positions from Kafka and window data into tumbling windows of
>> 30 seconds.
>> 2) We perform a rolling aggregation that basically collects all the
>> unique product keys in a set.
>> 3) At the end of the window, we have a process function that queries an
>> external service that performs a single lookup for all the unique products
>> we have seen in the window.
>> 4) Persist the enriched positions to Kafka topic T1. There is a sink
>> process that reads from this Kafka topic (T1), writes to an underlying DB
>> and persist to another Kafka topic (T2)  for the pipeline to read from.
>>
>> *Second Pipeline*
>> 1) Reads from topic T2 that contains enriched position.
>> 2) For each position, we get the account information and lookup all the
>> parent and child accounts associated with that account.
>> 3) Once we have all the accounts, we lookup all the enriched positions
>> that were created from the first pipeline for those accounts.
>> 4) We perform the final aggregation to say calculate the Net Asset Value
>> for the account.
>> 5) Persist the output to the DB.
>>
>> Regards,
>> Harsh
>>
>> On Wed, Jul 25, 2018 at 6:52 PM ashish pok  wrote:
>>
>>> Hi Michael,
>>>
>>> We are currently using 15 TMs with 4 cores and 4 slots each, 10GB of
>>> memory on each TM. We have 15 partitions on Kafka for stream and 6 for
>>> context/smaller stream. Heap is around 50%, GC is about 150ms and CPU loads
>>> are low. We may be able to reduce resources on this if need be.
>>>
>>> Thanks,
>>>
>>>
>>> - Ashish
>>>
>>> On Wednesday, July 25, 2018, 4:07 AM, Michael Gendelman <
>>> gen...@gmail.com> wrote:
>>>
>>> Hi Ashish,
>>>
>>> We are planning for a similar use case and I was wondering if you can
>>> share the amount of resources you have allocated for this flow?
>>>
>>> Thanks,
>>> Michael
>>>
>>> On Tue, Jul 24, 2018, 18:57 ashish pok  wrote:
>>>
>>> BTW,
>>>
>>> We got around bootstrap problem for similar use case using a “nohup”
>>> topic as input stream. Our CICD pipeline currently passes an initialize
>>> option to app IF there is a need to bootstrap and waits for X minutes
>>> before taking a savepoint and restart app normally listening to right
>>> topic(s). I believe there is work underway to ha

Re: Implement Joins with Lookup Data

2018-08-19 Thread Harshvardhan Agrawal
Hello Everyone,

Sorry for the delayed response.
This is what I am thinking of doing. We are thinking of creating 2
pipelines. The first one only enriches the position data with product
information. The second pipeline will use the enriched position and get all
the account information for performing aggregations.

*First Pipeline*:
1) Get the positions from Kafka and window data into tumbling windows of 30
seconds.
2) We perform a rolling aggregation that basically collects all the unique
product keys in a set.
3) At the end of the window, we have a process function that queries an
external service that performs a single lookup for all the unique products
we have seen in the window.
4) Persist the enriched positions to Kafka topic T1. There is a sink
process that reads from this Kafka topic (T1), writes to an underlying DB
and persist to another Kafka topic (T2)  for the pipeline to read from.

*Second Pipeline*
1) Reads from topic T2 that contains enriched position.
2) For each position, we get the account information and lookup all the
parent and child accounts associated with that account.
3) Once we have all the accounts, we lookup all the enriched positions that
were created from the first pipeline for those accounts.
4) We perform the final aggregation to say calculate the Net Asset Value
for the account.
5) Persist the output to the DB.

Regards,
Harsh

On Wed, Jul 25, 2018 at 6:52 PM ashish pok  wrote:

> Hi Michael,
>
> We are currently using 15 TMs with 4 cores and 4 slots each, 10GB of
> memory on each TM. We have 15 partitions on Kafka for stream and 6 for
> context/smaller stream. Heap is around 50%, GC is about 150ms and CPU loads
> are low. We may be able to reduce resources on this if need be.
>
> Thanks,
>
>
> - Ashish
>
> On Wednesday, July 25, 2018, 4:07 AM, Michael Gendelman 
> wrote:
>
> Hi Ashish,
>
> We are planning for a similar use case and I was wondering if you can
> share the amount of resources you have allocated for this flow?
>
> Thanks,
> Michael
>
> On Tue, Jul 24, 2018, 18:57 ashish pok  wrote:
>
> BTW,
>
> We got around bootstrap problem for similar use case using a “nohup” topic
> as input stream. Our CICD pipeline currently passes an initialize option to
> app IF there is a need to bootstrap and waits for X minutes before taking a
> savepoint and restart app normally listening to right topic(s). I believe
> there is work underway to handle this gracefully using Side Input as well.
> Other than determining X minutes for initialization to complete, we havent
> had any issue with this solution - we have over 40 million states refreshes
> daily and close to 200Mbps input streams being joined to states.
>
> Hope this helps!
>
>
>
> - Ashish
>
> On Tuesday, July 24, 2018, 11:37 AM, Elias Levy <
> fearsome.lucid...@gmail.com> wrote:
>
> Alas, this suffer from the bootstrap problem.  At the moment Flink does
> not allow you to pause a source (the positions), so you can't fully consume
> the and preload the accounts or products to perform the join before the
> positions start flowing.  Additionally, Flink SQL does not support
> materializing an upset table for the accounts or products to perform the
> join, so yo have to develop your own KeyedProcessFunction, maintain the
> state, and perform the join on your own if you only want to join against
> the latest value for each key.
>
> On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann 
> wrote:
>
> Yes, using Kafka which you initialize with the initial values and then
> feed changes to the Kafka topic from which you consume could be a solution.
>
> On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal <
> harshvardhan.ag...@gmail.com> wrote:
>
> Hi Till,
>
> How would we do the initial hydration of the Product and Account data
> since it’s currently in a relational DB? Do we have to copy over data to
> Kafka and then use them?
>
> Regards,
> Harsh
>
> On Tue, Jul 24, 2018 at 09:22 Till Rohrmann  wrote:
>
> Hi Harshvardhan,
>
> I agree with Ankit that this problem could actually be solved quite
> elegantly with Flink's state. If you can ingest the product/account
> information changes as a stream, you can keep the latest version of it in
> Flink state by using a co-map function [1, 2]. One input of the co-map
> function would be the product/account update stream which updates the
> respective entries in Flink's state and the other input stream is the one
> to be enriched. When receiving input from this stream one would lookup the
> latest information contained in the operator's state and join it with the
> incoming event.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/
> [2]
> https://ci.apache.org/pr

Behavior of time based operators

2018-08-02 Thread Harshvardhan Agrawal
Hello,

I have recently started reading Stream Processing with Apache Flink by
Fabian and Vasiliki. In Chapter 3 of the book there is a statement that
says: None of the functions expose an API to set time stamps of emitted
records, manipulate the event-time clock of a task, or emit watermarks.
Instead, time-based DataStream operator tasks internally set the time
stamps of emitted records to ensure that they are properly aligned with the
emitted watermarks. For instance, a time-window operator task attached the
end time of a window as time stamp to all records emitted by the window
computation before it emits the watermark with the time stamp that
triggered the computation of the window.

Does this mean that time stamps in the records are overwritten by these
time-based operators when using Event Time?
-- 
Regards,
Harshvardhan


Re: Order of events in a Keyed Stream

2018-07-30 Thread Harshvardhan Agrawal
Thanks for the response guys.

Based on Niels response, it seems like a keyby immediately after reading
from the source should map all messages with the account number on the same
slot.

On Sun, Jul 29, 2018 at 05:33 Renjie Liu  wrote:

> Hi,
> Another way to ensure order is by adding a logical version number for each
> message so that earlier version will not override later version. Timestamp
> depends on your ntp server works correctly.
>
> On Sun, Jul 29, 2018 at 3:52 PM Niels Basjes  wrote:
>
>> Hi,
>>
>> The basic thing is that you will only get the messages in a guaranteed
>> order if the order is maintained in all steps from creation to use.
>> In Kafka order is only guaranteed for messages in the same partition.
>> So if you need them in order by account then the producing system must
>> use the accountid as the key used to force a specific account into a
>> specific kafka partition.
>> Then the Flink Kafka source will read them sequentially in the right
>> order, but in order to KEEP them in that order you should really to a keyby
>> immediately after reading and used only keyedstreams from that point
>> onwards.
>> As soon as you do shuffle or key by a different key then the ordering
>> within an account is no longer guaranteed.
>>
>> In general I always put a very accurate timestamp in all of my events
>> (epoch milliseconds, in some cases even epoch microseconds) so I can always
>> check if an order problem occurred.
>>
>> Niels Basjes
>>
>> On Sun, Jul 29, 2018 at 9:25 AM, Congxian Qiu 
>> wrote:
>>
>>> Hi,
>>> Maybe the messages of the same key should be in the *same partition* of
>>> Kafka topic
>>>
>>> 2018-07-29 11:01 GMT+08:00 Hequn Cheng :
>>>
>>>> Hi harshvardhan,
>>>> If 1.the messages exist on the same topic and 2.there are no rebalance
>>>> and 3.keyby on the same field with same value, the answer is yes.
>>>>
>>>> Best, Hequn
>>>>
>>>> On Sun, Jul 29, 2018 at 3:56 AM, Harshvardhan Agrawal <
>>>> harshvardhan.ag...@gmail.com> wrote:
>>>>
>>>>> Hey,
>>>>>
>>>>> The messages will exist on the same topic. I intend to keyby on the
>>>>> same field. The question is that will the two messages be mapped to the
>>>>> same task manager and on the same slot. Also will they be processed in
>>>>> correct order given they have the same keys?
>>>>>
>>>>> On Fri, Jul 27, 2018 at 21:28 Hequn Cheng 
>>>>> wrote:
>>>>>
>>>>>> Hi Harshvardhan,
>>>>>>
>>>>>> There are a number of factors to consider.
>>>>>> 1. the consecutive Kafka messages must exist in a same topic of
>>>>>> kafka.
>>>>>> 2. the data should not been rebalanced. For example, operators should
>>>>>> be chained in order to avoid rebalancing.
>>>>>> 3. if you perform keyBy(), you should keyBy on a field the consecutive
>>>>>> two messages share the same value.
>>>>>>
>>>>>> Best, Hequn
>>>>>>
>>>>>> On Sat, Jul 28, 2018 at 12:11 AM, Harshvardhan Agrawal <
>>>>>> harshvardhan.ag...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>>
>>>>>>> We are currently using Flink to process financial data. We are
>>>>>>> getting position data from Kafka and we enrich the positions with 
>>>>>>> account
>>>>>>> and product information. We are using Ingestion time while processing
>>>>>>> events. The question I have is: say I key the position datasream by 
>>>>>>> account
>>>>>>> number. If I have two consecutive Kafka messages with the same account 
>>>>>>> and
>>>>>>> product info where the second one is an updated position of the first 
>>>>>>> one,
>>>>>>> does Flink guarantee that the messages will be processed on the same 
>>>>>>> slot
>>>>>>> in the same worker? We want to ensure that we don’t process them out of
>>>>>>> order.
>>>>>>>
>>>>>>> Thank you!
>>>>>>> --
>>>>>>> Regards,
>>>>>>> Harshvardhan
>>>>>>>
>>>>>>
>>>>>> --
>>>>> Regards,
>>>>> Harshvardhan
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Blog:http://www.klion26.com
>>> GTalk:qcx978132955
>>> 一切随心
>>>
>>
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
> --
> Liu, Renjie
> Software Engineer, MVAD
>
-- 
Regards,
Harshvardhan


Re: Order of events in a Keyed Stream

2018-07-28 Thread Harshvardhan Agrawal
Hey,

The messages will exist on the same topic. I intend to keyby on the same
field. The question is that will the two messages be mapped to the same
task manager and on the same slot. Also will they be processed in correct
order given they have the same keys?

On Fri, Jul 27, 2018 at 21:28 Hequn Cheng  wrote:

> Hi Harshvardhan,
>
> There are a number of factors to consider.
> 1. the consecutive Kafka messages must exist in a same topic of kafka.
> 2. the data should not been rebalanced. For example, operators should be
> chained in order to avoid rebalancing.
> 3. if you perform keyBy(), you should keyBy on a field the consecutive
> two messages share the same value.
>
> Best, Hequn
>
> On Sat, Jul 28, 2018 at 12:11 AM, Harshvardhan Agrawal <
> harshvardhan.ag...@gmail.com> wrote:
>
>> Hi,
>>
>>
>> We are currently using Flink to process financial data. We are getting
>> position data from Kafka and we enrich the positions with account and
>> product information. We are using Ingestion time while processing events.
>> The question I have is: say I key the position datasream by account number.
>> If I have two consecutive Kafka messages with the same account and product
>> info where the second one is an updated position of the first one, does
>> Flink guarantee that the messages will be processed on the same slot in the
>> same worker? We want to ensure that we don’t process them out of order.
>>
>> Thank you!
>> --
>> Regards,
>> Harshvardhan
>>
>
> --
Regards,
Harshvardhan


Order of events in a Keyed Stream

2018-07-27 Thread Harshvardhan Agrawal
Hi,


We are currently using Flink to process financial data. We are getting
position data from Kafka and we enrich the positions with account and
product information. We are using Ingestion time while processing events.
The question I have is: say I key the position datasream by account number.
If I have two consecutive Kafka messages with the same account and product
info where the second one is an updated position of the first one, does
Flink guarantee that the messages will be processed on the same slot in the
same worker? We want to ensure that we don’t process them out of order.

Thank you!
-- 
Regards,
Harshvardhan


Re: Implement Joins with Lookup Data

2018-07-24 Thread Harshvardhan Agrawal
What happens when one of your workers dies? Say the machine is dead is not
recoverable. How do you recover from that situation? Will the pipeline die
and you go over the entire bootstrap process?

On Tue, Jul 24, 2018 at 11:56 ashish pok  wrote:

> BTW,
>
> We got around bootstrap problem for similar use case using a “nohup” topic
> as input stream. Our CICD pipeline currently passes an initialize option to
> app IF there is a need to bootstrap and waits for X minutes before taking a
> savepoint and restart app normally listening to right topic(s). I believe
> there is work underway to handle this gracefully using Side Input as well.
> Other than determining X minutes for initialization to complete, we havent
> had any issue with this solution - we have over 40 million states refreshes
> daily and close to 200Mbps input streams being joined to states.
>
> Hope this helps!
>
>
>
> - Ashish
>
> On Tuesday, July 24, 2018, 11:37 AM, Elias Levy <
> fearsome.lucid...@gmail.com> wrote:
>
> Alas, this suffer from the bootstrap problem.  At the moment Flink does
> not allow you to pause a source (the positions), so you can't fully consume
> the and preload the accounts or products to perform the join before the
> positions start flowing.  Additionally, Flink SQL does not support
> materializing an upset table for the accounts or products to perform the
> join, so yo have to develop your own KeyedProcessFunction, maintain the
> state, and perform the join on your own if you only want to join against
> the latest value for each key.
>
> On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann 
> wrote:
>
> Yes, using Kafka which you initialize with the initial values and then
> feed changes to the Kafka topic from which you consume could be a solution.
>
> On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal <
> harshvardhan.ag...@gmail.com> wrote:
>
> Hi Till,
>
> How would we do the initial hydration of the Product and Account data
> since it’s currently in a relational DB? Do we have to copy over data to
> Kafka and then use them?
>
> Regards,
> Harsh
>
> On Tue, Jul 24, 2018 at 09:22 Till Rohrmann  wrote:
>
> Hi Harshvardhan,
>
> I agree with Ankit that this problem could actually be solved quite
> elegantly with Flink's state. If you can ingest the product/account
> information changes as a stream, you can keep the latest version of it in
> Flink state by using a co-map function [1, 2]. One input of the co-map
> function would be the product/account update stream which updates the
> respective entries in Flink's state and the other input stream is the one
> to be enriched. When receiving input from this stream one would lookup the
> latest information contained in the operator's state and join it with the
> incoming event.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html
>
> Cheers,
> Till
>
> On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <
> harshvardhan.ag...@gmail.com> wrote:
>
> Hi,
>
> Thanks for your responses.
>
> There is no fixed interval for the data being updated. It’s more like
> whenever you onboard a new product or there are any mandates that change
> will trigger the reference data to change.
>
> It’s not just the enrichment we are doing here. Once we have enriched the
> data we will be performing a bunch of aggregations using the enriched data.
>
> Which approach would you recommend?
>
> Regards,
> Harshvardhan
>
> On Tue, Jul 24, 2018 at 04:04 Jain, Ankit  wrote:
>
> How often is the product db updated? Based on that you can store product
> metadata as state in Flink, maybe setup the state on cluster startup and
> then update daily etc.
>
>
>
> Also, just based on this feature, flink doesn’t seem to add a lot of value
> on top of Kafka. As Jorn said below, you can very well store all the events
> in an external store and then periodically run a cron to enrich later since
> your processing doesn’t seem to require absolute real time.
>
>
>
> Thanks
>
> Ankit
>
>
>
> *From: *Jörn Franke 
> *Date: *Monday, July 23, 2018 at 10:10 PM
> *To: *Harshvardhan Agrawal 
> *Cc: *
> *Subject: *Re: Implement Joins with Lookup Data
>
>
>
> For the first one (lookup of single entries) you could use a NoSQL db (eg
> key value store) - a relational database will not scale.
>
>
>
> Depending on when you need to do the enrichment you could also first store
> the data and enrich it later as part of a batch process.
>
>
> On 24. Jul 2018, at

Re: Implement Joins with Lookup Data

2018-07-24 Thread Harshvardhan Agrawal
Hi Till,

How would we do the initial hydration of the Product and Account data since
it’s currently in a relational DB? Do we have to copy over data to Kafka
and then use them?

Regards,
Harsh

On Tue, Jul 24, 2018 at 09:22 Till Rohrmann  wrote:

> Hi Harshvardhan,
>
> I agree with Ankit that this problem could actually be solved quite
> elegantly with Flink's state. If you can ingest the product/account
> information changes as a stream, you can keep the latest version of it in
> Flink state by using a co-map function [1, 2]. One input of the co-map
> function would be the product/account update stream which updates the
> respective entries in Flink's state and the other input stream is the one
> to be enriched. When receiving input from this stream one would lookup the
> latest information contained in the operator's state and join it with the
> incoming event.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html
>
> Cheers,
> Till
>
> On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <
> harshvardhan.ag...@gmail.com> wrote:
>
>> Hi,
>>
>> Thanks for your responses.
>>
>> There is no fixed interval for the data being updated. It’s more like
>> whenever you onboard a new product or there are any mandates that change
>> will trigger the reference data to change.
>>
>> It’s not just the enrichment we are doing here. Once we have enriched the
>> data we will be performing a bunch of aggregations using the enriched data.
>>
>> Which approach would you recommend?
>>
>> Regards,
>> Harshvardhan
>>
>> On Tue, Jul 24, 2018 at 04:04 Jain, Ankit  wrote:
>>
>>> How often is the product db updated? Based on that you can store product
>>> metadata as state in Flink, maybe setup the state on cluster startup and
>>> then update daily etc.
>>>
>>>
>>>
>>> Also, just based on this feature, flink doesn’t seem to add a lot of
>>> value on top of Kafka. As Jorn said below, you can very well store all the
>>> events in an external store and then periodically run a cron to enrich
>>> later since your processing doesn’t seem to require absolute real time.
>>>
>>>
>>>
>>> Thanks
>>>
>>> Ankit
>>>
>>>
>>>
>>> *From: *Jörn Franke 
>>> *Date: *Monday, July 23, 2018 at 10:10 PM
>>> *To: *Harshvardhan Agrawal 
>>> *Cc: *
>>> *Subject: *Re: Implement Joins with Lookup Data
>>>
>>>
>>>
>>> For the first one (lookup of single entries) you could use a NoSQL db
>>> (eg key value store) - a relational database will not scale.
>>>
>>>
>>>
>>> Depending on when you need to do the enrichment you could also first
>>> store the data and enrich it later as part of a batch process.
>>>
>>>
>>> On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
>>> harshvardhan.ag...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>>
>>>
>>> We are using Flink for financial data enrichment and aggregations. We
>>> have Positions data that we are currently receiving from Kafka. We want to
>>> enrich that data with reference data like Product and Account information
>>> that is present in a relational database. From my understanding of Flink so
>>> far I think there are two ways to achieve this. Here are two ways to do it:
>>>
>>>
>>>
>>> 1) First Approach:
>>>
>>> a) Get positions from Kafka and key by product key.
>>>
>>> b) Perform lookup from the database for each key and then obtain
>>> Tuple2
>>>
>>>
>>>
>>> 2) Second Approach:
>>>
>>> a) Get positions from Kafka and key by product key.
>>>
>>> b) Window the keyed stream into say 15 seconds each.
>>>
>>> c) For each window get the unique product keys and perform a single
>>> lookup.
>>>
>>> d) Somehow join Positions and Products
>>>
>>>
>>>
>>> In the first approach we will be making a lot of calls to the DB and the
>>> solution is very chatty. Its hard to scale this cos the database storing
>>> the reference data might not be very responsive.
>>>
>>>
>>>
>>> In the second approach, I wish to join the WindowedStream with the
>>> SingleOutputStream and turns out I can't join a windowed stream. So I am
>>> not quite sure how to do that.
>>>
>>>
>>>
>>> I wanted an opinion for what is the right thing to do. Should I go with
>>> the first approach or the second one. If the second one, how can I
>>> implement the join?
>>>
>>>
>>>
>>> --
>>>
>>>
>>> *Regards, Harshvardhan Agrawal*
>>>
>>> --
>> Regards,
>> Harshvardhan
>>
> --
Regards,
Harshvardhan


Re: Implement Joins with Lookup Data

2018-07-24 Thread Harshvardhan Agrawal
Hi,

Thanks for your responses.

There is no fixed interval for the data being updated. It’s more like
whenever you onboard a new product or there are any mandates that change
will trigger the reference data to change.

It’s not just the enrichment we are doing here. Once we have enriched the
data we will be performing a bunch of aggregations using the enriched data.

Which approach would you recommend?

Regards,
Harshvardhan

On Tue, Jul 24, 2018 at 04:04 Jain, Ankit  wrote:

> How often is the product db updated? Based on that you can store product
> metadata as state in Flink, maybe setup the state on cluster startup and
> then update daily etc.
>
>
>
> Also, just based on this feature, flink doesn’t seem to add a lot of value
> on top of Kafka. As Jorn said below, you can very well store all the events
> in an external store and then periodically run a cron to enrich later since
> your processing doesn’t seem to require absolute real time.
>
>
>
> Thanks
>
> Ankit
>
>
>
> *From: *Jörn Franke 
> *Date: *Monday, July 23, 2018 at 10:10 PM
> *To: *Harshvardhan Agrawal 
> *Cc: *
> *Subject: *Re: Implement Joins with Lookup Data
>
>
>
> For the first one (lookup of single entries) you could use a NoSQL db (eg
> key value store) - a relational database will not scale.
>
>
>
> Depending on when you need to do the enrichment you could also first store
> the data and enrich it later as part of a batch process.
>
>
> On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
> harshvardhan.ag...@gmail.com> wrote:
>
> Hi,
>
>
>
> We are using Flink for financial data enrichment and aggregations. We have
> Positions data that we are currently receiving from Kafka. We want to
> enrich that data with reference data like Product and Account information
> that is present in a relational database. From my understanding of Flink so
> far I think there are two ways to achieve this. Here are two ways to do it:
>
>
>
> 1) First Approach:
>
> a) Get positions from Kafka and key by product key.
>
> b) Perform lookup from the database for each key and then obtain
> Tuple2
>
>
>
> 2) Second Approach:
>
> a) Get positions from Kafka and key by product key.
>
> b) Window the keyed stream into say 15 seconds each.
>
> c) For each window get the unique product keys and perform a single lookup.
>
> d) Somehow join Positions and Products
>
>
>
> In the first approach we will be making a lot of calls to the DB and the
> solution is very chatty. Its hard to scale this cos the database storing
> the reference data might not be very responsive.
>
>
>
> In the second approach, I wish to join the WindowedStream with the
> SingleOutputStream and turns out I can't join a windowed stream. So I am
> not quite sure how to do that.
>
>
>
> I wanted an opinion for what is the right thing to do. Should I go with
> the first approach or the second one. If the second one, how can I
> implement the join?
>
>
>
> --
>
>
> *Regards, Harshvardhan Agrawal*
>
> --
Regards,
Harshvardhan


Implement Joins with Lookup Data

2018-07-23 Thread Harshvardhan Agrawal
Hi,

We are using Flink for financial data enrichment and aggregations. We have
Positions data that we are currently receiving from Kafka. We want to
enrich that data with reference data like Product and Account information
that is present in a relational database. From my understanding of Flink so
far I think there are two ways to achieve this. Here are two ways to do it:

1) First Approach:
a) Get positions from Kafka and key by product key.
b) Perform lookup from the database for each key and then obtain
Tuple2

2) Second Approach:
a) Get positions from Kafka and key by product key.
b) Window the keyed stream into say 15 seconds each.
c) For each window get the unique product keys and perform a single lookup.
d) Somehow join Positions and Products

In the first approach we will be making a lot of calls to the DB and the
solution is very chatty. Its hard to scale this cos the database storing
the reference data might not be very responsive.

In the second approach, I wish to join the WindowedStream with the
SingleOutputStream and turns out I can't join a windowed stream. So I am
not quite sure how to do that.

I wanted an opinion for what is the right thing to do. Should I go with the
first approach or the second one. If the second one, how can I implement
the join?

-- 


*Regards,Harshvardhan Agrawal*


Re: Behaviour of triggers in Flink

2018-07-23 Thread Harshvardhan Agrawal
Thanks for the response Hequn. I also see a weird behavior with purging
trigger. It skips messages.

Here is the repro:

public class WindowTest {
public static void main (String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStreamSource source = env.addSource(new
SourceFunction() {

@Override
public void run(SourceContext ctx) throws Exception {
LongStream.range(0, 101).forEach(ctx::collect);
}

@Override
public void cancel() {

}
});


source.timeWindowAll(Time.milliseconds(5)).trigger(PurgingTrigger.of(CountTrigger.of(7))).apply(new
AllWindowFunction() {
@Override
public void apply(TimeWindow timeWindow, Iterable
values, Collector collector) throws Exception {
System.out.println("processing a window");
System.out.println(Joiner.on(',').join(values));
}
}).print();

env.execute("test-program");

}
}



processing a window
0,1,2,3,4,5,6
processing a window
10,11,12,13,14,15,16
processing a window
17,18,19,20,21,22,23
processing a window
24,25,26,27,28,29,30
processing a window
31,32,33,34,35,36,37
processing a window
38,39,40,41,42,43,44
processing a window
45,46,47,48,49,50,51
processing a window
52,53,54,55,56,57,58
processing a window
59,60,61,62,63,64,65
processing a window
66,67,68,69,70,71,72
processing a window
73,74,75,76,77,78,79
processing a window
80,81,82,83,84,85,86
processing a window
87,88,89,90,91,92,93
processing a window
94,95,96,97,98,99,100

It has skipped numbers 7-9. Is this expected behavior?

On Sun, Jul 22, 2018 at 9:43 PM Hequn Cheng  wrote:

> Hi Harshvardhan,
>
> By specifying a trigger using trigger() you are overwriting the default
> trigger of a WindowAssigner. For example, if you specify a CountTrigger for
> TumblingEventTimeWindows you will no longer get window firings based on the
> progress of time but only by count. Right now, you have to write your own
> custom trigger if you want to react based on both time and count.
> More details here[1].
>
> Best, Hequn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#default-triggers-of-windowassigners
>
> On Sun, Jul 22, 2018 at 11:59 PM, Harshvardhan Agrawal <
> harshvardhan.ag...@gmail.com> wrote:
>
>> Hi,
>>
>> I have been trying to understand how triggers work in Flink. We have a
>> set of data that arrives to us on Kafka. We need to process the data in a
>> window when either one of the two criteria satisfy:
>> 1) Max number of elements has reached in the window.
>> 2) Some max time has elapsed (Say 5 milliseconds in our case).
>>
>> I have written the following code:
>>
>> public class WindowTest {
>> public static void main (String[] args) throws Exception {
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.createLocalEnvironment();
>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>> DataStreamSource source = env.addSource(new
>> SourceFunction() {
>>
>> @Override
>> public void run(SourceContext ctx) throws Exception {
>> LongStream.range(0, 102).forEach(ctx::collect);
>> }
>>
>> @Override
>> public void cancel() {
>>
>> }
>> });
>>
>>
>> source.timeWindowAll(Time.milliseconds(5)).trigger(PurgingTrigger.of(CountTrigger.of(15))).apply(new
>> AllWindowFunction() {
>> @Override
>> public void apply(TimeWindow timeWindow, Iterable
>> values, Collector collector) throws Exception {
>> System.out.println("processing a window");
>> System.out.println(Joiner.on(',').join(values));
>> }
>> }).print();
>>
>> env.execute("test-program");
>>
>> }
>> }
>>
>> Here is the output I get when I run this code:
>>
>> processing a window
>> 0,1,2,3,4,5,6,7,8,9,10,11,12,13,14
>> processing a window
>> 15,16,17,18,19,20,21,22,23,24,25,26,27,28,29
>> processing a window
>> 30,31,32,33,34,35,36,37,38,39,40,41,42,43,44
>> processing a window
>> 45,46,47,48,49,50,51,52,53,54,55,56,57,58,59
>> processing a window
>> 60,61,62,63,64,65,66,67,68,69,70,71,72,73,74
>> processing a window
>> 75,76,77,78,79,80,81,82,83,84,85,86,87,88,89
>>
>> As you can see, the data from 90 to 101 is not processed. Shouldn't it be
>> processed when the window is completed after 5 ms?
>>
>> When I remove the trigger part, all of the data does get processed from 0
>> to 101.
>>
>> Any idea why do we see such a behaviour here?
>> --
>>
>>
>> *Regards,Harshvardhan Agrawal*
>>
>
>

-- 


*Regards,Harshvardhan Agrawal*


Triggers for late elements

2018-07-22 Thread Harshvardhan Agrawal
Hello,

I am trying to understand the behaviour of Triggers in the case where we
receive late elements for a window. Does Flink always fires a window each
time it receives late element i.e. if I receive 10 late elements, would it
fire 10 times?
Is there any specific example I could refer to understand the how this
works?

-- 


*Regards,Harshvardhan Agrawal*


Behaviour of triggers in Flink

2018-07-22 Thread Harshvardhan Agrawal
Hi,

I have been trying to understand how triggers work in Flink. We have a set
of data that arrives to us on Kafka. We need to process the data in a
window when either one of the two criteria satisfy:
1) Max number of elements has reached in the window.
2) Some max time has elapsed (Say 5 milliseconds in our case).

I have written the following code:

public class WindowTest {
public static void main (String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStreamSource source = env.addSource(new
SourceFunction() {

@Override
public void run(SourceContext ctx) throws Exception {
LongStream.range(0, 102).forEach(ctx::collect);
}

@Override
public void cancel() {

}
});


source.timeWindowAll(Time.milliseconds(5)).trigger(PurgingTrigger.of(CountTrigger.of(15))).apply(new
AllWindowFunction() {
@Override
public void apply(TimeWindow timeWindow, Iterable values,
Collector collector) throws Exception {
System.out.println("processing a window");
System.out.println(Joiner.on(',').join(values));
}
}).print();

env.execute("test-program");

}
}

Here is the output I get when I run this code:

processing a window
0,1,2,3,4,5,6,7,8,9,10,11,12,13,14
processing a window
15,16,17,18,19,20,21,22,23,24,25,26,27,28,29
processing a window
30,31,32,33,34,35,36,37,38,39,40,41,42,43,44
processing a window
45,46,47,48,49,50,51,52,53,54,55,56,57,58,59
processing a window
60,61,62,63,64,65,66,67,68,69,70,71,72,73,74
processing a window
75,76,77,78,79,80,81,82,83,84,85,86,87,88,89

As you can see, the data from 90 to 101 is not processed. Shouldn't it be
processed when the window is completed after 5 ms?

When I remove the trigger part, all of the data does get processed from 0
to 101.

Any idea why do we see such a behaviour here?
-- 

*Regards,Harshvardhan Agrawal*
*267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*