Re: Job Manager HA manual setup
Hi Ufuk, Thanks, it's working fine now with your suggestion. Cheers On Sun, Feb 28, 2016 at 10:10 PM, Ufuk Celebi wrote: > Hey Welly! > > Yes, it is possible to do manually via the jobmanager.sh and > taskmanager.sh script like this: > > jobmanager.sh start cluster $HOST $WEB-UI-PORT > taskmanager.sh start > > The start-cluster.sh script is just a wrapper around these scripts. > > From experience, it's often forgotten to sync the configuration files. > Make sure to have the same configuration file on each host (both job > and task managers), because that is relevant to parse the ZooKeeper > quorum etc. > > The task managers retrieve the currently leading job manager via > ZooKeeper. If job manager failover works as expected, but the task > managers don't connect to the new job manager, I would suspect that > the task manager configuration is out of sync. Could you check this > please? > > Moreover, it will be helpful to have a look at the jobmanager and task > manager logs to further investigate this. Can you share these? > (Privately works as well of course.) > > – Ufuk > > > > > On Sat, Feb 27, 2016 at 1:28 AM, Welly Tambunan wrote: > > typos > > > > > > We have tried this one the job manager can failover, but the task manager > > CAN'T be relocated to the new task manager. Is there some settings for > this > > one ? Or is the task manager also can be relocate to the new job manager > ? > > > > Cheers > > > > On Sat, Feb 27, 2016 at 7:27 AM, Welly Tambunan > wrote: > >> > >> Hi All, > >> > >> We have already try to setup the Job Manager HA based on the > documentation > >> and using script and provided zookeeper. It works. > >> > >> However currently everything is done using start-cluster script that I > >> believe will require passwordlress ssh between node. We are restricted > with > >> our environment so this one is not possible. > >> > >> Is it possible to setup the Job Manager HA manually ? By starting each > job > >> manager with in each node and task manager. We have our zookeeper and > hdfs > >> cluster already. > >> > >> We have tried this one the job manager can failover, but the task > manager > >> can be relocated to the new task manager. Is there some settings for > this > >> one ? Or is the task manager also can be relocate to the new job > manager ? > >> > >> Any more details on the mechanism used on Job Manager HA and interaction > >> with Zookeeper ? > >> > >> Is task manager also registered on Zookeeper ? How they find the right > job > >> manager master ? > >> > >> > >> Thanks for your help. > >> > >> Cheers > >> -- > >> Welly Tambunan > >> Triplelands > >> > >> http://weltam.wordpress.com > >> http://www.triplelands.com > > > > > > > > > > -- > > Welly Tambunan > > Triplelands > > > > http://weltam.wordpress.com > > http://www.triplelands.com > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Job Manager HA manual setup
typos We have tried this one the job manager can failover, but the task manager CAN'T be relocated to the new task manager. Is there some settings for this one ? Or is the task manager also can be relocate to the new job manager ? Cheers On Sat, Feb 27, 2016 at 7:27 AM, Welly Tambunan wrote: > Hi All, > > We have already try to setup the Job Manager HA based on the documentation > and using script and provided zookeeper. It works. > > However currently everything is done using start-cluster script that I > believe will require passwordlress ssh between node. We are restricted with > our environment so this one is not possible. > > Is it possible to setup the Job Manager HA manually ? By starting each job > manager with in each node and task manager. We have our zookeeper and hdfs > cluster already. > > We have tried this one the job manager can failover, but the task manager > can be relocated to the new task manager. Is there some settings for this > one ? Or is the task manager also can be relocate to the new job manager ? > > Any more details on the mechanism used on Job Manager HA and interaction > with Zookeeper ? > > Is task manager also registered on Zookeeper ? How they find the right job > manager master ? > > > Thanks for your help. > > Cheers > -- > Welly Tambunan > Triplelands > > http://weltam.wordpress.com > http://www.triplelands.com <http://www.triplelands.com/blog/> > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Job Manager HA manual setup
Hi All, We have already try to setup the Job Manager HA based on the documentation and using script and provided zookeeper. It works. However currently everything is done using start-cluster script that I believe will require passwordlress ssh between node. We are restricted with our environment so this one is not possible. Is it possible to setup the Job Manager HA manually ? By starting each job manager with in each node and task manager. We have our zookeeper and hdfs cluster already. We have tried this one the job manager can failover, but the task manager can be relocated to the new task manager. Is there some settings for this one ? Or is the task manager also can be relocate to the new job manager ? Any more details on the mechanism used on Job Manager HA and interaction with Zookeeper ? Is task manager also registered on Zookeeper ? How they find the right job manager master ? Thanks for your help. Cheers -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Optimal Configuration for Cluster
Hi Ufuk, Thanks for this. Really appreciated. Cheers On Tue, Feb 23, 2016 at 8:04 PM, Ufuk Celebi wrote: > I would go with one task manager with 48 slots per machine. This > reduces the communication overheads between task managers. > > Regarding memory configuration: Given that the machines have plenty of > memory, I would configure a bigger heap than the 4 GB you had > previously. Furhermore, you can also consider adding more network > buffers, which should improve job throughput. > > – Ufuk > > On Tue, Feb 23, 2016 at 11:57 AM, Welly Tambunan > wrote: > > Hi Ufuk and Fabian, > > > > Is that better to start 48 task manager ( one slot each ) in one machine > > than having single task manager with 48 slot ? Any trade-off that we > should > > know etc ? > > > > Cheers > > > > On Tue, Feb 23, 2016 at 3:03 PM, Welly Tambunan > wrote: > >> > >> Hi Ufuk, > >> > >> Thanks for the explanation. > >> > >> Yes. Our jobs is all streaming job. > >> > >> Cheers > >> > >> On Tue, Feb 23, 2016 at 2:48 PM, Ufuk Celebi wrote: > >>> > >>> The new default is equivalent to the previous "streaming mode". The > >>> community decided to get rid of this distinction, because it was > >>> confusing to users. > >>> > >>> The difference between "streaming mode" and "batch mode" was how > >>> Flink's managed memory was allocated, either lazily when required > >>> ('streaming mode") or eagerly on task manager start up ("batch mode"). > >>> Now it's lazy by default. > >>> > >>> This is not something you need to worry about, but if you are mostly > >>> using the DataSet API where pre allocation has benefits, you can get > >>> the "batch mode" behaviour by using the following configuration key: > >>> > >>> taskmanager.memory.preallocate: true > >>> > >>> But you are using the DataStream API anyways, right? > >>> > >>> – Ufuk > >>> > >>> > >>> On Tue, Feb 23, 2016 at 6:36 AM, Welly Tambunan > >>> wrote: > >>> > Hi Fabian, > >>> > > >>> > Previously when using flink 0.9-0.10 we start the cluster with > >>> > streaming > >>> > mode or batch mode. I see that this one is gone on Flink 1.00 > snapshot > >>> > ? So > >>> > this one has already taken care of the flink and optimize by runtime > > > >>> > > >>> > On Mon, Feb 22, 2016 at 5:26 PM, Fabian Hueske > >>> > wrote: > >>> >> > >>> >> Hi Welly, > >>> >> > >>> >> sorry for the late response. > >>> >> > >>> >> The number of network buffers primarily depends on the maximum > >>> >> parallelism > >>> >> of your job. > >>> >> The given formula assumes a specific cluster configuration (1 task > >>> >> manager > >>> >> per machine, one parallel task per CPU). > >>> >> The formula can be translated to: > >>> >> > >>> >> taskmanager.network.numberOfBuffers: p ^ 2 * t * 4 > >>> >> > >>> >> where p is the maximum parallelism of the job and t is the number of > >>> >> task > >>> >> manager. > >>> >> You can process more than one parallel task per TM if you configure > >>> >> more > >>> >> than one processing slot per machine ( > taskmanager.numberOfTaskSlots). > >>> >> The > >>> >> TM will divide its memory among all its slots. So it would be > possible > >>> >> to > >>> >> start one TM for each machine with 100GB+ memory and 48 slots each. > >>> >> > >>> >> We can compute the number of network buffers if you give a few more > >>> >> details about your setup: > >>> >> - How many task managers do you start? I assume more than one TM per > >>> >> machine given that you assign only 4GB of memory out of 128GB to > each > >>> >> TM. > >>> >> - What is the maximum parallelism of you program? > >>> >> - How many processing slots do you configure for each TM? > >>> >> > >>> >> In general, pipe
Re: Optimal Configuration for Cluster
Hi Ufuk and Fabian, Is that better to start 48 task manager ( one slot each ) in one machine than having single task manager with 48 slot ? Any trade-off that we should know etc ? Cheers On Tue, Feb 23, 2016 at 3:03 PM, Welly Tambunan wrote: > Hi Ufuk, > > Thanks for the explanation. > > Yes. Our jobs is all streaming job. > > Cheers > > On Tue, Feb 23, 2016 at 2:48 PM, Ufuk Celebi wrote: > >> The new default is equivalent to the previous "streaming mode". The >> community decided to get rid of this distinction, because it was >> confusing to users. >> >> The difference between "streaming mode" and "batch mode" was how >> Flink's managed memory was allocated, either lazily when required >> ('streaming mode") or eagerly on task manager start up ("batch mode"). >> Now it's lazy by default. >> >> This is not something you need to worry about, but if you are mostly >> using the DataSet API where pre allocation has benefits, you can get >> the "batch mode" behaviour by using the following configuration key: >> >> taskmanager.memory.preallocate: true >> >> But you are using the DataStream API anyways, right? >> >> – Ufuk >> >> >> On Tue, Feb 23, 2016 at 6:36 AM, Welly Tambunan >> wrote: >> > Hi Fabian, >> > >> > Previously when using flink 0.9-0.10 we start the cluster with streaming >> > mode or batch mode. I see that this one is gone on Flink 1.00 snapshot >> ? So >> > this one has already taken care of the flink and optimize by runtime > >> > >> > On Mon, Feb 22, 2016 at 5:26 PM, Fabian Hueske >> wrote: >> >> >> >> Hi Welly, >> >> >> >> sorry for the late response. >> >> >> >> The number of network buffers primarily depends on the maximum >> parallelism >> >> of your job. >> >> The given formula assumes a specific cluster configuration (1 task >> manager >> >> per machine, one parallel task per CPU). >> >> The formula can be translated to: >> >> >> >> taskmanager.network.numberOfBuffers: p ^ 2 * t * 4 >> >> >> >> where p is the maximum parallelism of the job and t is the number of >> task >> >> manager. >> >> You can process more than one parallel task per TM if you configure >> more >> >> than one processing slot per machine ( taskmanager.numberOfTaskSlots). >> The >> >> TM will divide its memory among all its slots. So it would be possible >> to >> >> start one TM for each machine with 100GB+ memory and 48 slots each. >> >> >> >> We can compute the number of network buffers if you give a few more >> >> details about your setup: >> >> - How many task managers do you start? I assume more than one TM per >> >> machine given that you assign only 4GB of memory out of 128GB to each >> TM. >> >> - What is the maximum parallelism of you program? >> >> - How many processing slots do you configure for each TM? >> >> >> >> In general, pipelined shuffles with a high parallelism require a lot of >> >> memory. >> >> If you configure batch instead of pipelined transfer, the memory >> >> requirement goes down >> >> (ExecutionConfig.setExecutionMode(ExecutionMode.BATCH)). >> >> >> >> Eventually, we want to merge the network buffer and the managed memory >> >> pools. So the "taskmanager.network.numberOfBuffers" configuration whill >> >> hopefully disappear at some point in the future. >> >> >> >> Best, Fabian >> >> >> >> 2016-02-19 9:34 GMT+01:00 Welly Tambunan : >> >>> >> >>> Hi All, >> >>> >> >>> We are trying to running our job in cluster that has this information >> >>> >> >>> 1. # of machine: 16 >> >>> 2. memory : 128 gb >> >>> 3. # of core : 48 >> >>> >> >>> However when we try to run we have an exception. >> >>> >> >>> "insufficient number of network buffers. 48 required but only 10 >> >>> available. the total number of network buffers is currently set to >> 2048" >> >>> >> >>> After looking at the documentation we set configuration based on docs >> >>> >> >>> taskmanager.network.numberOfBuffers:
Re: Optimal Configuration for Cluster
Hi Ufuk, Thanks for the explanation. Yes. Our jobs is all streaming job. Cheers On Tue, Feb 23, 2016 at 2:48 PM, Ufuk Celebi wrote: > The new default is equivalent to the previous "streaming mode". The > community decided to get rid of this distinction, because it was > confusing to users. > > The difference between "streaming mode" and "batch mode" was how > Flink's managed memory was allocated, either lazily when required > ('streaming mode") or eagerly on task manager start up ("batch mode"). > Now it's lazy by default. > > This is not something you need to worry about, but if you are mostly > using the DataSet API where pre allocation has benefits, you can get > the "batch mode" behaviour by using the following configuration key: > > taskmanager.memory.preallocate: true > > But you are using the DataStream API anyways, right? > > – Ufuk > > > On Tue, Feb 23, 2016 at 6:36 AM, Welly Tambunan wrote: > > Hi Fabian, > > > > Previously when using flink 0.9-0.10 we start the cluster with streaming > > mode or batch mode. I see that this one is gone on Flink 1.00 snapshot ? > So > > this one has already taken care of the flink and optimize by runtime > > > > > On Mon, Feb 22, 2016 at 5:26 PM, Fabian Hueske > wrote: > >> > >> Hi Welly, > >> > >> sorry for the late response. > >> > >> The number of network buffers primarily depends on the maximum > parallelism > >> of your job. > >> The given formula assumes a specific cluster configuration (1 task > manager > >> per machine, one parallel task per CPU). > >> The formula can be translated to: > >> > >> taskmanager.network.numberOfBuffers: p ^ 2 * t * 4 > >> > >> where p is the maximum parallelism of the job and t is the number of > task > >> manager. > >> You can process more than one parallel task per TM if you configure more > >> than one processing slot per machine ( taskmanager.numberOfTaskSlots). > The > >> TM will divide its memory among all its slots. So it would be possible > to > >> start one TM for each machine with 100GB+ memory and 48 slots each. > >> > >> We can compute the number of network buffers if you give a few more > >> details about your setup: > >> - How many task managers do you start? I assume more than one TM per > >> machine given that you assign only 4GB of memory out of 128GB to each > TM. > >> - What is the maximum parallelism of you program? > >> - How many processing slots do you configure for each TM? > >> > >> In general, pipelined shuffles with a high parallelism require a lot of > >> memory. > >> If you configure batch instead of pipelined transfer, the memory > >> requirement goes down > >> (ExecutionConfig.setExecutionMode(ExecutionMode.BATCH)). > >> > >> Eventually, we want to merge the network buffer and the managed memory > >> pools. So the "taskmanager.network.numberOfBuffers" configuration whill > >> hopefully disappear at some point in the future. > >> > >> Best, Fabian > >> > >> 2016-02-19 9:34 GMT+01:00 Welly Tambunan : > >>> > >>> Hi All, > >>> > >>> We are trying to running our job in cluster that has this information > >>> > >>> 1. # of machine: 16 > >>> 2. memory : 128 gb > >>> 3. # of core : 48 > >>> > >>> However when we try to run we have an exception. > >>> > >>> "insufficient number of network buffers. 48 required but only 10 > >>> available. the total number of network buffers is currently set to > 2048" > >>> > >>> After looking at the documentation we set configuration based on docs > >>> > >>> taskmanager.network.numberOfBuffers: # core ^ 2 * # machine * 4 > >>> > >>> However we face another error from JVM > >>> > >>> java.io.IOException: Cannot allocate network buffer pool: Could not > >>> allocate enough memory segments for NetworkBufferPool (required (Mb): > 2304, > >>> allocated (Mb): 698, missing (Mb): 1606). Cause: Java heap space > >>> > >>> We fiddle the taskmanager.heap.mb: 4096 > >>> > >>> Finally the cluster is running. > >>> > >>> However i'm still not sure about the configuration and fiddling in task > >>> manager heap really fine tune. So my question is > >>> > >>> Am i doing it right for numberOfBuffers ? > >>> How much should we allocate on taskmanager.heap.mb given the > information > >>> Any suggestion which configuration we need to set to make it optimal > for > >>> the cluster ? > >>> Is there any chance that this will get automatically resolve by > >>> memory/network buffer manager ? > >>> > >>> Thanks a lot for the help > >>> > >>> Cheers > >>> > >>> -- > >>> Welly Tambunan > >>> Triplelands > >>> > >>> http://weltam.wordpress.com > >>> http://www.triplelands.com > >> > >> > > > > > > > > -- > > Welly Tambunan > > Triplelands > > > > http://weltam.wordpress.com > > http://www.triplelands.com > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Optimal Configuration for Cluster
Hi Fabian, Previously when using flink 0.9-0.10 we start the cluster with streaming mode or batch mode. I see that this one is gone on Flink 1.00 snapshot ? So this one has already taken care of the flink and optimize by runtime > On Mon, Feb 22, 2016 at 5:26 PM, Fabian Hueske wrote: > Hi Welly, > > sorry for the late response. > > The number of network buffers primarily depends on the maximum parallelism > of your job. > The given formula assumes a specific cluster configuration (1 task manager > per machine, one parallel task per CPU). > The formula can be translated to: > > taskmanager.network.numberOfBuffers: p ^ 2 * t * 4 > > where p is the maximum parallelism of the job and t is the number of task > manager. > You can process more than one parallel task per TM if you configure more > than one processing slot per machine ( taskmanager.numberOfTaskSlots). > The TM will divide its memory among all its slots. So it would be possible > to start one TM for each machine with 100GB+ memory and 48 slots each. > > We can compute the number of network buffers if you give a few more > details about your setup: > - How many task managers do you start? I assume more than one TM per > machine given that you assign only 4GB of memory out of 128GB to each TM. > - What is the maximum parallelism of you program? > - How many processing slots do you configure for each TM? > > In general, pipelined shuffles with a high parallelism require a lot of > memory. > If you configure batch instead of pipelined transfer, the memory > requirement goes down > (ExecutionConfig.setExecutionMode(ExecutionMode.BATCH)). > > Eventually, we want to merge the network buffer and the managed memory > pools. So the "taskmanager.network.numberOfBuffers" configuration whill > hopefully disappear at some point in the future. > > Best, Fabian > > 2016-02-19 9:34 GMT+01:00 Welly Tambunan : > >> Hi All, >> >> We are trying to running our job in cluster that has this information >> >> 1. # of machine: 16 >> 2. memory : 128 gb >> 3. # of core : 48 >> >> However when we try to run we have an exception. >> >> "insufficient number of network buffers. 48 required but only 10 >> available. the total number of network buffers is currently set to 2048" >> >> After looking at the documentation we set configuration based on docs >> >> taskmanager.network.numberOfBuffers: # core ^ 2 * # machine * 4 >> >> However we face another error from JVM >> >> java.io.IOException: Cannot allocate network buffer pool: Could not >> allocate enough memory segments for NetworkBufferPool (required (Mb): 2304, >> allocated (Mb): 698, missing (Mb): 1606). Cause: Java heap space >> >> We fiddle the taskmanager.heap.mb: 4096 >> >> Finally the cluster is running. >> >> However i'm still not sure about the configuration and fiddling in task >> manager heap really fine tune. So my question is >> >> >>1. Am i doing it right for numberOfBuffers ? >>2. How much should we allocate on taskmanager.heap.mb given the >>information >>3. Any suggestion which configuration we need to set to make it >>optimal for the cluster ? >>4. Is there any chance that this will get automatically resolve by >>memory/network buffer manager ? >> >> Thanks a lot for the help >> >> Cheers >> >> -- >> Welly Tambunan >> Triplelands >> >> http://weltam.wordpress.com >> http://www.triplelands.com <http://www.triplelands.com/blog/> >> > > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Optimal Configuration for Cluster
Hi Fabian, Thanks a lot for your response. - How many task managers do you start? I assume more than one TM per machine given that you assign only 4GB of memory out of 128GB to each TM. Currently what we have done is start a 1 TM per machine with number of task slot 48. - What is the maximum parallelism of you program? Paralleism is around 30 and 40. - How many processing slots do you configure for each TM? We configure 48 (#core) for each TM. One TM for each machine. But i would like to ask another question. Is that better to start 48 task manager in one machine with number of task slot 1 ? Any trade-off that we should know etc ? On Mon, Feb 22, 2016 at 5:26 PM, Fabian Hueske wrote: > Hi Welly, > > sorry for the late response. > > The number of network buffers primarily depends on the maximum parallelism > of your job. > The given formula assumes a specific cluster configuration (1 task manager > per machine, one parallel task per CPU). > The formula can be translated to: > > taskmanager.network.numberOfBuffers: p ^ 2 * t * 4 > > where p is the maximum parallelism of the job and t is the number of task > manager. > You can process more than one parallel task per TM if you configure more > than one processing slot per machine ( taskmanager.numberOfTaskSlots). > The TM will divide its memory among all its slots. So it would be possible > to start one TM for each machine with 100GB+ memory and 48 slots each. > > We can compute the number of network buffers if you give a few more > details about your setup: > - How many task managers do you start? I assume more than one TM per > machine given that you assign only 4GB of memory out of 128GB to each TM. > - What is the maximum parallelism of you program? > - How many processing slots do you configure for each TM? > > In general, pipelined shuffles with a high parallelism require a lot of > memory. > If you configure batch instead of pipelined transfer, the memory > requirement goes down > (ExecutionConfig.setExecutionMode(ExecutionMode.BATCH)). > > Eventually, we want to merge the network buffer and the managed memory > pools. So the "taskmanager.network.numberOfBuffers" configuration whill > hopefully disappear at some point in the future. > > Best, Fabian > > 2016-02-19 9:34 GMT+01:00 Welly Tambunan : > >> Hi All, >> >> We are trying to running our job in cluster that has this information >> >> 1. # of machine: 16 >> 2. memory : 128 gb >> 3. # of core : 48 >> >> However when we try to run we have an exception. >> >> "insufficient number of network buffers. 48 required but only 10 >> available. the total number of network buffers is currently set to 2048" >> >> After looking at the documentation we set configuration based on docs >> >> taskmanager.network.numberOfBuffers: # core ^ 2 * # machine * 4 >> >> However we face another error from JVM >> >> java.io.IOException: Cannot allocate network buffer pool: Could not >> allocate enough memory segments for NetworkBufferPool (required (Mb): 2304, >> allocated (Mb): 698, missing (Mb): 1606). Cause: Java heap space >> >> We fiddle the taskmanager.heap.mb: 4096 >> >> Finally the cluster is running. >> >> However i'm still not sure about the configuration and fiddling in task >> manager heap really fine tune. So my question is >> >> >>1. Am i doing it right for numberOfBuffers ? >>2. How much should we allocate on taskmanager.heap.mb given the >>information >>3. Any suggestion which configuration we need to set to make it >>optimal for the cluster ? >>4. Is there any chance that this will get automatically resolve by >>memory/network buffer manager ? >> >> Thanks a lot for the help >> >> Cheers >> >> -- >> Welly Tambunan >> Triplelands >> >> http://weltam.wordpress.com >> http://www.triplelands.com <http://www.triplelands.com/blog/> >> > > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Optimal Configuration for Cluster
Hi All, We are trying to running our job in cluster that has this information 1. # of machine: 16 2. memory : 128 gb 3. # of core : 48 However when we try to run we have an exception. "insufficient number of network buffers. 48 required but only 10 available. the total number of network buffers is currently set to 2048" After looking at the documentation we set configuration based on docs taskmanager.network.numberOfBuffers: # core ^ 2 * # machine * 4 However we face another error from JVM java.io.IOException: Cannot allocate network buffer pool: Could not allocate enough memory segments for NetworkBufferPool (required (Mb): 2304, allocated (Mb): 698, missing (Mb): 1606). Cause: Java heap space We fiddle the taskmanager.heap.mb: 4096 Finally the cluster is running. However i'm still not sure about the configuration and fiddling in task manager heap really fine tune. So my question is 1. Am i doing it right for numberOfBuffers ? 2. How much should we allocate on taskmanager.heap.mb given the information 3. Any suggestion which configuration we need to set to make it optimal for the cluster ? 4. Is there any chance that this will get automatically resolve by memory/network buffer manager ? Thanks a lot for the help Cheers -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Flink v0.10.2
Hi Robert, We are on deadline for demo stage right now before production for management so it would be great to have 0.10.2 for stable version within this week if possible ? Cheers On Wed, Jan 13, 2016 at 4:13 PM, Robert Metzger wrote: > Hi, > > there are currently no planned releases. I would actually like to start > preparing for the 1.0 release soon, but the community needs to discuss that > first. > > How urgently do you need a 0.10.2 release? If this is the last blocker for > using Flink in production at your company, I can push for the bugfix > release. > > > On Wed, Jan 13, 2016 at 8:39 AM, Welly Tambunan wrote: > >> Hi All, >> >> We currently using snapshot version for development as we face Data >> Stream union error. For deployment we may need to built the flink from >> the master. >> >> >> I want to ask when this version will be released ? Any roadmap and plan i >> can look for this release ? >> >> >> Thanks a lot >> >> Cheers >> -- >> Welly Tambunan >> Triplelands >> >> http://weltam.wordpress.com >> http://www.triplelands.com <http://www.triplelands.com/blog/> >> > > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Flink v0.10.2
Hi All, We currently using snapshot version for development as we face Data Stream union error. For deployment we may need to built the flink from the master. I want to ask when this version will be released ? Any roadmap and plan i can look for this release ? Thanks a lot Cheers -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Security in Flink
Hi Stephan, Thanks a lot for the explanation. Is there any timeline on when this will be released ? I guess this one will be the important for our case if we want Flink to be deployed in production. Cheers On Tue, Jan 12, 2016 at 6:19 PM, Stephan Ewen wrote: > Hi Sourav! > > If you want to use Flink in a cluster where neither Hadoop/YARN (not soon > Mesos) is available, then I assume you have installed Flink in a standalone > mode on the cluster already. > > There is no support in Flink currently to manage user authentication. Few > thoughts on how that may evolve > > 1) It should be not too hard to add authentication to the web dashboard. > That way, if the cluster is otherwise blocked off (the master's RPC ports > are firewalled), one would have restricted job starts. > > 2) We plan to add authenticated / encrypted connections soon. With that, > the client that submits the program would need to have access to the > keystore or key and the corresponding password to connect. > > Greetings, > Stephan > > > > On Mon, Jan 11, 2016 at 3:46 PM, Sourav Mazumder < > sourav.mazumde...@gmail.com> wrote: > >> Thanks Steven for your details response. Things are more clear to me now. >> >> A follow up Qs - >> Looks like most of the security support depends on Hadoop ? What happens >> if anyone wants to use Flink with Hadoop (in a cluster where Hadoop is not >> there) ? >> >> Regards, >> Sourav >> >> On Sun, Jan 10, 2016 at 12:41 PM, Stephan Ewen wrote: >> >>> Hi Sourav! >>> >>> There is user-authentication support in Flink via the Hadoop / Kerberos >>> infrastructure. If you run Flink on YARN, it should seamlessly work that >>> Flink acquires the Kerberos tokens of the user that submits programs, and >>> authenticate itself at YARN, HDFS, and HBase with that. >>> >>> If you run Flink standalone, Flink can still authenticate at HDFS/HBase >>> via Kerberos, with a bit of manual help by the user (running kinit on the >>> workers). >>> >>> With Kafka 0.9 and Flink's upcoming connector ( >>> https://github.com/apache/flink/pull/1489), streaming programs can >>> authenticate themselves as stream brokers via SSL (and read via encrypted >>> connections). >>> >>> >>> What we have on the roadmap for the coming months it the following: >>> - Encrypt in-flight data streams that are exchanged between worker >>> nodes (TaskManagers). >>> - Encrypt the coordination messages between client/master/workers. >>> Note that these refer to encryption between Flink's own components only, >>> which would use transient keys generated just for a specific job or session >>> (hence would not need any user involvement). >>> >>> >>> Let us know if that answers your questions, and if that meets your >>> requirements. >>> >>> Greetings, >>> Stephan >>> >>> >>> On Fri, Jan 8, 2016 at 3:23 PM, Sourav Mazumder < >>> sourav.mazumde...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> Can anyone point me to ant documentation on support for Security in >>>> Flink ? >>>> >>>> The type of information I'm looking for are - >>>> >>>> 1. How do I do user level authentication to ensure that a job is >>>> submitted/deleted/modified by the right user ? Is it possible though the >>>> web client ? >>>> 2. Authentication across multiple slave nodes (where the task managers >>>> are running) and driver program so that they can communicate with each >>>> other >>>> 3. Support for SSL/encryption for data exchanged happening across the >>>> slave nodes >>>> 4. Support for pluggable authentication with existing solution like LDAP >>>> >>>> If not there today is there a roadmap for these security features ? >>>> >>>> Regards, >>>> Sourav >>>> >>> >>> >> > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Security in Flink
Hi Stephen, Do you have any plan on which encryption method and mechanism will be used on Flink ? Could you share about the detail on this ? We have very strict requirement from client that every communication need to be encryption. So any detail would be really appreciated for answering their security concern. Cheers On Mon, Jan 11, 2016 at 9:46 PM, Sourav Mazumder < sourav.mazumde...@gmail.com> wrote: > Thanks Steven for your details response. Things are more clear to me now. > > A follow up Qs - > Looks like most of the security support depends on Hadoop ? What happens > if anyone wants to use Flink with Hadoop (in a cluster where Hadoop is not > there) ? > > Regards, > Sourav > > On Sun, Jan 10, 2016 at 12:41 PM, Stephan Ewen wrote: > >> Hi Sourav! >> >> There is user-authentication support in Flink via the Hadoop / Kerberos >> infrastructure. If you run Flink on YARN, it should seamlessly work that >> Flink acquires the Kerberos tokens of the user that submits programs, and >> authenticate itself at YARN, HDFS, and HBase with that. >> >> If you run Flink standalone, Flink can still authenticate at HDFS/HBase >> via Kerberos, with a bit of manual help by the user (running kinit on the >> workers). >> >> With Kafka 0.9 and Flink's upcoming connector ( >> https://github.com/apache/flink/pull/1489), streaming programs can >> authenticate themselves as stream brokers via SSL (and read via encrypted >> connections). >> >> >> What we have on the roadmap for the coming months it the following: >> - Encrypt in-flight data streams that are exchanged between worker >> nodes (TaskManagers). >> - Encrypt the coordination messages between client/master/workers. >> Note that these refer to encryption between Flink's own components only, >> which would use transient keys generated just for a specific job or session >> (hence would not need any user involvement). >> >> >> Let us know if that answers your questions, and if that meets your >> requirements. >> >> Greetings, >> Stephan >> >> >> On Fri, Jan 8, 2016 at 3:23 PM, Sourav Mazumder < >> sourav.mazumde...@gmail.com> wrote: >> >>> Hi, >>> >>> Can anyone point me to ant documentation on support for Security in >>> Flink ? >>> >>> The type of information I'm looking for are - >>> >>> 1. How do I do user level authentication to ensure that a job is >>> submitted/deleted/modified by the right user ? Is it possible though the >>> web client ? >>> 2. Authentication across multiple slave nodes (where the task managers >>> are running) and driver program so that they can communicate with each other >>> 3. Support for SSL/encryption for data exchanged happening across the >>> slave nodes >>> 4. Support for pluggable authentication with existing solution like LDAP >>> >>> If not there today is there a roadmap for these security features ? >>> >>> Regards, >>> Sourav >>> >> >> > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Material on Apache flink internals
Hi Madhu, You can also check this page for the details on internals https://cwiki.apache.org/confluence/display/FLINK/Flink+Internals http://www.slideshare.net/KostasTzoumas/flink-internals Cheers On Fri, Dec 4, 2015 at 10:14 AM, madhu phatak wrote: > Hi, > Thanks a lot for the resources. > On Dec 1, 2015 9:11 PM, "Fabian Hueske" wrote: > >> Hi Madhu, >> >> checkout the following resources: >> >> - Apache Flink Blog: http://flink.apache.org/blog/index.html >> - Data Artisans Blog: http://data-artisans.com/blog/ >> - Flink Forward Conference website (Talk slides & recordings): >> http://flink-forward.org/?post_type=session >> - Flink Meetup talk recordings: >> https://www.youtube.com/channel/UCY8_lgiZLZErZPF47a2hXMA >> - Slim's Flink Knowledge base: >> http://sparkbigdata.com/component/tags/tag/27-flink >> >> Best, Fabian >> >> 2015-12-01 16:23 GMT+01:00 madhu phatak : >> >>> Hi everyone, >>> >>> I am fascinated with flink core engine way of streaming of operators >>> rather than typical map/reduce way that followed by hadoop or spark. Is any >>> good documentation/blog/video avalable which talks about this internal. I >>> am ok from a batch or streaming point of view. >>> >>> It will be great if some one can share this info. Thank you for your >>> excellent work. >>> >>> -- >>> Regards, >>> Madhukara Phatak >>> http://datamantra.io/ >>> >> >> -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Documentation for Fold
Hi All, Currently i'm going through the documentation for DataStream here and minor error in the docs. I thought i should inform you. I think fold only works for keyed data stream. [image: Inline image 1] Cheers -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Data Stream union error after upgrading from 0.9 to 0.10.1
Hi Robert, Thanks for the update. We will update to this version. Cheers On Thu, Dec 3, 2015 at 3:49 PM, Robert Metzger wrote: > Hi Welly, > > the fix has been merged and should be available in 0.10-SNAPSHOT. > > On Wed, Dec 2, 2015 at 10:12 AM, Maximilian Michels > wrote: > >> Hi Welly, >> >> We still have to decide on the next release date but I would expect >> Flink 0.10.2 within the next weeks. If you can't work around the union >> limitation, you may build your own Flink either from the master or the >> release-0.10 branch which will eventually be Flink 0.10.2. >> >> Cheers, >> Max >> >> On Tue, Dec 1, 2015 at 12:04 PM, Welly Tambunan >> wrote: >> > Thanks a lot Aljoscha. >> > >> > When it will be released ? >> > >> > Cheers >> > >> > On Tue, Dec 1, 2015 at 5:48 PM, Aljoscha Krettek >> > wrote: >> >> >> >> Hi, >> >> I relaxed the restrictions on union. This should make it into an >> upcoming >> >> 0.10.2 bugfix release. >> >> >> >> Cheers, >> >> Aljoscha >> >> > On 01 Dec 2015, at 11:23, Welly Tambunan wrote: >> >> > >> >> > Hi All, >> >> > >> >> > After upgrading our system to the latest version from 0.9 to 0.10.1 >> we >> >> > have this following error. >> >> > >> >> > Exception in thread "main" java.lang.UnsupportedOperationException: A >> >> > DataStream cannot be unioned with itself >> >> > >> >> > Then i find the relevant JIRA for this one. >> >> > https://issues.apache.org/jira/browse/FLINK-3080 >> >> > >> >> > Is there any plan which release this will be ? >> >> > >> >> > >> >> > Another issue i have after upgrading is can't union with different >> level >> >> > of parallelism. >> >> > >> >> > I think we will need to fall back to 0.9 again for the time being. >> >> > >> >> > Cheers >> >> > >> >> > -- >> >> > Welly Tambunan >> >> > Triplelands >> >> > >> >> > http://weltam.wordpress.com >> >> > http://www.triplelands.com >> >> >> > >> > >> > >> > -- >> > Welly Tambunan >> > Triplelands >> > >> > http://weltam.wordpress.com >> > http://www.triplelands.com >> > > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Running WebClient from Windows
Hi Fabian, I have already created JIRA for this one. https://issues.apache.org/jira/browse/FLINK-3099 Thanks a lot for this. Cheers On Wed, Dec 2, 2015 at 6:02 PM, Fabian Hueske wrote: > Hi Welly, > > at the moment we only provide native Windows .bat scripts for start-local > and the CLI client. > However, we check that the Unix scripts (including start-webclient.sh) > work in a Windows Cygwin environment. > I have to admit, I am not familiar with MinGW, so not sure what is > happening there. > > It would be nice to have a Windows start script for the webclient though. > Would you mind and open a JIRA for that? > > Thanks, > Fabian > > 2015-12-02 3:00 GMT+01:00 Welly Tambunan : > >> Hi All, >> >> Is there any way to run WebClient for uploading the job from windows ? >> >> I try to run that from mingw but has these error >> >> >> $ bin/start-webclient.sh >> /c/flink-0.10.0/bin/config.sh: line 261: conditional binary operator >> expected >> /c/flink-0.10.0/bin/config.sh: line 261: syntax error near `=~' >> /c/flink-0.10.0/bin/config.sh: line 261: `if [[ "$SLAVE" =~ >> ^.*/([0-9a-zA-Z. >> -]+)$ ]]; then' >> /c/flink-0.10.0/bin/config.sh: line 261: conditional binary operator >> expected >> /c/flink-0.10.0/bin/config.sh: line 261: syntax error near `=~' >> /c/flink-0.10.0/bin/config.sh: line 261: `if [[ "$SLAVE" =~ >> ^.*/([0-9a-zA-Z. >> -]+)$ ]]; then' >> Starting Flink webclient >> >> [Terminate] >> >> Cheers >> >> -- >> Welly Tambunan >> Triplelands >> >> http://weltam.wordpress.com >> http://www.triplelands.com <http://www.triplelands.com/blog/> >> > > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Running WebClient from Windows
Hi All, Is there any way to run WebClient for uploading the job from windows ? I try to run that from mingw but has these error $ bin/start-webclient.sh /c/flink-0.10.0/bin/config.sh: line 261: conditional binary operator expected /c/flink-0.10.0/bin/config.sh: line 261: syntax error near `=~' /c/flink-0.10.0/bin/config.sh: line 261: `if [[ "$SLAVE" =~ ^.*/([0-9a-zA-Z. -]+)$ ]]; then' /c/flink-0.10.0/bin/config.sh: line 261: conditional binary operator expected /c/flink-0.10.0/bin/config.sh: line 261: syntax error near `=~' /c/flink-0.10.0/bin/config.sh: line 261: `if [[ "$SLAVE" =~ ^.*/([0-9a-zA-Z. -]+)$ ]]; then' Starting Flink webclient [Terminate] Cheers -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Data Stream union error after upgrading from 0.9 to 0.10.1
Ok Robert, Thanks a lot. Looking forward to it. Cheers On Wed, Dec 2, 2015 at 5:50 AM, Robert Metzger wrote: > No, its not yet merged into the source repo of Flink. > > You can find the code here: https://github.com/apache/flink/pull/1425 > You can also check out the code of the PR or download the PR contents as a > patch and apply it to the Flink source. > > I think the change will be merged tomorrow and then you'll have it in > 0.10-SNAPSHOT. > > > For the 0.10.2 release: There are no concrete plans yet, but I think it'll > happen in the next 2-3 weeks. > > > On Tue, Dec 1, 2015 at 11:48 PM, Welly Tambunan wrote: > >> Hi Aljoscha, >> >> Is this fix has already been available on 0.10-SNAPSHOT ? >> >> >> Cheers >> >> On Tue, Dec 1, 2015 at 6:04 PM, Welly Tambunan wrote: >> >>> Thanks a lot Aljoscha. >>> >>> When it will be released ? >>> >>> Cheers >>> >>> On Tue, Dec 1, 2015 at 5:48 PM, Aljoscha Krettek >>> wrote: >>> >>>> Hi, >>>> I relaxed the restrictions on union. This should make it into an >>>> upcoming 0.10.2 bugfix release. >>>> >>>> Cheers, >>>> Aljoscha >>>> > On 01 Dec 2015, at 11:23, Welly Tambunan wrote: >>>> > >>>> > Hi All, >>>> > >>>> > After upgrading our system to the latest version from 0.9 to 0.10.1 >>>> we have this following error. >>>> > >>>> > Exception in thread "main" java.lang.UnsupportedOperationException: A >>>> DataStream cannot be unioned with itself >>>> > >>>> > Then i find the relevant JIRA for this one. >>>> > https://issues.apache.org/jira/browse/FLINK-3080 >>>> > >>>> > Is there any plan which release this will be ? >>>> > >>>> > >>>> > Another issue i have after upgrading is can't union with different >>>> level of parallelism. >>>> > >>>> > I think we will need to fall back to 0.9 again for the time being. >>>> > >>>> > Cheers >>>> > >>>> > -- >>>> > Welly Tambunan >>>> > Triplelands >>>> > >>>> > http://weltam.wordpress.com >>>> > http://www.triplelands.com >>>> >>>> >>> >>> >>> -- >>> Welly Tambunan >>> Triplelands >>> >>> http://weltam.wordpress.com >>> http://www.triplelands.com <http://www.triplelands.com/blog/> >>> >> >> >> >> -- >> Welly Tambunan >> Triplelands >> >> http://weltam.wordpress.com >> http://www.triplelands.com <http://www.triplelands.com/blog/> >> > > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Data Stream union error after upgrading from 0.9 to 0.10.1
Hi Aljoscha, Is this fix has already been available on 0.10-SNAPSHOT ? Cheers On Tue, Dec 1, 2015 at 6:04 PM, Welly Tambunan wrote: > Thanks a lot Aljoscha. > > When it will be released ? > > Cheers > > On Tue, Dec 1, 2015 at 5:48 PM, Aljoscha Krettek > wrote: > >> Hi, >> I relaxed the restrictions on union. This should make it into an upcoming >> 0.10.2 bugfix release. >> >> Cheers, >> Aljoscha >> > On 01 Dec 2015, at 11:23, Welly Tambunan wrote: >> > >> > Hi All, >> > >> > After upgrading our system to the latest version from 0.9 to 0.10.1 we >> have this following error. >> > >> > Exception in thread "main" java.lang.UnsupportedOperationException: A >> DataStream cannot be unioned with itself >> > >> > Then i find the relevant JIRA for this one. >> > https://issues.apache.org/jira/browse/FLINK-3080 >> > >> > Is there any plan which release this will be ? >> > >> > >> > Another issue i have after upgrading is can't union with different >> level of parallelism. >> > >> > I think we will need to fall back to 0.9 again for the time being. >> > >> > Cheers >> > >> > -- >> > Welly Tambunan >> > Triplelands >> > >> > http://weltam.wordpress.com >> > http://www.triplelands.com >> >> > > > -- > Welly Tambunan > Triplelands > > http://weltam.wordpress.com > http://www.triplelands.com <http://www.triplelands.com/blog/> > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Data Stream union error after upgrading from 0.9 to 0.10.1
Thanks a lot Aljoscha. When it will be released ? Cheers On Tue, Dec 1, 2015 at 5:48 PM, Aljoscha Krettek wrote: > Hi, > I relaxed the restrictions on union. This should make it into an upcoming > 0.10.2 bugfix release. > > Cheers, > Aljoscha > > On 01 Dec 2015, at 11:23, Welly Tambunan wrote: > > > > Hi All, > > > > After upgrading our system to the latest version from 0.9 to 0.10.1 we > have this following error. > > > > Exception in thread "main" java.lang.UnsupportedOperationException: A > DataStream cannot be unioned with itself > > > > Then i find the relevant JIRA for this one. > > https://issues.apache.org/jira/browse/FLINK-3080 > > > > Is there any plan which release this will be ? > > > > > > Another issue i have after upgrading is can't union with different level > of parallelism. > > > > I think we will need to fall back to 0.9 again for the time being. > > > > Cheers > > > > -- > > Welly Tambunan > > Triplelands > > > > http://weltam.wordpress.com > > http://www.triplelands.com > > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Data Stream union error after upgrading from 0.9 to 0.10.1
Hi All, After upgrading our system to the latest version from 0.9 to 0.10.1 we have this following error. Exception in thread "main" java.lang.UnsupportedOperationException: A DataStream cannot be unioned with itself Then i find the relevant JIRA for this one. https://issues.apache.org/jira/browse/FLINK-3080 Is there any plan which release this will be ? Another issue i have after upgrading is can't union with different level of parallelism. I think we will need to fall back to 0.9 again for the time being. Cheers -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Standalone Cluster vs YARN
Hi Andreas, Yes, seems I can't avoid Zookeeper right now. It would be really nice if we can achieve HA via gossip protocol like Cassandra/Spark DSE does ? Is this possible ? Cheers On Wed, Nov 25, 2015 at 4:12 PM, Andreas Fritzler < andreas.fritz...@gmail.com> wrote: > Hi Welly, > > you will need Zookeeper if you want to setup the standalone cluster in HA > mode. > http://spark.apache.org/docs/latest/spark-standalone.html#high-availability > > In the YARN case you probably have already Zookeeper in place if you are > running YARN in HA mode. > > Regards, > Andreas > > On Wed, Nov 25, 2015 at 10:02 AM, Welly Tambunan > wrote: > >> Hi Ufuk >> >> >In failure cases I find YARN more convenient, because it takes care of >> restarting failed task manager processes/containers for you. >> >> So this mean that we don't need zookeeper ? >> >> >> Cheers >> >> On Wed, Nov 25, 2015 at 3:46 PM, Ufuk Celebi wrote: >> >>> > On 25 Nov 2015, at 02:35, Welly Tambunan wrote: >>> > >>> > Hi All, >>> > >>> > I would like to know if there any feature differences between using >>> Standalone Cluster vs YARN ? >>> > >>> > Until now we are using Standalone cluster for our jobs. >>> > Is there any added value for using YARN ? >>> > >>> > We don't have any hadoop infrastructure in place right now but we can >>> provide that if there's some value to that. >>> >>> There are no features, which only work on YARN or in standalone >>> clusters. YARN mode is essentially starting a standalone cluster in YARN >>> containers. >>> >>> In failure cases I find YARN more convenient, because it takes care of >>> restarting failed task manager processes/containers for you. >>> >>> – Ufuk >>> >>> >> >> >> -- >> Welly Tambunan >> Triplelands >> >> http://weltam.wordpress.com >> http://www.triplelands.com <http://www.triplelands.com/blog/> >> > > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Standalone Cluster vs YARN
Hi Fabian, This make sense now. I would like to avoid zookeeper if possible. Is there any way to avoid this to achieve HA ? I see that DataStax Enterprise achieve this availability for Spark Master without using Zookeeper. https://academy.datastax.com/demos/how-spark-master-high-availability-works-dse Is this possible to achieve in Flink also ? Cheers On Wed, Nov 25, 2015 at 4:11 PM, Fabian Hueske wrote: > YARN is not a replacement for Zookeeper. Zookeeper is mandatory to run > Flink in high-availability mode and takes care of leader (JobManager) > election and meta-data persistance. > > With YARN, Flink can automatically start new Taskmanagers (and > JobManagers) to compensate for failures. In cluster mode, you need stand-by > TMs and JMs and manually take care that these are "filled-up" again in case > of a failure. > > 2015-11-25 10:06 GMT+01:00 Welly Tambunan : > >> Hi Fabian, >> >> Interesting ! >> >> However YARN is still tightly couple to HDFS, is that seems wasteful to >> use only YARN without Hadoop ? >> >> Currently we are using Cassandra and CFS ( cass file system ) >> >> >> Cheers >> >> On Wed, Nov 25, 2015 at 3:51 PM, Fabian Hueske wrote: >> >>> A strong argument for YARN mode can be the isolation of multiple users >>> and jobs. You can easily start a new Flink cluster for each job or user. >>> However, this comes at the price of resource (memory) fragmentation. YARN >>> mode does not use memory as effective as cluster mode. >>> >>> 2015-11-25 9:46 GMT+01:00 Ufuk Celebi : >>> >>>> > On 25 Nov 2015, at 02:35, Welly Tambunan wrote: >>>> > >>>> > Hi All, >>>> > >>>> > I would like to know if there any feature differences between using >>>> Standalone Cluster vs YARN ? >>>> > >>>> > Until now we are using Standalone cluster for our jobs. >>>> > Is there any added value for using YARN ? >>>> > >>>> > We don't have any hadoop infrastructure in place right now but we can >>>> provide that if there's some value to that. >>>> >>>> There are no features, which only work on YARN or in standalone >>>> clusters. YARN mode is essentially starting a standalone cluster in YARN >>>> containers. >>>> >>>> In failure cases I find YARN more convenient, because it takes care of >>>> restarting failed task manager processes/containers for you. >>>> >>>> – Ufuk >>>> >>>> >>> >> >> >> -- >> Welly Tambunan >> Triplelands >> >> http://weltam.wordpress.com >> http://www.triplelands.com <http://www.triplelands.com/blog/> >> > > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Standalone Cluster vs YARN
Hi Fabian, Interesting ! However YARN is still tightly couple to HDFS, is that seems wasteful to use only YARN without Hadoop ? Currently we are using Cassandra and CFS ( cass file system ) Cheers On Wed, Nov 25, 2015 at 3:51 PM, Fabian Hueske wrote: > A strong argument for YARN mode can be the isolation of multiple users and > jobs. You can easily start a new Flink cluster for each job or user. > However, this comes at the price of resource (memory) fragmentation. YARN > mode does not use memory as effective as cluster mode. > > 2015-11-25 9:46 GMT+01:00 Ufuk Celebi : > >> > On 25 Nov 2015, at 02:35, Welly Tambunan wrote: >> > >> > Hi All, >> > >> > I would like to know if there any feature differences between using >> Standalone Cluster vs YARN ? >> > >> > Until now we are using Standalone cluster for our jobs. >> > Is there any added value for using YARN ? >> > >> > We don't have any hadoop infrastructure in place right now but we can >> provide that if there's some value to that. >> >> There are no features, which only work on YARN or in standalone clusters. >> YARN mode is essentially starting a standalone cluster in YARN containers. >> >> In failure cases I find YARN more convenient, because it takes care of >> restarting failed task manager processes/containers for you. >> >> – Ufuk >> >> > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Standalone Cluster vs YARN
Hi Ufuk >In failure cases I find YARN more convenient, because it takes care of restarting failed task manager processes/containers for you. So this mean that we don't need zookeeper ? Cheers On Wed, Nov 25, 2015 at 3:46 PM, Ufuk Celebi wrote: > > On 25 Nov 2015, at 02:35, Welly Tambunan wrote: > > > > Hi All, > > > > I would like to know if there any feature differences between using > Standalone Cluster vs YARN ? > > > > Until now we are using Standalone cluster for our jobs. > > Is there any added value for using YARN ? > > > > We don't have any hadoop infrastructure in place right now but we can > provide that if there's some value to that. > > There are no features, which only work on YARN or in standalone clusters. > YARN mode is essentially starting a standalone cluster in YARN containers. > > In failure cases I find YARN more convenient, because it takes care of > restarting failed task manager processes/containers for you. > > – Ufuk > > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Standalone Cluster vs YARN
Hi All, I would like to know if there any feature differences between using Standalone Cluster vs YARN ? Until now we are using Standalone cluster for our jobs. Is there any added value for using YARN ? We don't have any hadoop infrastructure in place right now but we can provide that if there's some value to that. Cheers -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Adding TaskManager on Cluster
Thanks a lot Ufuk. That will be really useful. Cheers On Tue, Nov 24, 2015 at 7:36 PM, Ufuk Celebi wrote: > I’ve added a section about this to the standalone cluster setup guide. > > The webpage should be updated tonight by the automatic build bot. > > – Ufuk > > > On 24 Nov 2015, at 10:39, Welly Tambunan wrote: > > > > Hi Till, > > > > I've just tried that. It's works like a charm. Thanks a lot. > > > > Is there any documentation on taskmanager.sh and other script and the > parameters ? I try to look at the docs but can't find it > > > > Thanks again > > > > > > > > Cheers > > > > On Tue, Nov 24, 2015 at 4:29 PM, Till Rohrmann > wrote: > > Hi Welly, > > > > you can always start a new TaskManager by simply calling taskmanager.sh > start [streaming|batch], depending whether you are running a streaming > cluster or a batch cluster. You can find the script in /bin. > > > > Cheers, > > Till > > > > > > On Tue, Nov 24, 2015 at 10:27 AM, Welly Tambunan > wrote: > > What i'm looking for here is the ability to add a node to the cluster > (scale out) when there's no task slot left for use. > > > > > > On Tue, Nov 24, 2015 at 4:24 PM, Welly Tambunan > wrote: > > Hi All, > > > > Currently we are running flink using standalone mode. > > > > Is there any way to add one node ( task manager ) to the cluster without > bringing the cluster down ? > > > > > > Cheers > > > > -- > > Welly Tambunan > > Triplelands > > > > http://weltam.wordpress.com > > http://www.triplelands.com > > > > > > > > -- > > Welly Tambunan > > Triplelands > > > > http://weltam.wordpress.com > > http://www.triplelands.com > > > > > > > > > > -- > > Welly Tambunan > > Triplelands > > > > http://weltam.wordpress.com > > http://www.triplelands.com > > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Adding TaskManager on Cluster
Hi Till, I've just tried that. It's works like a charm. Thanks a lot. Is there any documentation on taskmanager.sh and other script and the parameters ? I try to look at the docs but can't find it Thanks again Cheers On Tue, Nov 24, 2015 at 4:29 PM, Till Rohrmann wrote: > Hi Welly, > > you can always start a new TaskManager by simply calling taskmanager.sh > start [streaming|batch], depending whether you are running a streaming > cluster or a batch cluster. You can find the script in /bin. > > Cheers, > Till > > > On Tue, Nov 24, 2015 at 10:27 AM, Welly Tambunan > wrote: > >> What i'm looking for here is the ability to add a node to the cluster >> (scale out) when there's no task slot left for use. >> >> >> On Tue, Nov 24, 2015 at 4:24 PM, Welly Tambunan >> wrote: >> >>> Hi All, >>> >>> Currently we are running flink using standalone mode. >>> >>> Is there any way to add one node ( task manager ) to the cluster without >>> bringing the cluster down ? >>> >>> >>> Cheers >>> >>> -- >>> Welly Tambunan >>> Triplelands >>> >>> http://weltam.wordpress.com >>> http://www.triplelands.com <http://www.triplelands.com/blog/> >>> >> >> >> >> -- >> Welly Tambunan >> Triplelands >> >> http://weltam.wordpress.com >> http://www.triplelands.com <http://www.triplelands.com/blog/> >> > > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Adding TaskManager on Cluster
What i'm looking for here is the ability to add a node to the cluster (scale out) when there's no task slot left for use. On Tue, Nov 24, 2015 at 4:24 PM, Welly Tambunan wrote: > Hi All, > > Currently we are running flink using standalone mode. > > Is there any way to add one node ( task manager ) to the cluster without > bringing the cluster down ? > > > Cheers > > -- > Welly Tambunan > Triplelands > > http://weltam.wordpress.com > http://www.triplelands.com <http://www.triplelands.com/blog/> > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Adding TaskManager on Cluster
Hi All, Currently we are running flink using standalone mode. Is there any way to add one node ( task manager ) to the cluster without bringing the cluster down ? Cheers -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Cancel Streaming Job
Hi Gyula and Ufuk, Thanks, I will give it a try. Cheers On Tue, Nov 24, 2015 at 3:42 PM, Ufuk Celebi wrote: > You can use the current release candidate if you like to try it out: > > Binaries are here: > > http://people.apache.org/~rmetzger/flink-0.10.1-rc1/ > > The dependency with version 0.10.1 is found in the staging repositories: > > https://repository.apache.org/content/repositories/orgapacheflink-1058 > > If you can wait a few more days, the official release vote will be over in > <= 72 hours if no issues are found. > > – Ufuk > > > On 24 Nov 2015, at 08:26, Gyula Fóra wrote: > > > > Hi! > > > > This issue has been fixed very recently and the fix will go into the > upcoming bugfix release. (0.10.1) > > > > Should be out in the next few days :) > > > > Cheers > > Gyula > > On Tue, Nov 24, 2015 at 4:49 AM Welly Tambunan > wrote: > > Hi All, > > > > Finally i've found the solution for killing the job manager. > > > > > https://flink.apache.org/faq.html#i-cant-stop-flink-with-the-provided-stop-scripts-what-can-i-do > > > > > > But i do really hope that we have that cancel button for restarting job. > > > > > > Cheers > > > > On Tue, Nov 24, 2015 at 8:30 AM, Welly Tambunan > wrote: > > Hi All, > > > > Is there any way to stop/cancel the job that's restarting ? > > > > I have already stop the cluster and start it again but seems it's still > restarting in dashboard. > > I also try to cancel the job via CLI by running bin/flink cancel > but it's not working. > > > > > > > > Cheers > > > > > > -- > > Welly Tambunan > > Triplelands > > > > http://weltam.wordpress.com > > http://www.triplelands.com > > > > > > > > -- > > Welly Tambunan > > Triplelands > > > > http://weltam.wordpress.com > > http://www.triplelands.com > > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Cancel Streaming Job
Hi All, Finally i've found the solution for killing the job manager. https://flink.apache.org/faq.html#i-cant-stop-flink-with-the-provided-stop-scripts-what-can-i-do But i do really hope that we have that cancel button for restarting job. Cheers On Tue, Nov 24, 2015 at 8:30 AM, Welly Tambunan wrote: > Hi All, > > Is there any way to stop/cancel the job that's restarting ? > > I have already stop the cluster and start it again but seems it's still > restarting in dashboard. > I also try to cancel the job via CLI by running bin/flink cancel > but it's not working. > > > > Cheers > > > -- > Welly Tambunan > Triplelands > > http://weltam.wordpress.com > http://www.triplelands.com <http://www.triplelands.com/blog/> > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Cancel Streaming Job
Hi All, Is there any way to stop/cancel the job that's restarting ? I have already stop the cluster and start it again but seems it's still restarting in dashboard. I also try to cancel the job via CLI by running bin/flink cancel but it's not working. Cheers -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Specially introduced Flink to chinese users in CNCC(China National Computer Congress)
agree, and Stateful Streaming operator instance in Flink is looks natural compare to Apache Spark. On Thu, Nov 19, 2015 at 10:54 AM, Liang Chen wrote: > Two aspects are attracting them: > 1.Flink is using java, it is easy for most of them to start Flink, and be > more easy to maintain in comparison to Storm(as Clojure is difficult to > maintain, and less people know it.) > 2.Users really want an unified system supporting streaming and batch > processing. > > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Specially-introduced-Flink-to-chinese-users-in-CNCC-China-National-Computer-Congress-tp3254p3574.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Apache Flink 0.10.0 released
Great Job guys, So this is the first production ready for Streaming API ! Cool ! Cheers On Mon, Nov 16, 2015 at 9:02 PM, Leonard Wolters wrote: > congrats! > > L. > > > On 16-11-15 14:53, Fabian Hueske wrote: > > Hi everybody, > > The Flink community is excited to announce that Apache Flink 0.10.0 has > been released. > Please find the release announcement here: > > --> http://flink.apache.org/news/2015/11/16/release-0.10.0.html > > Best, > Fabian > > > -- > Leonard Wolters > Chief Product Manager > *M*: +31 (0)6 55 53 04 01 | *T*: +31 (0)88 10 44 555 > *E*: leon...@sagent.io | *W*: sagent.io | Disclaimer > <http://sagent.io/email-disclaimer> | Sagent BV > Herengracht 504 | 1017CB Amsterdam | Netherlands > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Apache Flink Operator State as Query Cache
Hi Stephan, So that will be in Flink 1.0 right ? Cheers On Mon, Nov 16, 2015 at 9:06 PM, Stephan Ewen wrote: > Hi Anwar! > > 0.10.0 was feature frozen at that time already and under testing. > Key/value state on connected streams will have to go into the next > release... > > Stephan > > > On Mon, Nov 16, 2015 at 3:00 PM, Anwar Rizal wrote: > >> Stephan, >> >> Having a look at the brand new 0.10 release, I noticed that OperatorState >> is not implemented for ConnectedStream, which is quite the opposite of what >> you said below. >> >> Or maybe I misunderstood your sentence here ? >> >> Thanks, >> Anwar. >> >> >> On Wed, Nov 11, 2015 at 10:49 AM, Stephan Ewen wrote: >> >>> Hi! >>> >>> In general, if you can keep state in Flink, you get better >>> throughput/latency/consistency and have one less system to worry about >>> (external k/v store). State outside means that the Flink processes can be >>> slimmer and need fewer resources and as such recover a bit faster. There >>> are use cases for that as well. >>> >>> Storing the model in OperatorState is a good idea, if you can. On the >>> roadmap is to migrate the operator state to managed memory as well, so that >>> should take care of the GC issues. >>> >>> We are just adding functionality to make the Key/Value operator state >>> usable in CoMap/CoFlatMap as well (currently it only works in windows and >>> in Map/FlatMap/Filter functions over the KeyedStream). >>> Until the, you should be able to use a simple Java HashMap and use the >>> "Checkpointed" interface to get it persistent. >>> >>> Greetings, >>> Stephan >>> >>> >>> On Sun, Nov 8, 2015 at 10:11 AM, Welly Tambunan >>> wrote: >>> >>>> Thanks for the answer. >>>> >>>> Currently the approach that i'm using right now is creating a >>>> base/marker interface to stream different type of message to the same >>>> operator. Not sure about the performance hit about this compare to the >>>> CoFlatMap function. >>>> >>>> Basically this one is providing query cache, so i'm thinking instead of >>>> using in memory cache like redis, ignite etc, i can just use operator state >>>> for this one. >>>> >>>> I just want to gauge do i need to use memory cache or operator state >>>> would be just fine. >>>> >>>> However i'm concern about the Gen 2 Garbage Collection for caching our >>>> own state without using operator state. Is there any clarification on that >>>> one ? >>>> >>>> >>>> >>>> On Sat, Nov 7, 2015 at 12:38 AM, Anwar Rizal >>>> wrote: >>>> >>>>> >>>>> Let me understand your case better here. You have a stream of model >>>>> and stream of data. To process the data, you will need a way to access >>>>> your >>>>> model from the subsequent stream operations (map, filter, flatmap, ..). >>>>> I'm not sure in which case Operator State is a good choice, but I >>>>> think you can also live without. >>>>> >>>>> val modelStream = // get the model stream >>>>> val dataStream = >>>>> >>>>> modelStream.broadcast.connect(dataStream). coFlatMap( ) Then you can >>>>> keep the latest model in a CoFlatMapRichFunction, not necessarily as >>>>> Operator State, although maybe OperatorState is a good choice too. >>>>> >>>>> Does it make sense to you ? >>>>> >>>>> Anwar >>>>> >>>>> On Fri, Nov 6, 2015 at 10:21 AM, Welly Tambunan >>>>> wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> We have a high density data that required a downsample. However this >>>>>> downsample model is very flexible based on the client device and user >>>>>> interaction. So it will be wasteful to precompute and store to db. >>>>>> >>>>>> So we want to use Apache Flink to do downsampling and cache the >>>>>> result for subsequent query. >>>>>> >>>>>> We are considering using Flink Operator state for that one. >>>>>> >>>>>> Is that the right approach to use that for memory cache ? Or if that >>>>>> preferable using memory cache like redis etc. >>>>>> >>>>>> Any comments will be appreciated. >>>>>> >>>>>> >>>>>> Cheers >>>>>> -- >>>>>> Welly Tambunan >>>>>> Triplelands >>>>>> >>>>>> http://weltam.wordpress.com >>>>>> http://www.triplelands.com <http://www.triplelands.com/blog/> >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Welly Tambunan >>>> Triplelands >>>> >>>> http://weltam.wordpress.com >>>> http://www.triplelands.com <http://www.triplelands.com/blog/> >>>> >>> >>> >> > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Apache Flink Operator State as Query Cache
Hi Kostas, Yes. Exactly. Thanks a lot for this one. That's really what we need ! Cheers On Sun, Nov 15, 2015 at 8:53 PM, Kostas Tzoumas wrote: > Hi Wally, > > This version adds support for specifying and switching between time > semantics - processing time, ingestion time, or event time. > > When working with event time, you can specify watermarks to track the > progress of event time. So, even if events arrive out of order, windows > will be specified on the event time (not arrival time), and the computation > will be triggered on watermark arrival. > > You can see the API reference and an example here: > https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#working-with-time > > Is this what you are looking for? > > Kostas > > > On Sat, Nov 14, 2015 at 1:54 AM, Welly Tambunan wrote: > >> Hi Robert, >> >> Is this version has already handle the stream perfection or out of order >> event ? >> >> Any resource on how this work and the API reference ? >> >> >> Cheers >> >> On Fri, Nov 13, 2015 at 4:00 PM, Welly Tambunan >> wrote: >> >>> Awesome ! >>> >>> This is really the best weekend gift ever. :) >>> >>> Cheers >>> >>> On Fri, Nov 13, 2015 at 3:54 PM, Robert Metzger >>> wrote: >>> >>>> Hi Welly, >>>> Flink 0.10.0 is out, its just not announced yet. >>>> Its available on maven central and the global mirrors are currently >>>> syncing it. This mirror for example has the update already: >>>> http://apache.mirror.digionline.de/flink/flink-0.10.0/ >>>> >>>> On Fri, Nov 13, 2015 at 9:50 AM, Welly Tambunan >>>> wrote: >>>> >>>>> Hi Aljoscha, >>>>> >>>>> Thanks for this one. Looking forward for 0.10 release version. >>>>> >>>>> Cheers >>>>> >>>>> On Thu, Nov 12, 2015 at 5:34 PM, Aljoscha Krettek >>>> > wrote: >>>>> >>>>>> Hi, >>>>>> I don’t know yet when the operator state will be transitioned to >>>>>> managed memory but it could happen for 1.0 (which will come after 0.10). >>>>>> The good thing is that the interfaces won’t change, so state can be used >>>>>> as >>>>>> it is now. >>>>>> >>>>>> For 0.10, the release vote is winding down right now, so you can >>>>>> expect the release to happen today or tomorrow. I think the streaming is >>>>>> production ready now, we expect to mostly to hardening and some >>>>>> infrastructure changes (for example annotations that specify API >>>>>> stability) >>>>>> for the 1.0 release. >>>>>> >>>>>> Let us know if you need more information. >>>>>> >>>>>> Cheers, >>>>>> Aljoscha >>>>>> > On 12 Nov 2015, at 02:42, Welly Tambunan wrote: >>>>>> > >>>>>> > Hi Stephan, >>>>>> > >>>>>> > >Storing the model in OperatorState is a good idea, if you can. On >>>>>> the roadmap is to migrate the operator state to managed memory as well, >>>>>> so >>>>>> that should take care of the GC issues. >>>>>> > Is this using off the heap memory ? Which version we expect this >>>>>> one to be available ? >>>>>> > >>>>>> > Another question is when will the release version of 0.10 will be >>>>>> out ? We would love to upgrade to that one when it's available. That >>>>>> version will be a production ready streaming right ? >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > On Wed, Nov 11, 2015 at 4:49 PM, Stephan Ewen >>>>>> wrote: >>>>>> > Hi! >>>>>> > >>>>>> > In general, if you can keep state in Flink, you get better >>>>>> throughput/latency/consistency and have one less system to worry about >>>>>> (external k/v store). State outside means that the Flink processes can be >>>>>> slimmer and need fewer resources and as such recover a bit faster. There >>>>>> are use cases for that as well. >>>>>> > &
Re: Apache Flink Operator State as Query Cache
Hi Robert, Is this version has already handle the stream perfection or out of order event ? Any resource on how this work and the API reference ? Cheers On Fri, Nov 13, 2015 at 4:00 PM, Welly Tambunan wrote: > Awesome ! > > This is really the best weekend gift ever. :) > > Cheers > > On Fri, Nov 13, 2015 at 3:54 PM, Robert Metzger > wrote: > >> Hi Welly, >> Flink 0.10.0 is out, its just not announced yet. >> Its available on maven central and the global mirrors are currently >> syncing it. This mirror for example has the update already: >> http://apache.mirror.digionline.de/flink/flink-0.10.0/ >> >> On Fri, Nov 13, 2015 at 9:50 AM, Welly Tambunan >> wrote: >> >>> Hi Aljoscha, >>> >>> Thanks for this one. Looking forward for 0.10 release version. >>> >>> Cheers >>> >>> On Thu, Nov 12, 2015 at 5:34 PM, Aljoscha Krettek >>> wrote: >>> >>>> Hi, >>>> I don’t know yet when the operator state will be transitioned to >>>> managed memory but it could happen for 1.0 (which will come after 0.10). >>>> The good thing is that the interfaces won’t change, so state can be used as >>>> it is now. >>>> >>>> For 0.10, the release vote is winding down right now, so you can expect >>>> the release to happen today or tomorrow. I think the streaming is >>>> production ready now, we expect to mostly to hardening and some >>>> infrastructure changes (for example annotations that specify API stability) >>>> for the 1.0 release. >>>> >>>> Let us know if you need more information. >>>> >>>> Cheers, >>>> Aljoscha >>>> > On 12 Nov 2015, at 02:42, Welly Tambunan wrote: >>>> > >>>> > Hi Stephan, >>>> > >>>> > >Storing the model in OperatorState is a good idea, if you can. On >>>> the roadmap is to migrate the operator state to managed memory as well, so >>>> that should take care of the GC issues. >>>> > Is this using off the heap memory ? Which version we expect this one >>>> to be available ? >>>> > >>>> > Another question is when will the release version of 0.10 will be out >>>> ? We would love to upgrade to that one when it's available. That version >>>> will be a production ready streaming right ? >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > On Wed, Nov 11, 2015 at 4:49 PM, Stephan Ewen >>>> wrote: >>>> > Hi! >>>> > >>>> > In general, if you can keep state in Flink, you get better >>>> throughput/latency/consistency and have one less system to worry about >>>> (external k/v store). State outside means that the Flink processes can be >>>> slimmer and need fewer resources and as such recover a bit faster. There >>>> are use cases for that as well. >>>> > >>>> > Storing the model in OperatorState is a good idea, if you can. On the >>>> roadmap is to migrate the operator state to managed memory as well, so that >>>> should take care of the GC issues. >>>> > >>>> > We are just adding functionality to make the Key/Value operator state >>>> usable in CoMap/CoFlatMap as well (currently it only works in windows and >>>> in Map/FlatMap/Filter functions over the KeyedStream). >>>> > Until the, you should be able to use a simple Java HashMap and use >>>> the "Checkpointed" interface to get it persistent. >>>> > >>>> > Greetings, >>>> > Stephan >>>> > >>>> > >>>> > On Sun, Nov 8, 2015 at 10:11 AM, Welly Tambunan >>>> wrote: >>>> > Thanks for the answer. >>>> > >>>> > Currently the approach that i'm using right now is creating a >>>> base/marker interface to stream different type of message to the same >>>> operator. Not sure about the performance hit about this compare to the >>>> CoFlatMap function. >>>> > >>>> > Basically this one is providing query cache, so i'm thinking instead >>>> of using in memory cache like redis, ignite etc, i can just use operator >>>> state for this one. >>>> > >>>> > I just want to gauge do i need to use memory cache or operator state >>>> would be just
Re: Multilang Support on Flink
Hi Max, Do you know where the repo is ? I try to search on the flink staging but seems it's not there anymore ( via google) Cheers On Fri, Nov 13, 2015 at 5:07 PM, Maximilian Michels wrote: > Hi Welly, > > There is a protocol for communicating with other processes. This is > reflected in flink-language-binding-generic module. I'm not aware how > Spark or Storm communication protocols work but this protocol is > rather low level. > > Cheers, > Max > > On Fri, Nov 13, 2015 at 9:49 AM, Welly Tambunan wrote: > > Hi All, > > > > I want to ask if there's multilang support ( like in Storm and pipeTo in > > Spark ) in flink ? > > > > I try to find it in the docs but can't find it. > > > > Any link or direction would be really appreciated. > > > > > > Cheers > > > > -- > > Welly Tambunan > > Triplelands > > > > http://weltam.wordpress.com > > http://www.triplelands.com > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Flink, Kappa and Lambda
Hi Christian, Valid point. Thanks a lot for your great explanation. In our case, we want to avoid lots of moving part. We are working on the greenfield project so we want to adopt the latest approach as we have a flexibility right now. We also asses the Apache Spark before and it doesn't suit our real-time purpose. But we have a really great experience with Apache Flink until now. So that's why we trying to do everything as streaming as that has a HUGE business value for us. And fortunately the technology seems already there on the streaming world. Cheers On Fri, Nov 13, 2015 at 7:21 PM, Christian Kreutzfeldt wrote: > Hi > > Personally, I find the the concepts of the so-called Kappa > architecture intriguing. But I doubt that it is applicable in a generic > setup where different use cases are mapped into the architecture. To be > fair, I think the same applies to Lambda architectures. Therefore I > wouldn't assume that Lambda architectures are obsolete with the advent of > Kappa as new architectural paradigm. > > From my point of view, it all depends on the use case that you want to > solve. For example, I saw a presentation given by Eric Tschetter and > Fangjin Yang of MetaMarkets on how they use Hadoop and Druid to drive their > business. They used Hadoop as long-term storage and Druid on the serving > layer to provide up-to-date data into the business by updating it in > sub-second intervals. Regularly they algin both systems to be consistent. > In their case, the Lambda architecture serves their business quite well: > speed achieved through the streaming layer and long time persistence > through the batch layer. > > In cases where you - for example - want to create views on customer > sessions by aggregating all events belonging to a single person and use > them to > > * serve recommendation systems while the customer is still on your website > and > * keep them persistent in a long-term archive > > people tend to build typical Lambda architectures with duplicated > sessionizing code on both layers. From my point of view this is unnecessary > and introduces an additional source of errors. As customer sessions are > created as stream of events, simply implement the logic on your streaming > layer and persist the final session after a timeout in those systems where > you need the data to be present: eg. recommender system receives constant > updates on each new event and the batch layer (Hadoop) receives the > finished session after it timed out. > > As Lambda - in most cases - is implemented to do the same thing on both > layers, later merging the results to keep states consistent, the Kappa > architecture introduces an interesting pattern that people often are not > aware of. The idea to persist the stream itself and get rid of other > systems, like RDBMS, NoSQL DBs or any other type of archive software, is > often accepted as cheap way to reduce costs and maintenance efforts. > > But I think Kappa does more and may be expanded to other systems than > streaming as well. You keep the data at that system persistent where it > arrived or received a state you expect in subsequent systems. Why should I > convert a stream of tracking events into a static schema and store the data > inside an RDBMS? What if I rely on its nature that data is coming in as > stream and do not want to have it exported/imported as bulk update but have > the same stream replayed later? What about information loss? Being a stream > of events is part of the information as well like the attributes each event > carries. > > So, if Kappa is understood as architectural pattern where data is kept and > processed the way it arrived or is expected by subsequent systems, I do not > think that it will ever replace Lambda but it will complement it. > > Therefore I would like to give you the advice to look at your use case(s) > and design the architecture as you need it. Do not stick with a certain > pattern but deploy those parts that fit with your use-case. This context is > far too young that it provides you with additional value strictly following > a certain pattern, eg to make it more easier to integrate with third-party > software. > > Best > Christian > > > 2015-11-13 9:51 GMT+01:00 Welly Tambunan : > >> Hi rss rss, >> >> Yes. I have already read that book. >> >> However given the state of streaming right now, and Kappa Architecture, I >> don't think we need Lambda Architecture again ? >> >> Any thoughts ? >> >> On Thu, Nov 12, 2015 at 12:29 PM, rss rss wrote: >> >>> Hello, >>> >>> regarding the Lambda architecture there is a following book - >>> https://www.manning.com/books/big-data (Big Dat
Re: Apache Flink Forward Videos
Thanks Max, I see that's all the videos has already been there. The keynote has also been uploaded. Great stuff !! Cheers On Fri, Nov 13, 2015 at 5:12 PM, Maximilian Michels wrote: > Hi Welly, > > Thanks for sharing! The videos are coming. They soon will all be available. > > Cheers, > Max > > On Fri, Nov 13, 2015 at 11:08 AM, Welly Tambunan > wrote: > > Hi All, > > > > I've just notice that the video has already available for this one. > > > > http://flink-forward.org/?post_type=session > > > > > > Another weekend gift for all. > > > > Cheers > > -- > > Welly Tambunan > > Triplelands > > > > http://weltam.wordpress.com > > http://www.triplelands.com > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Apache Flink Forward Videos
Hi All, I've just notice that the video has already available for this one. http://flink-forward.org/?post_type=session Another weekend gift for all. Cheers -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Apache Flink Operator State as Query Cache
Awesome ! This is really the best weekend gift ever. :) Cheers On Fri, Nov 13, 2015 at 3:54 PM, Robert Metzger wrote: > Hi Welly, > Flink 0.10.0 is out, its just not announced yet. > Its available on maven central and the global mirrors are currently > syncing it. This mirror for example has the update already: > http://apache.mirror.digionline.de/flink/flink-0.10.0/ > > On Fri, Nov 13, 2015 at 9:50 AM, Welly Tambunan wrote: > >> Hi Aljoscha, >> >> Thanks for this one. Looking forward for 0.10 release version. >> >> Cheers >> >> On Thu, Nov 12, 2015 at 5:34 PM, Aljoscha Krettek >> wrote: >> >>> Hi, >>> I don’t know yet when the operator state will be transitioned to managed >>> memory but it could happen for 1.0 (which will come after 0.10). The good >>> thing is that the interfaces won’t change, so state can be used as it is >>> now. >>> >>> For 0.10, the release vote is winding down right now, so you can expect >>> the release to happen today or tomorrow. I think the streaming is >>> production ready now, we expect to mostly to hardening and some >>> infrastructure changes (for example annotations that specify API stability) >>> for the 1.0 release. >>> >>> Let us know if you need more information. >>> >>> Cheers, >>> Aljoscha >>> > On 12 Nov 2015, at 02:42, Welly Tambunan wrote: >>> > >>> > Hi Stephan, >>> > >>> > >Storing the model in OperatorState is a good idea, if you can. On the >>> roadmap is to migrate the operator state to managed memory as well, so that >>> should take care of the GC issues. >>> > Is this using off the heap memory ? Which version we expect this one >>> to be available ? >>> > >>> > Another question is when will the release version of 0.10 will be out >>> ? We would love to upgrade to that one when it's available. That version >>> will be a production ready streaming right ? >>> > >>> > >>> > >>> > >>> > >>> > On Wed, Nov 11, 2015 at 4:49 PM, Stephan Ewen >>> wrote: >>> > Hi! >>> > >>> > In general, if you can keep state in Flink, you get better >>> throughput/latency/consistency and have one less system to worry about >>> (external k/v store). State outside means that the Flink processes can be >>> slimmer and need fewer resources and as such recover a bit faster. There >>> are use cases for that as well. >>> > >>> > Storing the model in OperatorState is a good idea, if you can. On the >>> roadmap is to migrate the operator state to managed memory as well, so that >>> should take care of the GC issues. >>> > >>> > We are just adding functionality to make the Key/Value operator state >>> usable in CoMap/CoFlatMap as well (currently it only works in windows and >>> in Map/FlatMap/Filter functions over the KeyedStream). >>> > Until the, you should be able to use a simple Java HashMap and use the >>> "Checkpointed" interface to get it persistent. >>> > >>> > Greetings, >>> > Stephan >>> > >>> > >>> > On Sun, Nov 8, 2015 at 10:11 AM, Welly Tambunan >>> wrote: >>> > Thanks for the answer. >>> > >>> > Currently the approach that i'm using right now is creating a >>> base/marker interface to stream different type of message to the same >>> operator. Not sure about the performance hit about this compare to the >>> CoFlatMap function. >>> > >>> > Basically this one is providing query cache, so i'm thinking instead >>> of using in memory cache like redis, ignite etc, i can just use operator >>> state for this one. >>> > >>> > I just want to gauge do i need to use memory cache or operator state >>> would be just fine. >>> > >>> > However i'm concern about the Gen 2 Garbage Collection for caching our >>> own state without using operator state. Is there any clarification on that >>> one ? >>> > >>> > >>> > >>> > On Sat, Nov 7, 2015 at 12:38 AM, Anwar Rizal >>> wrote: >>> > >>> > Let me understand your case better here. You have a stream of model >>> and stream of data. To process the data, you will need a way to access your >>> model from the subsequent stream operations (map, filter,
Re: Flink, Kappa and Lambda
Hi Nick, I totally agree with your point. My concern is the Kafka, is the author concern really true ? Any one can give comments on this one ? On Thu, Nov 12, 2015 at 12:33 PM, Nick Dimiduk wrote: > The first and 3rd points here aren't very fair -- they apply to all data > systems. Systems downstream of your database can lose data in the same way; > the database retention policy expires old data, downstream fails, and back > to the tapes you must go. Likewise with 3, a bug in any ETL system can > cause problems. Also not specific to streaming in general or Kafka/Flink > specifically. > > I'm much more curious about the 2nd claim. The whole point of high > availability in these systems is to not lose data during failure. The > post's author is not specific on any of these points, but just like I look > to a distributed database community to prove to me it doesn't lose data in > these corner cases, so too do I expect Kafka to prove it is resilient. In > the absence of software formally proven correct, I look to empirical > evidence in the form of chaos monkey type tests. > > > On Wednesday, November 11, 2015, Welly Tambunan wrote: > >> Hi Stephan, >> >> >> Thanks for your response. >> >> >> We are trying to justify whether it's enough to use Kappa Architecture >> with Flink. This more about resiliency and message lost issue etc. >> >> The article is worry about message lost even if you are using Kafka. >> >> No matter the message queue or broker you rely on whether it be RabbitMQ, >> JMS, ActiveMQ, Websphere, MSMQ and yes even Kafka you can lose messages in >> any of the following ways: >> >>- A downstream system from the broker can have data loss >>- All message queues today can lose already acknowledged messages >>during failover or leader election. >>- A bug can send the wrong messages to the wrong systems. >> >> Cheers >> >> On Wed, Nov 11, 2015 at 4:13 PM, Stephan Ewen wrote: >> >>> Hi! >>> >>> Can you explain a little more what you want to achieve? Maybe then we >>> can give a few more comments... >>> >>> I briefly read through some of the articles you linked, but did not >>> quite understand their train of thoughts. >>> For example, letting Tomcat write to Cassandra directly, and to Kafka, >>> might just be redundant. Why not let the streaming job that reads the Kafka >>> queue >>> move the data to Cassandra as one of its results? Further more, durable >>> storing the sequence of events is exactly what Kafka does, but the article >>> suggests to use Cassandra for that, which I find very counter intuitive. >>> It looks a bit like the suggested approach is only adopting streaming for >>> half the task. >>> >>> Greetings, >>> Stephan >>> >>> >>> On Tue, Nov 10, 2015 at 7:49 AM, Welly Tambunan >>> wrote: >>> >>>> Hi All, >>>> >>>> I read a couple of article about Kappa and Lambda Architecture. >>>> >>>> >>>> http://www.confluent.io/blog/real-time-stream-processing-the-next-step-for-apache-flink/ >>>> >>>> I'm convince that Flink will simplify this one with streaming. >>>> >>>> However i also stumble upon this blog post that has valid argument to >>>> have a system of record storage ( event sourcing ) and finally lambda >>>> architecture is appear at the solution. Basically it will write twice to >>>> Queuing system and C* for safety. System of record here is basically >>>> storing the event (delta). >>>> >>>> [image: Inline image 1] >>>> >>>> >>>> https://lostechies.com/ryansvihla/2015/09/17/event-sourcing-and-system-of-record-sane-distributed-development-in-the-modern-era-2/ >>>> >>>> Another approach is about lambda architecture for maintaining the >>>> correctness of the system. >>>> >>>> >>>> https://lostechies.com/ryansvihla/2015/09/17/real-time-analytics-with-spark-streaming-and-cassandra/ >>>> >>>> >>>> Given that he's using Spark for the streaming processor, do we have to >>>> do the same thing with Apache Flink ? >>>> >>>> >>>> >>>> Cheers >>>> -- >>>> Welly Tambunan >>>> Triplelands >>>> >>>> http://weltam.wordpress.com >>>> http://www.triplelands.com <http://www.triplelands.com/blog/> >>>> >>> >>> >> >> >> -- >> Welly Tambunan >> Triplelands >> >> http://weltam.wordpress.com >> http://www.triplelands.com <http://www.triplelands.com/blog/> >> > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Flink, Kappa and Lambda
Hi rss rss, Yes. I have already read that book. However given the state of streaming right now, and Kappa Architecture, I don't think we need Lambda Architecture again ? Any thoughts ? On Thu, Nov 12, 2015 at 12:29 PM, rss rss wrote: > Hello, > > regarding the Lambda architecture there is a following book - > https://www.manning.com/books/big-data (Big Data. Principles and best > practices of scalable realtime data systems > Nathan Marz and James Warren). > > Regards, > Roman > > 2015-11-12 4:47 GMT+03:00 Welly Tambunan : > >> Hi Stephan, >> >> >> Thanks for your response. >> >> >> We are trying to justify whether it's enough to use Kappa Architecture >> with Flink. This more about resiliency and message lost issue etc. >> >> The article is worry about message lost even if you are using Kafka. >> >> No matter the message queue or broker you rely on whether it be RabbitMQ, >> JMS, ActiveMQ, Websphere, MSMQ and yes even Kafka you can lose messages in >> any of the following ways: >> >>- A downstream system from the broker can have data loss >>- All message queues today can lose already acknowledged messages >>during failover or leader election. >>- A bug can send the wrong messages to the wrong systems. >> >> Cheers >> >> On Wed, Nov 11, 2015 at 4:13 PM, Stephan Ewen wrote: >> >>> Hi! >>> >>> Can you explain a little more what you want to achieve? Maybe then we >>> can give a few more comments... >>> >>> I briefly read through some of the articles you linked, but did not >>> quite understand their train of thoughts. >>> For example, letting Tomcat write to Cassandra directly, and to Kafka, >>> might just be redundant. Why not let the streaming job that reads the Kafka >>> queue >>> move the data to Cassandra as one of its results? Further more, durable >>> storing the sequence of events is exactly what Kafka does, but the article >>> suggests to use Cassandra for that, which I find very counter intuitive. >>> It looks a bit like the suggested approach is only adopting streaming for >>> half the task. >>> >>> Greetings, >>> Stephan >>> >>> >>> On Tue, Nov 10, 2015 at 7:49 AM, Welly Tambunan >>> wrote: >>> >>>> Hi All, >>>> >>>> I read a couple of article about Kappa and Lambda Architecture. >>>> >>>> >>>> http://www.confluent.io/blog/real-time-stream-processing-the-next-step-for-apache-flink/ >>>> >>>> I'm convince that Flink will simplify this one with streaming. >>>> >>>> However i also stumble upon this blog post that has valid argument to >>>> have a system of record storage ( event sourcing ) and finally lambda >>>> architecture is appear at the solution. Basically it will write twice to >>>> Queuing system and C* for safety. System of record here is basically >>>> storing the event (delta). >>>> >>>> [image: Inline image 1] >>>> >>>> >>>> https://lostechies.com/ryansvihla/2015/09/17/event-sourcing-and-system-of-record-sane-distributed-development-in-the-modern-era-2/ >>>> >>>> Another approach is about lambda architecture for maintaining the >>>> correctness of the system. >>>> >>>> >>>> https://lostechies.com/ryansvihla/2015/09/17/real-time-analytics-with-spark-streaming-and-cassandra/ >>>> >>>> >>>> Given that he's using Spark for the streaming processor, do we have to >>>> do the same thing with Apache Flink ? >>>> >>>> >>>> >>>> Cheers >>>> -- >>>> Welly Tambunan >>>> Triplelands >>>> >>>> http://weltam.wordpress.com >>>> http://www.triplelands.com <http://www.triplelands.com/blog/> >>>> >>> >>> >> >> >> -- >> Welly Tambunan >> Triplelands >> >> http://weltam.wordpress.com >> http://www.triplelands.com <http://www.triplelands.com/blog/> >> > > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Apache Flink Operator State as Query Cache
Hi Aljoscha, Thanks for this one. Looking forward for 0.10 release version. Cheers On Thu, Nov 12, 2015 at 5:34 PM, Aljoscha Krettek wrote: > Hi, > I don’t know yet when the operator state will be transitioned to managed > memory but it could happen for 1.0 (which will come after 0.10). The good > thing is that the interfaces won’t change, so state can be used as it is > now. > > For 0.10, the release vote is winding down right now, so you can expect > the release to happen today or tomorrow. I think the streaming is > production ready now, we expect to mostly to hardening and some > infrastructure changes (for example annotations that specify API stability) > for the 1.0 release. > > Let us know if you need more information. > > Cheers, > Aljoscha > > On 12 Nov 2015, at 02:42, Welly Tambunan wrote: > > > > Hi Stephan, > > > > >Storing the model in OperatorState is a good idea, if you can. On the > roadmap is to migrate the operator state to managed memory as well, so that > should take care of the GC issues. > > Is this using off the heap memory ? Which version we expect this one to > be available ? > > > > Another question is when will the release version of 0.10 will be out ? > We would love to upgrade to that one when it's available. That version will > be a production ready streaming right ? > > > > > > > > > > > > On Wed, Nov 11, 2015 at 4:49 PM, Stephan Ewen wrote: > > Hi! > > > > In general, if you can keep state in Flink, you get better > throughput/latency/consistency and have one less system to worry about > (external k/v store). State outside means that the Flink processes can be > slimmer and need fewer resources and as such recover a bit faster. There > are use cases for that as well. > > > > Storing the model in OperatorState is a good idea, if you can. On the > roadmap is to migrate the operator state to managed memory as well, so that > should take care of the GC issues. > > > > We are just adding functionality to make the Key/Value operator state > usable in CoMap/CoFlatMap as well (currently it only works in windows and > in Map/FlatMap/Filter functions over the KeyedStream). > > Until the, you should be able to use a simple Java HashMap and use the > "Checkpointed" interface to get it persistent. > > > > Greetings, > > Stephan > > > > > > On Sun, Nov 8, 2015 at 10:11 AM, Welly Tambunan > wrote: > > Thanks for the answer. > > > > Currently the approach that i'm using right now is creating a > base/marker interface to stream different type of message to the same > operator. Not sure about the performance hit about this compare to the > CoFlatMap function. > > > > Basically this one is providing query cache, so i'm thinking instead of > using in memory cache like redis, ignite etc, i can just use operator state > for this one. > > > > I just want to gauge do i need to use memory cache or operator state > would be just fine. > > > > However i'm concern about the Gen 2 Garbage Collection for caching our > own state without using operator state. Is there any clarification on that > one ? > > > > > > > > On Sat, Nov 7, 2015 at 12:38 AM, Anwar Rizal > wrote: > > > > Let me understand your case better here. You have a stream of model and > stream of data. To process the data, you will need a way to access your > model from the subsequent stream operations (map, filter, flatmap, ..). > > I'm not sure in which case Operator State is a good choice, but I think > you can also live without. > > > > val modelStream = // get the model stream > > val dataStream = > > > > modelStream.broadcast.connect(dataStream). coFlatMap( ) Then you can > keep the latest model in a CoFlatMapRichFunction, not necessarily as > Operator State, although maybe OperatorState is a good choice too. > > > > Does it make sense to you ? > > > > Anwar > > > > On Fri, Nov 6, 2015 at 10:21 AM, Welly Tambunan > wrote: > > Hi All, > > > > We have a high density data that required a downsample. However this > downsample model is very flexible based on the client device and user > interaction. So it will be wasteful to precompute and store to db. > > > > So we want to use Apache Flink to do downsampling and cache the result > for subsequent query. > > > > We are considering using Flink Operator state for that one. > > > > Is that the right approach to use that for memory cache ? Or if that > preferable using memory cache like redis etc. > > > > Any comments will be appreciated. > > > > > > Cheers > > -- > > Welly Tambunan > > Triplelands > > > > http://weltam.wordpress.com > > http://www.triplelands.com > > > > > > > > > > -- > > Welly Tambunan > > Triplelands > > > > http://weltam.wordpress.com > > http://www.triplelands.com > > > > > > > > > > -- > > Welly Tambunan > > Triplelands > > > > http://weltam.wordpress.com > > http://www.triplelands.com > > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Multilang Support on Flink
Hi All, I want to ask if there's multilang support ( like in Storm and pipeTo in Spark ) in flink ? I try to find it in the docs but can't find it. Any link or direction would be really appreciated. Cheers -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Flink, Kappa and Lambda
Hi Stephan, Thanks for your response. We are trying to justify whether it's enough to use Kappa Architecture with Flink. This more about resiliency and message lost issue etc. The article is worry about message lost even if you are using Kafka. No matter the message queue or broker you rely on whether it be RabbitMQ, JMS, ActiveMQ, Websphere, MSMQ and yes even Kafka you can lose messages in any of the following ways: - A downstream system from the broker can have data loss - All message queues today can lose already acknowledged messages during failover or leader election. - A bug can send the wrong messages to the wrong systems. Cheers On Wed, Nov 11, 2015 at 4:13 PM, Stephan Ewen wrote: > Hi! > > Can you explain a little more what you want to achieve? Maybe then we can > give a few more comments... > > I briefly read through some of the articles you linked, but did not quite > understand their train of thoughts. > For example, letting Tomcat write to Cassandra directly, and to Kafka, > might just be redundant. Why not let the streaming job that reads the Kafka > queue > move the data to Cassandra as one of its results? Further more, durable > storing the sequence of events is exactly what Kafka does, but the article > suggests to use Cassandra for that, which I find very counter intuitive. > It looks a bit like the suggested approach is only adopting streaming for > half the task. > > Greetings, > Stephan > > > On Tue, Nov 10, 2015 at 7:49 AM, Welly Tambunan wrote: > >> Hi All, >> >> I read a couple of article about Kappa and Lambda Architecture. >> >> >> http://www.confluent.io/blog/real-time-stream-processing-the-next-step-for-apache-flink/ >> >> I'm convince that Flink will simplify this one with streaming. >> >> However i also stumble upon this blog post that has valid argument to >> have a system of record storage ( event sourcing ) and finally lambda >> architecture is appear at the solution. Basically it will write twice to >> Queuing system and C* for safety. System of record here is basically >> storing the event (delta). >> >> [image: Inline image 1] >> >> >> https://lostechies.com/ryansvihla/2015/09/17/event-sourcing-and-system-of-record-sane-distributed-development-in-the-modern-era-2/ >> >> Another approach is about lambda architecture for maintaining the >> correctness of the system. >> >> >> https://lostechies.com/ryansvihla/2015/09/17/real-time-analytics-with-spark-streaming-and-cassandra/ >> >> >> Given that he's using Spark for the streaming processor, do we have to do >> the same thing with Apache Flink ? >> >> >> >> Cheers >> -- >> Welly Tambunan >> Triplelands >> >> http://weltam.wordpress.com >> http://www.triplelands.com <http://www.triplelands.com/blog/> >> > > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Apache Flink Operator State as Query Cache
Hi Stephan, >Storing the model in OperatorState is a good idea, if you can. On the roadmap is to migrate the operator state to managed memory as well, so that should take care of the GC issues. Is this using off the heap memory ? Which version we expect this one to be available ? Another question is when will the release version of 0.10 will be out ? We would love to upgrade to that one when it's available. That version will be a production ready streaming right ? On Wed, Nov 11, 2015 at 4:49 PM, Stephan Ewen wrote: > Hi! > > In general, if you can keep state in Flink, you get better > throughput/latency/consistency and have one less system to worry about > (external k/v store). State outside means that the Flink processes can be > slimmer and need fewer resources and as such recover a bit faster. There > are use cases for that as well. > > Storing the model in OperatorState is a good idea, if you can. On the > roadmap is to migrate the operator state to managed memory as well, so that > should take care of the GC issues. > > We are just adding functionality to make the Key/Value operator state > usable in CoMap/CoFlatMap as well (currently it only works in windows and > in Map/FlatMap/Filter functions over the KeyedStream). > Until the, you should be able to use a simple Java HashMap and use the > "Checkpointed" interface to get it persistent. > > Greetings, > Stephan > > > On Sun, Nov 8, 2015 at 10:11 AM, Welly Tambunan wrote: > >> Thanks for the answer. >> >> Currently the approach that i'm using right now is creating a base/marker >> interface to stream different type of message to the same operator. Not >> sure about the performance hit about this compare to the CoFlatMap >> function. >> >> Basically this one is providing query cache, so i'm thinking instead of >> using in memory cache like redis, ignite etc, i can just use operator state >> for this one. >> >> I just want to gauge do i need to use memory cache or operator state >> would be just fine. >> >> However i'm concern about the Gen 2 Garbage Collection for caching our >> own state without using operator state. Is there any clarification on that >> one ? >> >> >> >> On Sat, Nov 7, 2015 at 12:38 AM, Anwar Rizal wrote: >> >>> >>> Let me understand your case better here. You have a stream of model and >>> stream of data. To process the data, you will need a way to access your >>> model from the subsequent stream operations (map, filter, flatmap, ..). >>> I'm not sure in which case Operator State is a good choice, but I think >>> you can also live without. >>> >>> val modelStream = // get the model stream >>> val dataStream = >>> >>> modelStream.broadcast.connect(dataStream). coFlatMap( ) Then you can >>> keep the latest model in a CoFlatMapRichFunction, not necessarily as >>> Operator State, although maybe OperatorState is a good choice too. >>> >>> Does it make sense to you ? >>> >>> Anwar >>> >>> On Fri, Nov 6, 2015 at 10:21 AM, Welly Tambunan >>> wrote: >>> >>>> Hi All, >>>> >>>> We have a high density data that required a downsample. However this >>>> downsample model is very flexible based on the client device and user >>>> interaction. So it will be wasteful to precompute and store to db. >>>> >>>> So we want to use Apache Flink to do downsampling and cache the result >>>> for subsequent query. >>>> >>>> We are considering using Flink Operator state for that one. >>>> >>>> Is that the right approach to use that for memory cache ? Or if that >>>> preferable using memory cache like redis etc. >>>> >>>> Any comments will be appreciated. >>>> >>>> >>>> Cheers >>>> -- >>>> Welly Tambunan >>>> Triplelands >>>> >>>> http://weltam.wordpress.com >>>> http://www.triplelands.com <http://www.triplelands.com/blog/> >>>> >>> >>> >> >> >> -- >> Welly Tambunan >> Triplelands >> >> http://weltam.wordpress.com >> http://www.triplelands.com <http://www.triplelands.com/blog/> >> > > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Flink, Kappa and Lambda
Hi All, I read a couple of article about Kappa and Lambda Architecture. http://www.confluent.io/blog/real-time-stream-processing-the-next-step-for-apache-flink/ I'm convince that Flink will simplify this one with streaming. However i also stumble upon this blog post that has valid argument to have a system of record storage ( event sourcing ) and finally lambda architecture is appear at the solution. Basically it will write twice to Queuing system and C* for safety. System of record here is basically storing the event (delta). [image: Inline image 1] https://lostechies.com/ryansvihla/2015/09/17/event-sourcing-and-system-of-record-sane-distributed-development-in-the-modern-era-2/ Another approach is about lambda architecture for maintaining the correctness of the system. https://lostechies.com/ryansvihla/2015/09/17/real-time-analytics-with-spark-streaming-and-cassandra/ Given that he's using Spark for the streaming processor, do we have to do the same thing with Apache Flink ? Cheers -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Apache Flink Operator State as Query Cache
Thanks for the answer. Currently the approach that i'm using right now is creating a base/marker interface to stream different type of message to the same operator. Not sure about the performance hit about this compare to the CoFlatMap function. Basically this one is providing query cache, so i'm thinking instead of using in memory cache like redis, ignite etc, i can just use operator state for this one. I just want to gauge do i need to use memory cache or operator state would be just fine. However i'm concern about the Gen 2 Garbage Collection for caching our own state without using operator state. Is there any clarification on that one ? On Sat, Nov 7, 2015 at 12:38 AM, Anwar Rizal wrote: > > Let me understand your case better here. You have a stream of model and > stream of data. To process the data, you will need a way to access your > model from the subsequent stream operations (map, filter, flatmap, ..). > I'm not sure in which case Operator State is a good choice, but I think > you can also live without. > > val modelStream = // get the model stream > val dataStream = > > modelStream.broadcast.connect(dataStream). coFlatMap( ) Then you can keep > the latest model in a CoFlatMapRichFunction, not necessarily as Operator > State, although maybe OperatorState is a good choice too. > > Does it make sense to you ? > > Anwar > > On Fri, Nov 6, 2015 at 10:21 AM, Welly Tambunan wrote: > >> Hi All, >> >> We have a high density data that required a downsample. However this >> downsample model is very flexible based on the client device and user >> interaction. So it will be wasteful to precompute and store to db. >> >> So we want to use Apache Flink to do downsampling and cache the result >> for subsequent query. >> >> We are considering using Flink Operator state for that one. >> >> Is that the right approach to use that for memory cache ? Or if that >> preferable using memory cache like redis etc. >> >> Any comments will be appreciated. >> >> >> Cheers >> -- >> Welly Tambunan >> Triplelands >> >> http://weltam.wordpress.com >> http://www.triplelands.com <http://www.triplelands.com/blog/> >> > > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Apache Flink Operator State as Query Cache
Hi All, We have a high density data that required a downsample. However this downsample model is very flexible based on the client device and user interaction. So it will be wasteful to precompute and store to db. So we want to use Apache Flink to do downsampling and cache the result for subsequent query. We are considering using Flink Operator state for that one. Is that the right approach to use that for memory cache ? Or if that preferable using memory cache like redis etc. Any comments will be appreciated. Cheers -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Question on flink and hdfs
Hi Jerry, yes, that's possible. You can download the appropriate version https://flink.apache.org/downloads.html [image: Inline image 1] Cheers On Fri, Sep 4, 2015 at 1:57 AM, Jerry Peng wrote: > Hello, > > Does flink require hdfs to run? I know you can use hdfs to checkpoint and > process files in a distributed fashion. So can flink run standalone > without hdfs? > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Efficiency for Filter then Transform ( filter().map() vs flatMap() )
Hi Stephan, Thanks for your clarification. Basically we will have lots of sensor that will push this kind of data to queuing system ( currently we are using RabbitMQ, but will soon move to Kafka). We also will use the same pipeline to process the historical data. I also want to minimize the chaining as in the filter is doing very little work. By minimizing the pipeline we can minimize db/external source hit and cached local data efficiently. Cheers On Fri, Sep 4, 2015 at 2:58 PM, Welly Tambunan wrote: > Hi Stephan, > > Cheers > > On Fri, Sep 4, 2015 at 2:31 PM, Stephan Ewen wrote: > >> We will definitely also try to get the chaining overhead down a bit. >> >> BTW: To reach this kind of throughput, you need sources that can produce >> very fast... >> >> On Fri, Sep 4, 2015 at 12:20 AM, Welly Tambunan >> wrote: >> >>> Hi Stephan, >>> >>> That's good information to know. We will hit that throughput easily. Our >>> computation graph has lot of chaining like this right now. >>> I think it's safe to minimize the chain right now. >>> >>> Thanks a lot for this Stephan. >>> >>> Cheers >>> >>> On Thu, Sep 3, 2015 at 7:20 PM, Stephan Ewen wrote: >>> >>>> In a set of benchmarks a while back, we found that the chaining >>>> mechanism has some overhead right now, because of its abstraction. The >>>> abstraction creates iterators for each element and makes it hard for the >>>> JIT to specialize on the operators in the chain. >>>> >>>> For purely local chains at full speed, this overhead is observable (can >>>> decrease throughput from 25mio elements/core to 15-20mio elements per >>>> core). If your job does not reach that throughput, or is I/O bound, source >>>> bound, etc, it does not matter. >>>> >>>> If you care about super high performance, collapsing the code into one >>>> function helps. >>>> >>>> On Thu, Sep 3, 2015 at 5:59 AM, Welly Tambunan >>>> wrote: >>>> >>>>> Hi Gyula, >>>>> >>>>> Thanks for your response. Seems i will use filter and map for now as >>>>> that one is really make the intention clear, and not a big performance >>>>> hit. >>>>> >>>>> Thanks again. >>>>> >>>>> Cheers >>>>> >>>>> On Thu, Sep 3, 2015 at 10:29 AM, Gyula Fóra >>>>> wrote: >>>>> >>>>>> Hey Welly, >>>>>> >>>>>> If you call filter and map one after the other like you mentioned, >>>>>> these operators will be chained and executed as if they were running in >>>>>> the >>>>>> same operator. >>>>>> The only small performance overhead comes from the fact that the >>>>>> output of the filter will be copied before passing it as input to the map >>>>>> to keep immutability guarantees (but no serialization/deserialization >>>>>> will >>>>>> happen). Copying might be practically free depending on your data type, >>>>>> though. >>>>>> >>>>>> If you are using operators that don't make use of the immutability of >>>>>> inputs/outputs (i.e you don't hold references to those values) than you >>>>>> can >>>>>> disable copying altogether by calling >>>>>> env.getConfig().enableObjectReuse(), >>>>>> in which case they will have exactly the same performance. >>>>>> >>>>>> Cheers, >>>>>> Gyula >>>>>> >>>>>> Welly Tambunan ezt írta (időpont: 2015. szept. >>>>>> 3., Cs, 4:33): >>>>>> >>>>>>> Hi All, >>>>>>> >>>>>>> I would like to filter some item from the event stream. I think >>>>>>> there are two ways doing this. >>>>>>> >>>>>>> Using the regular pipeline filter(...).map(...). We can also use >>>>>>> flatMap for doing both in the same operator. >>>>>>> >>>>>>> Any performance improvement if we are using flatMap ? As that will >>>>>>> be done in one operator instance. >>>>>>> >>>>>>> >>>>>>> Cheers >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Welly Tambunan >>>>>>> Triplelands >>>>>>> >>>>>>> http://weltam.wordpress.com >>>>>>> http://www.triplelands.com <http://www.triplelands.com/blog/> >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Welly Tambunan >>>>> Triplelands >>>>> >>>>> http://weltam.wordpress.com >>>>> http://www.triplelands.com <http://www.triplelands.com/blog/> >>>>> >>>> >>>> >>> >>> >>> -- >>> Welly Tambunan >>> Triplelands >>> >>> http://weltam.wordpress.com >>> http://www.triplelands.com <http://www.triplelands.com/blog/> >>> >> >> > > > -- > Welly Tambunan > Triplelands > > http://weltam.wordpress.com > http://www.triplelands.com <http://www.triplelands.com/blog/> > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Efficiency for Filter then Transform ( filter().map() vs flatMap() )
Hi Stephan, Cheers On Fri, Sep 4, 2015 at 2:31 PM, Stephan Ewen wrote: > We will definitely also try to get the chaining overhead down a bit. > > BTW: To reach this kind of throughput, you need sources that can produce > very fast... > > On Fri, Sep 4, 2015 at 12:20 AM, Welly Tambunan wrote: > >> Hi Stephan, >> >> That's good information to know. We will hit that throughput easily. Our >> computation graph has lot of chaining like this right now. >> I think it's safe to minimize the chain right now. >> >> Thanks a lot for this Stephan. >> >> Cheers >> >> On Thu, Sep 3, 2015 at 7:20 PM, Stephan Ewen wrote: >> >>> In a set of benchmarks a while back, we found that the chaining >>> mechanism has some overhead right now, because of its abstraction. The >>> abstraction creates iterators for each element and makes it hard for the >>> JIT to specialize on the operators in the chain. >>> >>> For purely local chains at full speed, this overhead is observable (can >>> decrease throughput from 25mio elements/core to 15-20mio elements per >>> core). If your job does not reach that throughput, or is I/O bound, source >>> bound, etc, it does not matter. >>> >>> If you care about super high performance, collapsing the code into one >>> function helps. >>> >>> On Thu, Sep 3, 2015 at 5:59 AM, Welly Tambunan >>> wrote: >>> >>>> Hi Gyula, >>>> >>>> Thanks for your response. Seems i will use filter and map for now as >>>> that one is really make the intention clear, and not a big performance hit. >>>> >>>> Thanks again. >>>> >>>> Cheers >>>> >>>> On Thu, Sep 3, 2015 at 10:29 AM, Gyula Fóra >>>> wrote: >>>> >>>>> Hey Welly, >>>>> >>>>> If you call filter and map one after the other like you mentioned, >>>>> these operators will be chained and executed as if they were running in >>>>> the >>>>> same operator. >>>>> The only small performance overhead comes from the fact that the >>>>> output of the filter will be copied before passing it as input to the map >>>>> to keep immutability guarantees (but no serialization/deserialization will >>>>> happen). Copying might be practically free depending on your data type, >>>>> though. >>>>> >>>>> If you are using operators that don't make use of the immutability of >>>>> inputs/outputs (i.e you don't hold references to those values) than you >>>>> can >>>>> disable copying altogether by calling env.getConfig().enableObjectReuse(), >>>>> in which case they will have exactly the same performance. >>>>> >>>>> Cheers, >>>>> Gyula >>>>> >>>>> Welly Tambunan ezt írta (időpont: 2015. szept. >>>>> 3., Cs, 4:33): >>>>> >>>>>> Hi All, >>>>>> >>>>>> I would like to filter some item from the event stream. I think there >>>>>> are two ways doing this. >>>>>> >>>>>> Using the regular pipeline filter(...).map(...). We can also use >>>>>> flatMap for doing both in the same operator. >>>>>> >>>>>> Any performance improvement if we are using flatMap ? As that will be >>>>>> done in one operator instance. >>>>>> >>>>>> >>>>>> Cheers >>>>>> >>>>>> >>>>>> -- >>>>>> Welly Tambunan >>>>>> Triplelands >>>>>> >>>>>> http://weltam.wordpress.com >>>>>> http://www.triplelands.com <http://www.triplelands.com/blog/> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Welly Tambunan >>>> Triplelands >>>> >>>> http://weltam.wordpress.com >>>> http://www.triplelands.com <http://www.triplelands.com/blog/> >>>> >>> >>> >> >> >> -- >> Welly Tambunan >> Triplelands >> >> http://weltam.wordpress.com >> http://www.triplelands.com <http://www.triplelands.com/blog/> >> > > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Efficiency for Filter then Transform ( filter().map() vs flatMap() )
Hi Stephan, That's good information to know. We will hit that throughput easily. Our computation graph has lot of chaining like this right now. I think it's safe to minimize the chain right now. Thanks a lot for this Stephan. Cheers On Thu, Sep 3, 2015 at 7:20 PM, Stephan Ewen wrote: > In a set of benchmarks a while back, we found that the chaining mechanism > has some overhead right now, because of its abstraction. The abstraction > creates iterators for each element and makes it hard for the JIT to > specialize on the operators in the chain. > > For purely local chains at full speed, this overhead is observable (can > decrease throughput from 25mio elements/core to 15-20mio elements per > core). If your job does not reach that throughput, or is I/O bound, source > bound, etc, it does not matter. > > If you care about super high performance, collapsing the code into one > function helps. > > On Thu, Sep 3, 2015 at 5:59 AM, Welly Tambunan wrote: > >> Hi Gyula, >> >> Thanks for your response. Seems i will use filter and map for now as that >> one is really make the intention clear, and not a big performance hit. >> >> Thanks again. >> >> Cheers >> >> On Thu, Sep 3, 2015 at 10:29 AM, Gyula Fóra wrote: >> >>> Hey Welly, >>> >>> If you call filter and map one after the other like you mentioned, these >>> operators will be chained and executed as if they were running in the same >>> operator. >>> The only small performance overhead comes from the fact that the output >>> of the filter will be copied before passing it as input to the map to keep >>> immutability guarantees (but no serialization/deserialization will happen). >>> Copying might be practically free depending on your data type, though. >>> >>> If you are using operators that don't make use of the immutability of >>> inputs/outputs (i.e you don't hold references to those values) than you can >>> disable copying altogether by calling env.getConfig().enableObjectReuse(), >>> in which case they will have exactly the same performance. >>> >>> Cheers, >>> Gyula >>> >>> Welly Tambunan ezt írta (időpont: 2015. szept. 3., >>> Cs, 4:33): >>> >>>> Hi All, >>>> >>>> I would like to filter some item from the event stream. I think there >>>> are two ways doing this. >>>> >>>> Using the regular pipeline filter(...).map(...). We can also use >>>> flatMap for doing both in the same operator. >>>> >>>> Any performance improvement if we are using flatMap ? As that will be >>>> done in one operator instance. >>>> >>>> >>>> Cheers >>>> >>>> >>>> -- >>>> Welly Tambunan >>>> Triplelands >>>> >>>> http://weltam.wordpress.com >>>> http://www.triplelands.com <http://www.triplelands.com/blog/> >>>> >>> >> >> >> -- >> Welly Tambunan >> Triplelands >> >> http://weltam.wordpress.com >> http://www.triplelands.com <http://www.triplelands.com/blog/> >> > > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Efficiency for Filter then Transform ( filter().map() vs flatMap() )
Hi Gyula, Thanks for your response. Seems i will use filter and map for now as that one is really make the intention clear, and not a big performance hit. Thanks again. Cheers On Thu, Sep 3, 2015 at 10:29 AM, Gyula Fóra wrote: > Hey Welly, > > If you call filter and map one after the other like you mentioned, these > operators will be chained and executed as if they were running in the same > operator. > The only small performance overhead comes from the fact that the output of > the filter will be copied before passing it as input to the map to keep > immutability guarantees (but no serialization/deserialization will happen). > Copying might be practically free depending on your data type, though. > > If you are using operators that don't make use of the immutability of > inputs/outputs (i.e you don't hold references to those values) than you can > disable copying altogether by calling env.getConfig().enableObjectReuse(), > in which case they will have exactly the same performance. > > Cheers, > Gyula > > Welly Tambunan ezt írta (időpont: 2015. szept. 3., > Cs, 4:33): > >> Hi All, >> >> I would like to filter some item from the event stream. I think there are >> two ways doing this. >> >> Using the regular pipeline filter(...).map(...). We can also use flatMap >> for doing both in the same operator. >> >> Any performance improvement if we are using flatMap ? As that will be >> done in one operator instance. >> >> >> Cheers >> >> >> -- >> Welly Tambunan >> Triplelands >> >> http://weltam.wordpress.com >> http://www.triplelands.com <http://www.triplelands.com/blog/> >> > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Efficiency for Filter then Transform ( filter().map() vs flatMap() )
Hi All, I would like to filter some item from the event stream. I think there are two ways doing this. Using the regular pipeline filter(...).map(...). We can also use flatMap for doing both in the same operator. Any performance improvement if we are using flatMap ? As that will be done in one operator instance. Cheers -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Keep Model in Operator instance up to date
Hi Gyula, Thanks a lot. That's really help a lot ! Have a great vacation Cheers On Fri, Aug 21, 2015 at 7:47 PM, Gyula Fóra wrote: > Hi > > You are right, if all operators need continuous updates than the most > straightforward way is to push all the updates to the operators like you > described. > > If the cached data is the same for all operators and is small enough you > can centralize the updates in a dedicated operator and push the updated > data to the operators every once in a while. > > Cheers > Gyula > > > > On Thu, Aug 20, 2015 at 4:31 PM Welly Tambunan wrote: > >> Hi Gyula, >> >> I have a couple of operator on the pipeline. Filter, mapper, flatMap, and >> each of these operator contains some cache data. >> >> So i think that means for every other operator on the pipeline, i will >> need to add a new stream to update each cache data. >> >> >> Cheers >> >> On Thu, Aug 20, 2015 at 5:33 PM, Gyula Fóra wrote: >> >>> Hi, >>> >>> I don't think I fully understand your question, could you please try to >>> be a little more specific? >>> >>> I assume by caching you mean that you keep the current model as an >>> operator state. Why would you need to add new streams in this case? >>> >>> I might be slow to answer as I am currently on vacation without stable >>> internet connection. >>> >>> Cheers, >>> Gyula >>> >>> On Thu, Aug 20, 2015 at 5:36 AM Welly Tambunan >>> wrote: >>> >>>> Hi Gyula, >>>> >>>> I have another question. So if i cache something on the operator, to >>>> keep it up to date, i will always need to add and connect another stream >>>> of changes to the operator ? >>>> >>>> Is this right for every case ? >>>> >>>> Cheers >>>> >>>> On Wed, Aug 19, 2015 at 3:21 PM, Welly Tambunan >>>> wrote: >>>> >>>>> Hi Gyula, >>>>> >>>>> That's really helpful. The docs is improving so much since the last >>>>> time (0.9). >>>>> >>>>> Thanks a lot ! >>>>> >>>>> Cheers >>>>> >>>>> On Wed, Aug 19, 2015 at 3:07 PM, Gyula Fóra >>>>> wrote: >>>>> >>>>>> Hey, >>>>>> >>>>>> If it is always better to check the events against a more up-to-date >>>>>> model (even if the events we are checking arrived before the update) then >>>>>> it is fine to keep the model outside of the system. >>>>>> >>>>>> In this case we need to make sure that we can push the updates to the >>>>>> external system consistently. If you are using the PersistentKafkaSource >>>>>> for instance it can happen that some messages are replayed in case of >>>>>> failure. In this case you need to make sure that you remove duplicate >>>>>> updates or have idempotent updates. >>>>>> >>>>>> You can read about the checkpoint mechanism in the Flink website: >>>>>> https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html >>>>>> >>>>>> Cheers, >>>>>> Gyula >>>>>> >>>>>> On Wed, Aug 19, 2015 at 9:56 AM Welly Tambunan >>>>>> wrote: >>>>>> >>>>>>> Thanks Gyula, >>>>>>> >>>>>>> Another question i have.. >>>>>>> >>>>>>> > ... while external model updates would be *tricky *to keep >>>>>>> consistent. >>>>>>> Is that still the case if the Operator treat the external model as >>>>>>> read-only ? We create another stream that will update the external model >>>>>>> separately. >>>>>>> >>>>>>> Could you please elaborate more about this one ? >>>>>>> >>>>>>> Cheers >>>>>>> >>>>>>> On Wed, Aug 19, 2015 at 2:52 PM, Gyula Fóra >>>>>>> wrote: >>>>>>> >>>>>>>> In that case I would apply a map to wrap in some common type, like >>>>>>>> a n Either before the union. >>>>>>>> >>>>>>
Re: Keep Model in Operator instance up to date
Hi Gyula, I have a couple of operator on the pipeline. Filter, mapper, flatMap, and each of these operator contains some cache data. So i think that means for every other operator on the pipeline, i will need to add a new stream to update each cache data. Cheers On Thu, Aug 20, 2015 at 5:33 PM, Gyula Fóra wrote: > Hi, > > I don't think I fully understand your question, could you please try to be > a little more specific? > > I assume by caching you mean that you keep the current model as an > operator state. Why would you need to add new streams in this case? > > I might be slow to answer as I am currently on vacation without stable > internet connection. > > Cheers, > Gyula > > On Thu, Aug 20, 2015 at 5:36 AM Welly Tambunan wrote: > >> Hi Gyula, >> >> I have another question. So if i cache something on the operator, to keep >> it up to date, i will always need to add and connect another stream of >> changes to the operator ? >> >> Is this right for every case ? >> >> Cheers >> >> On Wed, Aug 19, 2015 at 3:21 PM, Welly Tambunan >> wrote: >> >>> Hi Gyula, >>> >>> That's really helpful. The docs is improving so much since the last time >>> (0.9). >>> >>> Thanks a lot ! >>> >>> Cheers >>> >>> On Wed, Aug 19, 2015 at 3:07 PM, Gyula Fóra >>> wrote: >>> >>>> Hey, >>>> >>>> If it is always better to check the events against a more up-to-date >>>> model (even if the events we are checking arrived before the update) then >>>> it is fine to keep the model outside of the system. >>>> >>>> In this case we need to make sure that we can push the updates to the >>>> external system consistently. If you are using the PersistentKafkaSource >>>> for instance it can happen that some messages are replayed in case of >>>> failure. In this case you need to make sure that you remove duplicate >>>> updates or have idempotent updates. >>>> >>>> You can read about the checkpoint mechanism in the Flink website: >>>> https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html >>>> >>>> Cheers, >>>> Gyula >>>> >>>> On Wed, Aug 19, 2015 at 9:56 AM Welly Tambunan >>>> wrote: >>>> >>>>> Thanks Gyula, >>>>> >>>>> Another question i have.. >>>>> >>>>> > ... while external model updates would be *tricky *to keep >>>>> consistent. >>>>> Is that still the case if the Operator treat the external model as >>>>> read-only ? We create another stream that will update the external model >>>>> separately. >>>>> >>>>> Could you please elaborate more about this one ? >>>>> >>>>> Cheers >>>>> >>>>> On Wed, Aug 19, 2015 at 2:52 PM, Gyula Fóra >>>>> wrote: >>>>> >>>>>> In that case I would apply a map to wrap in some common type, like a >>>>>> n Either before the union. >>>>>> >>>>>> And then in the coflatmap you can unwrap it. >>>>>> On Wed, Aug 19, 2015 at 9:50 AM Welly Tambunan >>>>>> wrote: >>>>>> >>>>>>> Hi Gyula, >>>>>>> >>>>>>> Thanks. >>>>>>> >>>>>>> However update1 and update2 have a different type. Based on my >>>>>>> understanding, i don't think we can use union. How can we handle this >>>>>>> one ? >>>>>>> >>>>>>> We like to create our event strongly type to get the domain language >>>>>>> captured. >>>>>>> >>>>>>> >>>>>>> Cheers >>>>>>> >>>>>>> On Wed, Aug 19, 2015 at 2:37 PM, Gyula Fóra >>>>>>> wrote: >>>>>>> >>>>>>>> Hey, >>>>>>>> >>>>>>>> One input of your co-flatmap would be model updates and the other >>>>>>>> input would be events to check against the model if I understand >>>>>>>> correctly. >>>>>>>> >>>>>>>> This means that if your model updates come from more than one >>>>>>&
Re: Keep Model in Operator instance up to date
Hi Gyula, I have another question. So if i cache something on the operator, to keep it up to date, i will always need to add and connect another stream of changes to the operator ? Is this right for every case ? Cheers On Wed, Aug 19, 2015 at 3:21 PM, Welly Tambunan wrote: > Hi Gyula, > > That's really helpful. The docs is improving so much since the last time > (0.9). > > Thanks a lot ! > > Cheers > > On Wed, Aug 19, 2015 at 3:07 PM, Gyula Fóra wrote: > >> Hey, >> >> If it is always better to check the events against a more up-to-date >> model (even if the events we are checking arrived before the update) then >> it is fine to keep the model outside of the system. >> >> In this case we need to make sure that we can push the updates to the >> external system consistently. If you are using the PersistentKafkaSource >> for instance it can happen that some messages are replayed in case of >> failure. In this case you need to make sure that you remove duplicate >> updates or have idempotent updates. >> >> You can read about the checkpoint mechanism in the Flink website: >> https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html >> >> Cheers, >> Gyula >> >> On Wed, Aug 19, 2015 at 9:56 AM Welly Tambunan wrote: >> >>> Thanks Gyula, >>> >>> Another question i have.. >>> >>> > ... while external model updates would be *tricky *to keep consistent. >>> >>> Is that still the case if the Operator treat the external model as >>> read-only ? We create another stream that will update the external model >>> separately. >>> >>> Could you please elaborate more about this one ? >>> >>> Cheers >>> >>> On Wed, Aug 19, 2015 at 2:52 PM, Gyula Fóra >>> wrote: >>> >>>> In that case I would apply a map to wrap in some common type, like a n >>>> Either before the union. >>>> >>>> And then in the coflatmap you can unwrap it. >>>> On Wed, Aug 19, 2015 at 9:50 AM Welly Tambunan >>>> wrote: >>>> >>>>> Hi Gyula, >>>>> >>>>> Thanks. >>>>> >>>>> However update1 and update2 have a different type. Based on my >>>>> understanding, i don't think we can use union. How can we handle this one >>>>> ? >>>>> >>>>> We like to create our event strongly type to get the domain language >>>>> captured. >>>>> >>>>> >>>>> Cheers >>>>> >>>>> On Wed, Aug 19, 2015 at 2:37 PM, Gyula Fóra >>>>> wrote: >>>>> >>>>>> Hey, >>>>>> >>>>>> One input of your co-flatmap would be model updates and the other >>>>>> input would be events to check against the model if I understand >>>>>> correctly. >>>>>> >>>>>> This means that if your model updates come from more than one stream >>>>>> you need to union them into a single stream before connecting them with >>>>>> the >>>>>> event stream and applying the coatmap. >>>>>> >>>>>> DataStream updates1 = >>>>>> DataStream updates2 = >>>>>> DataStream events = >>>>>> >>>>>> events.connect(updates1.union(updates2).broadcast()).flatMap(...) >>>>>> >>>>>> Does this answer your question? >>>>>> >>>>>> Gyula >>>>>> >>>>>> >>>>>> On Wednesday, August 19, 2015, Welly Tambunan >>>>>> wrote: >>>>>> >>>>>>> Hi Gyula, >>>>>>> >>>>>>> Thanks for your response. >>>>>>> >>>>>>> However the model can received multiple event for update. How can we >>>>>>> do that with co-flatmap as i can see the connect API only received >>>>>>> single >>>>>>> datastream ? >>>>>>> >>>>>>> >>>>>>> > ... while external model updates would be tricky to keep >>>>>>> consistent. >>>>>>> Is that still the case if the Operator treat the external model as >>>>>>> read-only ? We create another stream that
Re: Keep Model in Operator instance up to date
Hi Gyula, That's really helpful. The docs is improving so much since the last time (0.9). Thanks a lot ! Cheers On Wed, Aug 19, 2015 at 3:07 PM, Gyula Fóra wrote: > Hey, > > If it is always better to check the events against a more up-to-date model > (even if the events we are checking arrived before the update) then it is > fine to keep the model outside of the system. > > In this case we need to make sure that we can push the updates to the > external system consistently. If you are using the PersistentKafkaSource > for instance it can happen that some messages are replayed in case of > failure. In this case you need to make sure that you remove duplicate > updates or have idempotent updates. > > You can read about the checkpoint mechanism in the Flink website: > https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html > > Cheers, > Gyula > > On Wed, Aug 19, 2015 at 9:56 AM Welly Tambunan wrote: > >> Thanks Gyula, >> >> Another question i have.. >> >> > ... while external model updates would be *tricky *to keep consistent. >> Is that still the case if the Operator treat the external model as >> read-only ? We create another stream that will update the external model >> separately. >> >> Could you please elaborate more about this one ? >> >> Cheers >> >> On Wed, Aug 19, 2015 at 2:52 PM, Gyula Fóra wrote: >> >>> In that case I would apply a map to wrap in some common type, like a n >>> Either before the union. >>> >>> And then in the coflatmap you can unwrap it. >>> On Wed, Aug 19, 2015 at 9:50 AM Welly Tambunan >>> wrote: >>> >>>> Hi Gyula, >>>> >>>> Thanks. >>>> >>>> However update1 and update2 have a different type. Based on my >>>> understanding, i don't think we can use union. How can we handle this one ? >>>> >>>> We like to create our event strongly type to get the domain language >>>> captured. >>>> >>>> >>>> Cheers >>>> >>>> On Wed, Aug 19, 2015 at 2:37 PM, Gyula Fóra >>>> wrote: >>>> >>>>> Hey, >>>>> >>>>> One input of your co-flatmap would be model updates and the other >>>>> input would be events to check against the model if I understand >>>>> correctly. >>>>> >>>>> This means that if your model updates come from more than one stream >>>>> you need to union them into a single stream before connecting them with >>>>> the >>>>> event stream and applying the coatmap. >>>>> >>>>> DataStream updates1 = >>>>> DataStream updates2 = >>>>> DataStream events = >>>>> >>>>> events.connect(updates1.union(updates2).broadcast()).flatMap(...) >>>>> >>>>> Does this answer your question? >>>>> >>>>> Gyula >>>>> >>>>> >>>>> On Wednesday, August 19, 2015, Welly Tambunan >>>>> wrote: >>>>> >>>>>> Hi Gyula, >>>>>> >>>>>> Thanks for your response. >>>>>> >>>>>> However the model can received multiple event for update. How can we >>>>>> do that with co-flatmap as i can see the connect API only received single >>>>>> datastream ? >>>>>> >>>>>> >>>>>> > ... while external model updates would be tricky to keep >>>>>> consistent. >>>>>> Is that still the case if the Operator treat the external model as >>>>>> read-only ? We create another stream that will update the external model >>>>>> separately. >>>>>> >>>>>> Cheers >>>>>> >>>>>> On Wed, Aug 19, 2015 at 2:05 PM, Gyula Fóra >>>>>> wrote: >>>>>> >>>>>>> Hey! >>>>>>> >>>>>>> I think it is safe to say that the best approach in this case is >>>>>>> creating a co-flatmap that will receive updates on one input. The events >>>>>>> should probably be broadcasted in this case so you can check in >>>>>>> parallel. >>>>>>> >>>>>>> This approach can be used effectively with Flink's checkpoint >>>>>
Re: Keep Model in Operator instance up to date
Thanks Gyula, Another question i have.. > ... while external model updates would be *tricky *to keep consistent. Is that still the case if the Operator treat the external model as read-only ? We create another stream that will update the external model separately. Could you please elaborate more about this one ? Cheers On Wed, Aug 19, 2015 at 2:52 PM, Gyula Fóra wrote: > In that case I would apply a map to wrap in some common type, like a n > Either before the union. > > And then in the coflatmap you can unwrap it. > On Wed, Aug 19, 2015 at 9:50 AM Welly Tambunan wrote: > >> Hi Gyula, >> >> Thanks. >> >> However update1 and update2 have a different type. Based on my >> understanding, i don't think we can use union. How can we handle this one ? >> >> We like to create our event strongly type to get the domain language >> captured. >> >> >> Cheers >> >> On Wed, Aug 19, 2015 at 2:37 PM, Gyula Fóra wrote: >> >>> Hey, >>> >>> One input of your co-flatmap would be model updates and the other input >>> would be events to check against the model if I understand correctly. >>> >>> This means that if your model updates come from more than one stream you >>> need to union them into a single stream before connecting them with the >>> event stream and applying the coatmap. >>> >>> DataStream updates1 = >>> DataStream updates2 = >>> DataStream events = >>> >>> events.connect(updates1.union(updates2).broadcast()).flatMap(...) >>> >>> Does this answer your question? >>> >>> Gyula >>> >>> >>> On Wednesday, August 19, 2015, Welly Tambunan wrote: >>> >>>> Hi Gyula, >>>> >>>> Thanks for your response. >>>> >>>> However the model can received multiple event for update. How can we do >>>> that with co-flatmap as i can see the connect API only received single >>>> datastream ? >>>> >>>> >>>> > ... while external model updates would be tricky to keep consistent. >>>> Is that still the case if the Operator treat the external model as >>>> read-only ? We create another stream that will update the external model >>>> separately. >>>> >>>> Cheers >>>> >>>> On Wed, Aug 19, 2015 at 2:05 PM, Gyula Fóra wrote: >>>> >>>>> Hey! >>>>> >>>>> I think it is safe to say that the best approach in this case is >>>>> creating a co-flatmap that will receive updates on one input. The events >>>>> should probably be broadcasted in this case so you can check in parallel. >>>>> >>>>> This approach can be used effectively with Flink's checkpoint >>>>> mechanism, while external model updates would be tricky to keep >>>>> consistent. >>>>> >>>>> Cheers, >>>>> Gyula >>>>> >>>>> >>>>> >>>>> >>>>> On Wed, Aug 19, 2015 at 8:44 AM Welly Tambunan >>>>> wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> We have a streaming computation that required to validate the data >>>>>> stream against the model provided by the user. >>>>>> >>>>>> Right now what I have done is to load the model into flink operator >>>>>> and then validate against it. However the model can be updated and >>>>>> changed >>>>>> frequently. Fortunately we always publish this event to RabbitMQ. >>>>>> >>>>>> I think we can >>>>>> >>>>>> >>>>>>1. Create RabbitMq listener for model changed event from inside >>>>>>the operator, then update the model if event arrived. >>>>>> >>>>>>But i think this will create race condition if not handle >>>>>>correctly and it seems odd to keep this >>>>>> >>>>>>2. We can move the model into external in external memory cache >>>>>>storage and keep the model up to date using flink. So the operator >>>>>> will >>>>>>retrieve that from memory cache >>>>>> >>>>>>3. Create two stream and using co operator for managing the >>>>>>shared state. >>>>>> >>>>>> >>>>>> What is your suggestion on keeping the state up to date from external >>>>>> event ? Is there some kind of best practice for maintaining model up to >>>>>> date on streaming operator ? >>>>>> >>>>>> Thanks a lot >>>>>> >>>>>> >>>>>> Cheers >>>>>> >>>>>> >>>>>> -- >>>>>> Welly Tambunan >>>>>> Triplelands >>>>>> >>>>>> http://weltam.wordpress.com >>>>>> http://www.triplelands.com <http://www.triplelands.com/blog/> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Welly Tambunan >>>> Triplelands >>>> >>>> http://weltam.wordpress.com >>>> http://www.triplelands.com <http://www.triplelands.com/blog/> >>>> >>> >> >> >> -- >> Welly Tambunan >> Triplelands >> >> http://weltam.wordpress.com >> http://www.triplelands.com <http://www.triplelands.com/blog/> >> > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Keep Model in Operator instance up to date
Hi Gyula, Thanks. However update1 and update2 have a different type. Based on my understanding, i don't think we can use union. How can we handle this one ? We like to create our event strongly type to get the domain language captured. Cheers On Wed, Aug 19, 2015 at 2:37 PM, Gyula Fóra wrote: > Hey, > > One input of your co-flatmap would be model updates and the other input > would be events to check against the model if I understand correctly. > > This means that if your model updates come from more than one stream you > need to union them into a single stream before connecting them with the > event stream and applying the coatmap. > > DataStream updates1 = > DataStream updates2 = > DataStream events = > > events.connect(updates1.union(updates2).broadcast()).flatMap(...) > > Does this answer your question? > > Gyula > > > On Wednesday, August 19, 2015, Welly Tambunan wrote: > >> Hi Gyula, >> >> Thanks for your response. >> >> However the model can received multiple event for update. How can we do >> that with co-flatmap as i can see the connect API only received single >> datastream ? >> >> >> > ... while external model updates would be tricky to keep consistent. >> Is that still the case if the Operator treat the external model as >> read-only ? We create another stream that will update the external model >> separately. >> >> Cheers >> >> On Wed, Aug 19, 2015 at 2:05 PM, Gyula Fóra wrote: >> >>> Hey! >>> >>> I think it is safe to say that the best approach in this case is >>> creating a co-flatmap that will receive updates on one input. The events >>> should probably be broadcasted in this case so you can check in parallel. >>> >>> This approach can be used effectively with Flink's checkpoint mechanism, >>> while external model updates would be tricky to keep consistent. >>> >>> Cheers, >>> Gyula >>> >>> >>> >>> >>> On Wed, Aug 19, 2015 at 8:44 AM Welly Tambunan >>> wrote: >>> >>>> Hi All, >>>> >>>> We have a streaming computation that required to validate the data >>>> stream against the model provided by the user. >>>> >>>> Right now what I have done is to load the model into flink operator and >>>> then validate against it. However the model can be updated and changed >>>> frequently. Fortunately we always publish this event to RabbitMQ. >>>> >>>> I think we can >>>> >>>> >>>>1. Create RabbitMq listener for model changed event from inside the >>>>operator, then update the model if event arrived. >>>> >>>>But i think this will create race condition if not handle correctly >>>>and it seems odd to keep this >>>> >>>>2. We can move the model into external in external memory cache >>>>storage and keep the model up to date using flink. So the operator will >>>>retrieve that from memory cache >>>> >>>>3. Create two stream and using co operator for managing the shared >>>>state. >>>> >>>> >>>> What is your suggestion on keeping the state up to date from external >>>> event ? Is there some kind of best practice for maintaining model up to >>>> date on streaming operator ? >>>> >>>> Thanks a lot >>>> >>>> >>>> Cheers >>>> >>>> >>>> -- >>>> Welly Tambunan >>>> Triplelands >>>> >>>> http://weltam.wordpress.com >>>> http://www.triplelands.com <http://www.triplelands.com/blog/> >>>> >>> >> >> >> -- >> Welly Tambunan >> Triplelands >> >> http://weltam.wordpress.com >> http://www.triplelands.com <http://www.triplelands.com/blog/> >> > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Keep Model in Operator instance up to date
Hi Gyula, Thanks for your response. However the model can received multiple event for update. How can we do that with co-flatmap as i can see the connect API only received single datastream ? > ... while external model updates would be tricky to keep consistent. Is that still the case if the Operator treat the external model as read-only ? We create another stream that will update the external model separately. Cheers On Wed, Aug 19, 2015 at 2:05 PM, Gyula Fóra wrote: > Hey! > > I think it is safe to say that the best approach in this case is creating > a co-flatmap that will receive updates on one input. The events should > probably be broadcasted in this case so you can check in parallel. > > This approach can be used effectively with Flink's checkpoint mechanism, > while external model updates would be tricky to keep consistent. > > Cheers, > Gyula > > > > > On Wed, Aug 19, 2015 at 8:44 AM Welly Tambunan wrote: > >> Hi All, >> >> We have a streaming computation that required to validate the data stream >> against the model provided by the user. >> >> Right now what I have done is to load the model into flink operator and >> then validate against it. However the model can be updated and changed >> frequently. Fortunately we always publish this event to RabbitMQ. >> >> I think we can >> >> >>1. Create RabbitMq listener for model changed event from inside the >>operator, then update the model if event arrived. >> >>But i think this will create race condition if not handle correctly >>and it seems odd to keep this >> >>2. We can move the model into external in external memory cache >>storage and keep the model up to date using flink. So the operator will >>retrieve that from memory cache >> >>3. Create two stream and using co operator for managing the shared >>state. >> >> >> What is your suggestion on keeping the state up to date from external >> event ? Is there some kind of best practice for maintaining model up to >> date on streaming operator ? >> >> Thanks a lot >> >> >> Cheers >> >> >> -- >> Welly Tambunan >> Triplelands >> >> http://weltam.wordpress.com >> http://www.triplelands.com <http://www.triplelands.com/blog/> >> > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Keep Model in Operator instance up to date
Hi All, We have a streaming computation that required to validate the data stream against the model provided by the user. Right now what I have done is to load the model into flink operator and then validate against it. However the model can be updated and changed frequently. Fortunately we always publish this event to RabbitMQ. I think we can 1. Create RabbitMq listener for model changed event from inside the operator, then update the model if event arrived. But i think this will create race condition if not handle correctly and it seems odd to keep this 2. We can move the model into external in external memory cache storage and keep the model up to date using flink. So the operator will retrieve that from memory cache 3. Create two stream and using co operator for managing the shared state. What is your suggestion on keeping the state up to date from external event ? Is there some kind of best practice for maintaining model up to date on streaming operator ? Thanks a lot Cheers -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Flink Streaming : PartitionBy vs GroupBy differences
Thanks Gyula Cheers On Fri, Jul 3, 2015 at 6:19 PM, Gyula Fóra wrote: > Yes, you can think of it that way. Each Operator has parallel instances > and each parallel instance receives input from multiple channels (FIFO from > each) and produces output. > > Welly Tambunan ezt írta (időpont: 2015. júl. 3., P, > 13:02): > >> Hi Gyula, >> >> Thanks a lot. That's enough for my case. >> >> I do really love Flink Streaming model compare to Spark Streaming. >> >> So is that true that i can think that Operator as an Actor model in this >> system ? Is that a right way to put it ? >> >> >> >> Cheers >> >> On Fri, Jul 3, 2015 at 5:29 PM, Gyula Fóra wrote: >> >>> Hey, >>> >>> 1. >>> Yes, if you use partitionBy the same key will always go to the same >>> downstream operator instance. >>> >>> 2. >>> There is only partial ordering guarantee, meaning that data received >>> from one input is FIFO. This means that if the same key is coming from >>> multiple inputs than there is no ordering guarantee there, only inside one >>> input. >>> >>> Gyula >>> >>> Welly Tambunan ezt írta (időpont: 2015. júl. 3., P, >>> 11:51): >>> >>>> Hi Gyula, >>>> >>>> Thanks for your response. >>>> >>>> So if i use partitionBy then data point with the same will receive >>>> exactly by the same instance of operator ? >>>> >>>> >>>> Another question is if i execute reduce() operator on after >>>> partitionBy, will that reduce operator guarantee ordering within the same >>>> key ? >>>> >>>> >>>> Cheers >>>> >>>> On Fri, Jul 3, 2015 at 4:14 PM, Gyula Fóra >>>> wrote: >>>> >>>>> Hey! >>>>> >>>>> Both groupBy and partitionBy will trigger a shuffle over the network >>>>> based on some key, assuring that elements with the same keys end up on the >>>>> same downstream processing operator. >>>>> >>>>> The difference between the two is that groupBy in addition to this >>>>> returns a GroupedDataStream which lets you execute some special >>>>> operations, >>>>> such as key based rolling aggregates. >>>>> >>>>> PartitionBy is useful when you are using simple operators but still >>>>> want to control the messages received by parallel instances (in a mapper >>>>> for example). >>>>> >>>>> Cheers, >>>>> Gyula >>>>> >>>>> tambunanw ezt írta (időpont: 2015. júl. 3., P, >>>>> 10:32): >>>>> >>>>>> Hi All, >>>>>> >>>>>> I'm trying to digest what's the difference between this two. From my >>>>>> experience in Spark GroupBy will cause shuffling on the network. Is >>>>>> that the >>>>>> same case in Flink ? >>>>>> >>>>>> I've watch videos and read a couple docs about Flink that's actually >>>>>> Flink >>>>>> will compile the user code into it's own optimized graph structure so >>>>>> i >>>>>> think Flink engine will take care of this one ? >>>>>> >>>>>> From the docs for Partitioning >>>>>> >>>>>> >>>>>> http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#partitioning >>>>>> >>>>>> Is that true that GroupBy is more advanced than PartitionBy ? Can >>>>>> someone >>>>>> elaborate ? >>>>>> >>>>>> I think this one is really confusing for me that come from Spark >>>>>> world. Any >>>>>> help would be really appreciated. >>>>>> >>>>>> Cheers >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> View this message in context: >>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Streaming-PartitionBy-vs-GroupBy-differences-tp1927.html >>>>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>>>> archive at Nabble.com. >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Welly Tambunan >>>> Triplelands >>>> >>>> http://weltam.wordpress.com >>>> http://www.triplelands.com <http://www.triplelands.com/blog/> >>>> >>> >> >> >> -- >> Welly Tambunan >> Triplelands >> >> http://weltam.wordpress.com >> http://www.triplelands.com <http://www.triplelands.com/blog/> >> > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Flink Streaming : PartitionBy vs GroupBy differences
Hi Gyula, Thanks a lot. That's enough for my case. I do really love Flink Streaming model compare to Spark Streaming. So is that true that i can think that Operator as an Actor model in this system ? Is that a right way to put it ? Cheers On Fri, Jul 3, 2015 at 5:29 PM, Gyula Fóra wrote: > Hey, > > 1. > Yes, if you use partitionBy the same key will always go to the same > downstream operator instance. > > 2. > There is only partial ordering guarantee, meaning that data received from > one input is FIFO. This means that if the same key is coming from multiple > inputs than there is no ordering guarantee there, only inside one input. > > Gyula > > Welly Tambunan ezt írta (időpont: 2015. júl. 3., P, > 11:51): > >> Hi Gyula, >> >> Thanks for your response. >> >> So if i use partitionBy then data point with the same will receive >> exactly by the same instance of operator ? >> >> >> Another question is if i execute reduce() operator on after partitionBy, >> will that reduce operator guarantee ordering within the same key ? >> >> >> Cheers >> >> On Fri, Jul 3, 2015 at 4:14 PM, Gyula Fóra wrote: >> >>> Hey! >>> >>> Both groupBy and partitionBy will trigger a shuffle over the network >>> based on some key, assuring that elements with the same keys end up on the >>> same downstream processing operator. >>> >>> The difference between the two is that groupBy in addition to this >>> returns a GroupedDataStream which lets you execute some special operations, >>> such as key based rolling aggregates. >>> >>> PartitionBy is useful when you are using simple operators but still want >>> to control the messages received by parallel instances (in a mapper for >>> example). >>> >>> Cheers, >>> Gyula >>> >>> tambunanw ezt írta (időpont: 2015. júl. 3., P, >>> 10:32): >>> >>>> Hi All, >>>> >>>> I'm trying to digest what's the difference between this two. From my >>>> experience in Spark GroupBy will cause shuffling on the network. Is >>>> that the >>>> same case in Flink ? >>>> >>>> I've watch videos and read a couple docs about Flink that's actually >>>> Flink >>>> will compile the user code into it's own optimized graph structure so i >>>> think Flink engine will take care of this one ? >>>> >>>> From the docs for Partitioning >>>> >>>> >>>> http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#partitioning >>>> >>>> Is that true that GroupBy is more advanced than PartitionBy ? Can >>>> someone >>>> elaborate ? >>>> >>>> I think this one is really confusing for me that come from Spark world. >>>> Any >>>> help would be really appreciated. >>>> >>>> Cheers >>>> >>>> >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Streaming-PartitionBy-vs-GroupBy-differences-tp1927.html >>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>> archive at Nabble.com. >>>> >>> >> >> >> -- >> Welly Tambunan >> Triplelands >> >> http://weltam.wordpress.com >> http://www.triplelands.com <http://www.triplelands.com/blog/> >> > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Flink Streaming : PartitionBy vs GroupBy differences
Hi Gyula, Thanks for your response. So if i use partitionBy then data point with the same will receive exactly by the same instance of operator ? Another question is if i execute reduce() operator on after partitionBy, will that reduce operator guarantee ordering within the same key ? Cheers On Fri, Jul 3, 2015 at 4:14 PM, Gyula Fóra wrote: > Hey! > > Both groupBy and partitionBy will trigger a shuffle over the network based > on some key, assuring that elements with the same keys end up on the same > downstream processing operator. > > The difference between the two is that groupBy in addition to this returns > a GroupedDataStream which lets you execute some special operations, such as > key based rolling aggregates. > > PartitionBy is useful when you are using simple operators but still want > to control the messages received by parallel instances (in a mapper for > example). > > Cheers, > Gyula > > tambunanw ezt írta (időpont: 2015. júl. 3., P, 10:32): > >> Hi All, >> >> I'm trying to digest what's the difference between this two. From my >> experience in Spark GroupBy will cause shuffling on the network. Is that >> the >> same case in Flink ? >> >> I've watch videos and read a couple docs about Flink that's actually Flink >> will compile the user code into it's own optimized graph structure so i >> think Flink engine will take care of this one ? >> >> From the docs for Partitioning >> >> >> http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#partitioning >> >> Is that true that GroupBy is more advanced than PartitionBy ? Can someone >> elaborate ? >> >> I think this one is really confusing for me that come from Spark world. >> Any >> help would be really appreciated. >> >> Cheers >> >> >> >> >> >> -- >> View this message in context: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Streaming-PartitionBy-vs-GroupBy-differences-tp1927.html >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive at Nabble.com. >> > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Open method is not called with custom implementation RichWindowMapFunction
Thanks Chiwan Great Job ! Cheers On Fri, Jul 3, 2015 at 3:32 PM, Chiwan Park wrote: > I found that the patch had been merged to upstream. [1] :) > > Regards, > Chiwan Park > > [1] https://github.com/apache/flink/pull/855 > > > On Jul 3, 2015, at 5:26 PM, Welly Tambunan wrote: > > > > Thanks Chiwan, > > > > > > Glad to hear that. > > > > > > Cheers > > > > On Fri, Jul 3, 2015 at 3:24 PM, Chiwan Park > wrote: > > Hi tambunanw, > > > > The issue is already known and we’ll patch soon. [1] > > In next release (maybe 0.9.1), the problem will be solved. > > > > Regards, > > Chiwan Park > > > > [1] https://issues.apache.org/jira/browse/FLINK-2257 > > > > > On Jul 3, 2015, at 4:57 PM, tambunanw wrote: > > > > > > Hi All, > > > > > > I'm trying to create some experiment with rich windowing function and > > > operator state. I modify the streaming stock prices from > > > > > > > https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala > > > > > > I create the simple windowing function like below > > > > > > class MyWindowFunction extends RichWindowMapFunction[StockPricex, > > > StockPricex] { > > > println("created") > > > private var counter = 0 > > > > > > override def open(conf: Configuration): Unit = { > > >println("opened") > > > } > > > > > > override def mapWindow(values: Iterable[StockPricex], out: > > > Collector[StockPricex]): Unit = { > > >// if not initialized .. > > > > > >println(counter) > > >println(values) > > >counter = counter + 1 > > > > > > } > > > } > > > > > > However the open() method is not invoked when i try to run this code > on my > > > local environment > > > > > >spx > > > .groupBy(x => x.symbol) > > > .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, > > > TimeUnit.SECONDS)) > > > .mapWindow(new MyWindowFunction) > > > > > > Any thought on this one ? > > > > > > > > > Cheers > > > > > > > > > > > > -- > > > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Open-method-is-not-called-with-custom-implementation-RichWindowMapFunction-tp1924.html > > > Sent from the Apache Flink User Mailing List archive. mailing list > archive at Nabble.com. > > > > > > > > > > > > > > > > > > -- > > Welly Tambunan > > Triplelands > > > > http://weltam.wordpress.com > > http://www.triplelands.com > > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Open method is not called with custom implementation RichWindowMapFunction
Thanks Chiwan, Glad to hear that. Cheers On Fri, Jul 3, 2015 at 3:24 PM, Chiwan Park wrote: > Hi tambunanw, > > The issue is already known and we’ll patch soon. [1] > In next release (maybe 0.9.1), the problem will be solved. > > Regards, > Chiwan Park > > [1] https://issues.apache.org/jira/browse/FLINK-2257 > > > On Jul 3, 2015, at 4:57 PM, tambunanw wrote: > > > > Hi All, > > > > I'm trying to create some experiment with rich windowing function and > > operator state. I modify the streaming stock prices from > > > > > https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala > > > > I create the simple windowing function like below > > > > class MyWindowFunction extends RichWindowMapFunction[StockPricex, > > StockPricex] { > > println("created") > > private var counter = 0 > > > > override def open(conf: Configuration): Unit = { > >println("opened") > > } > > > > override def mapWindow(values: Iterable[StockPricex], out: > > Collector[StockPricex]): Unit = { > >// if not initialized .. > > > >println(counter) > >println(values) > >counter = counter + 1 > > > > } > > } > > > > However the open() method is not invoked when i try to run this code on > my > > local environment > > > >spx > > .groupBy(x => x.symbol) > > .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, > > TimeUnit.SECONDS)) > > .mapWindow(new MyWindowFunction) > > > > Any thought on this one ? > > > > > > Cheers > > > > > > > > -- > > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Open-method-is-not-called-with-custom-implementation-RichWindowMapFunction-tp1924.html > > Sent from the Apache Flink User Mailing List archive. mailing list > archive at Nabble.com. > > > > > > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Batch Processing as Streaming
Thanks Stephan, That's clear ! Cheers On Thu, Jul 2, 2015 at 6:13 PM, Stephan Ewen wrote: > Hi! > > I am actually working to get some more docs out there, there is a lack > right now, agreed. > > Concerning your questions: > > (1) Batch programs basically recover from the data sources right now. > Checkpointing as in the streaming case does not happen for batch programs. > We have branches that materialize the intermediate streams and apply > backtracking logic for batch programs, but they are not merged into the > master at this point. > > (2) Streaming operators and user functions are long lived. They are > started once and live to the end of the stream, or the machine failure. > > Greetings, > Stephan > > > On Thu, Jul 2, 2015 at 11:48 AM, tambunanw wrote: > >> Hi All, >> >> I see that the way batch processing works in Flink is quite different with >> Spark. It's all about using streaming engine in Flink. >> >> I have a couple of question >> >> 1. Is there any support on Checkpointing on batch processing also ? Or >> that's only for streaming >> >> 2. I want to ask about operator lifecyle ? is that short live or long >> live ? >> Any docs where i can read about this more ? >> >> >> Cheers >> >> >> >> -- >> View this message in context: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Batch-Processing-as-Streaming-tp1909.html >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive at Nabble.com. >> > > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>
Re: Data Source from Cassandra
Hi Stephan, Thanks a lot ! I will give it a look. Cheers On Thu, Jul 2, 2015 at 6:05 PM, Stephan Ewen wrote: > Hi! > > If there is a CassandraSource for Hadoop, you can also use that with the > HadoopInputFormatWrapper. > > If you want to implement a Flink-specific source, extending InputFormat is > the right thing to do. A user has started to implement a cassandra sink in > this fork (may be able to reuse some code or testing infrastructure): > https://github.com/rzvoncek/flink/tree/zvo/cassandraSink > > Greetings, > Stephan > > > > > > On Thu, Jul 2, 2015 at 11:34 AM, tambunanw wrote: > >> Hi All, >> >> I want to if there's a custom data source available for Cassandra ? >> >> From my observation seems that we need to implement that by extending >> InputFormat. Is there any guide on how to do this robustly ? >> >> >> Cheers >> >> >> >> -- >> View this message in context: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-Source-from-Cassandra-tp1908.html >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive at Nabble.com. >> > > -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com <http://www.triplelands.com/blog/>