Hi,
I get some trouble in developing Flink applications.
1.
I want to test the performance between Storm and flink-storm-compatibility
using the test program:
https://github.com/project-flink/flink-perf/blob/master/storm-jobs/src/jvm/experiments/Throughput.java.
And there is a bit of my changes with this Throughput.java below:
public static void main(String[] args) throws Exception {
ParameterTool pt = ParameterTool.fromArgs(args);
int par = pt.getInt("para");
final FlinkTopologyBuilder builder = new
FlinkTopologyBuilder();
builder.setSpout("source0", new Generator(pt),
pt.getInt("sourceParallelism"));
int i = 0;
for (; i < pt.getInt("repartitions", 1) - 1; i++) {
System.out.println("adding source" + i + " -->
source" + (i + 1));
builder.setBolt("source" + (i + 1), new
RepartPassThroughBolt(pt), pt.getInt("sinkParallelism"))
.fieldsGrouping("source" + i,
new Fields("id"));
}
System.out.println("adding final source" + i + " --> sink");
builder.setBolt("sink", new Sink(pt),
pt.getInt("sinkParallelism")).fieldsGrouping("source" + i, new Fields("id"));
Config conf = new Config();
conf.setDebug(false);
//System.exit(1);
// execute program locally
final FlinkLocalCluster cluster =
FlinkLocalCluster.getLocalCluster();
cluster.submitTopology("throughput", null,
builder.createTopology());
Utils.sleep(10 * 1000);
// TODO kill does no do anything so far
cluster.killTopology("throughput");
cluster.shutdown();
}
This program will run well in IDEA with flink-storm-compatibility.
However, when I packaged it into a jar file and run on the flink-0.10SNAPSHOT
there is a problem in flink-client log file:
java.lang.Exception: Call to registerInputOutput() of invokable failed
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:521)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
instantiate user function.
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:187)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutput(StreamTask.java:90)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:518)
... 1 more
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:71)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:185)
... 3 more
And this class(StormSpoutWrapper) was exist in my packaged jar file.
As you can see part of my pom.xml:
<!-- Throughput -->
<execution>
<id>Throughput</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>Throughput</classifier>
<archive>
<manifestEntries>
<program-class>org.apache.flink.stormcompatibility.experiments.Throughput</program-class>
</manifestEntries>
</archive>
<includes>
<!-- from storm-core -->
<include>defaults.yaml</include>
<include>backtype/storm/*.class</include>
<include>backtype/storm/serialization/*.class</include>
<include>backtype/storm/topology/*.class</include>
<include>backtype/storm/topology/base/*.class</include>
<include>backtype/storm/utils/*.class</include>
<include>backtype/storm/spout/*.class</include>
<include>backtype/storm/task/*.class</include>
<include>backtype/storm/tuple/*.class</include>
<include>backtype/storm/generated/*.class</include>
<include>backtype/storm/metric/**/*.class</include>
<include>org/apache/storm/curator/*.class</include>
<include>org/apache/thrift7/**/*.class</include>
<!--<include>org/yaml/snakeyaml/constructor/*.class</include>-->
<include>org/yaml/snakeyaml/**/*.class</include>
<!-- Storm's recursive dependencies -->
<include>org/json/simple/**/*.class</include>
<!-- compatibility layer -->
<include>org/apache/flink/stormcompatibility/api/*.class</include>
<include>org/apache/flink/stormcompatibility/wrappers/*.class</include>
<!-- Word Count -->
<include>org/apache/flink/stormcompatibility/experiments/Throughput.class</include>
<include>org/apache/flink/stormcompatibility/experiments/Throughput$*.class</include>
</includes>
</configuration>
</execution>
So how can I fix it?
2.
There is a case following using operator join:
DataStream<Tuple3<String, Integer, Long>> user = env.addSource(new
sourceUserFunction());
DataStream<Tuple2<String, Integer>> area = env.addSource(new
sourceAreaFunction());
DataStream<Tuple2<String, Integer>> sink = user
.join(area)
.onWindow(15, TimeUnit.MINUTES)
.where(0)
.equalTo(0)
.with(new JoinFunction<Tuple3<String, Integer, Long>, Tuple2<String,
Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> join(Tuple3<String, Integer, Long>
first, Tuple2<String, Integer> second) throws Exception {
if (first.f1 + second.f1 > 10){
return new Tuple2<String, Integer>(first.f0, first.f1 +
second.f1);
}
return null;
}
});
As you see, I don`t want to return null when the condition is not satisfied.
But there is not any JoinFunction with Collector.
I found a FlatJoinFunction which allows the Collector.
However, the FlatJoinFunction seem only can be used in DataSet instead
DataStream.
Is there any other way to improve this case?
PS. I`m sorry about this email. You may ignore me during the weekend.
Greetings,
Huang Wei
华为技术有限公司 Huawei Technologies Co., Ltd.
Tel:+86 18106512602
Email:[email protected]