Hi Stephan,
Thank you for the information about the
https://github.com/apache/flink/pull/1008.
I`ll try again.
Otherwise, is there any suggestion about the original second problem:
> >>> 2.
> >>> There is a case following using operator join:
> >>>
> >>> DataStream<Tuple3<String, Integer, Long>> user = env.addSource(new
> >> sourceUserFunction());
> >>> DataStream<Tuple2<String, Integer>> area = env.addSource(new
> >> sourceAreaFunction());
> >>>
> >>> DataStream<Tuple2<String, Integer>> sink = user
> >>> .join(area)
> >>> .onWindow(15, TimeUnit.MINUTES)
> >>> .where(0)
> >>> .equalTo(0)
> >>> .with(new JoinFunction<Tuple3<String, Integer, Long>,
> >> Tuple2<String, Integer>, Tuple2<String, Integer>>() {
> >>> @Override
> >>> public Tuple2<String, Integer> join(Tuple3<String,
> >>> Integer,
> >> Long> first, Tuple2<String, Integer> second) throws Exception {
> >>> if (first.f1 + second.f1 > 10){
> >>> return new Tuple2<String, Integer>(first.f0,
> >>> first.f1
> >>> +
> >> second.f1);
> >>> }
> >>> return null;
> >>> }
> >>> });
> >>>
> >>> As you see, I don`t want to return null when the condition is not
> >> satisfied.
> >>> But there is not any JoinFunction with Collector.
> >>> I found a FlatJoinFunction which allows the Collector.
> >>> However, the FlatJoinFunction seem only can be used in DataSet
> >>> instead
> >> DataStream.
> >>> Is there any other way to improve this case?
Greetings,
Huang Wei
----------------------
early emails(Stephan wrote):
We are seeing these class loader issues a lot as of late.
Seems that packaging the classes is trickier than anticipated.
Here is a pull request to add some diagnostics info on a
"ClassNotFoundException": https://github.com/apache/flink/pull/1008
On Tue, Aug 11, 2015 at 3:29 PM, Matthias J. Sax <
[email protected]> wrote:
> Three comments
>
> 1) If StormSpoutWrapper is in your jar, is it located in the correct
> directory (must be same as package name)?
>
> 2) If you are using FlinkTopologyBuilder, you need to package as shown
> in StormWordCountRemoteBySubmitter example, using an additional
> assembly file.
> (The first examples are for embedded Spout/Bolts with Flink streaming
> program. -- Maybe we should add some comments in the pom file...)
>
> 3) How do you build? The whole source code? Or only parts of it? Did
> you run "install"?
>
> -Matthias
>
>
> On 08/11/2015 02:47 PM, huangwei (G) wrote:
> > Hi Stephan and Matthias,
> >
> > Sorry for replying late.
> > I`ve double checked that this class StormSpoutWrapper is really
> > exist in
> my jar file.
> > And it got the same trouble when I ran the
> flink-storm-compatibililty-example- corresponding word-count-storm.
> > The way I built my Throughput application was just adding it into
> > the
> flink-storm-compatibililty-example and change some configurations in
> the word-count-storm.xml.
> > Here is the entire POM file.
> >
> >
> > <?xml version="1.0" encoding="UTF-8"?>
> > <!--
> > Licensed to the Apache Software Foundation (ASF) under one or more
> > contributor license agreements. See the NOTICE file distributed
> > with this work for additional information regarding copyright
> > ownership. The ASF licenses this file to you under the Apache
> > License, Version 2.0 (the "License"); you may not use this file
> > except in compliance with the License. You may obtain a copy of the
> > License at
> >
> > http://www.apache.org/licenses/LICENSE-2.0
> >
> > Unless required by applicable law or agreed to in writing, software
> > distributed under the License is distributed on an "AS IS" BASIS,
> > WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> > implied. See the License for the specific language governing
> > permissions and limitations under the License.
> > -->
> > <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="
> http://www.w3.org/2001/XMLSchema-instance"
> > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/maven-v4_0_0.xsd">
> >
> > <modelVersion>4.0.0</modelVersion>
> >
> > <parent>
> > <groupId>org.apache.flink</groupId>
> > <artifactId>flink-storm-compatibility-parent</artifactId>
> > <version>0.10-SNAPSHOT</version>
> > <relativePath>..</relativePath>
> > </parent>
> >
> > <artifactId>flink-storm-compatibility-examples</artifactId>
> > <name>flink-storm-compatibility-examples</name>
> >
> > <packaging>jar</packaging>
> >
> > <dependencies>
> > <dependency>
> > <groupId>org.apache.flink</groupId>
> >
> <artifactId>flink-storm-compatibility-core</artifactId>
> > <version>${project.version}</version>
> > </dependency>
> >
> > <dependency>
> > <groupId>org.apache.flink</groupId>
> > <artifactId>flink-java-examples</artifactId>
> > <version>${project.version}</version>
> > </dependency>
> >
> > <dependency>
> > <groupId>org.apache.flink</groupId>
> > <artifactId>flink-streaming-core</artifactId>
> > <version>${project.version}</version>
> > <scope>test</scope>
> > <type>test-jar</type>
> > </dependency>
> >
> > <dependency>
> > <groupId>org.apache.flink</groupId>
> > <artifactId>flink-test-utils</artifactId>
> > <version>${project.version}</version>
> > <scope>test</scope>
> > </dependency>
> >
> > <dependency>
> > <groupId>org.apache.storm</groupId>
> > <artifactId>storm-core</artifactId>
> > <version>0.9.4</version>
> > <!-- keep storm out of the jar-with-dependencies -->
> > <scope>provided</scope>
> > </dependency>
> > </dependencies>
> >
> > <build>
> > <plugins>
> > <!-- get default data from flink-java-examples
> package -->
> > <plugin>
> >
> > <groupId>org.apache.maven.plugins</groupId>
> >
> <artifactId>maven-dependency-plugin</artifactId>
> >
> <version>2.9</version><!--$NO-MVN-MAN-VER$-->
> > <executions>
> > <execution>
> > <id>unpack</id>
> >
> <phase>prepare-package</phase>
> > <goals>
> > <goal>unpack</goal>
> > </goals>
> > <configuration>
> >
> > <artifactItems>
> >
> <artifactItem>
> >
> <groupId>org.apache.flink</groupId>
> >
> <artifactId>flink-java-examples</artifactId>
> >
> <version>${project.version}</version>
> >
> <type>jar</type>
> >
> <overWrite>false</overWrite>
> >
> <outputDirectory>${project.build.directory}/classes</outputDirectory>
> >
>
> <includes>org/apache/flink/examples/java/wordcount/util/WordCountData.
> class</includes>
> >
> </artifactItem>
> >
> <artifactItem>
> >
> <groupId>org.apache.flink</groupId>
> >
> <artifactId>flink-storm-compatibility-core</artifactId>
> >
> <version>${project.version}</version>
> >
> <type>jar</type>
> >
> <overWrite>false</overWrite>
> >
> <outputDirectory>${project.build.directory}/classes</outputDirectory>
> >
> </artifactItem>
> >
> <artifactItem>
> >
> <groupId>org.apache.storm</groupId>
> >
> <artifactId>storm-core</artifactId>
> >
> <version>0.9.4</version>
> >
> <type>jar</type>
> >
> <overWrite>false</overWrite>
> >
> <outputDirectory>${project.build.directory}/classes</outputDirectory>
> >
> <!--<excludes>defaults.yaml</excludes>-->
> >
> </artifactItem>
> >
> <artifactItem>
> >
> <groupId>com.googlecode.json-simple</groupId>
> >
> <artifactId>json-simple</artifactId>
> >
> <version>1.1</version>
> >
> <type>jar</type>
> >
> <overWrite>false</overWrite>
> >
> <outputDirectory>${project.build.directory}/classes</outputDirectory>
> >
> </artifactItem>snakeyaml
> >
> <artifactItem>
> >
> <groupId>org.yaml</groupId>
> >
> <artifactId>snakeyaml</artifactId>
> >
> <version>1.11</version>
> >
> <type>jar</type>
> >
> <overWrite>false</overWrite>
> >
> <outputDirectory>${project.build.directory}/classes</outputDirectory>
> >
> </artifactItem>
> > </artifactItems>
> > </configuration>
> > </execution>
> > </executions>
> > </plugin>
> >
> > <!-- self-contained jars for each example -->
> > <plugin>
> > <groupId>org.apache.maven.plugins</groupId>
> >
> > <artifactId>maven-jar-plugin</artifactId>
> >
> > <executions>
> >
> > <!-- WordCount Spout source-->
> > <execution>
> >
> <id>WordCount-SpoutSource</id>
> > <phase>package</phase>
> > <goals>
> > <goal>jar</goal>
> > </goals>
> > <configuration>
> >
> <classifier>WordCountSpoutSource</classifier>
> >
> > <archive>
> >
> <manifestEntries>
> >
>
> <program-class>org.apache.flink.stormcompatibility.wordcount.SpoutSour
> ceWordCount</program-class>
> >
> </manifestEntries>
> > </archive>
> >
> > <includes>
> > <!--
> > from
> storm-core -->
> >
> <include>backtype/storm/topology/*.class</include>
> >
> <include>backtype/storm/spout/*.class</include>
> >
> <include>backtype/storm/task/*.class</include>
> >
> <include>backtype/storm/tuple/*.class</include>
> >
> <include>backtype/storm/generated/*.class</include>
> >
> <include>backtype/storm/metric/**/*.class</include>
> >
> <include>org/apache/thrift7/**/*.class</include>
> > <!--
> Storm's recursive dependencies -->
> >
> <include>org/json/simple/**/*.class</include>
> > <!--
> compatibility layer -->
> >
> <include>org/apache/flink/stormcompatibility/api/*.class</include>
> >
>
> <include>org/apache/flink/stormcompatibility/wrappers/*.class</include
> >
> > <!--
> > Word
> Count -->
> >
>
> <include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWord
> Count.class</include>
> >
>
> <include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWord
> Count$*.class</include>
> >
>
> <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/
> AbstractStormSpout.class</include>
> >
>
> <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/
> StormFileSpout.class</include>
> >
>
> <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/
> StormInMemorySpout.class</include>
> >
>
> <include>org/apache/flink/examples/java/wordcount/util/WordCountData.c
> lass</include>
> > </includes>
> > </configuration>
> > </execution>
> >
> > <!-- WordCount Bolt tokenizer-->
> > <execution>
> >
> <id>WordCount-BoltTokenizer</id>
> > <phase>package</phase>
> > <goals>
> > <goal>jar</goal>
> > </goals>
> > <configuration>
> >
> <classifier>WordCountBoltTokenizer</classifier>
> >
> > <archive>
> >
> <manifestEntries>
> >
>
> <program-class>org.apache.flink.stormcompatibility.wordcount.BoltToken
> izerWordCount</program-class>
> >
> </manifestEntries>
> > </archive>
> >
> > <includes>
> > <!--
> > from
> storm-core -->
> >
> <include>backtype/storm/topology/*.class</include>
> >
> <include>backtype/storm/spout/*.class</include>
> >
> <include>backtype/storm/task/*.class</include>
> >
> <include>backtype/storm/tuple/*.class</include>
> >
> <include>backtype/storm/generated/*.class</include>
> >
> <include>backtype/storm/metric/**/*.class</include>
> >
> <include>org/apache/thrift7/**/*.class</include>
> > <!--
> Storm's recursive dependencies -->
> >
> <include>org/json/simple/**/*.class</include>
> > <!--
> compatibility layer -->
> >
> <include>org/apache/flink/stormcompatibility/api/*.class</include>
> >
>
> <include>org/apache/flink/stormcompatibility/wrappers/*.class</include
> >
> > <!--
> > Word
> Count -->
> >
>
> <include>org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWo
> rdCount.class</include>
> >
>
> <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/
> StormBoltTokenizer.class</include>
> >
>
> <include>org/apache/flink/examples/java/wordcount/util/WordCountData.c
> lass</include>
> > </includes>
> > </configuration>
> > </execution>
> >
> > <!-- Throughput -->
> > <execution>
> > <id>Throughput</id>
> > <phase>package</phase>
> > <goals>
> > <goal>jar</goal>
> > </goals>
> > <configuration>
> > <classifier>Throughput</classifier>
> >
> > <archive>
> > <manifestEntries>
> >
>
> <program-class>org.apache.flink.stormcompatibility.experiments.Through
> put</program-class>
> > </manifestEntries>
> > </archive>
> >
> > <includes>
> > <!-- from storm-core -->
> > <include>defaults.yaml</include>
> >
> > <include>backtype/storm/*.class</include>
> >
> <include>backtype/storm/serialization/*.class</include>
> >
> <include>backtype/storm/topology/*.class</include>
> >
> <include>backtype/storm/topology/base/*.class</include>
> >
> <include>backtype/storm/utils/*.class</include>
> >
> <include>backtype/storm/spout/*.class</include>
> >
> <include>backtype/storm/task/*.class</include>
> >
> <include>backtype/storm/tuple/*.class</include>
> >
> <include>backtype/storm/generated/*.class</include>
> >
> <include>backtype/storm/metric/**/*.class</include>
> >
> <include>org/apache/storm/curator/*.class</include>
> >
> <include>org/apache/thrift7/**/*.class</include>
> >
> <!--<include>org/yaml/snakeyaml/constructor/*.class</include>-->
> >
> <include>org/yaml/snakeyaml/**/*.class</include>
> > <!-- Storm's recursive dependencies
> > -->
> >
> <include>org/json/simple/**/*.class</include>
> > <!-- compatibility layer -->
> >
> <include>org/apache/flink/stormcompatibility/api/*.class</include>
> >
>
> <include>org/apache/flink/stormcompatibility/wrappers/*.class</include
> >
> > <!-- Word Count -->
> >
>
> <include>org/apache/flink/stormcompatibility/experiments/Throughput.cl
> ass</include>
> >
>
> <include>org/apache/flink/stormcompatibility/experiments/Throughput$*.
> class</include>
> > </includes>
> > </configuration>
> > </execution>
> >
> > <execution>
> > <goals>
> >
> <goal>test-jar</goal>
> > </goals>
> > </execution>
> > </executions>
> > </plugin>
> >
> > <!-- WordCount Storm topology-->
> > <!-- Cannot use maven-jar-plugin because
> 'defaults.yaml' must be included in jar -->
> > <plugin>
> >
> <artifactId>maven-assembly-plugin</artifactId>
> > <configuration>
> > <descriptors>
> >
> <descriptor>src/assembly/word-count-storm.xml</descriptor>
> > </descriptors>
> > <archive>
> > <manifestEntries>
> >
>
> <program-class>org.apache.flink.stormcompatibility.wordcount.StormWord
> CountRemoteBySubmitter</program-class>
> > </manifestEntries>
> > </archive>
> > </configuration>
> >
> > <executions>
> > <execution>
> > <id>WordCountStorm</id>
> > <phase>package</phase>
> > <goals>
> > <goal>single</goal>
> > </goals>
> > </execution>
> > </executions>
> > </plugin>
> > </plugins>
> >
> > <pluginManagement>
> > <plugins>
> > <!--This plugin's configuration is
> > used to
> store Eclipse m2e settings only. It has no influence on the Maven
> build itself.-->
> > <plugin>
> >
> > <groupId>org.eclipse.m2e</groupId>
> >
> <artifactId>lifecycle-mapping</artifactId>
> > <version>1.0.0</version>
> > <configuration>
> > <lifecycleMappingMetadata>
> >
> > <pluginExecutions>
> >
> <pluginExecution>
> >
> <pluginExecutionFilter>
> >
> <groupId>org.apache.maven.plugins</groupId>
> >
> <artifactId>maven-dependency-plugin</artifactId>
> >
> <versionRange>[2.9,)</versionRange>
> >
> <goals>
> >
> <goal>unpack</goal>
> >
> </goals>
> >
> </pluginExecutionFilter>
> >
> <action>
> >
> <ignore/>
> >
> </action>
> >
> </pluginExecution>
> > </pluginExecutions>
> > </lifecycleMappingMetadata>
> > </configuration>
> > </plugin>
> > </plugins>
> > </pluginManagement>
> >
> > </build>
> >
> > </project>
> >
> > -----邮件原件-----
> > 发件人: [email protected] [mailto:[email protected]] 代表 Stephan
> Ewen
> > 发送时间: 2015年8月11日 1:55
> > 收件人: [email protected]
> > 主题: Re: Some problems about Flink applications
> >
> > It would also help if you could paste the entire POM file of your
> project.
> > Maybe something is amiss with the dependencies, or the scopes?
> >
> > On Sat, Aug 8, 2015 at 7:27 PM, Matthias J. Sax <
> [email protected]> wrote:
> >
> >> Hi Huang,
> >>
> >> about Storm compatibility. Did you double check, that the file that
> >> is missing (StormSpoutWrapper) is contained in your jar. Looking at
> >> pom.xml does not help here, because if you specify to include a
> >> file, but maven cannot find it, it will just not add it to the jar,
> >> but build will succeed. Thus, you need to check the jar file itself
> >> via
> command line:
> >> unzip -l myjarfile.jar
> >> or
> >> unzip -l myjarfile.jar | grep file-I-am-looking-for
> >>
> >> I guess your jar is not build correctly, ie, the file is not there...
> >>
> >> Did you have a look into pom.xml for
> >> flink-storm-compatibililty-example
> >> and the corresponding word-count-storm.xml? This shows how to build
> >> a jar correctly (it was recently fixed, so make sure you update to
> >> the latest master)
> >>
> >> You can also have a look here how to package jars correctly (even
> >> if this example is about Flink ML):
> >>
> >> https://stackoverflow.com/questions/31661900/maven-build-has-missin
> >> g-p
> >> ackage-linkages/31662642#31662642
> >>
> >> -Matthias
> >>
> >> On 08/08/2015 11:15 AM, huangwei (G) wrote:
> >>> Hi,
> >>> I get some trouble in developing Flink applications.
> >>>
> >>> 1.
> >>> I want to test the performance between Storm and
> >> flink-storm-compatibility using the test program:
> >> https://github.com/project-flink/flink-perf/blob/master/storm-jobs/
> >> src
> >> /jvm/experiments/Throughput.java
> >> .
> >>> And there is a bit of my changes with this Throughput.java below:
> >>>
> >>>
> >>>
> >>> public static void main(String[] args) throws Exception {
> >>> ParameterTool pt =
> >>> ParameterTool.fromArgs(args);
> >>>
> >>> int par = pt.getInt("para");
> >>>
> >>> final FlinkTopologyBuilder builder = new
> >> FlinkTopologyBuilder();
> >>>
> >>> builder.setSpout("source0", new Generator(pt),
> >> pt.getInt("sourceParallelism"));
> >>>
> >>> int i = 0;
> >>> for (; i < pt.getInt("repartitions", 1) - 1; i++) {
> >>> System.out.println("adding source" + i + "
> >> --> source" + (i + 1));
> >>> builder.setBolt("source" + (i + 1),
> >>> new
> >> RepartPassThroughBolt(pt), pt.getInt("sinkParallelism"))
> >>> .fieldsGrouping("source"
> >> + i, new Fields("id"));
> >>> }
> >>>
> >>> System.out.println("adding final source" + i + "
> >>> -->
> >> sink");
> >>>
> >>> builder.setBolt("sink", new Sink(pt),
> >> pt.getInt("sinkParallelism")).fieldsGrouping("source" + i, new
> >> Fields("id"));
> >>>
> >>> Config conf = new Config();
> >>> conf.setDebug(false); //System.exit(1);
> >>>
> >>> // execute program locally
> >>> final FlinkLocalCluster cluster =
> >> FlinkLocalCluster.getLocalCluster();
> >>> cluster.submitTopology("throughput", null,
> >> builder.createTopology());
> >>>
> >>> Utils.sleep(10 * 1000);
> >>>
> >>> // TODO kill does no do anything so far
> >>> cluster.killTopology("throughput");
> >>> cluster.shutdown();
> >>> }
> >>>
> >>>
> >>> This program will run well in IDEA with flink-storm-compatibility.
> >>> However, when I packaged it into a jar file and run on the
> >> flink-0.10SNAPSHOT there is a problem in flink-client log file:
> >>>
> >>> java.lang.Exception: Call to registerInputOutput() of invokable failed
> >>> at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:521)
> >>> at java.lang.Thread.run(Thread.java:745)
> >>> Caused by:
> org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> >> Cannot instantiate user function.
> >>> at
> >> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator
> >> (St
> >> reamConfig.java:187)
> >>> at
> >> org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOu
> >> tpu
> >> t(StreamTask.java:90)
> >>> at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:518)
> >>> ... 1 more
> >>> Caused by: java.lang.ClassNotFoundException:
> >> org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper
> >>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> >>> at java.lang.Class.forName0(Native Method)
> >>> at java.lang.Class.forName(Class.java:348)
> >>> at
> >> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStrea
> >> m.r
> >> esolveClass(InstantiationUtil.java:71)
> >>> at
> >> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1
> >> 613
> >> )
> >>> at
> >> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518
> >> )
> >>> at
> >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java
> >> :17
> >> 74)
> >>> at
> >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> >>> at
> >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> >> 199
> >> 3)
> >>> at
> >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:191
> >> 8)
> >>> at
> >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java
> >> :18
> >> 01)
> >>> at
> >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> >>> at
> >> java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> >>> at
> >> org.apache.flink.util.InstantiationUtil.deserializeObject(Instantia
> >> tio
> >> nUtil.java:302)
> >>> at
> >> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(Instan
> >> tia
> >> tionUtil.java:264)
> >>> at
> >> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator
> >> (St
> >> reamConfig.java:185)
> >>> ... 3 more
> >>>
> >>>
> >>> And this class(StormSpoutWrapper) was exist in my packaged jar file.
> >>> As you can see part of my pom.xml:
> >>>
> >>> <!-- Throughput -->
> >>> <execution>
> >>> <id>Throughput</id>
> >>> <phase>package</phase>
> >>> <goals>
> >>> <goal>jar</goal>
> >>> </goals>
> >>> <configuration>
> >>> <classifier>Throughput</classifier>
> >>>
> >>> <archive>
> >>> <manifestEntries>
> >>>
> >>
> >> <program-class>org.apache.flink.stormcompatibility.experiments.Thro
> >> ugh
> >> put</program-class>
> >>> </manifestEntries>
> >>> </archive>
> >>>
> >>> <includes>
> >>> <!-- from storm-core -->
> >>> <include>defaults.yaml</include>
> >>>
> >>> <include>backtype/storm/*.class</include>
> >>>
> >> <include>backtype/storm/serialization/*.class</include>
> >>>
> >> <include>backtype/storm/topology/*.class</include>
> >>>
> >> <include>backtype/storm/topology/base/*.class</include>
> >>>
> >> <include>backtype/storm/utils/*.class</include>
> >>>
> >> <include>backtype/storm/spout/*.class</include>
> >>>
> >> <include>backtype/storm/task/*.class</include>
> >>>
> >> <include>backtype/storm/tuple/*.class</include>
> >>>
> >> <include>backtype/storm/generated/*.class</include>
> >>>
> >> <include>backtype/storm/metric/**/*.class</include>
> >>>
> >> <include>org/apache/storm/curator/*.class</include>
> >>>
> >> <include>org/apache/thrift7/**/*.class</include>
> >>>
> >> <!--<include>org/yaml/snakeyaml/constructor/*.class</include>-->
> >>>
> >> <include>org/yaml/snakeyaml/**/*.class</include>
> >>> <!-- Storm's recursive
> >>> dependencies
> >>> -->
> >>>
> >> <include>org/json/simple/**/*.class</include>
> >>> <!-- compatibility layer -->
> >>>
> >> <include>org/apache/flink/stormcompatibility/api/*.class</include>
> >>>
> >>
> >> <include>org/apache/flink/stormcompatibility/wrappers/*.class</incl
> >> ude
> >>>
> >>> <!-- Word Count -->
> >>>
> >>
> >> <include>org/apache/flink/stormcompatibility/experiments/Throughput
> >> .cl
> >> ass</include>
> >>>
> >>
> >> <include>org/apache/flink/stormcompatibility/experiments/Throughput$*.
> >> class</include>
> >>> </includes>
> >>> </configuration>
> >>> </execution>
> >>>
> >>> So how can I fix it?
> >>>
> >>>
> >>> 2.
> >>> There is a case following using operator join:
> >>>
> >>> DataStream<Tuple3<String, Integer, Long>> user = env.addSource(new
> >> sourceUserFunction());
> >>> DataStream<Tuple2<String, Integer>> area = env.addSource(new
> >> sourceAreaFunction());
> >>>
> >>> DataStream<Tuple2<String, Integer>> sink = user
> >>> .join(area)
> >>> .onWindow(15, TimeUnit.MINUTES)
> >>> .where(0)
> >>> .equalTo(0)
> >>> .with(new JoinFunction<Tuple3<String, Integer, Long>,
> >> Tuple2<String, Integer>, Tuple2<String, Integer>>() {
> >>> @Override
> >>> public Tuple2<String, Integer> join(Tuple3<String,
> >>> Integer,
> >> Long> first, Tuple2<String, Integer> second) throws Exception {
> >>> if (first.f1 + second.f1 > 10){
> >>> return new Tuple2<String, Integer>(first.f0,
> >>> first.f1
> >>> +
> >> second.f1);
> >>> }
> >>> return null;
> >>> }
> >>> });
> >>>
> >>> As you see, I don`t want to return null when the condition is not
> >> satisfied.
> >>> But there is not any JoinFunction with Collector.
> >>> I found a FlatJoinFunction which allows the Collector.
> >>> However, the FlatJoinFunction seem only can be used in DataSet
> >>> instead
> >> DataStream.
> >>> Is there any other way to improve this case?
> >>>
> >>> PS. I`m sorry about this email. You may ignore me during the weekend.
> >>>
> >>> Greetings,
> >>> Huang Wei
> >>> 华为技术有限公司 Huawei Technologies Co., Ltd.
> >>>
> >>>
> >>> Tel:+86 18106512602
> >>> Email:[email protected]
> >>>
> >>
> >>
> >
>
>