2016-05-09 14:56 GMT+02:00 Simone Robutti <simone.robu...@radicalbit.io>:
> >- You wrote you'd like to "instantiate a H2O's node in every task > manager". This reads a bit like you want to start H2O in the TM's JVM , but > I would assume that a H2O node runs as a separate process. So should it be > started inside the TM JVM or as an external process next to each TM. Also, > do you want to start one H2O node per TM slot or per TM? > > My idea is to run it in the same process but there may be several good > reasons not to do it, it's just the way I think of it right now. I'm > thinking about replicating the structure of Sparkling Water and for my > understanding, they run their H2O nodes in the same process. > > That should be possible by starting a thread from a MapPartition operator. To make it one H2O node per TM, you would need a synchronized singleton to avoid that each parallel task starts a new thread. > > >- You wrote you'd like to "handle the lifecycle of the node according to > the taskmanager and the execution graph". A TM can execute multiple jobs > each with its own execution graph. Do you want to start the H2O node for > each job and shut it down when the job finishes or start the H2O when the > TM is started and kill it when the TM is brought down? > > There are different trade-offs for both choices. I assume that there's > nothing inside H2O that should be shared between different jobs for most > use cases so it should follow the job's lifecycle. In the previous mail > this was ambigous, my bad. > The approach with two MapPartition operators at the beginning and end of the H2O section might work then. > >- "move the data locally from Flink to H2O", do you mean host local or > JVM local? I think it should not be hard to keep the data host local. > > JVM local. This is clearly not an issue flink-side but may be an issue on > H2O's side. It's one of the many issues we will tackle as soon as we will > talk with them (I hope soon). > > >- The Table API which you are referring to in your example is built on > top of the DataSet and DataStream APIs. I think it should be possible to > add another API similar to the Table API. You should be aware that the > Table API is currently quite actively developed and should not be > considered to be a stable interface. So certain things might change in the > next versions. With 1.0 we stabilized the DataSet API and I would rather > put a new API on top of it than on the Table API. > > We know but we will work mostly with the data abstractions of the Table > API and not the operations. We take the risk to rework it if they change in > the meantime. > > Your reply really helped: many questions helped us clear our mind on a few > points. H2O's team showed interest in working on this integration or at > least support us in the development. We are waiting for them to start a > discussion and as soon as we will have a more clear idea on how to proceed, > we will validate it with the stuff you just said. Your confidence in > Flink's operators gives up hope to achieve a clean solution. > > Thanks a lot of your time, > Simone > > Sure :-) Cheers, Fabian > 2016-05-09 12:24 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > >> Hi Simone, >> >> sorry for the delayed answer. I have a few questions regarding your >> requirements and a some ideas that might be helpful (depending on the >> requirements). >> >> 1) Starting / stopping of H2O nodes from Flink >> - You wrote you'd like to "instantiate a H2O's node in every task >> manager". This reads a bit like you want to start H2O in the TM's JVM , but >> I would assume that a H2O node runs as a separate process. So should it be >> started inside the TM JVM or as an external process next to each TM. Also, >> do you want to start one H2O node per TM slot or per TM? >> - You wrote you'd like to "handle the lifecycle of the node according to >> the taskmanager and the execution graph". A TM can execute multiple jobs >> each with its own execution graph. Do you want to start the H2O node for >> each job and shut it down when the job finishes or start the H2O when the >> TM is started and kill it when the TM is brought down? >> - "keep the H2O's nodes alive through multiple tasks" The first option >> (starting for each job) would allow to share the H2O node for all tasks of >> a job. This could be done using two MapPartition operators, the first >> Mapper is put in front of the first task requiring H2O starting an H2O >> service before the first record is forwarded and the second task is put >> after the last H2O task and shuts it down after the last element was >> forwarded. The mappers itself do nothing than forwarding elements and >> starting and stopping tasks. If you would like to share H2O nodes across >> jobs, we might need another hook to start the process. >> - "move the data locally from Flink to H2O", do you mean host local or >> JVM local? I think it should not be hard to keep the data host local. >> >> 2) "Allow the developer to code everything inside Flink". >> - The Table API which you are referring to in your example is built on >> top of the DataSet and DataStream APIs. I think it should be possible to >> add another API similar to the Table API. You should be aware that the >> Table API is currently quite actively developed and should not be >> considered to be a stable interface. So certain things might change in the >> next versions. With 1.0 we stabilized the DataSet API and I would rather >> put a new API on top of it than on the Table API. >> - Regarding the transformation in H2O structures and calling H2O >> operations, I think this might again be done in MapPartition operators. In >> general, MapPartition gives you a lot of freedom because it provides an >> iterator over all elements of a partition. So you can do things before the >> first and after the last element and group data as you like. You can use >> partitionByHash() or rebalace() to shuffle data and sortPartition to >> locally sort the data in a partition. Please note that MapPartition >> operators do not support chaining and come therefore with a certain >> serialization overhead. Whenever possible you should use a MapFunction or >> FlatMapFunction which are a bit more lightweight. >> >> Hope this helps, >> Fabian >> >> >> 2016-05-03 15:13 GMT+02:00 Simone Robutti <simone.robu...@radicalbit.io>: >> >>> I'm not sure this is the right way to do it but we were exploring all >>> the possibilities and this one is the more obvious. We also spent some time >>> to study how to do it to achieve a better understanding of Flink's >>> internals. >>> >>> What we want to do though is to integrate Flink with another distributed >>> system that builds its own nodes and coordinates through the network with >>> its own logic. This software is H2O (a Machine Learning platform) and the >>> integration consists of two big tasks: the first is to instantiate a H2O's >>> node in every task manager and handle the lifecycle of the node according >>> to the taskmanager and the execution graph. The second is to allow the >>> developer to code everything inside Flink, converting from and to H2O's >>> data structures (distributed tabular data) and triggering the execution of >>> algorithms on H2O with a uniform API. >>> >>> Here's a simple example (assuming that we will use the TableAPI): >>> >>> val env = ExecutionEnvironment.getExecutionEnvironment >>> val h2oEnv = H2OEnviroment.getEnvironment(env) >>> >>> val myData: Table = ... >>> val someOtherData: Table = ... >>> >>> val myH2OFrame = myData.select(...).toH2OFrame(h2oEnv) >>> >>> val linearRegressionModel = h2oEnv.linearRegression(myH2OFrame) >>> >>> val predictions:Table=linearRegressionModel(someOtherData) >>> >>> predictions.select(...) >>> >>> >>> A good solution should allow the system to keep the H2O's nodes alive >>> through multiple tasks and the possibility to move the data locally from >>> Flink to H2O. The latter is not achieved in H2O's integration with Spark >>> but we still hope to do it. >>> >>> That said, I'm still not sure if it is really required to implement a >>> custom runtime operator but given the complexity of the integration of two >>> distribute systems, we assumed that more control would allow more >>> flexibility and possibilities to achieve an ideal solution. >>> >>> >>> >>> >>> >>> 2016-05-03 13:29 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: >>> >>>> Hi Simone, >>>> >>>> you are right, the interfaces you extend are not considered to be >>>> public, user-facing API. >>>> Adding custom operators to the DataSet API touches many parts of the >>>> system and is not straightforward. >>>> The DataStream API has better support for custom operators. >>>> >>>> Can you explain what kind of operator you would like to add? >>>> Maybe the functionality can be achieved with the existing operators. >>>> >>>> Best, Fabian >>>> >>>> 2016-05-03 12:54 GMT+02:00 Simone Robutti <simone.robu...@radicalbit.io >>>> >: >>>> >>>>> Hello Fabian, >>>>> >>>>> we delved more moving from the input you gave us but a question >>>>> arised. We always assumed that runtime operators were open for extension >>>>> without modifying anything inside Flink but it looks like this is not the >>>>> case and the documentation assumes that the developer is working to a >>>>> contribution to Flink. So I would like to know if our understandment is >>>>> correct and custom runtime operators are not supposed to be implemented >>>>> outside of Flink. >>>>> >>>>> Thanks, >>>>> >>>>> Simone >>>>> >>>>> 2016-04-29 21:32 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: >>>>> >>>>>> Hi Simone, >>>>>> >>>>>> the GraphCreatingVisitor transforms the common operator plan into a >>>>>> representation that is translated by the optimizer. >>>>>> You have to implement an OptimizerNode and OperatorDescriptor to >>>>>> describe the operator. >>>>>> Depending on the semantics of the operator, there are a few more >>>>>> places to make the integration working like driver strategies, cost >>>>>> model, >>>>>> etc. >>>>>> >>>>>> I would recommend to have a look at previous changes that added an >>>>>> operator such as PartitionOperator, SortPartitionOperator, OuterJoin, >>>>>> etc. >>>>>> The respective commits should give you an idea which parts of the >>>>>> code need to be touched. You should find the commit IDs in the JIRA >>>>>> issues >>>>>> for these extensions. >>>>>> >>>>>> Cheers, Fabian >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> 2016-04-29 15:32 GMT+02:00 Simone Robutti < >>>>>> simone.robu...@radicalbit.io>: >>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> I'm trying to create a custom operator to explore the internals of >>>>>>> Flink. Actually the one I'm working on is rather similar to Union and >>>>>>> I'm >>>>>>> trying to mimick it for now. When I run my job though, this error arise: >>>>>>> >>>>>>> Exception in thread "main" java.lang.IllegalArgumentException: >>>>>>> Unknown operator type: MyOperator - My Operator >>>>>>> at >>>>>>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:237) >>>>>>> at >>>>>>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:82) >>>>>>> at >>>>>>> org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:279) >>>>>>> at >>>>>>> org.apache.flink.api.common.operators.GenericDataSinkBase.accept(GenericDataSinkBase.java:223) >>>>>>> at org.apache.flink.api.common.Plan.accept(Plan.java:348) >>>>>>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:454) >>>>>>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398) >>>>>>> at >>>>>>> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:213) >>>>>>> at >>>>>>> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:107) >>>>>>> at io.radicalbit.flinkh2o.Job$.main(Job.scala:50) >>>>>>> at io.radicalbit.flinkh2o.Job.main(Job.scala) >>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>> at >>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>>>>> at >>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>>>> at >>>>>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) >>>>>>> >>>>>>> I looked at the location of the error but it's not clear to me how >>>>>>> to make my operator recognizable from the optimizer. >>>>>>> >>>>>>> Thank, >>>>>>> >>>>>>> Simone >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >