Hi Stephan,

Thank you for the information about the 
https://github.com/apache/flink/pull/1008.
I`ll try again.
Otherwise, is there any suggestion about the original second problem:

> >>> 2.
> >>> There is a case following using operator join:
> >>>
> >>> DataStream<Tuple3<String, Integer, Long>> user = env.addSource(new
> >> sourceUserFunction());
> >>> DataStream<Tuple2<String, Integer>> area = env.addSource(new
> >> sourceAreaFunction());
> >>>
> >>> DataStream<Tuple2<String, Integer>> sink = user
> >>>       .join(area)
> >>>       .onWindow(15, TimeUnit.MINUTES)
> >>>       .where(0)
> >>>       .equalTo(0)
> >>>       .with(new JoinFunction<Tuple3<String, Integer, Long>,
> >> Tuple2<String, Integer>, Tuple2<String, Integer>>() {
> >>>          @Override
> >>>          public Tuple2<String, Integer> join(Tuple3<String, 
> >>> Integer,
> >> Long> first, Tuple2<String, Integer> second) throws Exception {
> >>>             if (first.f1 + second.f1 > 10){
> >>>                return new Tuple2<String, Integer>(first.f0, 
> >>> first.f1
> >>> +
> >> second.f1);
> >>>             }
> >>>             return null;
> >>>          }
> >>>       });
> >>>
> >>> As you see, I don`t want to return null when the condition is not
> >> satisfied.
> >>> But there is not any JoinFunction with Collector.
> >>> I found a FlatJoinFunction which allows the Collector.
> >>> However, the FlatJoinFunction seem only can be used in DataSet 
> >>> instead
> >> DataStream.
> >>> Is there any other way to improve this case?


Greetings,
Huang Wei

----------------------
early emails(Stephan wrote):

We are seeing these class loader issues a lot as of late.

Seems that packaging the classes is trickier than anticipated.

Here is a pull request to add some diagnostics info on a
"ClassNotFoundException": https://github.com/apache/flink/pull/1008

On Tue, Aug 11, 2015 at 3:29 PM, Matthias J. Sax < 
mj...@informatik.hu-berlin.de> wrote:

> Three comments
>
> 1) If StormSpoutWrapper is in your jar, is it located in the correct 
> directory (must be same as package name)?
>
> 2) If you are using FlinkTopologyBuilder, you need to package as shown 
> in StormWordCountRemoteBySubmitter example, using an additional 
> assembly file.
> (The first examples are for embedded Spout/Bolts with Flink streaming 
> program. -- Maybe we should add some comments in the pom file...)
>
> 3) How do you build? The whole source code? Or only parts of it? Did 
> you run "install"?
>
> -Matthias
>
>
> On 08/11/2015 02:47 PM, huangwei (G) wrote:
> > Hi Stephan and Matthias,
> >
> > Sorry for replying late.
> > I`ve double checked that this class StormSpoutWrapper is really 
> > exist in
> my jar file.
> > And it got the same trouble when I ran the
> flink-storm-compatibililty-example- corresponding word-count-storm.
> > The way I built my Throughput application was just adding it into 
> > the
> flink-storm-compatibililty-example and change some configurations in 
> the word-count-storm.xml.
> > Here is the entire POM file.
> >
> >
> > <?xml version="1.0" encoding="UTF-8"?>
> > <!--
> > Licensed to the Apache Software Foundation (ASF) under one or more 
> > contributor license agreements.  See the NOTICE file distributed 
> > with this work for additional information regarding copyright 
> > ownership.  The ASF licenses this file to you under the Apache 
> > License, Version 2.0 (the "License"); you may not use this file 
> > except in compliance with the License.  You may obtain a copy of the 
> > License at
> >
> >   http://www.apache.org/licenses/LICENSE-2.0
> >
> > Unless required by applicable law or agreed to in writing, software 
> > distributed under the License is distributed on an "AS IS" BASIS, 
> > WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
> > implied.  See the License for the specific language governing 
> > permissions and limitations under the License.
> > -->
> > <project xmlns="http://maven.apache.org/POM/4.0.0"; xmlns:xsi="
> http://www.w3.org/2001/XMLSchema-instance";
> >       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/maven-v4_0_0.xsd";>
> >
> >       <modelVersion>4.0.0</modelVersion>
> >
> >       <parent>
> >               <groupId>org.apache.flink</groupId>
> >               <artifactId>flink-storm-compatibility-parent</artifactId>
> >               <version>0.10-SNAPSHOT</version>
> >               <relativePath>..</relativePath>
> >       </parent>
> >
> >       <artifactId>flink-storm-compatibility-examples</artifactId>
> >       <name>flink-storm-compatibility-examples</name>
> >
> >       <packaging>jar</packaging>
> >
> >       <dependencies>
> >               <dependency>
> >                       <groupId>org.apache.flink</groupId>
> >
>  <artifactId>flink-storm-compatibility-core</artifactId>
> >                       <version>${project.version}</version>
> >               </dependency>
> >
> >               <dependency>
> >                       <groupId>org.apache.flink</groupId>
> >                       <artifactId>flink-java-examples</artifactId>
> >                       <version>${project.version}</version>
> >               </dependency>
> >
> >               <dependency>
> >                       <groupId>org.apache.flink</groupId>
> >                       <artifactId>flink-streaming-core</artifactId>
> >                       <version>${project.version}</version>
> >                       <scope>test</scope>
> >                       <type>test-jar</type>
> >               </dependency>
> >
> >               <dependency>
> >                       <groupId>org.apache.flink</groupId>
> >                       <artifactId>flink-test-utils</artifactId>
> >                       <version>${project.version}</version>
> >                       <scope>test</scope>
> >               </dependency>
> >
> >         <dependency>
> >             <groupId>org.apache.storm</groupId>
> >             <artifactId>storm-core</artifactId>
> >             <version>0.9.4</version>
> >             <!-- keep storm out of the jar-with-dependencies -->
> >             <scope>provided</scope>
> >         </dependency>
> >       </dependencies>
> >
> >       <build>
> >               <plugins>
> >                       <!-- get default data from flink-java-examples
> package -->
> >                       <plugin>
> >                               
> > <groupId>org.apache.maven.plugins</groupId>
> >
>  <artifactId>maven-dependency-plugin</artifactId>
> >
>  <version>2.9</version><!--$NO-MVN-MAN-VER$-->
> >                               <executions>
> >                                       <execution>
> >                                               <id>unpack</id>
> >
>  <phase>prepare-package</phase>
> >                                               <goals>
> >                                                       <goal>unpack</goal>
> >                                               </goals>
> >                                               <configuration>
> >                                                       
> > <artifactItems>
> >
>  <artifactItem>
> >
>  <groupId>org.apache.flink</groupId>
> >
>  <artifactId>flink-java-examples</artifactId>
> >
>  <version>${project.version}</version>
> >
>  <type>jar</type>
> >
>  <overWrite>false</overWrite>
> >
>  <outputDirectory>${project.build.directory}/classes</outputDirectory>
> >
>  
> <includes>org/apache/flink/examples/java/wordcount/util/WordCountData.
> class</includes>
> >
>  </artifactItem>
> >
>  <artifactItem>
> >
>  <groupId>org.apache.flink</groupId>
> >
>  <artifactId>flink-storm-compatibility-core</artifactId>
> >
>  <version>${project.version}</version>
> >
>  <type>jar</type>
> >
>  <overWrite>false</overWrite>
> >
>  <outputDirectory>${project.build.directory}/classes</outputDirectory>
> >
>  </artifactItem>
> >
>  <artifactItem>
> >
>  <groupId>org.apache.storm</groupId>
> >
>  <artifactId>storm-core</artifactId>
> >
>  <version>0.9.4</version>
> >
>  <type>jar</type>
> >
>  <overWrite>false</overWrite>
> >
>  <outputDirectory>${project.build.directory}/classes</outputDirectory>
> >
>  <!--<excludes>defaults.yaml</excludes>-->
> >
>  </artifactItem>
> >
>  <artifactItem>
> >
>  <groupId>com.googlecode.json-simple</groupId>
> >
>  <artifactId>json-simple</artifactId>
> >
>  <version>1.1</version>
> >
>  <type>jar</type>
> >
>  <overWrite>false</overWrite>
> >
>  <outputDirectory>${project.build.directory}/classes</outputDirectory>
> >
>  </artifactItem>snakeyaml
> >
>  <artifactItem>
> >
>  <groupId>org.yaml</groupId>
> >
>  <artifactId>snakeyaml</artifactId>
> >
>  <version>1.11</version>
> >
>  <type>jar</type>
> >
>  <overWrite>false</overWrite>
> >
>  <outputDirectory>${project.build.directory}/classes</outputDirectory>
> >
>  </artifactItem>
> >                                                       </artifactItems>
> >                                               </configuration>
> >                                       </execution>
> >                               </executions>
> >                       </plugin>
> >
> >                       <!-- self-contained jars for each example -->
> >                       <plugin>
> >                               <groupId>org.apache.maven.plugins</groupId>
> >                               
> > <artifactId>maven-jar-plugin</artifactId>
> >
> >                               <executions>
> >
> >                                       <!-- WordCount Spout source-->
> >                                       <execution>
> >
>  <id>WordCount-SpoutSource</id>
> >                                               <phase>package</phase>
> >                                               <goals>
> >                                                       <goal>jar</goal>
> >                                               </goals>
> >                                               <configuration>
> >
>  <classifier>WordCountSpoutSource</classifier>
> >
> >                                                       <archive>
> >
>  <manifestEntries>
> >
>  
> <program-class>org.apache.flink.stormcompatibility.wordcount.SpoutSour
> ceWordCount</program-class>
> >
>  </manifestEntries>
> >                                                       </archive>
> >
> >                                                       <includes>
> >                                                               <!-- 
> > from
> storm-core -->
> >
>  <include>backtype/storm/topology/*.class</include>
> >
>  <include>backtype/storm/spout/*.class</include>
> >
>  <include>backtype/storm/task/*.class</include>
> >
>  <include>backtype/storm/tuple/*.class</include>
> >
>  <include>backtype/storm/generated/*.class</include>
> >
>  <include>backtype/storm/metric/**/*.class</include>
> >
>  <include>org/apache/thrift7/**/*.class</include>
> >                                                               <!--
> Storm's recursive dependencies -->
> >
>  <include>org/json/simple/**/*.class</include>
> >                                                               <!--
> compatibility layer -->
> >
>  <include>org/apache/flink/stormcompatibility/api/*.class</include>
> >
>  
> <include>org/apache/flink/stormcompatibility/wrappers/*.class</include
> >
> >                                                               <!-- 
> > Word
> Count -->
> >
>  
> <include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWord
> Count.class</include>
> >
>  
> <include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWord
> Count$*.class</include>
> >
>  
> <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/
> AbstractStormSpout.class</include>
> >
>  
> <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/
> StormFileSpout.class</include>
> >
>  
> <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/
> StormInMemorySpout.class</include>
> >
>  
> <include>org/apache/flink/examples/java/wordcount/util/WordCountData.c
> lass</include>
> >                                                       </includes>
> >                                               </configuration>
> >                                       </execution>
> >
> >                                       <!-- WordCount Bolt tokenizer-->
> >                                       <execution>
> >
>  <id>WordCount-BoltTokenizer</id>
> >                                               <phase>package</phase>
> >                                               <goals>
> >                                                       <goal>jar</goal>
> >                                               </goals>
> >                                               <configuration>
> >
>  <classifier>WordCountBoltTokenizer</classifier>
> >
> >                                                       <archive>
> >
>  <manifestEntries>
> >
>  
> <program-class>org.apache.flink.stormcompatibility.wordcount.BoltToken
> izerWordCount</program-class>
> >
>  </manifestEntries>
> >                                                       </archive>
> >
> >                                                       <includes>
> >                                                               <!-- 
> > from
> storm-core -->
> >
>  <include>backtype/storm/topology/*.class</include>
> >
>  <include>backtype/storm/spout/*.class</include>
> >
>  <include>backtype/storm/task/*.class</include>
> >
>  <include>backtype/storm/tuple/*.class</include>
> >
>  <include>backtype/storm/generated/*.class</include>
> >
>  <include>backtype/storm/metric/**/*.class</include>
> >
>  <include>org/apache/thrift7/**/*.class</include>
> >                                                               <!--
> Storm's recursive dependencies -->
> >
>  <include>org/json/simple/**/*.class</include>
> >                                                               <!--
> compatibility layer -->
> >
>  <include>org/apache/flink/stormcompatibility/api/*.class</include>
> >
>  
> <include>org/apache/flink/stormcompatibility/wrappers/*.class</include
> >
> >                                                               <!-- 
> > Word
> Count -->
> >
>  
> <include>org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWo
> rdCount.class</include>
> >
>  
> <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/
> StormBoltTokenizer.class</include>
> >
>  
> <include>org/apache/flink/examples/java/wordcount/util/WordCountData.c
> lass</include>
> >                                                       </includes>
> >                                               </configuration>
> >                                       </execution>
> >
> >                     <!-- Throughput -->
> >                     <execution>
> >                         <id>Throughput</id>
> >                         <phase>package</phase>
> >                         <goals>
> >                             <goal>jar</goal>
> >                         </goals>
> >                         <configuration>
> >                             <classifier>Throughput</classifier>
> >
> >                             <archive>
> >                                 <manifestEntries>
> >
>  
> <program-class>org.apache.flink.stormcompatibility.experiments.Through
> put</program-class>
> >                                 </manifestEntries>
> >                             </archive>
> >
> >                             <includes>
> >                                 <!-- from storm-core -->
> >                                 <include>defaults.yaml</include>
> >                                 
> > <include>backtype/storm/*.class</include>
> >
>  <include>backtype/storm/serialization/*.class</include>
> >
>  <include>backtype/storm/topology/*.class</include>
> >
>  <include>backtype/storm/topology/base/*.class</include>
> >
>  <include>backtype/storm/utils/*.class</include>
> >
>  <include>backtype/storm/spout/*.class</include>
> >
>  <include>backtype/storm/task/*.class</include>
> >
>  <include>backtype/storm/tuple/*.class</include>
> >
>  <include>backtype/storm/generated/*.class</include>
> >
>  <include>backtype/storm/metric/**/*.class</include>
> >
>  <include>org/apache/storm/curator/*.class</include>
> >
>  <include>org/apache/thrift7/**/*.class</include>
> >
>  <!--<include>org/yaml/snakeyaml/constructor/*.class</include>-->
> >
>  <include>org/yaml/snakeyaml/**/*.class</include>
> >                                 <!-- Storm's recursive dependencies 
> > -->
> >
>  <include>org/json/simple/**/*.class</include>
> >                                 <!-- compatibility layer -->
> >
>  <include>org/apache/flink/stormcompatibility/api/*.class</include>
> >
>  
> <include>org/apache/flink/stormcompatibility/wrappers/*.class</include
> >
> >                                 <!-- Word Count -->
> >
>  
> <include>org/apache/flink/stormcompatibility/experiments/Throughput.cl
> ass</include>
> >
>  
> <include>org/apache/flink/stormcompatibility/experiments/Throughput$*.
> class</include>
> >                             </includes>
> >                         </configuration>
> >                     </execution>
> >
> >                                       <execution>
> >                                               <goals>
> >
>  <goal>test-jar</goal>
> >                                               </goals>
> >                                       </execution>
> >                               </executions>
> >                       </plugin>
> >
> >                       <!-- WordCount Storm topology-->
> >                       <!-- Cannot use maven-jar-plugin because
> 'defaults.yaml' must be included in jar -->
> >                       <plugin>
> >
>  <artifactId>maven-assembly-plugin</artifactId>
> >                               <configuration>
> >                                       <descriptors>
> >
>  <descriptor>src/assembly/word-count-storm.xml</descriptor>
> >                                       </descriptors>
> >                                       <archive>
> >                                               <manifestEntries>
> >
>  
> <program-class>org.apache.flink.stormcompatibility.wordcount.StormWord
> CountRemoteBySubmitter</program-class>
> >                                               </manifestEntries>
> >                                       </archive>
> >                               </configuration>
> >
> >                               <executions>
> >                                       <execution>
> >                                               <id>WordCountStorm</id>
> >                                               <phase>package</phase>
> >                                               <goals>
> >                                                       <goal>single</goal>
> >                                               </goals>
> >                                       </execution>
> >                               </executions>
> >                       </plugin>
> >               </plugins>
> >
> >               <pluginManagement>
> >                       <plugins>
> >                               <!--This plugin's configuration is 
> > used to
> store Eclipse m2e settings only. It has no influence on the Maven 
> build itself.-->
> >                               <plugin>
> >                                       
> > <groupId>org.eclipse.m2e</groupId>
> >
>  <artifactId>lifecycle-mapping</artifactId>
> >                                       <version>1.0.0</version>
> >                                       <configuration>
> >                                               <lifecycleMappingMetadata>
> >                                                       
> > <pluginExecutions>
> >
>  <pluginExecution>
> >
>  <pluginExecutionFilter>
> >
>      <groupId>org.apache.maven.plugins</groupId>
> >
>      <artifactId>maven-dependency-plugin</artifactId>
> >
>      <versionRange>[2.9,)</versionRange>
> >
>      <goals>
> >
>              <goal>unpack</goal>
> >
>      </goals>
> >
>  </pluginExecutionFilter>
> >
>  <action>
> >
>      <ignore/>
> >
>  </action>
> >
>  </pluginExecution>
> >                                                       </pluginExecutions>
> >                                               </lifecycleMappingMetadata>
> >                                       </configuration>
> >                               </plugin>
> >                       </plugins>
> >               </pluginManagement>
> >
> >       </build>
> >
> > </project>
> >
> > -----邮件原件-----
> > 发件人: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] 代表 Stephan
> Ewen
> > 发送时间: 2015年8月11日 1:55
> > 收件人: dev@flink.apache.org
> > 主题: Re: Some problems about Flink applications
> >
> > It would also help if you could paste the entire POM file of your
> project.
> > Maybe something is amiss with the dependencies, or the scopes?
> >
> > On Sat, Aug 8, 2015 at 7:27 PM, Matthias J. Sax <
> mj...@informatik.hu-berlin.de> wrote:
> >
> >> Hi Huang,
> >>
> >> about Storm compatibility. Did you double check, that the file that 
> >> is missing (StormSpoutWrapper) is contained in your jar. Looking at 
> >> pom.xml does not help here, because if you specify to include a 
> >> file, but maven cannot find it, it will just not add it to the jar, 
> >> but build will succeed. Thus, you need to check the jar file itself 
> >> via
> command line:
> >>     unzip -l myjarfile.jar
> >> or
> >>     unzip -l myjarfile.jar | grep file-I-am-looking-for
> >>
> >> I guess your jar is not build correctly, ie, the file is not there...
> >>
> >> Did you have a look into pom.xml for 
> >> flink-storm-compatibililty-example
> >> and the corresponding word-count-storm.xml? This shows how to build 
> >> a jar correctly (it was recently fixed, so make sure you update to 
> >> the latest master)
> >>
> >> You can also have a look here how to package jars correctly (even 
> >> if this example is about Flink ML):
> >>
> >> https://stackoverflow.com/questions/31661900/maven-build-has-missin
> >> g-p
> >> ackage-linkages/31662642#31662642
> >>
> >> -Matthias
> >>
> >> On 08/08/2015 11:15 AM, huangwei (G) wrote:
> >>> Hi,
> >>> I get some trouble in developing Flink applications.
> >>>
> >>> 1.
> >>> I want to test the performance between Storm and
> >> flink-storm-compatibility using the test program:
> >> https://github.com/project-flink/flink-perf/blob/master/storm-jobs/
> >> src
> >> /jvm/experiments/Throughput.java
> >> .
> >>> And there is a bit of my changes with this Throughput.java below:
> >>>
> >>>
> >>>
> >>> public static void main(String[] args) throws Exception {
> >>>                    ParameterTool pt = 
> >>> ParameterTool.fromArgs(args);
> >>>
> >>>                    int par = pt.getInt("para");
> >>>
> >>>                    final FlinkTopologyBuilder builder = new
> >> FlinkTopologyBuilder();
> >>>
> >>>                    builder.setSpout("source0", new Generator(pt),
> >> pt.getInt("sourceParallelism"));
> >>>
> >>>                    int i = 0;
> >>>                    for (; i < pt.getInt("repartitions", 1) - 1; i++) {
> >>>                             System.out.println("adding source" + i + "
> >> --> source" + (i + 1));
> >>>                             builder.setBolt("source" + (i + 1), 
> >>> new
> >> RepartPassThroughBolt(pt), pt.getInt("sinkParallelism"))
> >>>                                                .fieldsGrouping("source"
> >> + i, new Fields("id"));
> >>>                    }
> >>>
> >>>                    System.out.println("adding final source" + i + "
> >>> -->
> >> sink");
> >>>
> >>>                    builder.setBolt("sink", new Sink(pt),
> >> pt.getInt("sinkParallelism")).fieldsGrouping("source" + i, new 
> >> Fields("id"));
> >>>
> >>>                    Config conf = new Config();
> >>>                    conf.setDebug(false); //System.exit(1);
> >>>
> >>>                    // execute program locally
> >>>                    final FlinkLocalCluster cluster =
> >> FlinkLocalCluster.getLocalCluster();
> >>>                    cluster.submitTopology("throughput", null,
> >> builder.createTopology());
> >>>
> >>>                    Utils.sleep(10 * 1000);
> >>>
> >>>                    // TODO kill does no do anything so far
> >>>                    cluster.killTopology("throughput");
> >>>                    cluster.shutdown();
> >>>          }
> >>>
> >>>
> >>> This program will run well in IDEA with flink-storm-compatibility.
> >>> However, when I packaged it into a jar file and run on the
> >> flink-0.10SNAPSHOT there is a problem in flink-client log file:
> >>>
> >>> java.lang.Exception: Call to registerInputOutput() of invokable failed
> >>>          at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:521)
> >>>          at java.lang.Thread.run(Thread.java:745)
> >>> Caused by:
> org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> >> Cannot instantiate user function.
> >>>          at
> >> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator
> >> (St
> >> reamConfig.java:187)
> >>>          at
> >> org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOu
> >> tpu
> >> t(StreamTask.java:90)
> >>>          at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:518)
> >>>          ... 1 more
> >>> Caused by: java.lang.ClassNotFoundException:
> >> org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper
> >>>          at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> >>>          at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> >>>          at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> >>>          at java.lang.Class.forName0(Native Method)
> >>>          at java.lang.Class.forName(Class.java:348)
> >>>          at
> >> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStrea
> >> m.r
> >> esolveClass(InstantiationUtil.java:71)
> >>>          at
> >> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1
> >> 613
> >> )
> >>>          at
> >> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518
> >> )
> >>>          at
> >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java
> >> :17
> >> 74)
> >>>          at
> >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> >>>          at
> >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> >> 199
> >> 3)
> >>>          at
> >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:191
> >> 8)
> >>>          at
> >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java
> >> :18
> >> 01)
> >>>          at
> >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> >>>          at
> >> java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> >>>          at
> >> org.apache.flink.util.InstantiationUtil.deserializeObject(Instantia
> >> tio
> >> nUtil.java:302)
> >>>          at
> >> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(Instan
> >> tia
> >> tionUtil.java:264)
> >>>          at
> >> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator
> >> (St
> >> reamConfig.java:185)
> >>>          ... 3 more
> >>>
> >>>
> >>> And this class(StormSpoutWrapper) was exist in my packaged jar file.
> >>> As you can see part of my pom.xml:
> >>>
> >>> <!-- Throughput -->
> >>>                     <execution>
> >>>                         <id>Throughput</id>
> >>>                         <phase>package</phase>
> >>>                         <goals>
> >>>                             <goal>jar</goal>
> >>>                         </goals>
> >>>                         <configuration>
> >>>                             <classifier>Throughput</classifier>
> >>>
> >>>                             <archive>
> >>>                                 <manifestEntries>
> >>>
> >>
> >> <program-class>org.apache.flink.stormcompatibility.experiments.Thro
> >> ugh
> >> put</program-class>
> >>>                                 </manifestEntries>
> >>>                             </archive>
> >>>
> >>>                             <includes>
> >>>                                 <!-- from storm-core -->
> >>>                                 <include>defaults.yaml</include>
> >>>
> >>> <include>backtype/storm/*.class</include>
> >>>
> >>  <include>backtype/storm/serialization/*.class</include>
> >>>
> >>  <include>backtype/storm/topology/*.class</include>
> >>>
> >>  <include>backtype/storm/topology/base/*.class</include>
> >>>
> >>  <include>backtype/storm/utils/*.class</include>
> >>>
> >>  <include>backtype/storm/spout/*.class</include>
> >>>
> >>  <include>backtype/storm/task/*.class</include>
> >>>
> >>  <include>backtype/storm/tuple/*.class</include>
> >>>
> >>  <include>backtype/storm/generated/*.class</include>
> >>>
> >>  <include>backtype/storm/metric/**/*.class</include>
> >>>
> >>  <include>org/apache/storm/curator/*.class</include>
> >>>
> >>  <include>org/apache/thrift7/**/*.class</include>
> >>>
> >>  <!--<include>org/yaml/snakeyaml/constructor/*.class</include>-->
> >>>
> >>  <include>org/yaml/snakeyaml/**/*.class</include>
> >>>                                 <!-- Storm's recursive 
> >>> dependencies
> >>> -->
> >>>
> >>  <include>org/json/simple/**/*.class</include>
> >>>                                 <!-- compatibility layer -->
> >>>
> >>  <include>org/apache/flink/stormcompatibility/api/*.class</include>
> >>>
> >>
> >> <include>org/apache/flink/stormcompatibility/wrappers/*.class</incl
> >> ude
> >>>
> >>>                                 <!-- Word Count -->
> >>>
> >>
> >> <include>org/apache/flink/stormcompatibility/experiments/Throughput
> >> .cl
> >> ass</include>
> >>>
> >>
> >> <include>org/apache/flink/stormcompatibility/experiments/Throughput$*.
> >> class</include>
> >>>                             </includes>
> >>>                         </configuration>
> >>>                     </execution>
> >>>
> >>> So how can I fix it?
> >>>
> >>>
> >>> 2.
> >>> There is a case following using operator join:
> >>>
> >>> DataStream<Tuple3<String, Integer, Long>> user = env.addSource(new
> >> sourceUserFunction());
> >>> DataStream<Tuple2<String, Integer>> area = env.addSource(new
> >> sourceAreaFunction());
> >>>
> >>> DataStream<Tuple2<String, Integer>> sink = user
> >>>       .join(area)
> >>>       .onWindow(15, TimeUnit.MINUTES)
> >>>       .where(0)
> >>>       .equalTo(0)
> >>>       .with(new JoinFunction<Tuple3<String, Integer, Long>,
> >> Tuple2<String, Integer>, Tuple2<String, Integer>>() {
> >>>          @Override
> >>>          public Tuple2<String, Integer> join(Tuple3<String, 
> >>> Integer,
> >> Long> first, Tuple2<String, Integer> second) throws Exception {
> >>>             if (first.f1 + second.f1 > 10){
> >>>                return new Tuple2<String, Integer>(first.f0, 
> >>> first.f1
> >>> +
> >> second.f1);
> >>>             }
> >>>             return null;
> >>>          }
> >>>       });
> >>>
> >>> As you see, I don`t want to return null when the condition is not
> >> satisfied.
> >>> But there is not any JoinFunction with Collector.
> >>> I found a FlatJoinFunction which allows the Collector.
> >>> However, the FlatJoinFunction seem only can be used in DataSet 
> >>> instead
> >> DataStream.
> >>> Is there any other way to improve this case?
> >>>
> >>> PS. I`m sorry about this email. You may ignore me during the weekend.
> >>>
> >>> Greetings,
> >>> Huang Wei
> >>> 华为技术有限公司 Huawei Technologies Co., Ltd.
> >>>
> >>>
> >>> Tel:+86 18106512602
> >>> Email:huangwei...@huawei.com
> >>>
> >>
> >>
> >
>
>

Reply via email to