I have implemented your idea of an Unkown type which uses the KryoSerializer. Since I don't have type information, I initialize the the serializer with Object.class. Collection execution works fine but when I execute a simple identity mapper job normally I get the following Exception. Is there a way to get this working?

14/11/13 11:01:04 ERROR operators.DataSinkTask: Error in user code: Channel received an event before completing the current partial record.: DataSink(TextOutputFormat (file:/tmp/org.apache.flink.test.javaApiOperators.TypeHintITCase-result) - UTF-8) (1/1) java.lang.IllegalStateException: Channel received an event before completing the current partial record. at org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:158) at org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176) at org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53) at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:175) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:245)
    at java.lang.Thread.run(Thread.java:701)
14/11/13 11:01:04 INFO taskmanager.Task: DataSink(TextOutputFormat (file:/tmp/org.apache.flink.test.javaApiOperators.TypeHintITCase-result) - UTF-8) (1/1) switched to FAILED : java.lang.IllegalStateException: Channel received an event before completing the current partial record. at org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:158) at org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176) at org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53) at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:175) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:245)
    at java.lang.Thread.run(Thread.java:701)


On 05.11.2014 22:34, Stephan Ewen wrote:
I like the idea very much!

In my opinion, the DataSet is not quite the right place to put that
functionality. I think the UnaryUDFOperator or the BinaryUDFOperator would
be better. After all, these hooks are only necessary for UDFs.

One more suggestion:

  - Can the TypeExctactor initially return a special "Unknown" type? The
returns() method can override that type. Then we can also keep a bit more
of the eager initialization.

  - The collection execution, for example, works without specific type
information. It only needs the ability to clone, which is easily possible
with an "unknown" type information, which can create a "defaultserializer"
that simply uses Kryo to clone.

That way, one could also use Java 8 lambdas inside IDE with collection
execution, and on the cluster with the properly compiled code from maven.

Stephan





On Mon, Nov 3, 2014 at 12:23 PM, Timo Walther <fl...@twalthr.com> wrote:

Hey,

I have made a small prototype for a map-operator

env.fromElements(1, 2, 3)
  .map((i) -> new Tuple2<String,String>()).returns("Tuple2<String,String>")
  .print();

you can find my solution here: https://github.com/twalthr/
incubator-flink/commit/3ce2d3c86cf2457e02986f7a2d858304bbefea58

Actually, I like this solution most as it looks very easy to the user.
Furthermore, we can move the Type Extraction part into the operator which
makes more sense to me.

What do you think?

Greetings,
Timo






On 02.11.2014 16:22, Stephan Ewen wrote:

An alternative would be to go for

env.fromElements(1, 2, 3)
.flatMap((Integer i, Collector<Integer> o) -> o.collect(i) ,
returns("Integer"))
.print();

"returns" would here be a static method that creates the type info.

That would require to add an additional parameter, but allow us to keep
the
immediate checks. Deferring the checks will make things harder to
understand for users as well...
Am 30.10.2014 11:44 schrieb "Stephan Ewen" <se...@apache.org>:

I think that would look nice.

How easy is that to implement? With that change, we could not initialize
the type info in the constructor any more, but would have to change
everything to lazy initialization, which makes it complicated and error
prone...

On Wed, Oct 29, 2014 at 4:26 PM, Timo Walther <fl...@twalthr.com> wrote:

  What do you think about something like:
env.fromElements(1, 2, 3)
.flatMap((Integer i, Collector<Integer> o) -> o.collect(i)).returns("
Integer")
.print();

This looks to me like the most readable and user-friendly solution. We
only need to change the internals of DataSet a little bit, such that a
possible TypeExtractor Exception is stored temporarily and thrown by the
operator that follows if "returns()" was not called.

Regards,
Timo



On 28.10.2014 15:34, Stephan Ewen wrote:

  Is it possible to use a static method "hint" to create the hinting
wrapper
function?

Something like

DataSet.map(hint( (x) -> x.toString() , String.class));

If we go for option (1), I would suggest to call the methods just "from"
and overload them for String, Class, and TypeInformation


Stephan


On Tue, Oct 28, 2014 at 3:27 PM, Timo Walther <fl...@twalthr.com>
wrote:

   Hi all,

currently the Eclipse JDT compiler was the only compiler that included
generic signatures for Lambda Expressions in class files which is
necessary
to use them type-safe in Flink. Unfortunalely, this "feature" was
considered as a "bug" and had been thrown out with Eclipse 4.4.1. This
is
why Lambdas do not work properly with the current version of Eclipse. I
have opened a bug for that (see https://bugs.eclipse.org/bugs/
show_bug.cgi?id=449063).

The question is: Independent of the decision of the Eclipse JDT team,
how
do we want to deal with missing return type information?

Option 1)
Add a separate TypeInformation argument to each Java API operator.
Leads
to blown up API...
.map((x)->x + 1, TypeInformation.fromString("Integer"))
.flatMap((in, out)->out.collect(in), TypeInformation.fromClass(
Integer.class))

Option 2)
Introduce a wrapper class which implements ResultTypeQueryable. Leads
to
complicated syntax...
.map(TypeHint.map((x)->x + 1, "Integer"));
.map(TypeHint.map((x)->x + 1, Integer.class));

What are your opinions? Or any other ideas?


Regards,
Timo




Reply via email to