We are seeing these class loader issues a lot as of late.

Seems that packaging the classes is trickier than anticipated.

Here is a pull request to add some diagnostics info on a
"ClassNotFoundException": https://github.com/apache/flink/pull/1008

On Tue, Aug 11, 2015 at 3:29 PM, Matthias J. Sax <
mj...@informatik.hu-berlin.de> wrote:

> Three comments
>
> 1) If StormSpoutWrapper is in your jar, is it located in the correct
> directory (must be same as package name)?
>
> 2) If you are using FlinkTopologyBuilder, you need to package as shown
> in StormWordCountRemoteBySubmitter example, using an additional assembly
> file.
> (The first examples are for embedded Spout/Bolts with Flink streaming
> program. -- Maybe we should add some comments in the pom file...)
>
> 3) How do you build? The whole source code? Or only parts of it? Did you
> run "install"?
>
> -Matthias
>
>
> On 08/11/2015 02:47 PM, huangwei (G) wrote:
> > Hi Stephan and Matthias,
> >
> > Sorry for replying late.
> > I`ve double checked that this class StormSpoutWrapper is really exist in
> my jar file.
> > And it got the same trouble when I ran the
> flink-storm-compatibililty-example- corresponding word-count-storm.
> > The way I built my Throughput application was just adding it into the
> flink-storm-compatibililty-example and change some configurations in the
> word-count-storm.xml.
> > Here is the entire POM file.
> >
> >
> > <?xml version="1.0" encoding="UTF-8"?>
> > <!--
> > Licensed to the Apache Software Foundation (ASF) under one
> > or more contributor license agreements.  See the NOTICE file
> > distributed with this work for additional information
> > regarding copyright ownership.  The ASF licenses this file
> > to you under the Apache License, Version 2.0 (the
> > "License"); you may not use this file except in compliance
> > with the License.  You may obtain a copy of the License at
> >
> >   http://www.apache.org/licenses/LICENSE-2.0
> >
> > Unless required by applicable law or agreed to in writing,
> > software distributed under the License is distributed on an
> > "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> > KIND, either express or implied.  See the License for the
> > specific language governing permissions and limitations
> > under the License.
> > -->
> > <project xmlns="http://maven.apache.org/POM/4.0.0"; xmlns:xsi="
> http://www.w3.org/2001/XMLSchema-instance";
> >       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/maven-v4_0_0.xsd";>
> >
> >       <modelVersion>4.0.0</modelVersion>
> >
> >       <parent>
> >               <groupId>org.apache.flink</groupId>
> >               <artifactId>flink-storm-compatibility-parent</artifactId>
> >               <version>0.10-SNAPSHOT</version>
> >               <relativePath>..</relativePath>
> >       </parent>
> >
> >       <artifactId>flink-storm-compatibility-examples</artifactId>
> >       <name>flink-storm-compatibility-examples</name>
> >
> >       <packaging>jar</packaging>
> >
> >       <dependencies>
> >               <dependency>
> >                       <groupId>org.apache.flink</groupId>
> >
>  <artifactId>flink-storm-compatibility-core</artifactId>
> >                       <version>${project.version}</version>
> >               </dependency>
> >
> >               <dependency>
> >                       <groupId>org.apache.flink</groupId>
> >                       <artifactId>flink-java-examples</artifactId>
> >                       <version>${project.version}</version>
> >               </dependency>
> >
> >               <dependency>
> >                       <groupId>org.apache.flink</groupId>
> >                       <artifactId>flink-streaming-core</artifactId>
> >                       <version>${project.version}</version>
> >                       <scope>test</scope>
> >                       <type>test-jar</type>
> >               </dependency>
> >
> >               <dependency>
> >                       <groupId>org.apache.flink</groupId>
> >                       <artifactId>flink-test-utils</artifactId>
> >                       <version>${project.version}</version>
> >                       <scope>test</scope>
> >               </dependency>
> >
> >         <dependency>
> >             <groupId>org.apache.storm</groupId>
> >             <artifactId>storm-core</artifactId>
> >             <version>0.9.4</version>
> >             <!-- keep storm out of the jar-with-dependencies -->
> >             <scope>provided</scope>
> >         </dependency>
> >       </dependencies>
> >
> >       <build>
> >               <plugins>
> >                       <!-- get default data from flink-java-examples
> package -->
> >                       <plugin>
> >                               <groupId>org.apache.maven.plugins</groupId>
> >
>  <artifactId>maven-dependency-plugin</artifactId>
> >
>  <version>2.9</version><!--$NO-MVN-MAN-VER$-->
> >                               <executions>
> >                                       <execution>
> >                                               <id>unpack</id>
> >
>  <phase>prepare-package</phase>
> >                                               <goals>
> >                                                       <goal>unpack</goal>
> >                                               </goals>
> >                                               <configuration>
> >                                                       <artifactItems>
> >
>  <artifactItem>
> >
>  <groupId>org.apache.flink</groupId>
> >
>  <artifactId>flink-java-examples</artifactId>
> >
>  <version>${project.version}</version>
> >
>  <type>jar</type>
> >
>  <overWrite>false</overWrite>
> >
>  <outputDirectory>${project.build.directory}/classes</outputDirectory>
> >
>  
> <includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class</includes>
> >
>  </artifactItem>
> >
>  <artifactItem>
> >
>  <groupId>org.apache.flink</groupId>
> >
>  <artifactId>flink-storm-compatibility-core</artifactId>
> >
>  <version>${project.version}</version>
> >
>  <type>jar</type>
> >
>  <overWrite>false</overWrite>
> >
>  <outputDirectory>${project.build.directory}/classes</outputDirectory>
> >
>  </artifactItem>
> >
>  <artifactItem>
> >
>  <groupId>org.apache.storm</groupId>
> >
>  <artifactId>storm-core</artifactId>
> >
>  <version>0.9.4</version>
> >
>  <type>jar</type>
> >
>  <overWrite>false</overWrite>
> >
>  <outputDirectory>${project.build.directory}/classes</outputDirectory>
> >
>  <!--<excludes>defaults.yaml</excludes>-->
> >
>  </artifactItem>
> >
>  <artifactItem>
> >
>  <groupId>com.googlecode.json-simple</groupId>
> >
>  <artifactId>json-simple</artifactId>
> >
>  <version>1.1</version>
> >
>  <type>jar</type>
> >
>  <overWrite>false</overWrite>
> >
>  <outputDirectory>${project.build.directory}/classes</outputDirectory>
> >
>  </artifactItem>snakeyaml
> >
>  <artifactItem>
> >
>  <groupId>org.yaml</groupId>
> >
>  <artifactId>snakeyaml</artifactId>
> >
>  <version>1.11</version>
> >
>  <type>jar</type>
> >
>  <overWrite>false</overWrite>
> >
>  <outputDirectory>${project.build.directory}/classes</outputDirectory>
> >
>  </artifactItem>
> >                                                       </artifactItems>
> >                                               </configuration>
> >                                       </execution>
> >                               </executions>
> >                       </plugin>
> >
> >                       <!-- self-contained jars for each example -->
> >                       <plugin>
> >                               <groupId>org.apache.maven.plugins</groupId>
> >                               <artifactId>maven-jar-plugin</artifactId>
> >
> >                               <executions>
> >
> >                                       <!-- WordCount Spout source-->
> >                                       <execution>
> >
>  <id>WordCount-SpoutSource</id>
> >                                               <phase>package</phase>
> >                                               <goals>
> >                                                       <goal>jar</goal>
> >                                               </goals>
> >                                               <configuration>
> >
>  <classifier>WordCountSpoutSource</classifier>
> >
> >                                                       <archive>
> >
>  <manifestEntries>
> >
>  
> <program-class>org.apache.flink.stormcompatibility.wordcount.SpoutSourceWordCount</program-class>
> >
>  </manifestEntries>
> >                                                       </archive>
> >
> >                                                       <includes>
> >                                                               <!-- from
> storm-core -->
> >
>  <include>backtype/storm/topology/*.class</include>
> >
>  <include>backtype/storm/spout/*.class</include>
> >
>  <include>backtype/storm/task/*.class</include>
> >
>  <include>backtype/storm/tuple/*.class</include>
> >
>  <include>backtype/storm/generated/*.class</include>
> >
>  <include>backtype/storm/metric/**/*.class</include>
> >
>  <include>org/apache/thrift7/**/*.class</include>
> >                                                               <!--
> Storm's recursive dependencies -->
> >
>  <include>org/json/simple/**/*.class</include>
> >                                                               <!--
> compatibility layer -->
> >
>  <include>org/apache/flink/stormcompatibility/api/*.class</include>
> >
>  <include>org/apache/flink/stormcompatibility/wrappers/*.class</include>
> >                                                               <!-- Word
> Count -->
> >
>  
> <include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.class</include>
> >
>  
> <include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount$*.class</include>
> >
>  
> <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/AbstractStormSpout.class</include>
> >
>  
> <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormFileSpout.class</include>
> >
>  
> <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormInMemorySpout.class</include>
> >
>  
> <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
> >                                                       </includes>
> >                                               </configuration>
> >                                       </execution>
> >
> >                                       <!-- WordCount Bolt tokenizer-->
> >                                       <execution>
> >
>  <id>WordCount-BoltTokenizer</id>
> >                                               <phase>package</phase>
> >                                               <goals>
> >                                                       <goal>jar</goal>
> >                                               </goals>
> >                                               <configuration>
> >
>  <classifier>WordCountBoltTokenizer</classifier>
> >
> >                                                       <archive>
> >
>  <manifestEntries>
> >
>  
> <program-class>org.apache.flink.stormcompatibility.wordcount.BoltTokenizerWordCount</program-class>
> >
>  </manifestEntries>
> >                                                       </archive>
> >
> >                                                       <includes>
> >                                                               <!-- from
> storm-core -->
> >
>  <include>backtype/storm/topology/*.class</include>
> >
>  <include>backtype/storm/spout/*.class</include>
> >
>  <include>backtype/storm/task/*.class</include>
> >
>  <include>backtype/storm/tuple/*.class</include>
> >
>  <include>backtype/storm/generated/*.class</include>
> >
>  <include>backtype/storm/metric/**/*.class</include>
> >
>  <include>org/apache/thrift7/**/*.class</include>
> >                                                               <!--
> Storm's recursive dependencies -->
> >
>  <include>org/json/simple/**/*.class</include>
> >                                                               <!--
> compatibility layer -->
> >
>  <include>org/apache/flink/stormcompatibility/api/*.class</include>
> >
>  <include>org/apache/flink/stormcompatibility/wrappers/*.class</include>
> >                                                               <!-- Word
> Count -->
> >
>  
> <include>org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.class</include>
> >
>  
> <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.class</include>
> >
>  
> <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
> >                                                       </includes>
> >                                               </configuration>
> >                                       </execution>
> >
> >                     <!-- Throughput -->
> >                     <execution>
> >                         <id>Throughput</id>
> >                         <phase>package</phase>
> >                         <goals>
> >                             <goal>jar</goal>
> >                         </goals>
> >                         <configuration>
> >                             <classifier>Throughput</classifier>
> >
> >                             <archive>
> >                                 <manifestEntries>
> >
>  
> <program-class>org.apache.flink.stormcompatibility.experiments.Throughput</program-class>
> >                                 </manifestEntries>
> >                             </archive>
> >
> >                             <includes>
> >                                 <!-- from storm-core -->
> >                                 <include>defaults.yaml</include>
> >                                 <include>backtype/storm/*.class</include>
> >
>  <include>backtype/storm/serialization/*.class</include>
> >
>  <include>backtype/storm/topology/*.class</include>
> >
>  <include>backtype/storm/topology/base/*.class</include>
> >
>  <include>backtype/storm/utils/*.class</include>
> >
>  <include>backtype/storm/spout/*.class</include>
> >
>  <include>backtype/storm/task/*.class</include>
> >
>  <include>backtype/storm/tuple/*.class</include>
> >
>  <include>backtype/storm/generated/*.class</include>
> >
>  <include>backtype/storm/metric/**/*.class</include>
> >
>  <include>org/apache/storm/curator/*.class</include>
> >
>  <include>org/apache/thrift7/**/*.class</include>
> >
>  <!--<include>org/yaml/snakeyaml/constructor/*.class</include>-->
> >
>  <include>org/yaml/snakeyaml/**/*.class</include>
> >                                 <!-- Storm's recursive dependencies -->
> >
>  <include>org/json/simple/**/*.class</include>
> >                                 <!-- compatibility layer -->
> >
>  <include>org/apache/flink/stormcompatibility/api/*.class</include>
> >
>  <include>org/apache/flink/stormcompatibility/wrappers/*.class</include>
> >                                 <!-- Word Count -->
> >
>  
> <include>org/apache/flink/stormcompatibility/experiments/Throughput.class</include>
> >
>  
> <include>org/apache/flink/stormcompatibility/experiments/Throughput$*.class</include>
> >                             </includes>
> >                         </configuration>
> >                     </execution>
> >
> >                                       <execution>
> >                                               <goals>
> >
>  <goal>test-jar</goal>
> >                                               </goals>
> >                                       </execution>
> >                               </executions>
> >                       </plugin>
> >
> >                       <!-- WordCount Storm topology-->
> >                       <!-- Cannot use maven-jar-plugin because
> 'defaults.yaml' must be included in jar -->
> >                       <plugin>
> >
>  <artifactId>maven-assembly-plugin</artifactId>
> >                               <configuration>
> >                                       <descriptors>
> >
>  <descriptor>src/assembly/word-count-storm.xml</descriptor>
> >                                       </descriptors>
> >                                       <archive>
> >                                               <manifestEntries>
> >
>  
> <program-class>org.apache.flink.stormcompatibility.wordcount.StormWordCountRemoteBySubmitter</program-class>
> >                                               </manifestEntries>
> >                                       </archive>
> >                               </configuration>
> >
> >                               <executions>
> >                                       <execution>
> >                                               <id>WordCountStorm</id>
> >                                               <phase>package</phase>
> >                                               <goals>
> >                                                       <goal>single</goal>
> >                                               </goals>
> >                                       </execution>
> >                               </executions>
> >                       </plugin>
> >               </plugins>
> >
> >               <pluginManagement>
> >                       <plugins>
> >                               <!--This plugin's configuration is used to
> store Eclipse m2e settings only. It has no influence on the Maven build
> itself.-->
> >                               <plugin>
> >                                       <groupId>org.eclipse.m2e</groupId>
> >
>  <artifactId>lifecycle-mapping</artifactId>
> >                                       <version>1.0.0</version>
> >                                       <configuration>
> >                                               <lifecycleMappingMetadata>
> >                                                       <pluginExecutions>
> >
>  <pluginExecution>
> >
>  <pluginExecutionFilter>
> >
>      <groupId>org.apache.maven.plugins</groupId>
> >
>      <artifactId>maven-dependency-plugin</artifactId>
> >
>      <versionRange>[2.9,)</versionRange>
> >
>      <goals>
> >
>              <goal>unpack</goal>
> >
>      </goals>
> >
>  </pluginExecutionFilter>
> >
>  <action>
> >
>      <ignore/>
> >
>  </action>
> >
>  </pluginExecution>
> >                                                       </pluginExecutions>
> >                                               </lifecycleMappingMetadata>
> >                                       </configuration>
> >                               </plugin>
> >                       </plugins>
> >               </pluginManagement>
> >
> >       </build>
> >
> > </project>
> >
> > -----邮件原件-----
> > 发件人: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] 代表 Stephan
> Ewen
> > 发送时间: 2015年8月11日 1:55
> > 收件人: dev@flink.apache.org
> > 主题: Re: Some problems about Flink applications
> >
> > It would also help if you could paste the entire POM file of your
> project.
> > Maybe something is amiss with the dependencies, or the scopes?
> >
> > On Sat, Aug 8, 2015 at 7:27 PM, Matthias J. Sax <
> mj...@informatik.hu-berlin.de> wrote:
> >
> >> Hi Huang,
> >>
> >> about Storm compatibility. Did you double check, that the file that is
> >> missing (StormSpoutWrapper) is contained in your jar. Looking at
> >> pom.xml does not help here, because if you specify to include a file,
> >> but maven cannot find it, it will just not add it to the jar, but
> >> build will succeed. Thus, you need to check the jar file itself via
> command line:
> >>     unzip -l myjarfile.jar
> >> or
> >>     unzip -l myjarfile.jar | grep file-I-am-looking-for
> >>
> >> I guess your jar is not build correctly, ie, the file is not there...
> >>
> >> Did you have a look into pom.xml for
> >> flink-storm-compatibililty-example
> >> and the corresponding word-count-storm.xml? This shows how to build a
> >> jar correctly (it was recently fixed, so make sure you update to the
> >> latest master)
> >>
> >> You can also have a look here how to package jars correctly (even if
> >> this example is about Flink ML):
> >>
> >> https://stackoverflow.com/questions/31661900/maven-build-has-missing-p
> >> ackage-linkages/31662642#31662642
> >>
> >> -Matthias
> >>
> >> On 08/08/2015 11:15 AM, huangwei (G) wrote:
> >>> Hi,
> >>> I get some trouble in developing Flink applications.
> >>>
> >>> 1.
> >>> I want to test the performance between Storm and
> >> flink-storm-compatibility using the test program:
> >> https://github.com/project-flink/flink-perf/blob/master/storm-jobs/src
> >> /jvm/experiments/Throughput.java
> >> .
> >>> And there is a bit of my changes with this Throughput.java below:
> >>>
> >>>
> >>>
> >>> public static void main(String[] args) throws Exception {
> >>>                    ParameterTool pt = ParameterTool.fromArgs(args);
> >>>
> >>>                    int par = pt.getInt("para");
> >>>
> >>>                    final FlinkTopologyBuilder builder = new
> >> FlinkTopologyBuilder();
> >>>
> >>>                    builder.setSpout("source0", new Generator(pt),
> >> pt.getInt("sourceParallelism"));
> >>>
> >>>                    int i = 0;
> >>>                    for (; i < pt.getInt("repartitions", 1) - 1; i++) {
> >>>                             System.out.println("adding source" + i + "
> >> --> source" + (i + 1));
> >>>                             builder.setBolt("source" + (i + 1), new
> >> RepartPassThroughBolt(pt), pt.getInt("sinkParallelism"))
> >>>                                                .fieldsGrouping("source"
> >> + i, new Fields("id"));
> >>>                    }
> >>>
> >>>                    System.out.println("adding final source" + i + "
> >>> -->
> >> sink");
> >>>
> >>>                    builder.setBolt("sink", new Sink(pt),
> >> pt.getInt("sinkParallelism")).fieldsGrouping("source" + i, new
> >> Fields("id"));
> >>>
> >>>                    Config conf = new Config();
> >>>                    conf.setDebug(false); //System.exit(1);
> >>>
> >>>                    // execute program locally
> >>>                    final FlinkLocalCluster cluster =
> >> FlinkLocalCluster.getLocalCluster();
> >>>                    cluster.submitTopology("throughput", null,
> >> builder.createTopology());
> >>>
> >>>                    Utils.sleep(10 * 1000);
> >>>
> >>>                    // TODO kill does no do anything so far
> >>>                    cluster.killTopology("throughput");
> >>>                    cluster.shutdown();
> >>>          }
> >>>
> >>>
> >>> This program will run well in IDEA with flink-storm-compatibility.
> >>> However, when I packaged it into a jar file and run on the
> >> flink-0.10SNAPSHOT there is a problem in flink-client log file:
> >>>
> >>> java.lang.Exception: Call to registerInputOutput() of invokable failed
> >>>          at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:521)
> >>>          at java.lang.Thread.run(Thread.java:745)
> >>> Caused by:
> org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> >> Cannot instantiate user function.
> >>>          at
> >> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(St
> >> reamConfig.java:187)
> >>>          at
> >> org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutpu
> >> t(StreamTask.java:90)
> >>>          at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:518)
> >>>          ... 1 more
> >>> Caused by: java.lang.ClassNotFoundException:
> >> org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper
> >>>          at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> >>>          at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> >>>          at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> >>>          at java.lang.Class.forName0(Native Method)
> >>>          at java.lang.Class.forName(Class.java:348)
> >>>          at
> >> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.r
> >> esolveClass(InstantiationUtil.java:71)
> >>>          at
> >> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613
> >> )
> >>>          at
> >> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> >>>          at
> >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17
> >> 74)
> >>>          at
> >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> >>>          at
> >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199
> >> 3)
> >>>          at
> >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
> >>>          at
> >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:18
> >> 01)
> >>>          at
> >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> >>>          at
> >> java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> >>>          at
> >> org.apache.flink.util.InstantiationUtil.deserializeObject(Instantiatio
> >> nUtil.java:302)
> >>>          at
> >> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(Instantia
> >> tionUtil.java:264)
> >>>          at
> >> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(St
> >> reamConfig.java:185)
> >>>          ... 3 more
> >>>
> >>>
> >>> And this class(StormSpoutWrapper) was exist in my packaged jar file.
> >>> As you can see part of my pom.xml:
> >>>
> >>> <!-- Throughput -->
> >>>                     <execution>
> >>>                         <id>Throughput</id>
> >>>                         <phase>package</phase>
> >>>                         <goals>
> >>>                             <goal>jar</goal>
> >>>                         </goals>
> >>>                         <configuration>
> >>>                             <classifier>Throughput</classifier>
> >>>
> >>>                             <archive>
> >>>                                 <manifestEntries>
> >>>
> >>
> >> <program-class>org.apache.flink.stormcompatibility.experiments.Through
> >> put</program-class>
> >>>                                 </manifestEntries>
> >>>                             </archive>
> >>>
> >>>                             <includes>
> >>>                                 <!-- from storm-core -->
> >>>                                 <include>defaults.yaml</include>
> >>>
> >>> <include>backtype/storm/*.class</include>
> >>>
> >>  <include>backtype/storm/serialization/*.class</include>
> >>>
> >>  <include>backtype/storm/topology/*.class</include>
> >>>
> >>  <include>backtype/storm/topology/base/*.class</include>
> >>>
> >>  <include>backtype/storm/utils/*.class</include>
> >>>
> >>  <include>backtype/storm/spout/*.class</include>
> >>>
> >>  <include>backtype/storm/task/*.class</include>
> >>>
> >>  <include>backtype/storm/tuple/*.class</include>
> >>>
> >>  <include>backtype/storm/generated/*.class</include>
> >>>
> >>  <include>backtype/storm/metric/**/*.class</include>
> >>>
> >>  <include>org/apache/storm/curator/*.class</include>
> >>>
> >>  <include>org/apache/thrift7/**/*.class</include>
> >>>
> >>  <!--<include>org/yaml/snakeyaml/constructor/*.class</include>-->
> >>>
> >>  <include>org/yaml/snakeyaml/**/*.class</include>
> >>>                                 <!-- Storm's recursive dependencies
> >>> -->
> >>>
> >>  <include>org/json/simple/**/*.class</include>
> >>>                                 <!-- compatibility layer -->
> >>>
> >>  <include>org/apache/flink/stormcompatibility/api/*.class</include>
> >>>
> >>
> >> <include>org/apache/flink/stormcompatibility/wrappers/*.class</include
> >>>
> >>>                                 <!-- Word Count -->
> >>>
> >>
> >> <include>org/apache/flink/stormcompatibility/experiments/Throughput.cl
> >> ass</include>
> >>>
> >>
> >> <include>org/apache/flink/stormcompatibility/experiments/Throughput$*.
> >> class</include>
> >>>                             </includes>
> >>>                         </configuration>
> >>>                     </execution>
> >>>
> >>> So how can I fix it?
> >>>
> >>>
> >>> 2.
> >>> There is a case following using operator join:
> >>>
> >>> DataStream<Tuple3<String, Integer, Long>> user = env.addSource(new
> >> sourceUserFunction());
> >>> DataStream<Tuple2<String, Integer>> area = env.addSource(new
> >> sourceAreaFunction());
> >>>
> >>> DataStream<Tuple2<String, Integer>> sink = user
> >>>       .join(area)
> >>>       .onWindow(15, TimeUnit.MINUTES)
> >>>       .where(0)
> >>>       .equalTo(0)
> >>>       .with(new JoinFunction<Tuple3<String, Integer, Long>,
> >> Tuple2<String, Integer>, Tuple2<String, Integer>>() {
> >>>          @Override
> >>>          public Tuple2<String, Integer> join(Tuple3<String, Integer,
> >> Long> first, Tuple2<String, Integer> second) throws Exception {
> >>>             if (first.f1 + second.f1 > 10){
> >>>                return new Tuple2<String, Integer>(first.f0, first.f1
> >>> +
> >> second.f1);
> >>>             }
> >>>             return null;
> >>>          }
> >>>       });
> >>>
> >>> As you see, I don`t want to return null when the condition is not
> >> satisfied.
> >>> But there is not any JoinFunction with Collector.
> >>> I found a FlatJoinFunction which allows the Collector.
> >>> However, the FlatJoinFunction seem only can be used in DataSet
> >>> instead
> >> DataStream.
> >>> Is there any other way to improve this case?
> >>>
> >>> PS. I`m sorry about this email. You may ignore me during the weekend.
> >>>
> >>> Greetings,
> >>> Huang Wei
> >>> 华为技术有限公司 Huawei Technologies Co., Ltd.
> >>>
> >>>
> >>> Tel:+86 18106512602
> >>> Email:huangwei...@huawei.com
> >>>
> >>
> >>
> >
>
>

Reply via email to