Hi Chesnay,
Sorry for reply late.
It works!
Thank you!
Greetings Huangwei
=========================================================
How about something like this:
DataStream<Tuple2<String, Integer>> sink = user
.join(area)
.onWindow(15, TimeUnit.MINUTES)
.where(0)
.equalTo(0)
.flatMap(new FlatMapFunction<Tuple2<Tuple3<String, Integer,
Long>,Tuple2<String, Integer>>, Tuple2<String, Integer>>() {
@Override
public void flatMap(Tuple2<Tuple3<String, Integer,
Long>,Tuple2<String, Integer>> value, Collector<Tuple2<String, Integer>> c)
throws Exception {
if (value.f0.f1 + value.f1.f1 > 10){
c.collect(new Tuple2<String,
Integer>(value.f0.f0, value.f0.f1 + value.f1.f1));
}
}
});
On 13.08.2015 15:24, huangwei (G) wrote:
> Hi Stephan,
>
> Thank you for the information about the
> https://github.com/apache/flink/pull/1008.
> I`ll try again.
> Otherwise, is there any suggestion about the original second problem:
>
>>>>> 2.
>>>>> There is a case following using operator join:
>>>>>
>>>>> DataStream<Tuple3<String, Integer, Long>> user = env.addSource(new
>>>> sourceUserFunction());
>>>>> DataStream<Tuple2<String, Integer>> area = env.addSource(new
>>>> sourceAreaFunction());
>>>>> DataStream<Tuple2<String, Integer>> sink = user
>>>>> .join(area)
>>>>> .onWindow(15, TimeUnit.MINUTES)
>>>>> .where(0)
>>>>> .equalTo(0)
>>>>> .with(new JoinFunction<Tuple3<String, Integer, Long>,
>>>> Tuple2<String, Integer>, Tuple2<String, Integer>>() {
>>>>> @Override
>>>>> public Tuple2<String, Integer> join(Tuple3<String,
>>>>> Integer,
>>>> Long> first, Tuple2<String, Integer> second) throws Exception {
>>>>> if (first.f1 + second.f1 > 10){
>>>>> return new Tuple2<String, Integer>(first.f0,
>>>>> first.f1
>>>>> +
>>>> second.f1);
>>>>> }
>>>>> return null;
>>>>> }
>>>>> });
>>>>>
>>>>> As you see, I don`t want to return null when the condition is not
>>>> satisfied.
>>>>> But there is not any JoinFunction with Collector.
>>>>> I found a FlatJoinFunction which allows the Collector.
>>>>> However, the FlatJoinFunction seem only can be used in DataSet
>>>>> instead
>>>> DataStream.
>>>>> Is there any other way to improve this case?
>
> Greetings,
> Huang Wei
>
> ----------------------
> early emails(Stephan wrote):
>
> We are seeing these class loader issues a lot as of late.
>
> Seems that packaging the classes is trickier than anticipated.
>
> Here is a pull request to add some diagnostics info on a
> "ClassNotFoundException": https://github.com/apache/flink/pull/1008
>
> On Tue, Aug 11, 2015 at 3:29 PM, Matthias J. Sax <
> [email protected]> wrote:
>
>> Three comments
>>
>> 1) If StormSpoutWrapper is in your jar, is it located in the correct
>> directory (must be same as package name)?
>>
>> 2) If you are using FlinkTopologyBuilder, you need to package as
>> shown in StormWordCountRemoteBySubmitter example, using an additional
>> assembly file.
>> (The first examples are for embedded Spout/Bolts with Flink streaming
>> program. -- Maybe we should add some comments in the pom file...)
>>
>> 3) How do you build? The whole source code? Or only parts of it? Did
>> you run "install"?
>>
>> -Matthias
>>
>>
>> On 08/11/2015 02:47 PM, huangwei (G) wrote:
>>> Hi Stephan and Matthias,
>>>
>>> Sorry for replying late.
>>> I`ve double checked that this class StormSpoutWrapper is really
>>> exist in
>> my jar file.
>>> And it got the same trouble when I ran the
>> flink-storm-compatibililty-example- corresponding word-count-storm.
>>> The way I built my Throughput application was just adding it into
>>> the
>> flink-storm-compatibililty-example and change some configurations in
>> the word-count-storm.xml.
>>> Here is the entire POM file.
>>>
>>>
>>> <?xml version="1.0" encoding="UTF-8"?>
>>> <!--
>>> Licensed to the Apache Software Foundation (ASF) under one or more
>>> contributor license agreements. See the NOTICE file distributed
>>> with this work for additional information regarding copyright
>>> ownership. The ASF licenses this file to you under the Apache
>>> License, Version 2.0 (the "License"); you may not use this file
>>> except in compliance with the License. You may obtain a copy of the
>>> License at
>>>
>>> http://www.apache.org/licenses/LICENSE-2.0
>>>
>>> Unless required by applicable law or agreed to in writing, software
>>> distributed under the License is distributed on an "AS IS" BASIS,
>>> WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>>> implied. See the License for the specific language governing
>>> permissions and limitations under the License.
>>> -->
>>> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="
>> http://www.w3.org/2001/XMLSchema-instance"
>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
>> http://maven.apache.org/maven-v4_0_0.xsd">
>>> <modelVersion>4.0.0</modelVersion>
>>>
>>> <parent>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-storm-compatibility-parent</artifactId>
>>> <version>0.10-SNAPSHOT</version>
>>> <relativePath>..</relativePath>
>>> </parent>
>>>
>>> <artifactId>flink-storm-compatibility-examples</artifactId>
>>> <name>flink-storm-compatibility-examples</name>
>>>
>>> <packaging>jar</packaging>
>>>
>>> <dependencies>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>>
>> <artifactId>flink-storm-compatibility-core</artifactId>
>>> <version>${project.version}</version>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-java-examples</artifactId>
>>> <version>${project.version}</version>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-streaming-core</artifactId>
>>> <version>${project.version}</version>
>>> <scope>test</scope>
>>> <type>test-jar</type>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-test-utils</artifactId>
>>> <version>${project.version}</version>
>>> <scope>test</scope>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.storm</groupId>
>>> <artifactId>storm-core</artifactId>
>>> <version>0.9.4</version>
>>> <!-- keep storm out of the jar-with-dependencies -->
>>> <scope>provided</scope>
>>> </dependency>
>>> </dependencies>
>>>
>>> <build>
>>> <plugins>
>>> <!-- get default data from
>>> flink-java-examples
>> package -->
>>> <plugin>
>>>
>>> <groupId>org.apache.maven.plugins</groupId>
>>>
>> <artifactId>maven-dependency-plugin</artifactId>
>> <version>2.9</version><!--$NO-MVN-MAN-VER$-->
>>> <executions>
>>> <execution>
>>> <id>unpack</id>
>>>
>> <phase>prepare-package</phase>
>>> <goals>
>>> <goal>unpack</goal>
>>> </goals>
>>> <configuration>
>>>
>>> <artifactItems>
>>>
>> <artifactItem>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-java-examples</artifactId>
>> <version>${project.version}</version>
>> <type>jar</type>
>> <overWrite>false</overWrite>
>>
>> <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>
>> <includes>org/apache/flink/examples/java/wordcount/util/WordCountData.
>> class</includes>
>> </artifactItem>
>> <artifactItem>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-storm-compatibility-core</artifactId>
>> <version>${project.version}</version>
>> <type>jar</type>
>> <overWrite>false</overWrite>
>> <outputDirectory>${project.build.directory}/classes</outputDirectory>
>> </artifactItem>
>> <artifactItem>
>> <groupId>org.apache.storm</groupId>
>> <artifactId>storm-core</artifactId>
>> <version>0.9.4</version>
>> <type>jar</type>
>> <overWrite>false</overWrite>
>> <outputDirectory>${project.build.directory}/classes</outputDirectory>
>> <!--<excludes>defaults.yaml</excludes>-->
>> </artifactItem>
>> <artifactItem>
>> <groupId>com.googlecode.json-simple</groupId>
>> <artifactId>json-simple</artifactId>
>> <version>1.1</version>
>> <type>jar</type>
>> <overWrite>false</overWrite>
>> <outputDirectory>${project.build.directory}/classes</outputDirectory>
>> </artifactItem>snakeyaml
>> <artifactItem>
>> <groupId>org.yaml</groupId>
>> <artifactId>snakeyaml</artifactId>
>> <version>1.11</version>
>> <type>jar</type>
>> <overWrite>false</overWrite>
>> <outputDirectory>${project.build.directory}/classes</outputDirectory>
>> </artifactItem>
>>> </artifactItems>
>>> </configuration>
>>> </execution>
>>> </executions>
>>> </plugin>
>>>
>>> <!-- self-contained jars for each example -->
>>> <plugin>
>>>
>>> <groupId>org.apache.maven.plugins</groupId>
>>>
>>> <artifactId>maven-jar-plugin</artifactId>
>>>
>>> <executions>
>>>
>>> <!-- WordCount Spout source-->
>>> <execution>
>>>
>> <id>WordCount-SpoutSource</id>
>>> <phase>package</phase>
>>> <goals>
>>> <goal>jar</goal>
>>> </goals>
>>> <configuration>
>>>
>> <classifier>WordCountSpoutSource</classifier>
>>> <archive>
>>>
>> <manifestEntries>
>>
>> <program-class>org.apache.flink.stormcompatibility.wordcount.SpoutSou
>> r
>> ceWordCount</program-class>
>> </manifestEntries>
>>> </archive>
>>>
>>> <includes>
>>> <!--
>>> from
>> storm-core -->
>> <include>backtype/storm/topology/*.class</include>
>> <include>backtype/storm/spout/*.class</include>
>> <include>backtype/storm/task/*.class</include>
>> <include>backtype/storm/tuple/*.class</include>
>> <include>backtype/storm/generated/*.class</include>
>> <include>backtype/storm/metric/**/*.class</include>
>> <include>org/apache/thrift7/**/*.class</include>
>>> <!--
>> Storm's recursive dependencies -->
>> <include>org/json/simple/**/*.class</include>
>>> <!--
>> compatibility layer -->
>> <include>org/apache/flink/stormcompatibility/api/*.class</include>
>>
>> <include>org/apache/flink/stormcompatibility/wrappers/*.class</includ
>> e
>>> <!--
>>> Word
>> Count -->
>>
>> <include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWor
>> d
>> Count.class</include>
>>
>> <include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWor
>> d
>> Count$*.class</include>
>>
>> <include>org/apache/flink/stormcompatibility/wordcount/stormoperators
>> /
>> AbstractStormSpout.class</include>
>>
>> <include>org/apache/flink/stormcompatibility/wordcount/stormoperators
>> /
>> StormFileSpout.class</include>
>>
>> <include>org/apache/flink/stormcompatibility/wordcount/stormoperators
>> /
>> StormInMemorySpout.class</include>
>>
>> <include>org/apache/flink/examples/java/wordcount/util/WordCountData.
>> c
>> lass</include>
>>> </includes>
>>> </configuration>
>>> </execution>
>>>
>>> <!-- WordCount Bolt tokenizer-->
>>> <execution>
>>>
>> <id>WordCount-BoltTokenizer</id>
>>> <phase>package</phase>
>>> <goals>
>>> <goal>jar</goal>
>>> </goals>
>>> <configuration>
>>>
>> <classifier>WordCountBoltTokenizer</classifier>
>>> <archive>
>>>
>> <manifestEntries>
>>
>> <program-class>org.apache.flink.stormcompatibility.wordcount.BoltToke
>> n
>> izerWordCount</program-class>
>> </manifestEntries>
>>> </archive>
>>>
>>> <includes>
>>> <!--
>>> from
>> storm-core -->
>> <include>backtype/storm/topology/*.class</include>
>> <include>backtype/storm/spout/*.class</include>
>> <include>backtype/storm/task/*.class</include>
>> <include>backtype/storm/tuple/*.class</include>
>> <include>backtype/storm/generated/*.class</include>
>> <include>backtype/storm/metric/**/*.class</include>
>> <include>org/apache/thrift7/**/*.class</include>
>>> <!--
>> Storm's recursive dependencies -->
>> <include>org/json/simple/**/*.class</include>
>>> <!--
>> compatibility layer -->
>> <include>org/apache/flink/stormcompatibility/api/*.class</include>
>>
>> <include>org/apache/flink/stormcompatibility/wrappers/*.class</includ
>> e
>>> <!--
>>> Word
>> Count -->
>>
>> <include>org/apache/flink/stormcompatibility/wordcount/BoltTokenizerW
>> o
>> rdCount.class</include>
>>
>> <include>org/apache/flink/stormcompatibility/wordcount/stormoperators
>> /
>> StormBoltTokenizer.class</include>
>>
>> <include>org/apache/flink/examples/java/wordcount/util/WordCountData.
>> c
>> lass</include>
>>> </includes>
>>> </configuration>
>>> </execution>
>>>
>>> <!-- Throughput -->
>>> <execution>
>>> <id>Throughput</id>
>>> <phase>package</phase>
>>> <goals>
>>> <goal>jar</goal>
>>> </goals>
>>> <configuration>
>>> <classifier>Throughput</classifier>
>>>
>>> <archive>
>>> <manifestEntries>
>>>
>>
>> <program-class>org.apache.flink.stormcompatibility.experiments.Throug
>> h
>> put</program-class>
>>> </manifestEntries>
>>> </archive>
>>>
>>> <includes>
>>> <!-- from storm-core -->
>>> <include>defaults.yaml</include>
>>>
>>> <include>backtype/storm/*.class</include>
>>>
>> <include>backtype/storm/serialization/*.class</include>
>> <include>backtype/storm/topology/*.class</include>
>> <include>backtype/storm/topology/base/*.class</include>
>> <include>backtype/storm/utils/*.class</include>
>> <include>backtype/storm/spout/*.class</include>
>> <include>backtype/storm/task/*.class</include>
>> <include>backtype/storm/tuple/*.class</include>
>> <include>backtype/storm/generated/*.class</include>
>> <include>backtype/storm/metric/**/*.class</include>
>> <include>org/apache/storm/curator/*.class</include>
>> <include>org/apache/thrift7/**/*.class</include>
>> <!--<include>org/yaml/snakeyaml/constructor/*.class</include>-->
>> <include>org/yaml/snakeyaml/**/*.class</include>
>>> <!-- Storm's recursive dependencies
>>> -->
>>>
>> <include>org/json/simple/**/*.class</include>
>>> <!-- compatibility layer -->
>>>
>> <include>org/apache/flink/stormcompatibility/api/*.class</include>
>>
>> <include>org/apache/flink/stormcompatibility/wrappers/*.class</includ
>> e
>>> <!-- Word Count -->
>>>
>>
>> <include>org/apache/flink/stormcompatibility/experiments/Throughput.c
>> l
>> ass</include>
>>
>> <include>org/apache/flink/stormcompatibility/experiments/Throughput$*.
>> class</include>
>>> </includes>
>>> </configuration>
>>> </execution>
>>>
>>> <execution>
>>> <goals>
>>>
>> <goal>test-jar</goal>
>>> </goals>
>>> </execution>
>>> </executions>
>>> </plugin>
>>>
>>> <!-- WordCount Storm topology-->
>>> <!-- Cannot use maven-jar-plugin because
>> 'defaults.yaml' must be included in jar -->
>>> <plugin>
>>>
>> <artifactId>maven-assembly-plugin</artifactId>
>>> <configuration>
>>> <descriptors>
>>>
>> <descriptor>src/assembly/word-count-storm.xml</descriptor>
>>> </descriptors>
>>> <archive>
>>> <manifestEntries>
>>>
>>
>> <program-class>org.apache.flink.stormcompatibility.wordcount.StormWor
>> d CountRemoteBySubmitter</program-class>
>>> </manifestEntries>
>>> </archive>
>>> </configuration>
>>>
>>> <executions>
>>> <execution>
>>> <id>WordCountStorm</id>
>>> <phase>package</phase>
>>> <goals>
>>> <goal>single</goal>
>>> </goals>
>>> </execution>
>>> </executions>
>>> </plugin>
>>> </plugins>
>>>
>>> <pluginManagement>
>>> <plugins>
>>> <!--This plugin's configuration is
>>> used to
>> store Eclipse m2e settings only. It has no influence on the Maven
>> build itself.-->
>>> <plugin>
>>>
>>> <groupId>org.eclipse.m2e</groupId>
>>>
>> <artifactId>lifecycle-mapping</artifactId>
>>> <version>1.0.0</version>
>>> <configuration>
>>>
>>> <lifecycleMappingMetadata>
>>>
>>> <pluginExecutions>
>>>
>> <pluginExecution>
>> <pluginExecutionFilter>
>> <groupId>org.apache.maven.plugins</groupId>
>> <artifactId>maven-dependency-plugin</artifactId>
>> <versionRange>[2.9,)</versionRange>
>> <goals>
>> <goal>unpack</goal>
>> </goals>
>> </pluginExecutionFilter>
>> <action>
>> <ignore/>
>> </action>
>> </pluginExecution>
>>> </pluginExecutions>
>>> </lifecycleMappingMetadata>
>>> </configuration>
>>> </plugin>
>>> </plugins>
>>> </pluginManagement>
>>>
>>> </build>
>>>
>>> </project>
>>>
>>> -----邮件原件-----
>>> 发件人: [email protected] [mailto:[email protected]] 代表 Stephan
>> Ewen
>>> 发送时间: 2015年8月11日 1:55
>>> 收件人: [email protected]
>>> 主题: Re: Some problems about Flink applications
>>>
>>> It would also help if you could paste the entire POM file of your
>> project.
>>> Maybe something is amiss with the dependencies, or the scopes?
>>>
>>> On Sat, Aug 8, 2015 at 7:27 PM, Matthias J. Sax <
>> [email protected]> wrote:
>>>> Hi Huang,
>>>>
>>>> about Storm compatibility. Did you double check, that the file that
>>>> is missing (StormSpoutWrapper) is contained in your jar. Looking at
>>>> pom.xml does not help here, because if you specify to include a
>>>> file, but maven cannot find it, it will just not add it to the jar,
>>>> but build will succeed. Thus, you need to check the jar file itself
>>>> via
>> command line:
>>>> unzip -l myjarfile.jar
>>>> or
>>>> unzip -l myjarfile.jar | grep file-I-am-looking-for
>>>>
>>>> I guess your jar is not build correctly, ie, the file is not there...
>>>>
>>>> Did you have a look into pom.xml for
>>>> flink-storm-compatibililty-example
>>>> and the corresponding word-count-storm.xml? This shows how to build
>>>> a jar correctly (it was recently fixed, so make sure you update to
>>>> the latest master)
>>>>
>>>> You can also have a look here how to package jars correctly (even
>>>> if this example is about Flink ML):
>>>>
>>>> https://stackoverflow.com/questions/31661900/maven-build-has-missin
>>>> g-p
>>>> ackage-linkages/31662642#31662642
>>>>
>>>> -Matthias
>>>>
>>>> On 08/08/2015 11:15 AM, huangwei (G) wrote:
>>>>> Hi,
>>>>> I get some trouble in developing Flink applications.
>>>>>
>>>>> 1.
>>>>> I want to test the performance between Storm and
>>>> flink-storm-compatibility using the test program:
>>>> https://github.com/project-flink/flink-perf/blob/master/storm-jobs/
>>>> src
>>>> /jvm/experiments/Throughput.java
>>>> .
>>>>> And there is a bit of my changes with this Throughput.java below:
>>>>>
>>>>>
>>>>>
>>>>> public static void main(String[] args) throws Exception {
>>>>> ParameterTool pt =
>>>>> ParameterTool.fromArgs(args);
>>>>>
>>>>> int par = pt.getInt("para");
>>>>>
>>>>> final FlinkTopologyBuilder builder = new
>>>> FlinkTopologyBuilder();
>>>>> builder.setSpout("source0", new Generator(pt),
>>>> pt.getInt("sourceParallelism"));
>>>>> int i = 0;
>>>>> for (; i < pt.getInt("repartitions", 1) - 1; i++) {
>>>>> System.out.println("adding source" + i + "
>>>> --> source" + (i + 1));
>>>>> builder.setBolt("source" + (i + 1),
>>>>> new
>>>> RepartPassThroughBolt(pt), pt.getInt("sinkParallelism"))
>>>>> .fieldsGrouping("source"
>>>> + i, new Fields("id"));
>>>>> }
>>>>>
>>>>> System.out.println("adding final source" + i + "
>>>>> -->
>>>> sink");
>>>>> builder.setBolt("sink", new Sink(pt),
>>>> pt.getInt("sinkParallelism")).fieldsGrouping("source" + i, new
>>>> Fields("id"));
>>>>> Config conf = new Config();
>>>>> conf.setDebug(false); //System.exit(1);
>>>>>
>>>>> // execute program locally
>>>>> final FlinkLocalCluster cluster =
>>>> FlinkLocalCluster.getLocalCluster();
>>>>> cluster.submitTopology("throughput", null,
>>>> builder.createTopology());
>>>>> Utils.sleep(10 * 1000);
>>>>>
>>>>> // TODO kill does no do anything so far
>>>>> cluster.killTopology("throughput");
>>>>> cluster.shutdown();
>>>>> }
>>>>>
>>>>>
>>>>> This program will run well in IDEA with flink-storm-compatibility.
>>>>> However, when I packaged it into a jar file and run on the
>>>> flink-0.10SNAPSHOT there is a problem in flink-client log file:
>>>>> java.lang.Exception: Call to registerInputOutput() of invokable failed
>>>>> at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:521)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by:
>> org.apache.flink.streaming.runtime.tasks.StreamTaskException:
>>>> Cannot instantiate user function.
>>>>> at
>>>> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator
>>>> (St
>>>> reamConfig.java:187)
>>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOu
>>>> tpu
>>>> t(StreamTask.java:90)
>>>>> at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:518)
>>>>> ... 1 more
>>>>> Caused by: java.lang.ClassNotFoundException:
>>>> org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper
>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>> at java.lang.Class.forName0(Native Method)
>>>>> at java.lang.Class.forName(Class.java:348)
>>>>> at
>>>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStrea
>>>> m.r
>>>> esolveClass(InstantiationUtil.java:71)
>>>>> at
>>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1
>>>> 613
>>>> )
>>>>> at
>>>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518
>>>> )
>>>>> at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java
>>>> :17
>>>> 74)
>>>>> at
>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>>>> at
>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
>>>> 199
>>>> 3)
>>>>> at
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:191
>>>> 8)
>>>>> at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java
>>>> :18
>>>> 01)
>>>>> at
>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>>>> at
>>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>>>>> at
>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(Instantia
>>>> tio
>>>> nUtil.java:302)
>>>>> at
>>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(Instan
>>>> tia
>>>> tionUtil.java:264)
>>>>> at
>>>> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator
>>>> (St
>>>> reamConfig.java:185)
>>>>> ... 3 more
>>>>>
>>>>>
>>>>> And this class(StormSpoutWrapper) was exist in my packaged jar file.
>>>>> As you can see part of my pom.xml:
>>>>>
>>>>> <!-- Throughput -->
>>>>> <execution>
>>>>> <id>Throughput</id>
>>>>> <phase>package</phase>
>>>>> <goals>
>>>>> <goal>jar</goal>
>>>>> </goals>
>>>>> <configuration>
>>>>> <classifier>Throughput</classifier>
>>>>>
>>>>> <archive>
>>>>> <manifestEntries>
>>>>>
>>>> <program-class>org.apache.flink.stormcompatibility.experiments.Thro
>>>> ugh
>>>> put</program-class>
>>>>> </manifestEntries>
>>>>> </archive>
>>>>>
>>>>> <includes>
>>>>> <!-- from storm-core -->
>>>>> <include>defaults.yaml</include>
>>>>>
>>>>> <include>backtype/storm/*.class</include>
>>>>>
>>>> <include>backtype/storm/serialization/*.class</include>
>>>> <include>backtype/storm/topology/*.class</include>
>>>> <include>backtype/storm/topology/base/*.class</include>
>>>> <include>backtype/storm/utils/*.class</include>
>>>> <include>backtype/storm/spout/*.class</include>
>>>> <include>backtype/storm/task/*.class</include>
>>>> <include>backtype/storm/tuple/*.class</include>
>>>> <include>backtype/storm/generated/*.class</include>
>>>> <include>backtype/storm/metric/**/*.class</include>
>>>> <include>org/apache/storm/curator/*.class</include>
>>>> <include>org/apache/thrift7/**/*.class</include>
>>>> <!--<include>org/yaml/snakeyaml/constructor/*.class</include>-->
>>>> <include>org/yaml/snakeyaml/**/*.class</include>
>>>>> <!-- Storm's recursive
>>>>> dependencies
>>>>> -->
>>>>>
>>>> <include>org/json/simple/**/*.class</include>
>>>>> <!-- compatibility layer -->
>>>>>
>>>>
>>>> <include>org/apache/flink/stormcompatibility/api/*.class</include>
>>>> <include>org/apache/flink/stormcompatibility/wrappers/*.class</incl
>>>> ude
>>>>> <!-- Word Count -->
>>>>>
>>>> <include>org/apache/flink/stormcompatibility/experiments/Throughput
>>>> .cl
>>>> ass</include>
>>>> <include>org/apache/flink/stormcompatibility/experiments/Throughput$*.
>>>> class</include>
>>>>> </includes>
>>>>> </configuration>
>>>>> </execution>
>>>>>
>>>>> So how can I fix it?
>>>>>
>>>>>
>>>>> 2.
>>>>> There is a case following using operator join:
>>>>>
>>>>> DataStream<Tuple3<String, Integer, Long>> user = env.addSource(new
>>>> sourceUserFunction());
>>>>> DataStream<Tuple2<String, Integer>> area = env.addSource(new
>>>> sourceAreaFunction());
>>>>> DataStream<Tuple2<String, Integer>> sink = user
>>>>> .join(area)
>>>>> .onWindow(15, TimeUnit.MINUTES)
>>>>> .where(0)
>>>>> .equalTo(0)
>>>>> .with(new JoinFunction<Tuple3<String, Integer, Long>,
>>>> Tuple2<String, Integer>, Tuple2<String, Integer>>() {
>>>>> @Override
>>>>> public Tuple2<String, Integer> join(Tuple3<String,
>>>>> Integer,
>>>> Long> first, Tuple2<String, Integer> second) throws Exception {
>>>>> if (first.f1 + second.f1 > 10){
>>>>> return new Tuple2<String, Integer>(first.f0,
>>>>> first.f1
>>>>> +
>>>> second.f1);
>>>>> }
>>>>> return null;
>>>>> }
>>>>> });
>>>>>
>>>>> As you see, I don`t want to return null when the condition is not
>>>> satisfied.
>>>>> But there is not any JoinFunction with Collector.
>>>>> I found a FlatJoinFunction which allows the Collector.
>>>>> However, the FlatJoinFunction seem only can be used in DataSet
>>>>> instead
>>>> DataStream.
>>>>> Is there any other way to improve this case?
>>>>>
>>>>> PS. I`m sorry about this email. You may ignore me during the weekend.
>>>>>
>>>>> Greetings,
>>>>> Huang Wei
>>>>> 华为技术有限公司 Huawei Technologies Co., Ltd.
>>>>>
>>>>>
>>>>> Tel:+86 18106512602
>>>>> Email:[email protected]
>>>>>
>>>>
>>