We are seeing these class loader issues a lot as of late. Seems that packaging the classes is trickier than anticipated.
Here is a pull request to add some diagnostics info on a "ClassNotFoundException": https://github.com/apache/flink/pull/1008 On Tue, Aug 11, 2015 at 3:29 PM, Matthias J. Sax < mj...@informatik.hu-berlin.de> wrote: > Three comments > > 1) If StormSpoutWrapper is in your jar, is it located in the correct > directory (must be same as package name)? > > 2) If you are using FlinkTopologyBuilder, you need to package as shown > in StormWordCountRemoteBySubmitter example, using an additional assembly > file. > (The first examples are for embedded Spout/Bolts with Flink streaming > program. -- Maybe we should add some comments in the pom file...) > > 3) How do you build? The whole source code? Or only parts of it? Did you > run "install"? > > -Matthias > > > On 08/11/2015 02:47 PM, huangwei (G) wrote: > > Hi Stephan and Matthias, > > > > Sorry for replying late. > > I`ve double checked that this class StormSpoutWrapper is really exist in > my jar file. > > And it got the same trouble when I ran the > flink-storm-compatibililty-example- corresponding word-count-storm. > > The way I built my Throughput application was just adding it into the > flink-storm-compatibililty-example and change some configurations in the > word-count-storm.xml. > > Here is the entire POM file. > > > > > > <?xml version="1.0" encoding="UTF-8"?> > > <!-- > > Licensed to the Apache Software Foundation (ASF) under one > > or more contributor license agreements. See the NOTICE file > > distributed with this work for additional information > > regarding copyright ownership. The ASF licenses this file > > to you under the Apache License, Version 2.0 (the > > "License"); you may not use this file except in compliance > > with the License. You may obtain a copy of the License at > > > > http://www.apache.org/licenses/LICENSE-2.0 > > > > Unless required by applicable law or agreed to in writing, > > software distributed under the License is distributed on an > > "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > > KIND, either express or implied. See the License for the > > specific language governing permissions and limitations > > under the License. > > --> > > <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi=" > http://www.w3.org/2001/XMLSchema-instance" > > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > http://maven.apache.org/maven-v4_0_0.xsd"> > > > > <modelVersion>4.0.0</modelVersion> > > > > <parent> > > <groupId>org.apache.flink</groupId> > > <artifactId>flink-storm-compatibility-parent</artifactId> > > <version>0.10-SNAPSHOT</version> > > <relativePath>..</relativePath> > > </parent> > > > > <artifactId>flink-storm-compatibility-examples</artifactId> > > <name>flink-storm-compatibility-examples</name> > > > > <packaging>jar</packaging> > > > > <dependencies> > > <dependency> > > <groupId>org.apache.flink</groupId> > > > <artifactId>flink-storm-compatibility-core</artifactId> > > <version>${project.version}</version> > > </dependency> > > > > <dependency> > > <groupId>org.apache.flink</groupId> > > <artifactId>flink-java-examples</artifactId> > > <version>${project.version}</version> > > </dependency> > > > > <dependency> > > <groupId>org.apache.flink</groupId> > > <artifactId>flink-streaming-core</artifactId> > > <version>${project.version}</version> > > <scope>test</scope> > > <type>test-jar</type> > > </dependency> > > > > <dependency> > > <groupId>org.apache.flink</groupId> > > <artifactId>flink-test-utils</artifactId> > > <version>${project.version}</version> > > <scope>test</scope> > > </dependency> > > > > <dependency> > > <groupId>org.apache.storm</groupId> > > <artifactId>storm-core</artifactId> > > <version>0.9.4</version> > > <!-- keep storm out of the jar-with-dependencies --> > > <scope>provided</scope> > > </dependency> > > </dependencies> > > > > <build> > > <plugins> > > <!-- get default data from flink-java-examples > package --> > > <plugin> > > <groupId>org.apache.maven.plugins</groupId> > > > <artifactId>maven-dependency-plugin</artifactId> > > > <version>2.9</version><!--$NO-MVN-MAN-VER$--> > > <executions> > > <execution> > > <id>unpack</id> > > > <phase>prepare-package</phase> > > <goals> > > <goal>unpack</goal> > > </goals> > > <configuration> > > <artifactItems> > > > <artifactItem> > > > <groupId>org.apache.flink</groupId> > > > <artifactId>flink-java-examples</artifactId> > > > <version>${project.version}</version> > > > <type>jar</type> > > > <overWrite>false</overWrite> > > > <outputDirectory>${project.build.directory}/classes</outputDirectory> > > > > <includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class</includes> > > > </artifactItem> > > > <artifactItem> > > > <groupId>org.apache.flink</groupId> > > > <artifactId>flink-storm-compatibility-core</artifactId> > > > <version>${project.version}</version> > > > <type>jar</type> > > > <overWrite>false</overWrite> > > > <outputDirectory>${project.build.directory}/classes</outputDirectory> > > > </artifactItem> > > > <artifactItem> > > > <groupId>org.apache.storm</groupId> > > > <artifactId>storm-core</artifactId> > > > <version>0.9.4</version> > > > <type>jar</type> > > > <overWrite>false</overWrite> > > > <outputDirectory>${project.build.directory}/classes</outputDirectory> > > > <!--<excludes>defaults.yaml</excludes>--> > > > </artifactItem> > > > <artifactItem> > > > <groupId>com.googlecode.json-simple</groupId> > > > <artifactId>json-simple</artifactId> > > > <version>1.1</version> > > > <type>jar</type> > > > <overWrite>false</overWrite> > > > <outputDirectory>${project.build.directory}/classes</outputDirectory> > > > </artifactItem>snakeyaml > > > <artifactItem> > > > <groupId>org.yaml</groupId> > > > <artifactId>snakeyaml</artifactId> > > > <version>1.11</version> > > > <type>jar</type> > > > <overWrite>false</overWrite> > > > <outputDirectory>${project.build.directory}/classes</outputDirectory> > > > </artifactItem> > > </artifactItems> > > </configuration> > > </execution> > > </executions> > > </plugin> > > > > <!-- self-contained jars for each example --> > > <plugin> > > <groupId>org.apache.maven.plugins</groupId> > > <artifactId>maven-jar-plugin</artifactId> > > > > <executions> > > > > <!-- WordCount Spout source--> > > <execution> > > > <id>WordCount-SpoutSource</id> > > <phase>package</phase> > > <goals> > > <goal>jar</goal> > > </goals> > > <configuration> > > > <classifier>WordCountSpoutSource</classifier> > > > > <archive> > > > <manifestEntries> > > > > <program-class>org.apache.flink.stormcompatibility.wordcount.SpoutSourceWordCount</program-class> > > > </manifestEntries> > > </archive> > > > > <includes> > > <!-- from > storm-core --> > > > <include>backtype/storm/topology/*.class</include> > > > <include>backtype/storm/spout/*.class</include> > > > <include>backtype/storm/task/*.class</include> > > > <include>backtype/storm/tuple/*.class</include> > > > <include>backtype/storm/generated/*.class</include> > > > <include>backtype/storm/metric/**/*.class</include> > > > <include>org/apache/thrift7/**/*.class</include> > > <!-- > Storm's recursive dependencies --> > > > <include>org/json/simple/**/*.class</include> > > <!-- > compatibility layer --> > > > <include>org/apache/flink/stormcompatibility/api/*.class</include> > > > <include>org/apache/flink/stormcompatibility/wrappers/*.class</include> > > <!-- Word > Count --> > > > > <include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.class</include> > > > > <include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount$*.class</include> > > > > <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/AbstractStormSpout.class</include> > > > > <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormFileSpout.class</include> > > > > <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormInMemorySpout.class</include> > > > > <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include> > > </includes> > > </configuration> > > </execution> > > > > <!-- WordCount Bolt tokenizer--> > > <execution> > > > <id>WordCount-BoltTokenizer</id> > > <phase>package</phase> > > <goals> > > <goal>jar</goal> > > </goals> > > <configuration> > > > <classifier>WordCountBoltTokenizer</classifier> > > > > <archive> > > > <manifestEntries> > > > > <program-class>org.apache.flink.stormcompatibility.wordcount.BoltTokenizerWordCount</program-class> > > > </manifestEntries> > > </archive> > > > > <includes> > > <!-- from > storm-core --> > > > <include>backtype/storm/topology/*.class</include> > > > <include>backtype/storm/spout/*.class</include> > > > <include>backtype/storm/task/*.class</include> > > > <include>backtype/storm/tuple/*.class</include> > > > <include>backtype/storm/generated/*.class</include> > > > <include>backtype/storm/metric/**/*.class</include> > > > <include>org/apache/thrift7/**/*.class</include> > > <!-- > Storm's recursive dependencies --> > > > <include>org/json/simple/**/*.class</include> > > <!-- > compatibility layer --> > > > <include>org/apache/flink/stormcompatibility/api/*.class</include> > > > <include>org/apache/flink/stormcompatibility/wrappers/*.class</include> > > <!-- Word > Count --> > > > > <include>org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.class</include> > > > > <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.class</include> > > > > <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include> > > </includes> > > </configuration> > > </execution> > > > > <!-- Throughput --> > > <execution> > > <id>Throughput</id> > > <phase>package</phase> > > <goals> > > <goal>jar</goal> > > </goals> > > <configuration> > > <classifier>Throughput</classifier> > > > > <archive> > > <manifestEntries> > > > > <program-class>org.apache.flink.stormcompatibility.experiments.Throughput</program-class> > > </manifestEntries> > > </archive> > > > > <includes> > > <!-- from storm-core --> > > <include>defaults.yaml</include> > > <include>backtype/storm/*.class</include> > > > <include>backtype/storm/serialization/*.class</include> > > > <include>backtype/storm/topology/*.class</include> > > > <include>backtype/storm/topology/base/*.class</include> > > > <include>backtype/storm/utils/*.class</include> > > > <include>backtype/storm/spout/*.class</include> > > > <include>backtype/storm/task/*.class</include> > > > <include>backtype/storm/tuple/*.class</include> > > > <include>backtype/storm/generated/*.class</include> > > > <include>backtype/storm/metric/**/*.class</include> > > > <include>org/apache/storm/curator/*.class</include> > > > <include>org/apache/thrift7/**/*.class</include> > > > <!--<include>org/yaml/snakeyaml/constructor/*.class</include>--> > > > <include>org/yaml/snakeyaml/**/*.class</include> > > <!-- Storm's recursive dependencies --> > > > <include>org/json/simple/**/*.class</include> > > <!-- compatibility layer --> > > > <include>org/apache/flink/stormcompatibility/api/*.class</include> > > > <include>org/apache/flink/stormcompatibility/wrappers/*.class</include> > > <!-- Word Count --> > > > > <include>org/apache/flink/stormcompatibility/experiments/Throughput.class</include> > > > > <include>org/apache/flink/stormcompatibility/experiments/Throughput$*.class</include> > > </includes> > > </configuration> > > </execution> > > > > <execution> > > <goals> > > > <goal>test-jar</goal> > > </goals> > > </execution> > > </executions> > > </plugin> > > > > <!-- WordCount Storm topology--> > > <!-- Cannot use maven-jar-plugin because > 'defaults.yaml' must be included in jar --> > > <plugin> > > > <artifactId>maven-assembly-plugin</artifactId> > > <configuration> > > <descriptors> > > > <descriptor>src/assembly/word-count-storm.xml</descriptor> > > </descriptors> > > <archive> > > <manifestEntries> > > > > <program-class>org.apache.flink.stormcompatibility.wordcount.StormWordCountRemoteBySubmitter</program-class> > > </manifestEntries> > > </archive> > > </configuration> > > > > <executions> > > <execution> > > <id>WordCountStorm</id> > > <phase>package</phase> > > <goals> > > <goal>single</goal> > > </goals> > > </execution> > > </executions> > > </plugin> > > </plugins> > > > > <pluginManagement> > > <plugins> > > <!--This plugin's configuration is used to > store Eclipse m2e settings only. It has no influence on the Maven build > itself.--> > > <plugin> > > <groupId>org.eclipse.m2e</groupId> > > > <artifactId>lifecycle-mapping</artifactId> > > <version>1.0.0</version> > > <configuration> > > <lifecycleMappingMetadata> > > <pluginExecutions> > > > <pluginExecution> > > > <pluginExecutionFilter> > > > <groupId>org.apache.maven.plugins</groupId> > > > <artifactId>maven-dependency-plugin</artifactId> > > > <versionRange>[2.9,)</versionRange> > > > <goals> > > > <goal>unpack</goal> > > > </goals> > > > </pluginExecutionFilter> > > > <action> > > > <ignore/> > > > </action> > > > </pluginExecution> > > </pluginExecutions> > > </lifecycleMappingMetadata> > > </configuration> > > </plugin> > > </plugins> > > </pluginManagement> > > > > </build> > > > > </project> > > > > -----邮件原件----- > > 发件人: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] 代表 Stephan > Ewen > > 发送时间: 2015年8月11日 1:55 > > 收件人: dev@flink.apache.org > > 主题: Re: Some problems about Flink applications > > > > It would also help if you could paste the entire POM file of your > project. > > Maybe something is amiss with the dependencies, or the scopes? > > > > On Sat, Aug 8, 2015 at 7:27 PM, Matthias J. Sax < > mj...@informatik.hu-berlin.de> wrote: > > > >> Hi Huang, > >> > >> about Storm compatibility. Did you double check, that the file that is > >> missing (StormSpoutWrapper) is contained in your jar. Looking at > >> pom.xml does not help here, because if you specify to include a file, > >> but maven cannot find it, it will just not add it to the jar, but > >> build will succeed. Thus, you need to check the jar file itself via > command line: > >> unzip -l myjarfile.jar > >> or > >> unzip -l myjarfile.jar | grep file-I-am-looking-for > >> > >> I guess your jar is not build correctly, ie, the file is not there... > >> > >> Did you have a look into pom.xml for > >> flink-storm-compatibililty-example > >> and the corresponding word-count-storm.xml? This shows how to build a > >> jar correctly (it was recently fixed, so make sure you update to the > >> latest master) > >> > >> You can also have a look here how to package jars correctly (even if > >> this example is about Flink ML): > >> > >> https://stackoverflow.com/questions/31661900/maven-build-has-missing-p > >> ackage-linkages/31662642#31662642 > >> > >> -Matthias > >> > >> On 08/08/2015 11:15 AM, huangwei (G) wrote: > >>> Hi, > >>> I get some trouble in developing Flink applications. > >>> > >>> 1. > >>> I want to test the performance between Storm and > >> flink-storm-compatibility using the test program: > >> https://github.com/project-flink/flink-perf/blob/master/storm-jobs/src > >> /jvm/experiments/Throughput.java > >> . > >>> And there is a bit of my changes with this Throughput.java below: > >>> > >>> > >>> > >>> public static void main(String[] args) throws Exception { > >>> ParameterTool pt = ParameterTool.fromArgs(args); > >>> > >>> int par = pt.getInt("para"); > >>> > >>> final FlinkTopologyBuilder builder = new > >> FlinkTopologyBuilder(); > >>> > >>> builder.setSpout("source0", new Generator(pt), > >> pt.getInt("sourceParallelism")); > >>> > >>> int i = 0; > >>> for (; i < pt.getInt("repartitions", 1) - 1; i++) { > >>> System.out.println("adding source" + i + " > >> --> source" + (i + 1)); > >>> builder.setBolt("source" + (i + 1), new > >> RepartPassThroughBolt(pt), pt.getInt("sinkParallelism")) > >>> .fieldsGrouping("source" > >> + i, new Fields("id")); > >>> } > >>> > >>> System.out.println("adding final source" + i + " > >>> --> > >> sink"); > >>> > >>> builder.setBolt("sink", new Sink(pt), > >> pt.getInt("sinkParallelism")).fieldsGrouping("source" + i, new > >> Fields("id")); > >>> > >>> Config conf = new Config(); > >>> conf.setDebug(false); //System.exit(1); > >>> > >>> // execute program locally > >>> final FlinkLocalCluster cluster = > >> FlinkLocalCluster.getLocalCluster(); > >>> cluster.submitTopology("throughput", null, > >> builder.createTopology()); > >>> > >>> Utils.sleep(10 * 1000); > >>> > >>> // TODO kill does no do anything so far > >>> cluster.killTopology("throughput"); > >>> cluster.shutdown(); > >>> } > >>> > >>> > >>> This program will run well in IDEA with flink-storm-compatibility. > >>> However, when I packaged it into a jar file and run on the > >> flink-0.10SNAPSHOT there is a problem in flink-client log file: > >>> > >>> java.lang.Exception: Call to registerInputOutput() of invokable failed > >>> at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:521) > >>> at java.lang.Thread.run(Thread.java:745) > >>> Caused by: > org.apache.flink.streaming.runtime.tasks.StreamTaskException: > >> Cannot instantiate user function. > >>> at > >> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(St > >> reamConfig.java:187) > >>> at > >> org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutpu > >> t(StreamTask.java:90) > >>> at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:518) > >>> ... 1 more > >>> Caused by: java.lang.ClassNotFoundException: > >> org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper > >>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > >>> at java.lang.Class.forName0(Native Method) > >>> at java.lang.Class.forName(Class.java:348) > >>> at > >> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.r > >> esolveClass(InstantiationUtil.java:71) > >>> at > >> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613 > >> ) > >>> at > >> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > >>> at > >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17 > >> 74) > >>> at > >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > >>> at > >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199 > >> 3) > >>> at > >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > >>> at > >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:18 > >> 01) > >>> at > >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > >>> at > >> java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > >>> at > >> org.apache.flink.util.InstantiationUtil.deserializeObject(Instantiatio > >> nUtil.java:302) > >>> at > >> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(Instantia > >> tionUtil.java:264) > >>> at > >> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(St > >> reamConfig.java:185) > >>> ... 3 more > >>> > >>> > >>> And this class(StormSpoutWrapper) was exist in my packaged jar file. > >>> As you can see part of my pom.xml: > >>> > >>> <!-- Throughput --> > >>> <execution> > >>> <id>Throughput</id> > >>> <phase>package</phase> > >>> <goals> > >>> <goal>jar</goal> > >>> </goals> > >>> <configuration> > >>> <classifier>Throughput</classifier> > >>> > >>> <archive> > >>> <manifestEntries> > >>> > >> > >> <program-class>org.apache.flink.stormcompatibility.experiments.Through > >> put</program-class> > >>> </manifestEntries> > >>> </archive> > >>> > >>> <includes> > >>> <!-- from storm-core --> > >>> <include>defaults.yaml</include> > >>> > >>> <include>backtype/storm/*.class</include> > >>> > >> <include>backtype/storm/serialization/*.class</include> > >>> > >> <include>backtype/storm/topology/*.class</include> > >>> > >> <include>backtype/storm/topology/base/*.class</include> > >>> > >> <include>backtype/storm/utils/*.class</include> > >>> > >> <include>backtype/storm/spout/*.class</include> > >>> > >> <include>backtype/storm/task/*.class</include> > >>> > >> <include>backtype/storm/tuple/*.class</include> > >>> > >> <include>backtype/storm/generated/*.class</include> > >>> > >> <include>backtype/storm/metric/**/*.class</include> > >>> > >> <include>org/apache/storm/curator/*.class</include> > >>> > >> <include>org/apache/thrift7/**/*.class</include> > >>> > >> <!--<include>org/yaml/snakeyaml/constructor/*.class</include>--> > >>> > >> <include>org/yaml/snakeyaml/**/*.class</include> > >>> <!-- Storm's recursive dependencies > >>> --> > >>> > >> <include>org/json/simple/**/*.class</include> > >>> <!-- compatibility layer --> > >>> > >> <include>org/apache/flink/stormcompatibility/api/*.class</include> > >>> > >> > >> <include>org/apache/flink/stormcompatibility/wrappers/*.class</include > >>> > >>> <!-- Word Count --> > >>> > >> > >> <include>org/apache/flink/stormcompatibility/experiments/Throughput.cl > >> ass</include> > >>> > >> > >> <include>org/apache/flink/stormcompatibility/experiments/Throughput$*. > >> class</include> > >>> </includes> > >>> </configuration> > >>> </execution> > >>> > >>> So how can I fix it? > >>> > >>> > >>> 2. > >>> There is a case following using operator join: > >>> > >>> DataStream<Tuple3<String, Integer, Long>> user = env.addSource(new > >> sourceUserFunction()); > >>> DataStream<Tuple2<String, Integer>> area = env.addSource(new > >> sourceAreaFunction()); > >>> > >>> DataStream<Tuple2<String, Integer>> sink = user > >>> .join(area) > >>> .onWindow(15, TimeUnit.MINUTES) > >>> .where(0) > >>> .equalTo(0) > >>> .with(new JoinFunction<Tuple3<String, Integer, Long>, > >> Tuple2<String, Integer>, Tuple2<String, Integer>>() { > >>> @Override > >>> public Tuple2<String, Integer> join(Tuple3<String, Integer, > >> Long> first, Tuple2<String, Integer> second) throws Exception { > >>> if (first.f1 + second.f1 > 10){ > >>> return new Tuple2<String, Integer>(first.f0, first.f1 > >>> + > >> second.f1); > >>> } > >>> return null; > >>> } > >>> }); > >>> > >>> As you see, I don`t want to return null when the condition is not > >> satisfied. > >>> But there is not any JoinFunction with Collector. > >>> I found a FlatJoinFunction which allows the Collector. > >>> However, the FlatJoinFunction seem only can be used in DataSet > >>> instead > >> DataStream. > >>> Is there any other way to improve this case? > >>> > >>> PS. I`m sorry about this email. You may ignore me during the weekend. > >>> > >>> Greetings, > >>> Huang Wei > >>> 华为技术有限公司 Huawei Technologies Co., Ltd. > >>> > >>> > >>> Tel:+86 18106512602 > >>> Email:huangwei...@huawei.com > >>> > >> > >> > > > >