Re: Join with slow changing dimensions/ streams
Hi Hanan, BroadcastState and CoMap (or CoProcessFunction) have both advantages and disadvantages. Broadcast state is better if the broadcasted side is small (only low data rate). Its records are replicated to each instance but the other (larger) stream does not need to be partitioned and stays on the partitions. The CoMapFunction approach is better if both side are similar in size. Their records are not replicated but repartitioned and sent over the network. This is the common trade-off between broadcast-forward and repartition-repartition joins that query optimizer of distributed database systems have to deal with. Best, Fabian Am Do., 5. Sept. 2019 um 13:37 Uhr schrieb Hanan Yehudai < hanan.yehu...@radcom.com>: > Thanks Fabian. > > > is there any advantage using broadcast state VS using just CoMap function > on 2 connected streams ? > > > > *From:* Fabian Hueske > *Sent:* Thursday, September 5, 2019 12:59 PM > *To:* Hanan Yehudai > *Cc:* flink-u...@apache.org > *Subject:* Re: Join with slow changing dimensions/ streams > > > > Hi, > > > > Flink does not have good support for mixing bounded and unbounded streams > in its DataStream API yet. > > If the dimension table is static (and small enough), I'd use a > RichMapFunction and load the table in the open() method into the heap. > > In this case, you'd probably need to restart the job (can be done with a > savepoint and restart) to load a new table. You can also use a > ProcessFunction and register a timer to periodically load a new table. > > > > If the dimension table is (slowly) changing, you might want to think about > the broadcast state. > > With this setup you can propagate updates by sending them to the > broadcasted channel. > > > > I would not use the join operator because it would also buffer the actual > stream in state. > > > > Best, Fabian > > > > Am Mo., 2. Sept. 2019 um 15:38 Uhr schrieb Hanan Yehudai < > hanan.yehu...@radcom.com>: > > I have a very common use case -enriching the stream with some > dimension tables. > > e.g the events stream has a SERVER_ID , and another files have the > LOCATION associated with e SERVER_ID. ( a dimension table csv file) > > in SQL I would simply join. > but hen using Flink stream API , as far as I see, there are several > option and I wondered which would be optimal. > > > > 1. Use the JOIN operator,, from the documentation ( > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/joining.html > <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Fstream%2Foperators%2Fjoining.html=01%7C01%7Chanan.yehudai%40radcom.com%7C04b58a965a6e46a467b008d731e7b852%7C0eb9e2d98763412e97093f539e9e25bc%7C1=q%2BlDzMlF2WQHTuo%2BjtmzL8zRMrU2uAZ4KweLLK2rfqc%3D=0> > ) > this is always has some time aspect to the join . unless I use an > interval join with very large upper bound and associate the dimension > stream record with an old timestamp. > > > > 2. just write a mapper function the gets the NAME from the dimesion > records – that are preloaded on the mapFunction loading method. > > > > 3. use a broadcast state – this way I can also listen to the changes on > the dimension tables and do the actual join in the processElement > ducntion. > > > > What soul be the most efficient way to do this from mem and Cpu > consumption perspective ? > > > > Or is there another , better way ? > >
RE: Join with slow changing dimensions/ streams
Thanks Fabian. is there any advantage using broadcast state VS using just CoMap function on 2 connected streams ? From: Fabian Hueske Sent: Thursday, September 5, 2019 12:59 PM To: Hanan Yehudai Cc: flink-u...@apache.org Subject: Re: Join with slow changing dimensions/ streams Hi, Flink does not have good support for mixing bounded and unbounded streams in its DataStream API yet. If the dimension table is static (and small enough), I'd use a RichMapFunction and load the table in the open() method into the heap. In this case, you'd probably need to restart the job (can be done with a savepoint and restart) to load a new table. You can also use a ProcessFunction and register a timer to periodically load a new table. If the dimension table is (slowly) changing, you might want to think about the broadcast state. With this setup you can propagate updates by sending them to the broadcasted channel. I would not use the join operator because it would also buffer the actual stream in state. Best, Fabian Am Mo., 2. Sept. 2019 um 15:38 Uhr schrieb Hanan Yehudai mailto:hanan.yehu...@radcom.com>>: I have a very common use case -enriching the stream with some dimension tables. e.g the events stream has a SERVER_ID , and another files have the LOCATION associated with e SERVER_ID. ( a dimension table csv file) in SQL I would simply join. but hen using Flink stream API , as far as I see, there are several option and I wondered which would be optimal. 1. Use the JOIN operator,, from the documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/joining.html<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Fstream%2Foperators%2Fjoining.html=01%7C01%7Chanan.yehudai%40radcom.com%7C04b58a965a6e46a467b008d731e7b852%7C0eb9e2d98763412e97093f539e9e25bc%7C1=q%2BlDzMlF2WQHTuo%2BjtmzL8zRMrU2uAZ4KweLLK2rfqc%3D=0>) this is always has some time aspect to the join . unless I use an interval join with very large upper bound and associate the dimension stream record with an old timestamp. 2. just write a mapper function the gets the NAME from the dimesion records – that are preloaded on the mapFunction loading method. 3. use a broadcast state – this way I can also listen to the changes on the dimension tables and do the actual join in the processElement ducntion. What soul be the most efficient way to do this from mem and Cpu consumption perspective ? Or is there another , better way ?
Re: Join with slow changing dimensions/ streams
Hi, Flink does not have good support for mixing bounded and unbounded streams in its DataStream API yet. If the dimension table is static (and small enough), I'd use a RichMapFunction and load the table in the open() method into the heap. In this case, you'd probably need to restart the job (can be done with a savepoint and restart) to load a new table. You can also use a ProcessFunction and register a timer to periodically load a new table. If the dimension table is (slowly) changing, you might want to think about the broadcast state. With this setup you can propagate updates by sending them to the broadcasted channel. I would not use the join operator because it would also buffer the actual stream in state. Best, Fabian Am Mo., 2. Sept. 2019 um 15:38 Uhr schrieb Hanan Yehudai < hanan.yehu...@radcom.com>: > I have a very common use case -enriching the stream with some > dimension tables. > > e.g the events stream has a SERVER_ID , and another files have the > LOCATION associated with e SERVER_ID. ( a dimension table csv file) > > in SQL I would simply join. > but hen using Flink stream API , as far as I see, there are several > option and I wondered which would be optimal. > > > > 1. Use the JOIN operator,, from the documentation ( > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/joining.html > ) > this is always has some time aspect to the join . unless I use an > interval join with very large upper bound and associate the dimension > stream record with an old timestamp. > > > > 2. just write a mapper function the gets the NAME from the dimesion > records – that are preloaded on the mapFunction loading method. > > > > 3. use a broadcast state – this way I can also listen to the changes on > the dimension tables and do the actual join in the processElement > ducntion. > > > > What soul be the most efficient way to do this from mem and Cpu > consumption perspective ? > > > > Or is there another , better way ? >
Join with slow changing dimensions/ streams
I have a very common use case -enriching the stream with some dimension tables. e.g the events stream has a SERVER_ID , and another files have the LOCATION associated with e SERVER_ID. ( a dimension table csv file) in SQL I would simply join. but hen using Flink stream API , as far as I see, there are several option and I wondered which would be optimal. 1. Use the JOIN operator,, from the documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/joining.html) this is always has some time aspect to the join . unless I use an interval join with very large upper bound and associate the dimension stream record with an old timestamp. 2. just write a mapper function the gets the NAME from the dimesion records – that are preloaded on the mapFunction loading method. 3. use a broadcast state – this way I can also listen to the changes on the dimension tables and do the actual join in the processElement ducntion. What soul be the most efficient way to do this from mem and Cpu consumption perspective ? Or is there another , better way ?