Hi Huang,

about Storm compatibility. Did you double check, that the file that is
missing (StormSpoutWrapper) is contained in your jar. Looking at pom.xml
does not help here, because if you specify to include a file, but maven
cannot find it, it will just not add it to the jar, but build will
succeed. Thus, you need to check the jar file itself via command line:
    unzip -l myjarfile.jar
or
    unzip -l myjarfile.jar | grep file-I-am-looking-for

I guess your jar is not build correctly, ie, the file is not there...

Did you have a look into pom.xml for flink-storm-compatibililty-example
and the corresponding word-count-storm.xml? This shows how to build a
jar correctly (it was recently fixed, so make sure you update to the
latest master)

You can also have a look here how to package jars correctly (even if
this example is about Flink ML):
https://stackoverflow.com/questions/31661900/maven-build-has-missing-package-linkages/31662642#31662642

-Matthias

On 08/08/2015 11:15 AM, huangwei (G) wrote:
> Hi,
> I get some trouble in developing Flink applications.
> 
> 1.
> I want to test the performance between Storm and flink-storm-compatibility 
> using the test program: 
> https://github.com/project-flink/flink-perf/blob/master/storm-jobs/src/jvm/experiments/Throughput.java.
> And there is a bit of my changes with this Throughput.java below:
> 
> 
> 
> public static void main(String[] args) throws Exception {
>                    ParameterTool pt = ParameterTool.fromArgs(args);
> 
>                    int par = pt.getInt("para");
> 
>                    final FlinkTopologyBuilder builder = new 
> FlinkTopologyBuilder();
> 
>                    builder.setSpout("source0", new Generator(pt), 
> pt.getInt("sourceParallelism"));
> 
>                    int i = 0;
>                    for (; i < pt.getInt("repartitions", 1) - 1; i++) {
>                             System.out.println("adding source" + i + " --> 
> source" + (i + 1));
>                             builder.setBolt("source" + (i + 1), new 
> RepartPassThroughBolt(pt), pt.getInt("sinkParallelism"))
>                                                .fieldsGrouping("source" + i, 
> new Fields("id"));
>                    }
> 
>                    System.out.println("adding final source" + i + " --> 
> sink");
> 
>                    builder.setBolt("sink", new Sink(pt), 
> pt.getInt("sinkParallelism")).fieldsGrouping("source" + i, new Fields("id"));
> 
>                    Config conf = new Config();
>                    conf.setDebug(false);
> //System.exit(1);
> 
>                    // execute program locally
>                    final FlinkLocalCluster cluster = 
> FlinkLocalCluster.getLocalCluster();
>                    cluster.submitTopology("throughput", null, 
> builder.createTopology());
> 
>                    Utils.sleep(10 * 1000);
> 
>                    // TODO kill does no do anything so far
>                    cluster.killTopology("throughput");
>                    cluster.shutdown();
>          }
> 
> 
> This program will run well in IDEA with flink-storm-compatibility.
> However, when I packaged it into a jar file and run on the flink-0.10SNAPSHOT 
> there is a problem in flink-client log file:
> 
> java.lang.Exception: Call to registerInputOutput() of invokable failed
>          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:521)
>          at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: 
> Cannot instantiate user function.
>          at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:187)
>          at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutput(StreamTask.java:90)
>          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:518)
>          ... 1 more
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper
>          at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>          at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>          at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>          at java.lang.Class.forName0(Native Method)
>          at java.lang.Class.forName(Class.java:348)
>          at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:71)
>          at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
>          at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>          at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>          at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>          at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>          at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>          at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>          at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>          at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>          at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
>          at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
>          at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:185)
>          ... 3 more
> 
> 
> And this class(StormSpoutWrapper) was exist in my packaged jar file.
> As you can see part of my pom.xml:
> 
> <!-- Throughput -->
>                     <execution>
>                         <id>Throughput</id>
>                         <phase>package</phase>
>                         <goals>
>                             <goal>jar</goal>
>                         </goals>
>                         <configuration>
>                             <classifier>Throughput</classifier>
> 
>                             <archive>
>                                 <manifestEntries>
>                                     
> <program-class>org.apache.flink.stormcompatibility.experiments.Throughput</program-class>
>                                 </manifestEntries>
>                             </archive>
> 
>                             <includes>
>                                 <!-- from storm-core -->
>                                 <include>defaults.yaml</include>
>                                 <include>backtype/storm/*.class</include>
>                                 
> <include>backtype/storm/serialization/*.class</include>
>                                 
> <include>backtype/storm/topology/*.class</include>
>                                 
> <include>backtype/storm/topology/base/*.class</include>
>                                 
> <include>backtype/storm/utils/*.class</include>
>                                 
> <include>backtype/storm/spout/*.class</include>
>                                 <include>backtype/storm/task/*.class</include>
>                                 
> <include>backtype/storm/tuple/*.class</include>
>                                 
> <include>backtype/storm/generated/*.class</include>
>                                 
> <include>backtype/storm/metric/**/*.class</include>
>                                 
> <include>org/apache/storm/curator/*.class</include>
>                                 
> <include>org/apache/thrift7/**/*.class</include>
>                                 
> <!--<include>org/yaml/snakeyaml/constructor/*.class</include>-->
>                                 
> <include>org/yaml/snakeyaml/**/*.class</include>
>                                 <!-- Storm's recursive dependencies -->
>                                 <include>org/json/simple/**/*.class</include>
>                                 <!-- compatibility layer -->
>                                 
> <include>org/apache/flink/stormcompatibility/api/*.class</include>
>                                 
> <include>org/apache/flink/stormcompatibility/wrappers/*.class</include>
>                                 <!-- Word Count -->
>                                 
> <include>org/apache/flink/stormcompatibility/experiments/Throughput.class</include>
>                                 
> <include>org/apache/flink/stormcompatibility/experiments/Throughput$*.class</include>
>                             </includes>
>                         </configuration>
>                     </execution>
> 
> So how can I fix it?
> 
> 
> 2.
> There is a case following using operator join:
> 
> DataStream<Tuple3<String, Integer, Long>> user = env.addSource(new 
> sourceUserFunction());
> DataStream<Tuple2<String, Integer>> area = env.addSource(new 
> sourceAreaFunction());
> 
> DataStream<Tuple2<String, Integer>> sink = user
>       .join(area)
>       .onWindow(15, TimeUnit.MINUTES)
>       .where(0)
>       .equalTo(0)
>       .with(new JoinFunction<Tuple3<String, Integer, Long>, Tuple2<String, 
> Integer>, Tuple2<String, Integer>>() {
>          @Override
>          public Tuple2<String, Integer> join(Tuple3<String, Integer, Long> 
> first, Tuple2<String, Integer> second) throws Exception {
>             if (first.f1 + second.f1 > 10){
>                return new Tuple2<String, Integer>(first.f0, first.f1 + 
> second.f1);
>             }
>             return null;
>          }
>       });
> 
> As you see, I don`t want to return null when the condition is not satisfied.
> But there is not any JoinFunction with Collector.
> I found a FlatJoinFunction which allows the Collector.
> However, the FlatJoinFunction seem only can be used in DataSet instead 
> DataStream.
> Is there any other way to improve this case?
> 
> PS. I`m sorry about this email. You may ignore me during the weekend.
> 
> Greetings,
> Huang Wei
> 华为技术有限公司 Huawei Technologies Co., Ltd.
> 
> 
> Tel:+86 18106512602
> Email:huangwei...@huawei.com
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to