Hello Fabian,

we delved more moving from the input you gave us but a question arised. We
always assumed that runtime operators were open for extension without
modifying anything inside Flink but it looks like this is not the case and
the documentation assumes that the developer is working to a contribution
to Flink. So I would like to know if our understandment is correct and
custom runtime operators are not supposed to be implemented outside of
Flink.

Thanks,

Simone

2016-04-29 21:32 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Simone,
>
> the GraphCreatingVisitor transforms the common operator plan into a
> representation that is translated by the optimizer.
> You have to implement an OptimizerNode and OperatorDescriptor to describe
> the operator.
> Depending on the semantics of the operator, there are a few more places to
> make the integration working like driver strategies, cost model, etc.
>
> I would recommend to have a look at previous changes that added an
> operator such as PartitionOperator, SortPartitionOperator, OuterJoin, etc.
> The respective commits should give you an idea which parts of the code
> need to be touched. You should find the commit IDs in the JIRA issues for
> these extensions.
>
> Cheers, Fabian
>
>
>
>
>
> 2016-04-29 15:32 GMT+02:00 Simone Robutti <simone.robu...@radicalbit.io>:
>
>> Hello,
>>
>> I'm trying to create a custom operator to explore the internals of Flink.
>> Actually the one I'm working on is rather similar to Union and I'm trying
>> to mimick it for now. When I run my job though, this error arise:
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: Unknown
>> operator type: MyOperator - My Operator
>> at
>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:237)
>> at
>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:82)
>> at
>> org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:279)
>> at
>> org.apache.flink.api.common.operators.GenericDataSinkBase.accept(GenericDataSinkBase.java:223)
>> at org.apache.flink.api.common.Plan.accept(Plan.java:348)
>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:454)
>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
>> at
>> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:213)
>> at
>> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:107)
>> at io.radicalbit.flinkh2o.Job$.main(Job.scala:50)
>> at io.radicalbit.flinkh2o.Job.main(Job.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>>
>> I looked at the location of the error but it's not clear to me how to
>> make my operator recognizable from the optimizer.
>>
>> Thank,
>>
>> Simone
>>
>
>

Reply via email to