Repository: flink
Updated Branches:
  refs/heads/master c97a63e3a -> 41b826bda


[FLINK-3436] Remove ComplexIntegrationITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/41b826bd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/41b826bd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/41b826bd

Branch: refs/heads/master
Commit: 41b826bda9f9c3cf95cb72a8af4127c255298ef0
Parents: c97a63e
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Wed Feb 17 17:47:07 2016 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Wed Feb 17 17:47:58 2016 +0100

----------------------------------------------------------------------
 .../api/complex/ComplexIntegrationITCase.java   | 825 -------------------
 1 file changed, 825 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/41b826bd/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationITCase.java
deleted file mode 100644
index a0e81a1..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationITCase.java
+++ /dev/null
@@ -1,825 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.complex;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.IterativeStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-@SuppressWarnings("serial")
-public class ComplexIntegrationITCase extends 
StreamingMultipleProgramsTestBase {
-
-       // 
*************************************************************************
-       // GENERAL SETUP
-       // 
*************************************************************************
-
-       private String resultPath1;
-       private String resultPath2;
-       private String expected1;
-       private String expected2;
-       
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception {
-               resultPath1 = tempFolder.newFile().toURI().toString();
-               resultPath2 = tempFolder.newFile().toURI().toString();
-               expected1 = "";
-               expected2 = "";
-       }
-
-       @After
-       public void after() throws Exception {
-               compareResultsByLinesInMemory(expected1, resultPath1);
-               compareResultsByLinesInMemory(expected2, resultPath2);
-       }
-
-       // 
*************************************************************************
-       // INTEGRATION TESTS
-       // 
*************************************************************************
-
-       @Test
-       public void complexIntegrationTest1() throws Exception {
-               //Testing data stream splitting with tuples
-
-               expected1 = "";
-               for (int i = 0; i < 8; i++) {
-                       expected1 += "(10,(a,1))\n";
-               }
-               //i == 8
-               expected1 += "(10,(a,1))";
-
-               expected2 = "";
-               for (int i = 0; i < 18; i++) {
-                       expected2 += "(20,(a,1))\n";
-               }
-               //i == 18
-               expected2 += "(20,(a,1))";
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               DataStream<Tuple2<Long, Tuple2<String, Long>>> sourceStream1 = 
env.addSource(new TupleSource()).setParallelism(1);
-
-               IterativeStream<Tuple2<Long, Tuple2<String, Long>>> it = 
sourceStream1.map(new MapFunction<Tuple2<Long, Tuple2<String, 
Long>>,Tuple2<Long, Tuple2<String, Long>>>(){
-
-                                       Tuple2<Long, Tuple2<String, Long>> 
result = new Tuple2<>(
-                                                       0L, new Tuple2<>("", 
0L));
-
-                                       @Override
-                                       public Tuple2<Long, Tuple2<String, 
Long>> map(
-                                                       Tuple2<Long, 
Tuple2<String, Long>> value) throws Exception {
-                                               result.f0 = result.f0 + 
value.f0;
-                                               result.f1 = value.f1;
-                                               return result;
-                       }
-                       
-               })
-                               .setParallelism(1).filter(new FilterFunction
-                               <Tuple2<Long, Tuple2<String, Long>>>() {
-
-                       @Override
-                       public boolean filter(Tuple2<Long, Tuple2<String, 
Long>> value) throws Exception {
-                               return value.f0 < 20;
-                       }
-               }).iterate(5000);
-
-               SplitStream<Tuple2<Long, Tuple2<String, Long>>> step = 
it.map(new IncrementMap()).split(new
-                               MyOutputSelector());
-               it.closeWith(step.select("iterate"));
-
-               step.select("firstOutput")
-                               .writeAsText(resultPath1, 
FileSystem.WriteMode.OVERWRITE);
-               step.select("secondOutput")
-                               .writeAsText(resultPath2, 
FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-       }
-
-       // Disabled, because it depends on strange behaviour, for example of 
the sum() function.
-       // This test evens fails, for example, if the order of only two lines 
in the "input" is changed.
-       @SuppressWarnings("unchecked")
-       @Ignore
-       @Test
-       public void complexIntegrationTest2() throws Exception {
-               //Testing POJO source, grouping by multiple filds and windowing 
with timestamp
-
-               expected1 = "water_melon-b\n" + "water_melon-b\n" + 
"water_melon-b\n" + "water_melon-b\n" +
-                               "water_melon-b\n" + "water_melon-b\n" + 
"water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" +
-                               "water_melon-b\n" + "water_melon-b\n" + 
"water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" +
-                               "water_melon-b\n" + "water_melon-b\n" + 
"water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" +
-                               "water_melon-b\n" + "orange-b\n" + "orange-b\n" 
+ "orange-b\n" + "orange-b\n" + "orange-b\n" +
-                               "orange-b\n" + "orange-c\n" + "orange-c\n" + 
"orange-c\n" + "orange-c\n" + "orange-d\n" + "orange-d\n" +
-                               "peach-d\n" + "peach-d\n";
-
-               List<Tuple5<Integer, String, Character, Double, Boolean>> input 
= Arrays.asList(
-                               new Tuple5<>(1, "apple", 'j', 0.1, false),
-                               new Tuple5<>(1, "peach", 'b', 0.8, false),
-                               new Tuple5<>(1, "orange", 'c', 0.7, true),
-                               new Tuple5<>(2, "apple", 'd', 0.5, false),
-                               new Tuple5<>(2, "peach", 'j', 0.6, false),
-                               new Tuple5<>(3, "orange", 'b', 0.2, true),
-                               new Tuple5<>(6, "apple", 'c', 0.1, false),
-                               new Tuple5<>(7, "peach", 'd', 0.4, false),
-                               new Tuple5<>(8, "orange", 'j', 0.2, true),
-                               new Tuple5<>(10, "apple", 'b', 0.1, false),
-                               new Tuple5<>(10, "peach", 'c', 0.5, false),
-                               new Tuple5<>(11, "orange", 'd', 0.3, true),
-                               new Tuple5<>(11, "apple", 'j', 0.3, false),
-                               new Tuple5<>(12, "peach", 'b', 0.9, false),
-                               new Tuple5<>(13, "orange", 'c', 0.7, true),
-                               new Tuple5<>(15, "apple", 'd', 0.2, false),
-                               new Tuple5<>(16, "peach", 'j', 0.8, false),
-                               new Tuple5<>(16, "orange", 'b', 0.8, true),
-                               new Tuple5<>(16, "apple", 'c', 0.1, false),
-                               new Tuple5<>(17, "peach", 'd', 1.0, true));
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().enableTimestamps();
-
-               SingleOutputStreamOperator<Tuple5<Integer, String, Character, 
Double, Boolean>, DataStreamSource<Tuple5<Integer, String, Character, Double, 
Boolean>>> sourceStream21 = env.fromCollection(input);
-               DataStream<OuterPojo> sourceStream22 = env.addSource(new 
PojoSource());
-
-               sourceStream21
-                               .assignTimestampsAndWatermarks(new 
MyTimestampExtractor())
-                               .keyBy(2, 2)
-                               .timeWindow(Time.of(10, TimeUnit.MILLISECONDS), 
Time.of(4, TimeUnit.MILLISECONDS))
-                               .maxBy(3)
-                               .map(new MyMapFunction2())
-                               .flatMap(new MyFlatMapFunction())
-                               .connect(sourceStream22)
-                               .map(new MyCoMapFunction())
-                               .writeAsText(resultPath1, 
FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-       }
-
-       // Ignore because the count(10_000) window actually only emits one 
element during processing
-       // and all the rest in close()
-       @SuppressWarnings("unchecked")
-       @Ignore
-       @Test
-       public void complexIntegrationTest3() throws Exception {
-               //Heavy prime factorisation with maps and flatmaps
-
-               expected1 = "541\n" + "1223\n" + "3319\n" + "5851\n" + "1987\n" 
+ "8387\n" + "15907\n" + "10939\n" +
-                               "4127\n" + "2477\n" + "6737\n" + "13421\n" + 
"4987\n" + "4999\n" + "18451\n" + "9283\n" + "7499\n" +
-                               "16937\n" + "11927\n" + "9973\n" + "14431\n" + 
"19507\n" + "12497\n" + "17497\n" + "14983\n" +
-                               "19997\n";
-
-               expected1 = "541\n" + "1223\n" + "1987\n" + "2741\n" + "3571\n" 
+ "10939\n" + "4409\n" +
-                               "5279\n" + "11927\n" + "6133\n" + "6997\n" + 
"12823\n" + "7919\n" + "8831\n" +
-                               "13763\n" + "9733\n" + "9973\n" + "14759\n" + 
"15671\n" + "16673\n" + "17659\n" +
-                               "18617\n";
-
-               for (int i = 2; i < 100; i++) {
-                       expected2 += "(" + i + "," + 20000 / i + ")\n";
-               }
-               for (int i = 19901; i < 20000; i++) {
-                       expected2 += "(" + i + "," + 20000 / i + ")\n";
-               }
-               //i == 20000
-               expected2 += "(" + 20000 + "," + 1 + ")";
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               // set to parallelism 1 because otherwise we don't know which 
elements go to which parallel
-               // count-window.
-               env.setParallelism(1);
-
-               env.setBufferTimeout(0);
-
-               DataStream<Long> sourceStream31 = env.generateSequence(1, 
10000);
-               DataStream<Long> sourceStream32 = env.generateSequence(10001, 
20000);
-
-
-               sourceStream31.filter(new PrimeFilterFunction())
-                               .windowAll(GlobalWindows.create())
-                               
.trigger(PurgingTrigger.of(CountTrigger.of(100)))
-                               .max(0)
-                               .union(sourceStream32.filter(new 
PrimeFilterFunction())
-                                               
.windowAll(GlobalWindows.create())
-                                               
.trigger(PurgingTrigger.of(CountTrigger.of(100)))
-                                               .max(0))
-                               .writeAsText(resultPath1, 
FileSystem.WriteMode.OVERWRITE);
-
-               sourceStream31
-                       .flatMap(new DivisorsFlatMapFunction())
-                       .union(sourceStream32.flatMap(new 
DivisorsFlatMapFunction()))
-                       .map(new MapFunction<Long, Tuple2<Long,Integer>>() {
-
-                       @Override
-                       public Tuple2<Long, Integer> map(Long value) throws 
Exception {
-                               return new Tuple2<>(value, 1);
-                       }
-               })
-                               .keyBy(0)
-                               .window(GlobalWindows.create())
-                               
.trigger(PurgingTrigger.of(CountTrigger.of(10_000)))
-                               .sum(1)
-                               
-                       .print();
-
-               env.execute();
-       }
-
-       @Test
-       @Ignore
-       @SuppressWarnings("unchecked, rawtypes")
-       public void complexIntegrationTest4() throws Exception {
-               //Testing mapping and delta-policy windowing with custom class
-
-               expected1 = "((100,100),0)\n" + "((120,122),5)\n" + 
"((121,125),6)\n" + "((138,144),9)\n" +
-                       "((139,147),10)\n" + "((156,166),13)\n" + 
"((157,169),14)\n" + "((174,188),17)\n" + "((175,191),18)\n" +
-                       "((192,210),21)\n" + "((193,213),22)\n" + 
"((210,232),25)\n" + "((211,235),26)\n" + "((228,254),29)\n" +
-                       "((229,257),30)\n" + "((246,276),33)\n" + 
"((247,279),34)\n" + "((264,298),37)\n" + "((265,301),38)\n" +
-                       "((282,320),41)\n" + "((283,323),42)\n" + 
"((300,342),45)\n" + "((301,345),46)\n" + "((318,364),49)\n" +
-                       "((319,367),50)\n" + "((336,386),53)\n" + 
"((337,389),54)\n" + "((354,408),57)\n" + "((355,411),58)\n" +
-                       "((372,430),61)\n" + "((373,433),62)\n" + 
"((390,452),65)\n" + "((391,455),66)\n" + "((408,474),69)\n" +
-                       "((409,477),70)\n" + "((426,496),73)\n" + 
"((427,499),74)\n" + "((444,518),77)\n" + "((445,521),78)\n" +
-                       "((462,540),81)\n" + "((463,543),82)\n" + 
"((480,562),85)\n" + "((481,565),86)\n" + "((498,584),89)\n" +
-                       "((499,587),90)\n" + "((516,606),93)\n" + 
"((517,609),94)\n" + "((534,628),97)\n" + "((535,631),98)";
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(1);
-
-               TupleSerializer<Tuple2<Rectangle, Integer>> deltaSerializer = 
new TupleSerializer<>((Class) Tuple2.class,
-                       new TypeSerializer[] {new 
KryoSerializer<>(Rectangle.class, env.getConfig()),
-                       IntSerializer.INSTANCE});
-
-               env.addSource(new RectangleSource())
-                               .global()
-                               .map(new RectangleMapFunction())
-                               .windowAll(GlobalWindows.create())
-                               .trigger(PurgingTrigger.of(DeltaTrigger.of(0.0, 
new MyDelta(), deltaSerializer)))
-                               .apply(new MyWindowMapFunction())
-                               .writeAsText(resultPath1, 
FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-       }
-
-       private static class MyDelta implements DeltaFunction<Tuple2<Rectangle, 
Integer>> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public double getDelta(Tuple2<Rectangle, Integer> oldDataPoint, 
Tuple2<Rectangle,
-                               Integer> newDataPoint) {
-                       return (newDataPoint.f0.b - newDataPoint.f0.a) - 
(oldDataPoint.f0.b - oldDataPoint.f0.a);
-               }
-       }
-
-
-       @Test
-       public void complexIntegrationTest5() throws Exception {
-               //Turning on and off chaining
-
-               expected1 = "1\n" + "2\n" + "2\n" + "3\n" + "3\n" + "3\n" + 
"4\n" + "4\n" + "4\n" + "4\n" + "5\n" + "5\n" +
-                               "5\n" + "5\n" + "5\n";
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               // Set to parallelism 1 to make it deterministic, otherwise, it 
is not clear which
-               // elements will go to which parallel instance of the fold
-               env.setParallelism(1);
-               
-               env.setBufferTimeout(0);
-
-               DataStream<Long> dataStream51 = env.generateSequence(1, 5)
-                               .map(new MapFunction<Long, Long>() {
-
-                                       @Override
-                                       public Long map(Long value) throws 
Exception {
-                                               return value;
-                                       }
-                               }).startNewChain()
-                               .filter(new FilterFunction<Long>() {
-
-                                       @Override
-                                       public boolean filter(Long value) 
throws Exception {
-                                               return true;
-                                       }
-                               }).disableChaining()
-                               .flatMap(new SquareFlatMapFunction());
-
-               DataStream<Long> dataStream53 = dataStream51.map(new 
MapFunction<Long, Long>() {
-
-                       @Override
-                       public Long map(Long value) throws Exception {
-                               return value;
-                       }
-               });
-
-
-               dataStream53.writeAsText(resultPath1, 
FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-       }
-
-
-       @Test
-       @Ignore
-       public void complexIntegrationTest6() throws Exception {
-               //Testing java collections and date-time types
-
-               expected1 = "(6,(a,6))\n" + "(6,(b,3))\n" + "(6,(c,4))\n" + 
"(6,(d,2))\n" + "(6,(f,2))\n" +
-                               "(7,(a,1))\n" + "(7,(b,2))\n" + "(7,(c,3))\n" + 
"(7,(d,1))\n" + "(7,(e,1))\n" + "(7,(f,1))\n" +
-                               "(8,(a,6))\n" + "(8,(b,4))\n" + "(8,(c,5))\n" + 
"(8,(d,1))\n" + "(8,(e,2))\n" + "(8,(f,2))\n" +
-                               "(9,(a,4))\n" + "(9,(b,4))\n" + "(9,(c,7))\n" + 
"(9,(d,3))\n" + "(9,(e,1))\n" + "(9,(f,2))\n" +
-                               "(10,(a,3))\n" + "(10,(b,2))\n" + 
"(10,(c,3))\n" + "(10,(d,2))\n" + "(10,(e,1))\n" + "(10,(f,1))";
-               expected2 = "[a, a, c, c, d, f]\n" + "[a, b, b, d]\n" + "[a, a, 
a, b, c, c, f]\n" + "[a, d, e]\n" +
-                               "[b, b, c, c, c, f]\n" + "[a, a, a, a, b, b, c, 
c, e]\n" + "[a, a, b, b, c, c, c, d, e, f, f]\n" +
-                               "[a, a, a, b, c, c, c, d, d, f]\n" + "[a, b, b, 
b, c, c, c, c, d, e, f]\n" +
-                               "[a, a, a, b, b, c, c, c, d, d, e, f]";
-
-               SimpleDateFormat ft = new SimpleDateFormat("dd-MM-yyyy");
-
-               ArrayList<Tuple2<Date, HashMap<Character, Integer>>> sales = 
new ArrayList<>();
-               HashMap<Character, Integer> sale1 = new HashMap<>();
-               sale1.put('a', 2);
-               sale1.put('c', 2);
-               sale1.put('d', 1);
-               sale1.put('f', 1);
-               sales.add(new Tuple2<>(ft.parse("03-06-2014"), sale1));
-
-               HashMap<Character, Integer> sale2 = new HashMap<>();
-               sale2.put('a', 1);
-               sale2.put('b', 2);
-               sale2.put('d', 1);
-               sales.add(new Tuple2<>(ft.parse("10-06-2014"), sale2));
-
-               HashMap<Character, Integer> sale3 = new HashMap<>();
-               sale3.put('a', 3);
-               sale3.put('b', 1);
-               sale3.put('c', 2);
-               sale3.put('f', 1);
-               sales.add(new Tuple2<>(ft.parse("29-06-2014"), sale3));
-
-               HashMap<Character, Integer> sale4 = new HashMap<>();
-               sale4.put('a', 1);
-               sale4.put('d', 1);
-               sale4.put('e', 1);
-               sales.add(new Tuple2<>(ft.parse("15-07-2014"), sale4));
-
-               HashMap<Character, Integer> sale5 = new HashMap<>();
-               sale5.put('b', 2);
-               sale5.put('c', 3);
-               sale5.put('f', 1);
-               sales.add(new Tuple2<>(ft.parse("24-07-2014"), sale5));
-
-               HashMap<Character, Integer> sale6 = new HashMap<>();
-               sale6.put('a', 4);
-               sale6.put('b', 2);
-               sale6.put('c', 2);
-               sale6.put('e', 1);
-               sales.add(new Tuple2<>(ft.parse("17-08-2014"), sale6));
-
-               HashMap<Character, Integer> sale7 = new HashMap<>();
-               sale7.put('a', 2);
-               sale7.put('b', 2);
-               sale7.put('c', 3);
-               sale7.put('d', 1);
-               sale7.put('e', 1);
-               sale7.put('f', 2);
-               sales.add(new Tuple2<>(ft.parse("27-08-2014"), sale7));
-
-               HashMap<Character, Integer> sale8 = new HashMap<>();
-               sale8.put('a', 3);
-               sale8.put('b', 1);
-               sale8.put('c', 3);
-               sale8.put('d', 2);
-               sale8.put('f', 1);
-               sales.add(new Tuple2<>(ft.parse("16-09-2014"), sale8));
-
-               HashMap<Character, Integer> sale9 = new HashMap<>();
-               sale9.put('a', 1);
-               sale9.put('b', 3);
-               sale9.put('c', 4);
-               sale9.put('d', 1);
-               sale9.put('e', 1);
-               sale9.put('f', 1);
-               sales.add(new Tuple2<>(ft.parse("25-09-2014"), sale9));
-
-               HashMap<Character, Integer> sale10 = new HashMap<>();
-               sale10.put('a', 3);
-               sale10.put('b', 2);
-               sale10.put('c', 3);
-               sale10.put('d', 2);
-               sale10.put('e', 1);
-               sale10.put('f', 1);
-               sales.add(new Tuple2<>(ft.parse("01-10-2014"), sale10));
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().enableTimestamps();
-
-               DataStream<Tuple2<Date, HashMap<Character, Integer>>> 
sourceStream6 = env.fromCollection(sales);
-               sourceStream6
-                               .assignTimestampsAndWatermarks(new Timestamp6())
-                               .timeWindowAll(Time.of(1, 
TimeUnit.MILLISECONDS))
-                               .reduce(new SalesReduceFunction())
-                               .flatMap(new FlatMapFunction6())
-                               .writeAsText(resultPath1, 
FileSystem.WriteMode.OVERWRITE);
-
-               sourceStream6.map(new MapFunction6())
-                               .writeAsText(resultPath2, 
FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-       }
-
-       // 
*************************************************************************
-       // FUNCTIONS
-       // 
*************************************************************************
-
-       private static class MyMapFunction2 implements 
MapFunction<Tuple5<Integer, String, Character, Double, Boolean>, 
Tuple4<Integer, String,
-                       Double, Boolean>> {
-
-               @Override
-               public Tuple4<Integer, String, Double, Boolean> 
map(Tuple5<Integer, String, Character, Double,
-                               Boolean> value) throws Exception {
-                       return new Tuple4<>(value.f0, value.f1 + "-" + value.f2,
-                                       value.f3, value.f4);
-               }
-
-       }
-
-       private static class PojoSource implements SourceFunction<OuterPojo> {
-               private static final long serialVersionUID = 1L;
-
-               long cnt = 0;
-
-               @Override
-               public void run(SourceContext<OuterPojo> ctx) throws Exception {
-                       for (int i = 0; i < 20; i++) {
-                               OuterPojo result = new OuterPojo(new 
InnerPojo(cnt / 2, "water_melon-b"), 2L);
-                               ctx.collect(result);
-                       }
-               }
-
-               @Override
-               public void cancel() {
-
-               }
-       }
-
-       private static class TupleSource implements SourceFunction<Tuple2<Long, 
Tuple2<String, Long>>> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void run(SourceContext<Tuple2<Long, Tuple2<String, 
Long>>> ctx) throws Exception {
-                       for (int i = 0; i < 20; i++) {
-                               Tuple2<Long, Tuple2<String, Long>> result = new 
Tuple2<>(1L, new Tuple2<>("a", 1L));
-                               ctx.collect(result);
-                       }
-               }
-
-               @Override
-               public void cancel() {
-
-               }
-       }
-
-       private class IncrementMap implements MapFunction<Tuple2<Long, 
Tuple2<String, Long>>, Tuple2<Long, Tuple2<String,
-                       Long>>> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public Tuple2<Long, Tuple2<String, Long>> map(Tuple2<Long, 
Tuple2<String, Long>> value) throws Exception {
-                       return new Tuple2<>(value.f0 + 1, value.f1);
-               }
-       }
-
-       private static class MyTimestampExtractor extends 
AscendingTimestampExtractor<Tuple5<Integer, String, Character, Double, 
Boolean>> {
-
-
-               @Override
-               public long extractAscendingTimestamp(Tuple5<Integer, String, 
Character, Double, Boolean> element, long currentTimestamp) {
-                       return (long) element.f0;
-               }
-       }
-
-       private static class MyFlatMapFunction implements 
FlatMapFunction<Tuple4<Integer, String, Double,
-                       Boolean>, OuterPojo> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void flatMap(Tuple4<Integer, String, Double, Boolean> 
value, Collector<OuterPojo> out) throws
-                               Exception {
-                       if (value.f3) {
-                               for (int i = 0; i < 2; i++) {
-                                       out.collect(new OuterPojo(new 
InnerPojo((long) value.f0, value.f1), (long) i));
-                               }
-                       }
-               }
-       }
-
-       private class MyCoMapFunction implements CoMapFunction<OuterPojo, 
OuterPojo, String> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public String map1(OuterPojo value) {
-                       return value.f0.f1;
-               }
-
-               @Override
-               public String map2(OuterPojo value) {
-                       return value.f0.f1;
-               }
-       }
-
-       private class MyOutputSelector implements OutputSelector<Tuple2<Long, 
Tuple2<String, Long>>> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public Iterable<String> select(Tuple2<Long, Tuple2<String, 
Long>> value) {
-                       List<String> output = new ArrayList<>();
-                       if (value.f0 == 10) {
-                               output.add("iterate");
-                               output.add("firstOutput");
-                       } else if (value.f0 == 20) {
-                               output.add("secondOutput");
-                       } else {
-                               output.add("iterate");
-                       }
-                       return output;
-               }
-       }
-
-       private static class PrimeFilterFunction implements 
FilterFunction<Long> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public boolean filter(Long value) throws Exception {
-                       if (value < 2) {
-                               return false;
-                       } else {
-                               for (long i = 2; i < value; i++) {
-                                       if (value % i == 0) {
-                                               return false;
-                                       }
-                               }
-                       }
-                       return true;
-               }
-       }
-
-       private static class DivisorsFlatMapFunction implements 
FlatMapFunction<Long, Long> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void flatMap(Long value, Collector<Long> out) throws 
Exception {
-                       for (long i = 2; i <= value; i++) {
-                               if (value % i == 0) {
-                                       out.collect(i);
-                               }
-                       }
-               }
-       }
-
-       private static class RectangleSource extends 
RichSourceFunction<Rectangle> {
-               private static final long serialVersionUID = 1L;
-               private transient Rectangle rectangle;
-
-               public void open(Configuration parameters) throws Exception {
-                       rectangle = new Rectangle(100, 100);
-               }
-
-               @Override
-               public void run(SourceContext<Rectangle> ctx) throws Exception {
-                       // emit once as the initializer of the delta trigger
-                       ctx.collect(rectangle);
-                       for (int i = 0; i < 100; i++) {
-                               ctx.collect(rectangle);
-                               rectangle = rectangle.next();
-                       }
-               }
-
-               @Override
-               public void cancel() {
-               }
-       }
-
-       private static class RectangleMapFunction implements 
MapFunction<Rectangle, Tuple2<Rectangle, Integer>> {
-               private static final long serialVersionUID = 1L;
-               private int counter = 0;
-
-               @Override
-               public Tuple2<Rectangle, Integer> map(Rectangle value) throws 
Exception {
-                       return new Tuple2<>(value, counter++);
-               }
-       }
-
-       private static class MyWindowMapFunction implements 
AllWindowFunction<Iterable<Tuple2<Rectangle, Integer>>, Tuple2<Rectangle, 
Integer>, GlobalWindow> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void apply(GlobalWindow window, 
Iterable<Tuple2<Rectangle, Integer>> values, Collector<Tuple2<Rectangle,
-                               Integer>> out) throws Exception {
-                       out.collect(values.iterator().next());
-               }
-       }
-
-       private static class SquareFlatMapFunction implements 
FlatMapFunction<Long, Long> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void flatMap(Long value, Collector<Long> out) throws 
Exception {
-                       for (long i = 0; i < value; i++) {
-                               out.collect(value);
-                       }
-               }
-       }
-
-       private static class Timestamp6 implements 
AssignerWithPunctuatedWatermarks<Tuple2<Date, HashMap<Character, Integer>>> {
-               
-               @Override
-               public long extractTimestamp(Tuple2<Date, HashMap<Character, 
Integer>> value, long previousTimestamp) {
-                       
-                       Calendar cal = Calendar.getInstance();
-                       cal.setTime(value.f0);
-                       return 12 * (cal.get(Calendar.YEAR)) + 
cal.get(Calendar.MONTH);
-               }
-
-               @Override
-               public long checkAndGetNextWatermark(Tuple2<Date, 
HashMap<Character, Integer>> lastElement, long extractedTimestamp) {
-                       return extractedTimestamp - 1;
-               }
-       }
-
-       private static class SalesReduceFunction implements 
ReduceFunction<Tuple2<Date, HashMap<Character, Integer>>> {
-
-               @Override
-               public Tuple2<Date, HashMap<Character, Integer>> 
reduce(Tuple2<Date, HashMap<Character, Integer>> value1,
-                               Tuple2<Date,HashMap<Character, Integer>> 
value2) throws Exception {
-                       
-                       HashMap<Character, Integer> map1 = value1.f1;
-                       HashMap<Character, Integer> map2 = value2.f1;
-                       for (Character key : map2.keySet()) {
-                               Integer volume1 = map1.get(key);
-                               Integer volume2 = map2.get(key);
-                               if (volume1 == null) {
-                                       volume1 = 0;
-                               }
-                               map1.put(key, volume1 + volume2);
-                       }
-                       return new Tuple2<>(value2.f0, map1);
-               }
-       }
-
-       private static class FlatMapFunction6 implements 
FlatMapFunction<Tuple2<Date, HashMap<Character, Integer>>,
-                       Tuple2<Integer, Tuple2<Character, Integer>>> {
-
-               @Override
-               public void flatMap(Tuple2<Date, HashMap<Character, Integer>> 
value, Collector<Tuple2<Integer,
-                               Tuple2<Character, Integer>>> out) throws 
Exception {
-                       Calendar cal = Calendar.getInstance();
-                       cal.setTime(value.f0);
-                       for (Character key : value.f1.keySet()) {
-                               out.collect(new Tuple2<>(cal.get(Calendar.MONTH)
-                                               + 1,
-                                               new Tuple2<>(key, 
value.f1.get(key))));
-                       }
-               }
-       }
-
-       private static class MapFunction6 implements MapFunction<Tuple2<Date, 
HashMap<Character, Integer>>, ArrayList<Character>> {
-
-               @Override
-               public ArrayList<Character> map(Tuple2<Date, HashMap<Character, 
Integer>> value)
-                               throws Exception {
-                       ArrayList<Character> list = new ArrayList<>();
-                       for (Character ch : value.f1.keySet()) {
-                               for (int i = 0; i < value.f1.get(ch); i++) {
-                                       list.add(ch);
-                               }
-                       }
-                       Collections.sort(list);
-                       return list;
-               }
-       }
-
-       // 
*************************************************************************
-       // DATA TYPES
-       // 
*************************************************************************
-
-       //Flink Pojo
-       public static class InnerPojo {
-               public Long f0;
-               public String f1;
-
-               //default constructor to qualify as Flink POJO
-               InnerPojo(){}
-
-               public InnerPojo(Long f0, String f1) {
-                       this.f0 = f0;
-                       this.f1 = f1;
-               }
-
-               @Override
-               public String toString() {
-                       return "POJO(" + f0 + "," + f1 + ")";
-               }
-       }
-
-       // Nested class serialized with Kryo
-       public static class OuterPojo {
-               public InnerPojo f0;
-               public Long f1;
-
-               public OuterPojo(InnerPojo f0, Long f1) {
-                       this.f0 = f0;
-                       this.f1 = f1;
-               }
-
-               @Override
-               public String toString() {
-                       return "POJO(" + f0 + "," + f1 + ")";
-               }
-       }
-
-       public static class Rectangle {
-
-               public int a;
-               public int b;
-
-               //default constructor to qualify as Flink POJO
-               public Rectangle() {}
-
-               public Rectangle(int a, int b) {
-                       this.a = a;
-                       this.b = b;
-               }
-
-               public Rectangle next() {
-                       return new Rectangle(a + (b % 11), b + (a % 9));
-               }
-
-               @Override
-               public String toString() {
-                       return "(" + a + "," + b + ")";
-               }
-       }
-}

Reply via email to