>- You wrote you'd like to "instantiate a H2O's node in every task
manager". This reads a bit like you want to start H2O in the TM's JVM , but
I would assume that a H2O node runs as a separate process. So should it be
started inside the TM JVM or as an external process next to each TM. Also,
do you want to start one H2O node per TM slot or per TM?

My idea is to run it in the same process but there may be several good
reasons not to do it, it's just the way I think of it right now. I'm
thinking about replicating the structure of Sparkling Water and for my
understanding, they run their H2O nodes in the same process.

>- You wrote you'd like to "handle the lifecycle of the node according to
the taskmanager and the execution graph". A TM can execute multiple jobs
each with its own execution graph. Do you want to start the H2O node for
each job and shut it down when the job finishes or start the H2O when the
TM is started and kill it when the TM is brought down?

There are different trade-offs for both choices. I assume that there's
nothing inside H2O that should be shared between different jobs for most
use cases so it should follow the job's lifecycle. In the previous mail
this was ambigous, my bad.

>- "move the data locally from Flink to H2O", do you mean host local or JVM
local? I think it should not be hard to keep the data host local.

JVM local. This is clearly not an issue flink-side but may be an issue on
H2O's side. It's one of the many issues we will tackle as soon as we will
talk with them (I hope soon).

>- The Table API which you are referring to in your example is built on top
of the DataSet and DataStream APIs. I think it should be possible to add
another API similar to the Table API. You should be aware that the Table
API is currently quite actively developed and should not be considered to
be a stable interface. So certain things might change in the next versions.
With 1.0 we stabilized the DataSet API and I would rather put a new API on
top of it than on the Table API.

We know but we will work mostly with the data abstractions of the Table API
and not the operations. We take the risk to rework it if they change in the
meantime.

Your reply really helped: many questions helped us clear our mind on a few
points. H2O's team showed interest in working on this integration or at
least support us in the development. We are waiting for them to start a
discussion and as soon as we will have a more clear idea on how to proceed,
we will validate it with the stuff you just said. Your confidence in
Flink's operators gives up hope to achieve a clean solution.

Thanks a lot of your time,

Simone

2016-05-09 12:24 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Simone,
>
> sorry for the delayed answer. I have a few questions regarding your
> requirements and a some ideas that might be helpful (depending on the
> requirements).
>
> 1) Starting / stopping of H2O nodes from Flink
> - You wrote you'd like to "instantiate a H2O's node in every task
> manager". This reads a bit like you want to start H2O in the TM's JVM , but
> I would assume that a H2O node runs as a separate process. So should it be
> started inside the TM JVM or as an external process next to each TM. Also,
> do you want to start one H2O node per TM slot or per TM?
> - You wrote you'd like to "handle the lifecycle of the node according to
> the taskmanager and the execution graph". A TM can execute multiple jobs
> each with its own execution graph. Do you want to start the H2O node for
> each job and shut it down when the job finishes or start the H2O when the
> TM is started and kill it when the TM is brought down?
> - "keep the H2O's nodes alive through multiple tasks" The first option
> (starting for each job) would allow to share the H2O node for all tasks of
> a job. This could be done using two MapPartition operators, the first
> Mapper is put in front of the first task requiring H2O starting an H2O
> service before the first record is forwarded and the second task is put
> after the last H2O task and shuts it down after the last element was
> forwarded. The mappers itself do nothing than forwarding elements and
> starting and stopping tasks. If you would like to share H2O nodes across
> jobs, we might need another hook to start the process.
> - "move the data locally from Flink to H2O", do you mean host local or JVM
> local? I think it should not be hard to keep the data host local.
>
> 2) "Allow the developer to code everything inside Flink".
> - The Table API which you are referring to in your example is built on top
> of the DataSet and DataStream APIs. I think it should be possible to add
> another API similar to the Table API. You should be aware that the Table
> API is currently quite actively developed and should not be considered to
> be a stable interface. So certain things might change in the next versions.
> With 1.0 we stabilized the DataSet API and I would rather put a new API on
> top of it than on the Table API.
> - Regarding the transformation in H2O structures and calling H2O
> operations, I think this might again be done in MapPartition operators. In
> general, MapPartition gives you a lot of freedom because it provides an
> iterator over all elements of a partition. So you can do things before the
> first and after the last element and group data as you like. You can use
> partitionByHash() or rebalace() to shuffle data and sortPartition to
> locally sort the data in a partition. Please note that MapPartition
> operators do not support chaining and come therefore with a certain
> serialization overhead. Whenever possible you should use a MapFunction or
> FlatMapFunction which are a bit more lightweight.
>
> Hope this helps,
> Fabian
>
>
> 2016-05-03 15:13 GMT+02:00 Simone Robutti <simone.robu...@radicalbit.io>:
>
>> I'm not sure this is the right way to do it but we were exploring all the
>> possibilities and this one is the more obvious. We also spent some time to
>> study how to do it to achieve a better understanding of Flink's internals.
>>
>> What we want to do though is to integrate Flink with another distributed
>> system that builds its own nodes and coordinates through the network with
>> its own logic. This software is H2O (a Machine Learning platform) and the
>> integration consists of two big tasks: the first is to instantiate a H2O's
>> node in every task manager and handle the lifecycle of the node according
>> to the taskmanager and the execution graph. The second is to allow the
>> developer to code everything inside Flink, converting from and to H2O's
>> data structures (distributed tabular data) and triggering the execution of
>> algorithms on H2O with a uniform API.
>>
>> Here's a simple example (assuming that we will use the TableAPI):
>>
>> val env = ExecutionEnvironment.getExecutionEnvironment
>> val h2oEnv = H2OEnviroment.getEnvironment(env)
>>
>> val myData: Table = ...
>> val someOtherData: Table = ...
>>
>> val myH2OFrame = myData.select(...).toH2OFrame(h2oEnv)
>>
>> val linearRegressionModel = h2oEnv.linearRegression(myH2OFrame)
>>
>> val predictions:Table=linearRegressionModel(someOtherData)
>>
>> predictions.select(...)
>>
>>
>> A good solution should allow the system to keep the H2O's nodes alive
>> through multiple tasks and the possibility to move the data locally from
>> Flink to H2O. The latter is not achieved in H2O's integration with Spark
>> but we still hope to do it.
>>
>> That said, I'm still not sure if it is really required to implement a
>> custom runtime operator but given the complexity of the integration of two
>> distribute systems, we assumed that more control would allow more
>> flexibility and possibilities to achieve an ideal solution.
>>
>>
>>
>>
>>
>> 2016-05-03 13:29 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>>
>>> Hi Simone,
>>>
>>> you are right, the interfaces you extend are not considered to be
>>> public, user-facing API.
>>> Adding custom operators to the DataSet API touches many parts of the
>>> system and is not straightforward.
>>> The DataStream API has better support for custom operators.
>>>
>>> Can you explain what kind of operator you would like to add?
>>> Maybe the functionality can be achieved with the existing operators.
>>>
>>> Best, Fabian
>>>
>>> 2016-05-03 12:54 GMT+02:00 Simone Robutti <simone.robu...@radicalbit.io>
>>> :
>>>
>>>> Hello Fabian,
>>>>
>>>> we delved more moving from the input you gave us but a question arised.
>>>> We always assumed that runtime operators were open for extension without
>>>> modifying anything inside Flink but it looks like this is not the case and
>>>> the documentation assumes that the developer is working to a contribution
>>>> to Flink. So I would like to know if our understandment is correct and
>>>> custom runtime operators are not supposed to be implemented outside of
>>>> Flink.
>>>>
>>>> Thanks,
>>>>
>>>> Simone
>>>>
>>>> 2016-04-29 21:32 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>>>>
>>>>> Hi Simone,
>>>>>
>>>>> the GraphCreatingVisitor transforms the common operator plan into a
>>>>> representation that is translated by the optimizer.
>>>>> You have to implement an OptimizerNode and OperatorDescriptor to
>>>>> describe the operator.
>>>>> Depending on the semantics of the operator, there are a few more
>>>>> places to make the integration working like driver strategies, cost model,
>>>>> etc.
>>>>>
>>>>> I would recommend to have a look at previous changes that added an
>>>>> operator such as PartitionOperator, SortPartitionOperator, OuterJoin, etc.
>>>>> The respective commits should give you an idea which parts of the code
>>>>> need to be touched. You should find the commit IDs in the JIRA issues for
>>>>> these extensions.
>>>>>
>>>>> Cheers, Fabian
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 2016-04-29 15:32 GMT+02:00 Simone Robutti <
>>>>> simone.robu...@radicalbit.io>:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I'm trying to create a custom operator to explore the internals of
>>>>>> Flink. Actually the one I'm working on is rather similar to Union and I'm
>>>>>> trying to mimick it for now. When I run my job though, this error arise:
>>>>>>
>>>>>> Exception in thread "main" java.lang.IllegalArgumentException:
>>>>>> Unknown operator type: MyOperator - My Operator
>>>>>> at
>>>>>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:237)
>>>>>> at
>>>>>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:82)
>>>>>> at
>>>>>> org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:279)
>>>>>> at
>>>>>> org.apache.flink.api.common.operators.GenericDataSinkBase.accept(GenericDataSinkBase.java:223)
>>>>>> at org.apache.flink.api.common.Plan.accept(Plan.java:348)
>>>>>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:454)
>>>>>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
>>>>>> at
>>>>>> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:213)
>>>>>> at
>>>>>> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:107)
>>>>>> at io.radicalbit.flinkh2o.Job$.main(Job.scala:50)
>>>>>> at io.radicalbit.flinkh2o.Job.main(Job.scala)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>>> at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>> at
>>>>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>>>>>>
>>>>>> I looked at the location of the error but it's not clear to me how to
>>>>>> make my operator recognizable from the optimizer.
>>>>>>
>>>>>> Thank,
>>>>>>
>>>>>> Simone
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to