Hi, I get some trouble in developing Flink applications. 1. I want to test the performance between Storm and flink-storm-compatibility using the test program: https://github.com/project-flink/flink-perf/blob/master/storm-jobs/src/jvm/experiments/Throughput.java. And there is a bit of my changes with this Throughput.java below:
public static void main(String[] args) throws Exception { ParameterTool pt = ParameterTool.fromArgs(args); int par = pt.getInt("para"); final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); builder.setSpout("source0", new Generator(pt), pt.getInt("sourceParallelism")); int i = 0; for (; i < pt.getInt("repartitions", 1) - 1; i++) { System.out.println("adding source" + i + " --> source" + (i + 1)); builder.setBolt("source" + (i + 1), new RepartPassThroughBolt(pt), pt.getInt("sinkParallelism")) .fieldsGrouping("source" + i, new Fields("id")); } System.out.println("adding final source" + i + " --> sink"); builder.setBolt("sink", new Sink(pt), pt.getInt("sinkParallelism")).fieldsGrouping("source" + i, new Fields("id")); Config conf = new Config(); conf.setDebug(false); //System.exit(1); // execute program locally final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); cluster.submitTopology("throughput", null, builder.createTopology()); Utils.sleep(10 * 1000); // TODO kill does no do anything so far cluster.killTopology("throughput"); cluster.shutdown(); } This program will run well in IDEA with flink-storm-compatibility. However, when I packaged it into a jar file and run on the flink-0.10SNAPSHOT there is a problem in flink-client log file: java.lang.Exception: Call to registerInputOutput() of invokable failed at org.apache.flink.runtime.taskmanager.Task.run(Task.java:521) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:187) at org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutput(StreamTask.java:90) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:518) ... 1 more Caused by: java.lang.ClassNotFoundException: org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:71) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264) at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:185) ... 3 more And this class(StormSpoutWrapper) was exist in my packaged jar file. As you can see part of my pom.xml: <!-- Throughput --> <execution> <id>Throughput</id> <phase>package</phase> <goals> <goal>jar</goal> </goals> <configuration> <classifier>Throughput</classifier> <archive> <manifestEntries> <program-class>org.apache.flink.stormcompatibility.experiments.Throughput</program-class> </manifestEntries> </archive> <includes> <!-- from storm-core --> <include>defaults.yaml</include> <include>backtype/storm/*.class</include> <include>backtype/storm/serialization/*.class</include> <include>backtype/storm/topology/*.class</include> <include>backtype/storm/topology/base/*.class</include> <include>backtype/storm/utils/*.class</include> <include>backtype/storm/spout/*.class</include> <include>backtype/storm/task/*.class</include> <include>backtype/storm/tuple/*.class</include> <include>backtype/storm/generated/*.class</include> <include>backtype/storm/metric/**/*.class</include> <include>org/apache/storm/curator/*.class</include> <include>org/apache/thrift7/**/*.class</include> <!--<include>org/yaml/snakeyaml/constructor/*.class</include>--> <include>org/yaml/snakeyaml/**/*.class</include> <!-- Storm's recursive dependencies --> <include>org/json/simple/**/*.class</include> <!-- compatibility layer --> <include>org/apache/flink/stormcompatibility/api/*.class</include> <include>org/apache/flink/stormcompatibility/wrappers/*.class</include> <!-- Word Count --> <include>org/apache/flink/stormcompatibility/experiments/Throughput.class</include> <include>org/apache/flink/stormcompatibility/experiments/Throughput$*.class</include> </includes> </configuration> </execution> So how can I fix it? 2. There is a case following using operator join: DataStream<Tuple3<String, Integer, Long>> user = env.addSource(new sourceUserFunction()); DataStream<Tuple2<String, Integer>> area = env.addSource(new sourceAreaFunction()); DataStream<Tuple2<String, Integer>> sink = user .join(area) .onWindow(15, TimeUnit.MINUTES) .where(0) .equalTo(0) .with(new JoinFunction<Tuple3<String, Integer, Long>, Tuple2<String, Integer>, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> join(Tuple3<String, Integer, Long> first, Tuple2<String, Integer> second) throws Exception { if (first.f1 + second.f1 > 10){ return new Tuple2<String, Integer>(first.f0, first.f1 + second.f1); } return null; } }); As you see, I don`t want to return null when the condition is not satisfied. But there is not any JoinFunction with Collector. I found a FlatJoinFunction which allows the Collector. However, the FlatJoinFunction seem only can be used in DataSet instead DataStream. Is there any other way to improve this case? PS. I`m sorry about this email. You may ignore me during the weekend. Greetings, Huang Wei 华为技术有限公司 Huawei Technologies Co., Ltd. Tel:+86 18106512602 Email:huangwei...@huawei.com