How about something like this:

DataStream<Tuple2<String, Integer>> sink = user
        .join(area)
        .onWindow(15, TimeUnit.MINUTES)
        .where(0)
        .equalTo(0)
        .flatMap(new FlatMapFunction<Tuple2<Tuple3<String, Integer, Long>,Tuple2<String, 
Integer>>, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(Tuple2<Tuple3<String, Integer, Long>,Tuple2<String, 
Integer>> value, Collector<Tuple2<String, Integer>> c) throws Exception {
                        if (value.f0.f1 + value.f1.f1 > 10){
                                c.collect(new Tuple2<String, 
Integer>(value.f0.f0, value.f0.f1 + value.f1.f1));
                        }
                }
        });


On 13.08.2015 15:24, huangwei (G) wrote:
Hi Stephan,

Thank you for the information about the 
https://github.com/apache/flink/pull/1008.
I`ll try again.
Otherwise, is there any suggestion about the original second problem:

2.
There is a case following using operator join:

DataStream<Tuple3<String, Integer, Long>> user = env.addSource(new
sourceUserFunction());
DataStream<Tuple2<String, Integer>> area = env.addSource(new
sourceAreaFunction());
DataStream<Tuple2<String, Integer>> sink = user
       .join(area)
       .onWindow(15, TimeUnit.MINUTES)
       .where(0)
       .equalTo(0)
       .with(new JoinFunction<Tuple3<String, Integer, Long>,
Tuple2<String, Integer>, Tuple2<String, Integer>>() {
          @Override
          public Tuple2<String, Integer> join(Tuple3<String,
Integer,
Long> first, Tuple2<String, Integer> second) throws Exception {
             if (first.f1 + second.f1 > 10){
                return new Tuple2<String, Integer>(first.f0,
first.f1
+
second.f1);
             }
             return null;
          }
       });

As you see, I don`t want to return null when the condition is not
satisfied.
But there is not any JoinFunction with Collector.
I found a FlatJoinFunction which allows the Collector.
However, the FlatJoinFunction seem only can be used in DataSet
instead
DataStream.
Is there any other way to improve this case?

Greetings,
Huang Wei

----------------------
early emails(Stephan wrote):

We are seeing these class loader issues a lot as of late.

Seems that packaging the classes is trickier than anticipated.

Here is a pull request to add some diagnostics info on a
"ClassNotFoundException": https://github.com/apache/flink/pull/1008

On Tue, Aug 11, 2015 at 3:29 PM, Matthias J. Sax < 
mj...@informatik.hu-berlin.de> wrote:

Three comments

1) If StormSpoutWrapper is in your jar, is it located in the correct
directory (must be same as package name)?

2) If you are using FlinkTopologyBuilder, you need to package as shown
in StormWordCountRemoteBySubmitter example, using an additional
assembly file.
(The first examples are for embedded Spout/Bolts with Flink streaming
program. -- Maybe we should add some comments in the pom file...)

3) How do you build? The whole source code? Or only parts of it? Did
you run "install"?

-Matthias


On 08/11/2015 02:47 PM, huangwei (G) wrote:
Hi Stephan and Matthias,

Sorry for replying late.
I`ve double checked that this class StormSpoutWrapper is really
exist in
my jar file.
And it got the same trouble when I ran the
flink-storm-compatibililty-example- corresponding word-count-storm.
The way I built my Throughput application was just adding it into
the
flink-storm-compatibililty-example and change some configurations in
the word-count-storm.xml.
Here is the entire POM file.


<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements.  See the NOTICE file distributed
with this work for additional information regarding copyright
ownership.  The ASF licenses this file to you under the Apache
License, Version 2.0 (the "License"); you may not use this file
except in compliance with the License.  You may obtain a copy of the
License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.  See the License for the specific language governing
permissions and limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"; xmlns:xsi="
http://www.w3.org/2001/XMLSchema-instance";
       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd";>
       <modelVersion>4.0.0</modelVersion>

       <parent>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-storm-compatibility-parent</artifactId>
               <version>0.10-SNAPSHOT</version>
               <relativePath>..</relativePath>
       </parent>

       <artifactId>flink-storm-compatibility-examples</artifactId>
       <name>flink-storm-compatibility-examples</name>

       <packaging>jar</packaging>

       <dependencies>
               <dependency>
                       <groupId>org.apache.flink</groupId>

  <artifactId>flink-storm-compatibility-core</artifactId>
                       <version>${project.version}</version>
               </dependency>

               <dependency>
                       <groupId>org.apache.flink</groupId>
                       <artifactId>flink-java-examples</artifactId>
                       <version>${project.version}</version>
               </dependency>

               <dependency>
                       <groupId>org.apache.flink</groupId>
                       <artifactId>flink-streaming-core</artifactId>
                       <version>${project.version}</version>
                       <scope>test</scope>
                       <type>test-jar</type>
               </dependency>

               <dependency>
                       <groupId>org.apache.flink</groupId>
                       <artifactId>flink-test-utils</artifactId>
                       <version>${project.version}</version>
                       <scope>test</scope>
               </dependency>

         <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-core</artifactId>
             <version>0.9.4</version>
             <!-- keep storm out of the jar-with-dependencies -->
             <scope>provided</scope>
         </dependency>
       </dependencies>

       <build>
               <plugins>
                       <!-- get default data from flink-java-examples
package -->
                       <plugin>
<groupId>org.apache.maven.plugins</groupId>

  <artifactId>maven-dependency-plugin</artifactId>
  <version>2.9</version><!--$NO-MVN-MAN-VER$-->
                               <executions>
                                       <execution>
                                               <id>unpack</id>

  <phase>prepare-package</phase>
                                               <goals>
                                                       <goal>unpack</goal>
                                               </goals>
                                               <configuration>
<artifactItems>

  <artifactItem>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-java-examples</artifactId>
  <version>${project.version}</version>
  <type>jar</type>
  <overWrite>false</overWrite>
  <outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/examples/java/wordcount/util/WordCountData.
class</includes>
  </artifactItem>
  <artifactItem>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-storm-compatibility-core</artifactId>
  <version>${project.version}</version>
  <type>jar</type>
  <overWrite>false</overWrite>
  <outputDirectory>${project.build.directory}/classes</outputDirectory>
  </artifactItem>
  <artifactItem>
  <groupId>org.apache.storm</groupId>
  <artifactId>storm-core</artifactId>
  <version>0.9.4</version>
  <type>jar</type>
  <overWrite>false</overWrite>
  <outputDirectory>${project.build.directory}/classes</outputDirectory>
  <!--<excludes>defaults.yaml</excludes>-->
  </artifactItem>
  <artifactItem>
  <groupId>com.googlecode.json-simple</groupId>
  <artifactId>json-simple</artifactId>
  <version>1.1</version>
  <type>jar</type>
  <overWrite>false</overWrite>
  <outputDirectory>${project.build.directory}/classes</outputDirectory>
  </artifactItem>snakeyaml
  <artifactItem>
  <groupId>org.yaml</groupId>
  <artifactId>snakeyaml</artifactId>
  <version>1.11</version>
  <type>jar</type>
  <overWrite>false</overWrite>
  <outputDirectory>${project.build.directory}/classes</outputDirectory>
  </artifactItem>
                                                       </artifactItems>
                                               </configuration>
                                       </execution>
                               </executions>
                       </plugin>

                       <!-- self-contained jars for each example -->
                       <plugin>
                               <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>

                               <executions>

                                       <!-- WordCount Spout source-->
                                       <execution>

  <id>WordCount-SpoutSource</id>
                                               <phase>package</phase>
                                               <goals>
                                                       <goal>jar</goal>
                                               </goals>
                                               <configuration>

  <classifier>WordCountSpoutSource</classifier>
                                                       <archive>

  <manifestEntries>
<program-class>org.apache.flink.stormcompatibility.wordcount.SpoutSour
ceWordCount</program-class>
  </manifestEntries>
                                                       </archive>

                                                       <includes>
                                                               <!--
from
storm-core -->
  <include>backtype/storm/topology/*.class</include>
  <include>backtype/storm/spout/*.class</include>
  <include>backtype/storm/task/*.class</include>
  <include>backtype/storm/tuple/*.class</include>
  <include>backtype/storm/generated/*.class</include>
  <include>backtype/storm/metric/**/*.class</include>
  <include>org/apache/thrift7/**/*.class</include>
                                                               <!--
Storm's recursive dependencies -->
  <include>org/json/simple/**/*.class</include>
                                                               <!--
compatibility layer -->
  <include>org/apache/flink/stormcompatibility/api/*.class</include>
<include>org/apache/flink/stormcompatibility/wrappers/*.class</include
                                                               <!--
Word
Count -->
<include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWord
Count.class</include>
<include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWord
Count$*.class</include>
<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/
AbstractStormSpout.class</include>
<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/
StormFileSpout.class</include>
<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/
StormInMemorySpout.class</include>
<include>org/apache/flink/examples/java/wordcount/util/WordCountData.c
lass</include>
                                                       </includes>
                                               </configuration>
                                       </execution>

                                       <!-- WordCount Bolt tokenizer-->
                                       <execution>

  <id>WordCount-BoltTokenizer</id>
                                               <phase>package</phase>
                                               <goals>
                                                       <goal>jar</goal>
                                               </goals>
                                               <configuration>

  <classifier>WordCountBoltTokenizer</classifier>
                                                       <archive>

  <manifestEntries>
<program-class>org.apache.flink.stormcompatibility.wordcount.BoltToken
izerWordCount</program-class>
  </manifestEntries>
                                                       </archive>

                                                       <includes>
                                                               <!--
from
storm-core -->
  <include>backtype/storm/topology/*.class</include>
  <include>backtype/storm/spout/*.class</include>
  <include>backtype/storm/task/*.class</include>
  <include>backtype/storm/tuple/*.class</include>
  <include>backtype/storm/generated/*.class</include>
  <include>backtype/storm/metric/**/*.class</include>
  <include>org/apache/thrift7/**/*.class</include>
                                                               <!--
Storm's recursive dependencies -->
  <include>org/json/simple/**/*.class</include>
                                                               <!--
compatibility layer -->
  <include>org/apache/flink/stormcompatibility/api/*.class</include>
<include>org/apache/flink/stormcompatibility/wrappers/*.class</include
                                                               <!--
Word
Count -->
<include>org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWo
rdCount.class</include>
<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/
StormBoltTokenizer.class</include>
<include>org/apache/flink/examples/java/wordcount/util/WordCountData.c
lass</include>
                                                       </includes>
                                               </configuration>
                                       </execution>

                     <!-- Throughput -->
                     <execution>
                         <id>Throughput</id>
                         <phase>package</phase>
                         <goals>
                             <goal>jar</goal>
                         </goals>
                         <configuration>
                             <classifier>Throughput</classifier>

                             <archive>
                                 <manifestEntries>

<program-class>org.apache.flink.stormcompatibility.experiments.Through
put</program-class>
                                 </manifestEntries>
                             </archive>

                             <includes>
                                 <!-- from storm-core -->
                                 <include>defaults.yaml</include>
<include>backtype/storm/*.class</include>

  <include>backtype/storm/serialization/*.class</include>
  <include>backtype/storm/topology/*.class</include>
  <include>backtype/storm/topology/base/*.class</include>
  <include>backtype/storm/utils/*.class</include>
  <include>backtype/storm/spout/*.class</include>
  <include>backtype/storm/task/*.class</include>
  <include>backtype/storm/tuple/*.class</include>
  <include>backtype/storm/generated/*.class</include>
  <include>backtype/storm/metric/**/*.class</include>
  <include>org/apache/storm/curator/*.class</include>
  <include>org/apache/thrift7/**/*.class</include>
  <!--<include>org/yaml/snakeyaml/constructor/*.class</include>-->
  <include>org/yaml/snakeyaml/**/*.class</include>
                                 <!-- Storm's recursive dependencies
-->

  <include>org/json/simple/**/*.class</include>
                                 <!-- compatibility layer -->

  <include>org/apache/flink/stormcompatibility/api/*.class</include>
<include>org/apache/flink/stormcompatibility/wrappers/*.class</include
                                 <!-- Word Count -->

<include>org/apache/flink/stormcompatibility/experiments/Throughput.cl
ass</include>
<include>org/apache/flink/stormcompatibility/experiments/Throughput$*.
class</include>
                             </includes>
                         </configuration>
                     </execution>

                                       <execution>
                                               <goals>

  <goal>test-jar</goal>
                                               </goals>
                                       </execution>
                               </executions>
                       </plugin>

                       <!-- WordCount Storm topology-->
                       <!-- Cannot use maven-jar-plugin because
'defaults.yaml' must be included in jar -->
                       <plugin>

  <artifactId>maven-assembly-plugin</artifactId>
                               <configuration>
                                       <descriptors>

  <descriptor>src/assembly/word-count-storm.xml</descriptor>
                                       </descriptors>
                                       <archive>
                                               <manifestEntries>

<program-class>org.apache.flink.stormcompatibility.wordcount.StormWord
CountRemoteBySubmitter</program-class>
                                               </manifestEntries>
                                       </archive>
                               </configuration>

                               <executions>
                                       <execution>
                                               <id>WordCountStorm</id>
                                               <phase>package</phase>
                                               <goals>
                                                       <goal>single</goal>
                                               </goals>
                                       </execution>
                               </executions>
                       </plugin>
               </plugins>

               <pluginManagement>
                       <plugins>
                               <!--This plugin's configuration is
used to
store Eclipse m2e settings only. It has no influence on the Maven
build itself.-->
                               <plugin>
<groupId>org.eclipse.m2e</groupId>

  <artifactId>lifecycle-mapping</artifactId>
                                       <version>1.0.0</version>
                                       <configuration>
                                               <lifecycleMappingMetadata>
<pluginExecutions>

  <pluginExecution>
  <pluginExecutionFilter>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-dependency-plugin</artifactId>
      <versionRange>[2.9,)</versionRange>
      <goals>
              <goal>unpack</goal>
      </goals>
  </pluginExecutionFilter>
  <action>
      <ignore/>
  </action>
  </pluginExecution>
                                                       </pluginExecutions>
                                               </lifecycleMappingMetadata>
                                       </configuration>
                               </plugin>
                       </plugins>
               </pluginManagement>

       </build>

</project>

-----邮件原件-----
发件人: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] 代表 Stephan
Ewen
发送时间: 2015年8月11日 1:55
收件人: dev@flink.apache.org
主题: Re: Some problems about Flink applications

It would also help if you could paste the entire POM file of your
project.
Maybe something is amiss with the dependencies, or the scopes?

On Sat, Aug 8, 2015 at 7:27 PM, Matthias J. Sax <
mj...@informatik.hu-berlin.de> wrote:
Hi Huang,

about Storm compatibility. Did you double check, that the file that
is missing (StormSpoutWrapper) is contained in your jar. Looking at
pom.xml does not help here, because if you specify to include a
file, but maven cannot find it, it will just not add it to the jar,
but build will succeed. Thus, you need to check the jar file itself
via
command line:
     unzip -l myjarfile.jar
or
     unzip -l myjarfile.jar | grep file-I-am-looking-for

I guess your jar is not build correctly, ie, the file is not there...

Did you have a look into pom.xml for
flink-storm-compatibililty-example
and the corresponding word-count-storm.xml? This shows how to build
a jar correctly (it was recently fixed, so make sure you update to
the latest master)

You can also have a look here how to package jars correctly (even
if this example is about Flink ML):

https://stackoverflow.com/questions/31661900/maven-build-has-missin
g-p
ackage-linkages/31662642#31662642

-Matthias

On 08/08/2015 11:15 AM, huangwei (G) wrote:
Hi,
I get some trouble in developing Flink applications.

1.
I want to test the performance between Storm and
flink-storm-compatibility using the test program:
https://github.com/project-flink/flink-perf/blob/master/storm-jobs/
src
/jvm/experiments/Throughput.java
.
And there is a bit of my changes with this Throughput.java below:



public static void main(String[] args) throws Exception {
                    ParameterTool pt =
ParameterTool.fromArgs(args);

                    int par = pt.getInt("para");

                    final FlinkTopologyBuilder builder = new
FlinkTopologyBuilder();
                    builder.setSpout("source0", new Generator(pt),
pt.getInt("sourceParallelism"));
                    int i = 0;
                    for (; i < pt.getInt("repartitions", 1) - 1; i++) {
                             System.out.println("adding source" + i + "
--> source" + (i + 1));
                             builder.setBolt("source" + (i + 1),
new
RepartPassThroughBolt(pt), pt.getInt("sinkParallelism"))
                                                .fieldsGrouping("source"
+ i, new Fields("id"));
                    }

                    System.out.println("adding final source" + i + "
-->
sink");
                    builder.setBolt("sink", new Sink(pt),
pt.getInt("sinkParallelism")).fieldsGrouping("source" + i, new
Fields("id"));
                    Config conf = new Config();
                    conf.setDebug(false); //System.exit(1);

                    // execute program locally
                    final FlinkLocalCluster cluster =
FlinkLocalCluster.getLocalCluster();
                    cluster.submitTopology("throughput", null,
builder.createTopology());
                    Utils.sleep(10 * 1000);

                    // TODO kill does no do anything so far
                    cluster.killTopology("throughput");
                    cluster.shutdown();
          }


This program will run well in IDEA with flink-storm-compatibility.
However, when I packaged it into a jar file and run on the
flink-0.10SNAPSHOT there is a problem in flink-client log file:
java.lang.Exception: Call to registerInputOutput() of invokable failed
          at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:521)
          at java.lang.Thread.run(Thread.java:745)
Caused by:
org.apache.flink.streaming.runtime.tasks.StreamTaskException:
Cannot instantiate user function.
          at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator
(St
reamConfig.java:187)
          at
org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOu
tpu
t(StreamTask.java:90)
          at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:518)
          ... 1 more
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper
          at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
          at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
          at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
          at java.lang.Class.forName0(Native Method)
          at java.lang.Class.forName(Class.java:348)
          at
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStrea
m.r
esolveClass(InstantiationUtil.java:71)
          at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1
613
)
          at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518
)
          at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java
:17
74)
          at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
          at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
199
3)
          at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:191
8)
          at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java
:18
01)
          at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
          at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
          at
org.apache.flink.util.InstantiationUtil.deserializeObject(Instantia
tio
nUtil.java:302)
          at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(Instan
tia
tionUtil.java:264)
          at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator
(St
reamConfig.java:185)
          ... 3 more


And this class(StormSpoutWrapper) was exist in my packaged jar file.
As you can see part of my pom.xml:

<!-- Throughput -->
                     <execution>
                         <id>Throughput</id>
                         <phase>package</phase>
                         <goals>
                             <goal>jar</goal>
                         </goals>
                         <configuration>
                             <classifier>Throughput</classifier>

                             <archive>
                                 <manifestEntries>

<program-class>org.apache.flink.stormcompatibility.experiments.Thro
ugh
put</program-class>
                                 </manifestEntries>
                             </archive>

                             <includes>
                                 <!-- from storm-core -->
                                 <include>defaults.yaml</include>

<include>backtype/storm/*.class</include>

  <include>backtype/storm/serialization/*.class</include>
  <include>backtype/storm/topology/*.class</include>
  <include>backtype/storm/topology/base/*.class</include>
  <include>backtype/storm/utils/*.class</include>
  <include>backtype/storm/spout/*.class</include>
  <include>backtype/storm/task/*.class</include>
  <include>backtype/storm/tuple/*.class</include>
  <include>backtype/storm/generated/*.class</include>
  <include>backtype/storm/metric/**/*.class</include>
  <include>org/apache/storm/curator/*.class</include>
  <include>org/apache/thrift7/**/*.class</include>
  <!--<include>org/yaml/snakeyaml/constructor/*.class</include>-->
  <include>org/yaml/snakeyaml/**/*.class</include>
                                 <!-- Storm's recursive
dependencies
-->

  <include>org/json/simple/**/*.class</include>
                                 <!-- compatibility layer -->

  <include>org/apache/flink/stormcompatibility/api/*.class</include>
<include>org/apache/flink/stormcompatibility/wrappers/*.class</incl
ude
                                 <!-- Word Count -->

<include>org/apache/flink/stormcompatibility/experiments/Throughput
.cl
ass</include>
<include>org/apache/flink/stormcompatibility/experiments/Throughput$*.
class</include>
                             </includes>
                         </configuration>
                     </execution>

So how can I fix it?


2.
There is a case following using operator join:

DataStream<Tuple3<String, Integer, Long>> user = env.addSource(new
sourceUserFunction());
DataStream<Tuple2<String, Integer>> area = env.addSource(new
sourceAreaFunction());
DataStream<Tuple2<String, Integer>> sink = user
       .join(area)
       .onWindow(15, TimeUnit.MINUTES)
       .where(0)
       .equalTo(0)
       .with(new JoinFunction<Tuple3<String, Integer, Long>,
Tuple2<String, Integer>, Tuple2<String, Integer>>() {
          @Override
          public Tuple2<String, Integer> join(Tuple3<String,
Integer,
Long> first, Tuple2<String, Integer> second) throws Exception {
             if (first.f1 + second.f1 > 10){
                return new Tuple2<String, Integer>(first.f0,
first.f1
+
second.f1);
             }
             return null;
          }
       });

As you see, I don`t want to return null when the condition is not
satisfied.
But there is not any JoinFunction with Collector.
I found a FlatJoinFunction which allows the Collector.
However, the FlatJoinFunction seem only can be used in DataSet
instead
DataStream.
Is there any other way to improve this case?

PS. I`m sorry about this email. You may ignore me during the weekend.

Greetings,
Huang Wei
华为技术有限公司 Huawei Technologies Co., Ltd.


Tel:+86 18106512602
Email:huangwei...@huawei.com




Reply via email to