Hi Chesnay,

Sorry for reply late.
It works!
Thank you!

Greetings Huangwei

=========================================================

How about something like this:

DataStream<Tuple2<String, Integer>> sink = user
        .join(area)
        .onWindow(15, TimeUnit.MINUTES)
        .where(0)
        .equalTo(0)
        .flatMap(new FlatMapFunction<Tuple2<Tuple3<String, Integer, 
Long>,Tuple2<String, Integer>>, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(Tuple2<Tuple3<String, Integer, 
Long>,Tuple2<String, Integer>> value, Collector<Tuple2<String, Integer>> c) 
throws Exception {
                        if (value.f0.f1 + value.f1.f1 > 10){
                                c.collect(new Tuple2<String, 
Integer>(value.f0.f0, value.f0.f1 + value.f1.f1));
                        }
                }
        });


On 13.08.2015 15:24, huangwei (G) wrote:
> Hi Stephan,
>
> Thank you for the information about the 
> https://github.com/apache/flink/pull/1008.
> I`ll try again.
> Otherwise, is there any suggestion about the original second problem:
>
>>>>> 2.
>>>>> There is a case following using operator join:
>>>>>
>>>>> DataStream<Tuple3<String, Integer, Long>> user = env.addSource(new
>>>> sourceUserFunction());
>>>>> DataStream<Tuple2<String, Integer>> area = env.addSource(new
>>>> sourceAreaFunction());
>>>>> DataStream<Tuple2<String, Integer>> sink = user
>>>>>        .join(area)
>>>>>        .onWindow(15, TimeUnit.MINUTES)
>>>>>        .where(0)
>>>>>        .equalTo(0)
>>>>>        .with(new JoinFunction<Tuple3<String, Integer, Long>,
>>>> Tuple2<String, Integer>, Tuple2<String, Integer>>() {
>>>>>           @Override
>>>>>           public Tuple2<String, Integer> join(Tuple3<String, 
>>>>> Integer,
>>>> Long> first, Tuple2<String, Integer> second) throws Exception {
>>>>>              if (first.f1 + second.f1 > 10){
>>>>>                 return new Tuple2<String, Integer>(first.f0,
>>>>> first.f1
>>>>> +
>>>> second.f1);
>>>>>              }
>>>>>              return null;
>>>>>           }
>>>>>        });
>>>>>
>>>>> As you see, I don`t want to return null when the condition is not
>>>> satisfied.
>>>>> But there is not any JoinFunction with Collector.
>>>>> I found a FlatJoinFunction which allows the Collector.
>>>>> However, the FlatJoinFunction seem only can be used in DataSet 
>>>>> instead
>>>> DataStream.
>>>>> Is there any other way to improve this case?
>
> Greetings,
> Huang Wei
>
> ----------------------
> early emails(Stephan wrote):
>
> We are seeing these class loader issues a lot as of late.
>
> Seems that packaging the classes is trickier than anticipated.
>
> Here is a pull request to add some diagnostics info on a
> "ClassNotFoundException": https://github.com/apache/flink/pull/1008
>
> On Tue, Aug 11, 2015 at 3:29 PM, Matthias J. Sax < 
> mj...@informatik.hu-berlin.de> wrote:
>
>> Three comments
>>
>> 1) If StormSpoutWrapper is in your jar, is it located in the correct 
>> directory (must be same as package name)?
>>
>> 2) If you are using FlinkTopologyBuilder, you need to package as 
>> shown in StormWordCountRemoteBySubmitter example, using an additional 
>> assembly file.
>> (The first examples are for embedded Spout/Bolts with Flink streaming 
>> program. -- Maybe we should add some comments in the pom file...)
>>
>> 3) How do you build? The whole source code? Or only parts of it? Did 
>> you run "install"?
>>
>> -Matthias
>>
>>
>> On 08/11/2015 02:47 PM, huangwei (G) wrote:
>>> Hi Stephan and Matthias,
>>>
>>> Sorry for replying late.
>>> I`ve double checked that this class StormSpoutWrapper is really 
>>> exist in
>> my jar file.
>>> And it got the same trouble when I ran the
>> flink-storm-compatibililty-example- corresponding word-count-storm.
>>> The way I built my Throughput application was just adding it into 
>>> the
>> flink-storm-compatibililty-example and change some configurations in 
>> the word-count-storm.xml.
>>> Here is the entire POM file.
>>>
>>>
>>> <?xml version="1.0" encoding="UTF-8"?>
>>> <!--
>>> Licensed to the Apache Software Foundation (ASF) under one or more 
>>> contributor license agreements.  See the NOTICE file distributed 
>>> with this work for additional information regarding copyright 
>>> ownership.  The ASF licenses this file to you under the Apache 
>>> License, Version 2.0 (the "License"); you may not use this file 
>>> except in compliance with the License.  You may obtain a copy of the 
>>> License at
>>>
>>>    http://www.apache.org/licenses/LICENSE-2.0
>>>
>>> Unless required by applicable law or agreed to in writing, software 
>>> distributed under the License is distributed on an "AS IS" BASIS, 
>>> WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
>>> implied.  See the License for the specific language governing 
>>> permissions and limitations under the License.
>>> -->
>>> <project xmlns="http://maven.apache.org/POM/4.0.0"; xmlns:xsi="
>> http://www.w3.org/2001/XMLSchema-instance";
>>>        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
>> http://maven.apache.org/maven-v4_0_0.xsd";>
>>>        <modelVersion>4.0.0</modelVersion>
>>>
>>>        <parent>
>>>                <groupId>org.apache.flink</groupId>
>>>                <artifactId>flink-storm-compatibility-parent</artifactId>
>>>                <version>0.10-SNAPSHOT</version>
>>>                <relativePath>..</relativePath>
>>>        </parent>
>>>
>>>        <artifactId>flink-storm-compatibility-examples</artifactId>
>>>        <name>flink-storm-compatibility-examples</name>
>>>
>>>        <packaging>jar</packaging>
>>>
>>>        <dependencies>
>>>                <dependency>
>>>                        <groupId>org.apache.flink</groupId>
>>>
>>   <artifactId>flink-storm-compatibility-core</artifactId>
>>>                        <version>${project.version}</version>
>>>                </dependency>
>>>
>>>                <dependency>
>>>                        <groupId>org.apache.flink</groupId>
>>>                        <artifactId>flink-java-examples</artifactId>
>>>                        <version>${project.version}</version>
>>>                </dependency>
>>>
>>>                <dependency>
>>>                        <groupId>org.apache.flink</groupId>
>>>                        <artifactId>flink-streaming-core</artifactId>
>>>                        <version>${project.version}</version>
>>>                        <scope>test</scope>
>>>                        <type>test-jar</type>
>>>                </dependency>
>>>
>>>                <dependency>
>>>                        <groupId>org.apache.flink</groupId>
>>>                        <artifactId>flink-test-utils</artifactId>
>>>                        <version>${project.version}</version>
>>>                        <scope>test</scope>
>>>                </dependency>
>>>
>>>          <dependency>
>>>              <groupId>org.apache.storm</groupId>
>>>              <artifactId>storm-core</artifactId>
>>>              <version>0.9.4</version>
>>>              <!-- keep storm out of the jar-with-dependencies -->
>>>              <scope>provided</scope>
>>>          </dependency>
>>>        </dependencies>
>>>
>>>        <build>
>>>                <plugins>
>>>                        <!-- get default data from 
>>> flink-java-examples
>> package -->
>>>                        <plugin>
>>>                                
>>> <groupId>org.apache.maven.plugins</groupId>
>>>
>>   <artifactId>maven-dependency-plugin</artifactId>
>>   <version>2.9</version><!--$NO-MVN-MAN-VER$-->
>>>                                <executions>
>>>                                        <execution>
>>>                                                <id>unpack</id>
>>>
>>   <phase>prepare-package</phase>
>>>                                                <goals>
>>>                                                        <goal>unpack</goal>
>>>                                                </goals>
>>>                                                <configuration>
>>>                                                        
>>> <artifactItems>
>>>
>>   <artifactItem>
>>   <groupId>org.apache.flink</groupId>
>>   <artifactId>flink-java-examples</artifactId>
>>   <version>${project.version}</version>
>>   <type>jar</type>
>>   <overWrite>false</overWrite>
>>   
>> <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>   
>> <includes>org/apache/flink/examples/java/wordcount/util/WordCountData.
>> class</includes>
>>   </artifactItem>
>>   <artifactItem>
>>   <groupId>org.apache.flink</groupId>
>>   <artifactId>flink-storm-compatibility-core</artifactId>
>>   <version>${project.version}</version>
>>   <type>jar</type>
>>   <overWrite>false</overWrite>
>>   <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>   </artifactItem>
>>   <artifactItem>
>>   <groupId>org.apache.storm</groupId>
>>   <artifactId>storm-core</artifactId>
>>   <version>0.9.4</version>
>>   <type>jar</type>
>>   <overWrite>false</overWrite>
>>   <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>   <!--<excludes>defaults.yaml</excludes>-->
>>   </artifactItem>
>>   <artifactItem>
>>   <groupId>com.googlecode.json-simple</groupId>
>>   <artifactId>json-simple</artifactId>
>>   <version>1.1</version>
>>   <type>jar</type>
>>   <overWrite>false</overWrite>
>>   <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>   </artifactItem>snakeyaml
>>   <artifactItem>
>>   <groupId>org.yaml</groupId>
>>   <artifactId>snakeyaml</artifactId>
>>   <version>1.11</version>
>>   <type>jar</type>
>>   <overWrite>false</overWrite>
>>   <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>   </artifactItem>
>>>                                                        </artifactItems>
>>>                                                </configuration>
>>>                                        </execution>
>>>                                </executions>
>>>                        </plugin>
>>>
>>>                        <!-- self-contained jars for each example -->
>>>                        <plugin>
>>>                                
>>> <groupId>org.apache.maven.plugins</groupId>
>>>                                
>>> <artifactId>maven-jar-plugin</artifactId>
>>>
>>>                                <executions>
>>>
>>>                                        <!-- WordCount Spout source-->
>>>                                        <execution>
>>>
>>   <id>WordCount-SpoutSource</id>
>>>                                                <phase>package</phase>
>>>                                                <goals>
>>>                                                        <goal>jar</goal>
>>>                                                </goals>
>>>                                                <configuration>
>>>
>>   <classifier>WordCountSpoutSource</classifier>
>>>                                                        <archive>
>>>
>>   <manifestEntries>
>>   
>> <program-class>org.apache.flink.stormcompatibility.wordcount.SpoutSou
>> r
>> ceWordCount</program-class>
>>   </manifestEntries>
>>>                                                        </archive>
>>>
>>>                                                        <includes>
>>>                                                                <!-- 
>>> from
>> storm-core -->
>>   <include>backtype/storm/topology/*.class</include>
>>   <include>backtype/storm/spout/*.class</include>
>>   <include>backtype/storm/task/*.class</include>
>>   <include>backtype/storm/tuple/*.class</include>
>>   <include>backtype/storm/generated/*.class</include>
>>   <include>backtype/storm/metric/**/*.class</include>
>>   <include>org/apache/thrift7/**/*.class</include>
>>>                                                                <!--
>> Storm's recursive dependencies -->
>>   <include>org/json/simple/**/*.class</include>
>>>                                                                <!--
>> compatibility layer -->
>>   <include>org/apache/flink/stormcompatibility/api/*.class</include>
>>   
>> <include>org/apache/flink/stormcompatibility/wrappers/*.class</includ
>> e
>>>                                                                <!-- 
>>> Word
>> Count -->
>>   
>> <include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWor
>> d
>> Count.class</include>
>>   
>> <include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWor
>> d
>> Count$*.class</include>
>>   
>> <include>org/apache/flink/stormcompatibility/wordcount/stormoperators
>> /
>> AbstractStormSpout.class</include>
>>   
>> <include>org/apache/flink/stormcompatibility/wordcount/stormoperators
>> /
>> StormFileSpout.class</include>
>>   
>> <include>org/apache/flink/stormcompatibility/wordcount/stormoperators
>> /
>> StormInMemorySpout.class</include>
>>   
>> <include>org/apache/flink/examples/java/wordcount/util/WordCountData.
>> c
>> lass</include>
>>>                                                        </includes>
>>>                                                </configuration>
>>>                                        </execution>
>>>
>>>                                        <!-- WordCount Bolt tokenizer-->
>>>                                        <execution>
>>>
>>   <id>WordCount-BoltTokenizer</id>
>>>                                                <phase>package</phase>
>>>                                                <goals>
>>>                                                        <goal>jar</goal>
>>>                                                </goals>
>>>                                                <configuration>
>>>
>>   <classifier>WordCountBoltTokenizer</classifier>
>>>                                                        <archive>
>>>
>>   <manifestEntries>
>>   
>> <program-class>org.apache.flink.stormcompatibility.wordcount.BoltToke
>> n
>> izerWordCount</program-class>
>>   </manifestEntries>
>>>                                                        </archive>
>>>
>>>                                                        <includes>
>>>                                                                <!-- 
>>> from
>> storm-core -->
>>   <include>backtype/storm/topology/*.class</include>
>>   <include>backtype/storm/spout/*.class</include>
>>   <include>backtype/storm/task/*.class</include>
>>   <include>backtype/storm/tuple/*.class</include>
>>   <include>backtype/storm/generated/*.class</include>
>>   <include>backtype/storm/metric/**/*.class</include>
>>   <include>org/apache/thrift7/**/*.class</include>
>>>                                                                <!--
>> Storm's recursive dependencies -->
>>   <include>org/json/simple/**/*.class</include>
>>>                                                                <!--
>> compatibility layer -->
>>   <include>org/apache/flink/stormcompatibility/api/*.class</include>
>>   
>> <include>org/apache/flink/stormcompatibility/wrappers/*.class</includ
>> e
>>>                                                                <!-- 
>>> Word
>> Count -->
>>   
>> <include>org/apache/flink/stormcompatibility/wordcount/BoltTokenizerW
>> o
>> rdCount.class</include>
>>   
>> <include>org/apache/flink/stormcompatibility/wordcount/stormoperators
>> /
>> StormBoltTokenizer.class</include>
>>   
>> <include>org/apache/flink/examples/java/wordcount/util/WordCountData.
>> c
>> lass</include>
>>>                                                        </includes>
>>>                                                </configuration>
>>>                                        </execution>
>>>
>>>                      <!-- Throughput -->
>>>                      <execution>
>>>                          <id>Throughput</id>
>>>                          <phase>package</phase>
>>>                          <goals>
>>>                              <goal>jar</goal>
>>>                          </goals>
>>>                          <configuration>
>>>                              <classifier>Throughput</classifier>
>>>
>>>                              <archive>
>>>                                  <manifestEntries>
>>>
>>   
>> <program-class>org.apache.flink.stormcompatibility.experiments.Throug
>> h
>> put</program-class>
>>>                                  </manifestEntries>
>>>                              </archive>
>>>
>>>                              <includes>
>>>                                  <!-- from storm-core -->
>>>                                  <include>defaults.yaml</include>
>>>                                  
>>> <include>backtype/storm/*.class</include>
>>>
>>   <include>backtype/storm/serialization/*.class</include>
>>   <include>backtype/storm/topology/*.class</include>
>>   <include>backtype/storm/topology/base/*.class</include>
>>   <include>backtype/storm/utils/*.class</include>
>>   <include>backtype/storm/spout/*.class</include>
>>   <include>backtype/storm/task/*.class</include>
>>   <include>backtype/storm/tuple/*.class</include>
>>   <include>backtype/storm/generated/*.class</include>
>>   <include>backtype/storm/metric/**/*.class</include>
>>   <include>org/apache/storm/curator/*.class</include>
>>   <include>org/apache/thrift7/**/*.class</include>
>>   <!--<include>org/yaml/snakeyaml/constructor/*.class</include>-->
>>   <include>org/yaml/snakeyaml/**/*.class</include>
>>>                                  <!-- Storm's recursive dependencies
>>> -->
>>>
>>   <include>org/json/simple/**/*.class</include>
>>>                                  <!-- compatibility layer -->
>>>
>>   <include>org/apache/flink/stormcompatibility/api/*.class</include>
>>   
>> <include>org/apache/flink/stormcompatibility/wrappers/*.class</includ
>> e
>>>                                  <!-- Word Count -->
>>>
>>   
>> <include>org/apache/flink/stormcompatibility/experiments/Throughput.c
>> l
>> ass</include>
>>   
>> <include>org/apache/flink/stormcompatibility/experiments/Throughput$*.
>> class</include>
>>>                              </includes>
>>>                          </configuration>
>>>                      </execution>
>>>
>>>                                        <execution>
>>>                                                <goals>
>>>
>>   <goal>test-jar</goal>
>>>                                                </goals>
>>>                                        </execution>
>>>                                </executions>
>>>                        </plugin>
>>>
>>>                        <!-- WordCount Storm topology-->
>>>                        <!-- Cannot use maven-jar-plugin because
>> 'defaults.yaml' must be included in jar -->
>>>                        <plugin>
>>>
>>   <artifactId>maven-assembly-plugin</artifactId>
>>>                                <configuration>
>>>                                        <descriptors>
>>>
>>   <descriptor>src/assembly/word-count-storm.xml</descriptor>
>>>                                        </descriptors>
>>>                                        <archive>
>>>                                                <manifestEntries>
>>>
>>   
>> <program-class>org.apache.flink.stormcompatibility.wordcount.StormWor
>> d CountRemoteBySubmitter</program-class>
>>>                                                </manifestEntries>
>>>                                        </archive>
>>>                                </configuration>
>>>
>>>                                <executions>
>>>                                        <execution>
>>>                                                <id>WordCountStorm</id>
>>>                                                <phase>package</phase>
>>>                                                <goals>
>>>                                                        <goal>single</goal>
>>>                                                </goals>
>>>                                        </execution>
>>>                                </executions>
>>>                        </plugin>
>>>                </plugins>
>>>
>>>                <pluginManagement>
>>>                        <plugins>
>>>                                <!--This plugin's configuration is 
>>> used to
>> store Eclipse m2e settings only. It has no influence on the Maven 
>> build itself.-->
>>>                                <plugin>
>>>                                        
>>> <groupId>org.eclipse.m2e</groupId>
>>>
>>   <artifactId>lifecycle-mapping</artifactId>
>>>                                        <version>1.0.0</version>
>>>                                        <configuration>
>>>                                                
>>> <lifecycleMappingMetadata>
>>>                                                        
>>> <pluginExecutions>
>>>
>>   <pluginExecution>
>>   <pluginExecutionFilter>
>>       <groupId>org.apache.maven.plugins</groupId>
>>       <artifactId>maven-dependency-plugin</artifactId>
>>       <versionRange>[2.9,)</versionRange>
>>       <goals>
>>               <goal>unpack</goal>
>>       </goals>
>>   </pluginExecutionFilter>
>>   <action>
>>       <ignore/>
>>   </action>
>>   </pluginExecution>
>>>                                                        </pluginExecutions>
>>>                                                </lifecycleMappingMetadata>
>>>                                        </configuration>
>>>                                </plugin>
>>>                        </plugins>
>>>                </pluginManagement>
>>>
>>>        </build>
>>>
>>> </project>
>>>
>>> -----邮件原件-----
>>> 发件人: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] 代表 Stephan
>> Ewen
>>> 发送时间: 2015年8月11日 1:55
>>> 收件人: dev@flink.apache.org
>>> 主题: Re: Some problems about Flink applications
>>>
>>> It would also help if you could paste the entire POM file of your
>> project.
>>> Maybe something is amiss with the dependencies, or the scopes?
>>>
>>> On Sat, Aug 8, 2015 at 7:27 PM, Matthias J. Sax <
>> mj...@informatik.hu-berlin.de> wrote:
>>>> Hi Huang,
>>>>
>>>> about Storm compatibility. Did you double check, that the file that 
>>>> is missing (StormSpoutWrapper) is contained in your jar. Looking at 
>>>> pom.xml does not help here, because if you specify to include a 
>>>> file, but maven cannot find it, it will just not add it to the jar, 
>>>> but build will succeed. Thus, you need to check the jar file itself 
>>>> via
>> command line:
>>>>      unzip -l myjarfile.jar
>>>> or
>>>>      unzip -l myjarfile.jar | grep file-I-am-looking-for
>>>>
>>>> I guess your jar is not build correctly, ie, the file is not there...
>>>>
>>>> Did you have a look into pom.xml for 
>>>> flink-storm-compatibililty-example
>>>> and the corresponding word-count-storm.xml? This shows how to build 
>>>> a jar correctly (it was recently fixed, so make sure you update to 
>>>> the latest master)
>>>>
>>>> You can also have a look here how to package jars correctly (even 
>>>> if this example is about Flink ML):
>>>>
>>>> https://stackoverflow.com/questions/31661900/maven-build-has-missin
>>>> g-p
>>>> ackage-linkages/31662642#31662642
>>>>
>>>> -Matthias
>>>>
>>>> On 08/08/2015 11:15 AM, huangwei (G) wrote:
>>>>> Hi,
>>>>> I get some trouble in developing Flink applications.
>>>>>
>>>>> 1.
>>>>> I want to test the performance between Storm and
>>>> flink-storm-compatibility using the test program:
>>>> https://github.com/project-flink/flink-perf/blob/master/storm-jobs/
>>>> src
>>>> /jvm/experiments/Throughput.java
>>>> .
>>>>> And there is a bit of my changes with this Throughput.java below:
>>>>>
>>>>>
>>>>>
>>>>> public static void main(String[] args) throws Exception {
>>>>>                     ParameterTool pt = 
>>>>> ParameterTool.fromArgs(args);
>>>>>
>>>>>                     int par = pt.getInt("para");
>>>>>
>>>>>                     final FlinkTopologyBuilder builder = new
>>>> FlinkTopologyBuilder();
>>>>>                     builder.setSpout("source0", new Generator(pt),
>>>> pt.getInt("sourceParallelism"));
>>>>>                     int i = 0;
>>>>>                     for (; i < pt.getInt("repartitions", 1) - 1; i++) {
>>>>>                              System.out.println("adding source" + i + "
>>>> --> source" + (i + 1));
>>>>>                              builder.setBolt("source" + (i + 1), 
>>>>> new
>>>> RepartPassThroughBolt(pt), pt.getInt("sinkParallelism"))
>>>>>                                                 .fieldsGrouping("source"
>>>> + i, new Fields("id"));
>>>>>                     }
>>>>>
>>>>>                     System.out.println("adding final source" + i + "
>>>>> -->
>>>> sink");
>>>>>                     builder.setBolt("sink", new Sink(pt),
>>>> pt.getInt("sinkParallelism")).fieldsGrouping("source" + i, new 
>>>> Fields("id"));
>>>>>                     Config conf = new Config();
>>>>>                     conf.setDebug(false); //System.exit(1);
>>>>>
>>>>>                     // execute program locally
>>>>>                     final FlinkLocalCluster cluster =
>>>> FlinkLocalCluster.getLocalCluster();
>>>>>                     cluster.submitTopology("throughput", null,
>>>> builder.createTopology());
>>>>>                     Utils.sleep(10 * 1000);
>>>>>
>>>>>                     // TODO kill does no do anything so far
>>>>>                     cluster.killTopology("throughput");
>>>>>                     cluster.shutdown();
>>>>>           }
>>>>>
>>>>>
>>>>> This program will run well in IDEA with flink-storm-compatibility.
>>>>> However, when I packaged it into a jar file and run on the
>>>> flink-0.10SNAPSHOT there is a problem in flink-client log file:
>>>>> java.lang.Exception: Call to registerInputOutput() of invokable failed
>>>>>           at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:521)
>>>>>           at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by:
>> org.apache.flink.streaming.runtime.tasks.StreamTaskException:
>>>> Cannot instantiate user function.
>>>>>           at
>>>> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator
>>>> (St
>>>> reamConfig.java:187)
>>>>>           at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOu
>>>> tpu
>>>> t(StreamTask.java:90)
>>>>>           at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:518)
>>>>>           ... 1 more
>>>>> Caused by: java.lang.ClassNotFoundException:
>>>> org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper
>>>>>           at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>>           at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>           at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>           at java.lang.Class.forName0(Native Method)
>>>>>           at java.lang.Class.forName(Class.java:348)
>>>>>           at
>>>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStrea
>>>> m.r
>>>> esolveClass(InstantiationUtil.java:71)
>>>>>           at
>>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1
>>>> 613
>>>> )
>>>>>           at
>>>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518
>>>> )
>>>>>           at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java
>>>> :17
>>>> 74)
>>>>>           at
>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>>>>           at
>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
>>>> 199
>>>> 3)
>>>>>           at
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:191
>>>> 8)
>>>>>           at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java
>>>> :18
>>>> 01)
>>>>>           at
>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>>>>           at
>>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>>>>>           at
>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(Instantia
>>>> tio
>>>> nUtil.java:302)
>>>>>           at
>>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(Instan
>>>> tia
>>>> tionUtil.java:264)
>>>>>           at
>>>> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator
>>>> (St
>>>> reamConfig.java:185)
>>>>>           ... 3 more
>>>>>
>>>>>
>>>>> And this class(StormSpoutWrapper) was exist in my packaged jar file.
>>>>> As you can see part of my pom.xml:
>>>>>
>>>>> <!-- Throughput -->
>>>>>                      <execution>
>>>>>                          <id>Throughput</id>
>>>>>                          <phase>package</phase>
>>>>>                          <goals>
>>>>>                              <goal>jar</goal>
>>>>>                          </goals>
>>>>>                          <configuration>
>>>>>                              <classifier>Throughput</classifier>
>>>>>
>>>>>                              <archive>
>>>>>                                  <manifestEntries>
>>>>>
>>>> <program-class>org.apache.flink.stormcompatibility.experiments.Thro
>>>> ugh
>>>> put</program-class>
>>>>>                                  </manifestEntries>
>>>>>                              </archive>
>>>>>
>>>>>                              <includes>
>>>>>                                  <!-- from storm-core -->
>>>>>                                  <include>defaults.yaml</include>
>>>>>
>>>>> <include>backtype/storm/*.class</include>
>>>>>
>>>>   <include>backtype/storm/serialization/*.class</include>
>>>>   <include>backtype/storm/topology/*.class</include>
>>>>   <include>backtype/storm/topology/base/*.class</include>
>>>>   <include>backtype/storm/utils/*.class</include>
>>>>   <include>backtype/storm/spout/*.class</include>
>>>>   <include>backtype/storm/task/*.class</include>
>>>>   <include>backtype/storm/tuple/*.class</include>
>>>>   <include>backtype/storm/generated/*.class</include>
>>>>   <include>backtype/storm/metric/**/*.class</include>
>>>>   <include>org/apache/storm/curator/*.class</include>
>>>>   <include>org/apache/thrift7/**/*.class</include>
>>>>   <!--<include>org/yaml/snakeyaml/constructor/*.class</include>-->
>>>>   <include>org/yaml/snakeyaml/**/*.class</include>
>>>>>                                  <!-- Storm's recursive 
>>>>> dependencies
>>>>> -->
>>>>>
>>>>   <include>org/json/simple/**/*.class</include>
>>>>>                                  <!-- compatibility layer -->
>>>>>
>>>>   
>>>> <include>org/apache/flink/stormcompatibility/api/*.class</include>
>>>> <include>org/apache/flink/stormcompatibility/wrappers/*.class</incl
>>>> ude
>>>>>                                  <!-- Word Count -->
>>>>>
>>>> <include>org/apache/flink/stormcompatibility/experiments/Throughput
>>>> .cl
>>>> ass</include>
>>>> <include>org/apache/flink/stormcompatibility/experiments/Throughput$*.
>>>> class</include>
>>>>>                              </includes>
>>>>>                          </configuration>
>>>>>                      </execution>
>>>>>
>>>>> So how can I fix it?
>>>>>
>>>>>
>>>>> 2.
>>>>> There is a case following using operator join:
>>>>>
>>>>> DataStream<Tuple3<String, Integer, Long>> user = env.addSource(new
>>>> sourceUserFunction());
>>>>> DataStream<Tuple2<String, Integer>> area = env.addSource(new
>>>> sourceAreaFunction());
>>>>> DataStream<Tuple2<String, Integer>> sink = user
>>>>>        .join(area)
>>>>>        .onWindow(15, TimeUnit.MINUTES)
>>>>>        .where(0)
>>>>>        .equalTo(0)
>>>>>        .with(new JoinFunction<Tuple3<String, Integer, Long>,
>>>> Tuple2<String, Integer>, Tuple2<String, Integer>>() {
>>>>>           @Override
>>>>>           public Tuple2<String, Integer> join(Tuple3<String, 
>>>>> Integer,
>>>> Long> first, Tuple2<String, Integer> second) throws Exception {
>>>>>              if (first.f1 + second.f1 > 10){
>>>>>                 return new Tuple2<String, Integer>(first.f0,
>>>>> first.f1
>>>>> +
>>>> second.f1);
>>>>>              }
>>>>>              return null;
>>>>>           }
>>>>>        });
>>>>>
>>>>> As you see, I don`t want to return null when the condition is not
>>>> satisfied.
>>>>> But there is not any JoinFunction with Collector.
>>>>> I found a FlatJoinFunction which allows the Collector.
>>>>> However, the FlatJoinFunction seem only can be used in DataSet 
>>>>> instead
>>>> DataStream.
>>>>> Is there any other way to improve this case?
>>>>>
>>>>> PS. I`m sorry about this email. You may ignore me during the weekend.
>>>>>
>>>>> Greetings,
>>>>> Huang Wei
>>>>> 华为技术有限公司 Huawei Technologies Co., Ltd.
>>>>>
>>>>>
>>>>> Tel:+86 18106512602
>>>>> Email:huangwei...@huawei.com
>>>>>
>>>>
>>

Reply via email to