Hi,
I get some trouble in developing Flink applications.

1.
I want to test the performance between Storm and flink-storm-compatibility 
using the test program: 
https://github.com/project-flink/flink-perf/blob/master/storm-jobs/src/jvm/experiments/Throughput.java.
And there is a bit of my changes with this Throughput.java below:



public static void main(String[] args) throws Exception {
                   ParameterTool pt = ParameterTool.fromArgs(args);

                   int par = pt.getInt("para");

                   final FlinkTopologyBuilder builder = new 
FlinkTopologyBuilder();

                   builder.setSpout("source0", new Generator(pt), 
pt.getInt("sourceParallelism"));

                   int i = 0;
                   for (; i < pt.getInt("repartitions", 1) - 1; i++) {
                            System.out.println("adding source" + i + " --> 
source" + (i + 1));
                            builder.setBolt("source" + (i + 1), new 
RepartPassThroughBolt(pt), pt.getInt("sinkParallelism"))
                                               .fieldsGrouping("source" + i, 
new Fields("id"));
                   }

                   System.out.println("adding final source" + i + " --> sink");

                   builder.setBolt("sink", new Sink(pt), 
pt.getInt("sinkParallelism")).fieldsGrouping("source" + i, new Fields("id"));

                   Config conf = new Config();
                   conf.setDebug(false);
//System.exit(1);

                   // execute program locally
                   final FlinkLocalCluster cluster = 
FlinkLocalCluster.getLocalCluster();
                   cluster.submitTopology("throughput", null, 
builder.createTopology());

                   Utils.sleep(10 * 1000);

                   // TODO kill does no do anything so far
                   cluster.killTopology("throughput");
                   cluster.shutdown();
         }


This program will run well in IDEA with flink-storm-compatibility.
However, when I packaged it into a jar file and run on the flink-0.10SNAPSHOT 
there is a problem in flink-client log file:

java.lang.Exception: Call to registerInputOutput() of invokable failed
         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:521)
         at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
instantiate user function.
         at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:187)
         at 
org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutput(StreamTask.java:90)
         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:518)
         ... 1 more
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper
         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
         at java.lang.Class.forName0(Native Method)
         at java.lang.Class.forName(Class.java:348)
         at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:71)
         at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
         at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
         at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
         at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
         at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
         at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
         at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
         at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
         at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:185)
         ... 3 more


And this class(StormSpoutWrapper) was exist in my packaged jar file.
As you can see part of my pom.xml:

<!-- Throughput -->
                    <execution>
                        <id>Throughput</id>
                        <phase>package</phase>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                        <configuration>
                            <classifier>Throughput</classifier>

                            <archive>
                                <manifestEntries>
                                    
<program-class>org.apache.flink.stormcompatibility.experiments.Throughput</program-class>
                                </manifestEntries>
                            </archive>

                            <includes>
                                <!-- from storm-core -->
                                <include>defaults.yaml</include>
                                <include>backtype/storm/*.class</include>
                                
<include>backtype/storm/serialization/*.class</include>
                                
<include>backtype/storm/topology/*.class</include>
                                
<include>backtype/storm/topology/base/*.class</include>
                                <include>backtype/storm/utils/*.class</include>
                                <include>backtype/storm/spout/*.class</include>
                                <include>backtype/storm/task/*.class</include>
                                <include>backtype/storm/tuple/*.class</include>
                                
<include>backtype/storm/generated/*.class</include>
                                
<include>backtype/storm/metric/**/*.class</include>
                                
<include>org/apache/storm/curator/*.class</include>
                                <include>org/apache/thrift7/**/*.class</include>
                                
<!--<include>org/yaml/snakeyaml/constructor/*.class</include>-->
                                <include>org/yaml/snakeyaml/**/*.class</include>
                                <!-- Storm's recursive dependencies -->
                                <include>org/json/simple/**/*.class</include>
                                <!-- compatibility layer -->
                                
<include>org/apache/flink/stormcompatibility/api/*.class</include>
                                
<include>org/apache/flink/stormcompatibility/wrappers/*.class</include>
                                <!-- Word Count -->
                                
<include>org/apache/flink/stormcompatibility/experiments/Throughput.class</include>
                                
<include>org/apache/flink/stormcompatibility/experiments/Throughput$*.class</include>
                            </includes>
                        </configuration>
                    </execution>

So how can I fix it?


2.
There is a case following using operator join:

DataStream<Tuple3<String, Integer, Long>> user = env.addSource(new 
sourceUserFunction());
DataStream<Tuple2<String, Integer>> area = env.addSource(new 
sourceAreaFunction());

DataStream<Tuple2<String, Integer>> sink = user
      .join(area)
      .onWindow(15, TimeUnit.MINUTES)
      .where(0)
      .equalTo(0)
      .with(new JoinFunction<Tuple3<String, Integer, Long>, Tuple2<String, 
Integer>, Tuple2<String, Integer>>() {
         @Override
         public Tuple2<String, Integer> join(Tuple3<String, Integer, Long> 
first, Tuple2<String, Integer> second) throws Exception {
            if (first.f1 + second.f1 > 10){
               return new Tuple2<String, Integer>(first.f0, first.f1 + 
second.f1);
            }
            return null;
         }
      });

As you see, I don`t want to return null when the condition is not satisfied.
But there is not any JoinFunction with Collector.
I found a FlatJoinFunction which allows the Collector.
However, the FlatJoinFunction seem only can be used in DataSet instead 
DataStream.
Is there any other way to improve this case?

PS. I`m sorry about this email. You may ignore me during the weekend.

Greetings,
Huang Wei
华为技术有限公司 Huawei Technologies Co., Ltd.


Tel:+86 18106512602
Email:huangwei...@huawei.com

Reply via email to