One more thing, please try to minimize your solution by removing this Union and Odd/Even filters at the beginning and check whether you get the same results.
Piotrek > On Jun 19, 2017, at 2:43 PM, Piotr Nowojski <pi...@data-artisans.com> wrote: > > Hi, > > It is difficult for me to respond fully to your question. First of all it > would be really useful if you could strip down your example to a minimal > version that shows a problem. Unfortunately I was unable to reproduce your > issue. I was getting only one output line per window (as expected). Could you > try to print output to the console (or use some different data sink) instead > of writing it back to the Kafka, maybe there is a problem? Also please try > remove some parts of the code bit by bit, so that you may be able to find > what’s causing a problem. > > As a side note I have couple of concerns with your > timestamps/watermarks/windows definitions. First you specify time > characteristic to an EventTime: > >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > But I don’t see where you are actually setting the timestamp/watermarks. > Didn’t you want to use “.assignTimestampsAndWatermarks(…)” on your input > DataStream based on it’s content? Nevertheless, later you specify window by > ProcessingTime: > >> .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2))); > > Which defines the windows independent of the content of those events. Maybe > switching to properly EvenTime will solve your problem? > > Thanks, Piotrek > >> On Jun 18, 2017, at 6:12 PM, FRANCISCO BORJA ROBLES MARTIN >> <francisco.robles.mar...@alumnos.upm.es> wrote: >> >> Hello everybody! First of all, thanks for reading :D >> >> I am currently working on my bachelor's final project which is a comparison >> between Spark Streaming and Flink. Now let's focus on the problem: >> >> - THE PROBLEM: the problem is that my program is writing to Kafka more than >> once every window (is creating 2-3 or more lines per window, meanwhile it is >> supposed to create 1 line per window as with the reduce function it lets >> only one element). I have the same code written in Spark and it works >> perfectly. I have been trying to find info about this issue and I haven't >> found anything :(. Also I have been trying changing some functions' >> parallelism and some more things and nothing worked, and I can not realise >> where can be the problem. >> >> - MY CLUSTER: I am using Flink 1.2.0 and OpenJDK 8. I have 3 computers: 1 >> JobManager, 2 TaskManagers (4 cores, 2GB RAM, 4 task slots each TaskManager). >> >> - INPUT DATA: lines produced by one java producer to the Kafka 24 >> partitions' topic with two elements: incremental value and creation >> timestamp: >> 1 1497790546981 >> 2 1497790546982 >> 3 1497790546983 >> 4 1497790546984 >> .................. >> >> - MY JAVA APPLICATION: >> + It reads from a Kafka topic with 24 partitions (Kafka is in the same >> machine than the JobManager). >> + The filter functions are useless together with the union as I use them >> just for checking their latency. >> + Basically, it adds a "1" to each line,then there is a tumbling window >> every 2 seconds, and the reduce function sum all this 1's and all the >> timestamps, this last timestamp is later divided in the map function between >> the sum of 1's which gives me the average, and finally in the last map >> function it adds a timestamp of the current moment to each reduced line and >> the difference between this timestamp and the average timestamp. >> + This line is written to Kafka (to a 2 partitions' topic). >> >> ######################### - CODE - #################################### >> >> //FLINK CONFIGURATION >> final StreamExecutionEnvironment env = StreamExecutionEnvironment >> .getExecutionEnvironment(); >> >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> //env.setParallelism(2); >> >> //KAFKA CONSUMER CONFIGURATION >> Properties properties = new Properties(); >> properties.setProperty("bootstrap.servers", "192.168.0.155:9092"); >> FlinkKafkaConsumer010<String> myConsumer = new >> FlinkKafkaConsumer010<>(args[0], new SimpleStringSchema(), properties); >> >> >> //KAFKA PRODUCER >> Properties producerConfig = new Properties(); >> producerConfig.setProperty("bootstrap.servers", "192.168.0.155:9092"); >> producerConfig.setProperty("acks", "0"); >> producerConfig.setProperty("linger.ms", "0"); >> >> >> //MAIN PROGRAM >> //Read from Kafka >> DataStream<String> line = env.addSource(myConsumer); >> >> //Add 1 to each line >> DataStream<Tuple2<String, Integer>> line_Num = line.map(new NumberAdder()); >> >> //Filted Odd numbers >> DataStream<Tuple2<String, Integer>> line_Num_Odd = line_Num.filter(new >> FilterOdd()); >> >> //Filter Even numbers >> DataStream<Tuple2<String, Integer>> line_Num_Even = line_Num.filter(new >> FilterEven()); >> >> //Join Even and Odd >> DataStream<Tuple2<String, Integer>> line_Num_U = >> line_Num_Odd.union(line_Num_Even); >> >> //Tumbling windows every 2 seconds >> AllWindowedStream<Tuple2<String, Integer>, TimeWindow> windowedLine_Num_U >> = line_Num_U >> .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2))); >> >> //Reduce to one line with the sum >> DataStream<Tuple2<String, Integer>> wL_Num_U_Reduced = >> windowedLine_Num_U.reduce(new Reducer()); >> >> //Calculate the average of the elements summed >> DataStream<String> wL_Average = wL_Num_U_Reduced.map(new >> AverageCalculator()); >> >> //Add timestamp and calculate the difference with the average >> DataStream<String> averageTS = wL_Average.map(new TimestampAdder()); >> >> >> //Send the result to Kafka >> FlinkKafkaProducer010Configuration<String> myProducerConfig = >> (FlinkKafkaProducer010Configuration<String>) FlinkKafkaProducer010 >> .writeToKafkaWithTimestamps(averageTS, "testRes", new >> SimpleStringSchema(), producerConfig); >> >> myProducerConfig.setWriteTimestampToKafka(true); >> >> env.execute("TimestampLongKafka"); >> } >> >> >> //Functions used in the program implementation: >> >> public static class FilterOdd implements FilterFunction<Tuple2<String, >> Integer>> { >> private static final long serialVersionUID = 1L; >> >> public boolean filter(Tuple2<String, Integer> line) throws Exception { >> Boolean isOdd = (Long.valueOf(line._1.split(" ")[0]) % 2) != 0; >> return isOdd; >> } >> }; >> >> >> public static class FilterEven implements FilterFunction<Tuple2<String, >> Integer>> { >> private static final long serialVersionUID = 1L; >> >> public boolean filter(Tuple2<String, Integer> line) throws Exception { >> Boolean isEven = (Long.valueOf(line._1.split(" ")[0]) % 2) == 0; >> return isEven; >> } >> }; >> >> >> public static class NumberAdder implements MapFunction<String, >> Tuple2<String, Integer>> { >> private static final long serialVersionUID = 1L; >> >> public Tuple2<String, Integer> map(String line) { >> Tuple2<String, Integer> newLine = new Tuple2<String, Integer>(line, 1); >> return newLine; >> } >> }; >> >> >> public static class Reducer implements ReduceFunction<Tuple2<String, >> Integer>> { >> private static final long serialVersionUID = 1L; >> >> public Tuple2<String, Integer> reduce(Tuple2<String, Integer> line1, >> Tuple2<String, Integer> line2) throws Exception { >> Long sum = Long.valueOf(line1._1.split(" ")[0]) + >> Long.valueOf(line2._1.split(" ")[0]); >> Long sumTS = Long.valueOf(line1._1.split(" ")[1]) + >> Long.valueOf(line2._1.split(" ")[1]); >> Tuple2<String, Integer> newLine = new Tuple2<String, >> Integer>(String.valueOf(sum) + " " + String.valueOf(sumTS), >> line1._2 + line2._2); >> return newLine; >> } >> }; >> >> >> public static class AverageCalculator implements MapFunction<Tuple2<String, >> Integer>, String> { >> private static final long serialVersionUID = 1L; >> >> public String map(Tuple2<String, Integer> line) throws Exception { >> Long average = Long.valueOf(line._1.split(" ")[1]) / line._2; >> String result = String.valueOf(line._2) + " " + >> String.valueOf(average); >> return result; >> } >> }; >> >> >> public static final class TimestampAdder implements MapFunction<String, >> String> { >> private static final long serialVersionUID = 1L; >> >> public String map(String line) throws Exception { >> Long currentTime = System.currentTimeMillis(); >> String totalTime = String.valueOf(currentTime - >> Long.valueOf(line.split(" ")[1])); >> String newLine = line.concat(" " + String.valueOf(currentTime) + " " + >> totalTime); >> >> return newLine; >> } >> }; >> >> ######################### - CODE - #################################### >> >> - SOME OUTPUT DATA: this output has been written to the 2 partitions' topic, >> and with a producing rate of less than 1000 records/second (**in this case >> it is creating 3 output lines per window): >> >> (check that every 3 lines correspond to the same 2 seconds part) >> 1969 1497791240910 1497791241999 1089 1497791242001 1091 >> 1973 1497791240971 1497791241999 1028 1497791242002 1031 >> 1970 1497791240937 1497791242094 1157 1497791242198 1261 >> 1917 1497791242912 1497791243999 1087 1497791244051 1139 >> 1905 1497791242971 1497791243999 1028 1497791244051 1080 >> 1916 1497791242939 1497791244096 1157 1497791244199 1260 >> 1994 1497791244915 1497791245999 1084 1497791246002 1087 >> 1993 1497791244966 1497791245999 1033 1497791246004 1038 >> 1990 1497791244939 1497791246097 1158 1497791246201 1262 >> >> Thanks again in advance! >> >> --- >> FRANCISCO BORJA ROBLES MARTÍN >> Escuela Técnica Superior de Ingeniería Informática >> Campus Montegancedo s/n >> Universidad Politécnica de Madrid (Technical University of Madrid) >