Hey Wayang people,
I worked on the ApachaSpark/ApacheKafka/ApacheWayang integration.
So far good progress.
My current blockers are related to Serialization of the Spark Tasks.
I solved many of such NotSerializableException by adding the Serializable
interface to many of the classes, involved in the processing. But I run
into this "Lambda" related problem - I assume this is where Scala land
begins.
I found two independent but comparable situations as described in this
email.
Any hint which helps me to solve those issues (probably of the same kind)
is welcome.
Cheers,
Mirko
*Observations:*
*(1)* the Formatter UDF
import java.io.Serializable;
public class Util implements Serializable {
public static String formatData( String f1, Integer f2 ) {
return String.format("%d, %s", f1, f2);
}
}
and in my WayangPlan:
d -> Util.formatData( d.getField0(), d.getField1() )
causes such a NotSerializableException:
Caused by: java.io.NotSerializableException:
KafkaTopicWordCountSpark$$Lambda$503/0x00000008005ea040
* - field (class "org.apache.wayang.api.package$$anon$1", name:
"scalaFunc$1", type: "interface scala.Function1")*
- object (class "org.apache.wayang.api.package$$anon$1",
org.apache.wayang.api.package$$anon$1@5aede88b)
- field (class
"org.apache.wayang.core.function.TransformationDescriptor", name:
"javaImplementation", type: "interface
org.apache.wayang.core.function.FunctionDescriptor$SerializableFunction")
- object (class
"org.apache.wayang.core.function.TransformationDescriptor",
TransformationDescriptor[org.apache.wayang.api.package$$anon$1@5aede88b])
- field (class "org.apache.wayang.basic.operators.KafkaTopicSink",
name: "formattingDescriptor", type: "class
org.apache.wayang.core.function.TransformationDescriptor")
- object (class "org.apache.wayang.basic.operators.KafkaTopicSink",
KafkaTopicSink[Write to KafkaTopic test_23456])
*(2) *by using null as formatterUDF I would expect a NullPointer Exception,
later after the job started, but instead I get another
NotSerializableException from DefaultCardinalityEstimator:
Caused by: java.io.NotSerializableException:
*org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator$$Lambda*
$502/0x00000008005e9440
- field (class
"org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator",
name: "singlePointEstimator", type: "interface
java.util.function.ToLongBiFunction")
- object (class
"org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator",
org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator@285b63c2
)
- element of array (index: 0)
--
Dr. rer. nat. Mirko Kämpf
Müchelner Str. 23
06259 Frankleben