One more thing, please try to minimize your solution by removing this Union and 
Odd/Even filters at the beginning and check whether you get the same results.

Piotrek

> On Jun 19, 2017, at 2:43 PM, Piotr Nowojski <pi...@data-artisans.com> wrote:
> 
> Hi,
> 
> It is difficult for me to respond fully to your question. First of all it 
> would be really useful if you could strip down your example to a minimal 
> version that shows a problem. Unfortunately I was unable to reproduce your 
> issue. I was getting only one output line per window (as expected). Could you 
> try to print output to the console (or use some different data sink) instead 
> of writing it back to the Kafka, maybe there is a problem? Also please try 
> remove some parts of the code bit by bit, so that you may be able to find 
> what’s causing a problem.
> 
> As a side note I have couple of concerns with your 
> timestamps/watermarks/windows definitions. First you specify time 
> characteristic to an EventTime:
> 
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> 
> But I don’t see where you are actually setting the timestamp/watermarks. 
> Didn’t you want to use “.assignTimestampsAndWatermarks(…)” on your input 
> DataStream based on it’s content? Nevertheless, later you specify window by 
> ProcessingTime:
> 
>>          .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)));
> 
> Which defines the windows independent of the content of those events. Maybe 
> switching to properly EvenTime will solve your problem?
> 
> Thanks, Piotrek
> 
>> On Jun 18, 2017, at 6:12 PM, FRANCISCO BORJA ROBLES MARTIN 
>> <francisco.robles.mar...@alumnos.upm.es> wrote:
>> 
>> Hello everybody! First of all, thanks for reading :D
>> 
>> I am currently working on my bachelor's final project which is a comparison 
>> between Spark Streaming and Flink. Now let's focus on the problem:
>> 
>> - THE PROBLEM: the problem is that my program is writing to Kafka more than 
>> once every window (is creating 2-3 or more lines per window, meanwhile it is 
>> supposed to create 1 line per window as with the reduce function it lets 
>> only one element). I have the same code written in Spark and it works 
>> perfectly. I have been trying to find info about this issue and I haven't 
>> found anything :(. Also I have been trying changing some functions' 
>> parallelism and some more things and nothing worked, and I can not realise 
>> where can be the problem.
>> 
>> - MY CLUSTER: I am using Flink 1.2.0 and OpenJDK 8. I have 3 computers: 1 
>> JobManager, 2 TaskManagers (4 cores, 2GB RAM, 4 task slots each TaskManager).
>> 
>> - INPUT DATA: lines produced by one java producer to the Kafka 24 
>> partitions' topic with two elements: incremental value and creation 
>> timestamp:
>> 1 1497790546981
>> 2 1497790546982
>> 3 1497790546983
>> 4 1497790546984
>> ..................
>> 
>> - MY JAVA APPLICATION:
>> + It reads from a Kafka topic with 24 partitions (Kafka is in the same 
>> machine than the JobManager).
>> + The filter functions are useless together with the union as I use them 
>> just for checking their latency.
>> + Basically, it adds a "1" to each line,then there is a tumbling window 
>> every 2 seconds, and the  reduce function sum all this 1's and all the 
>> timestamps, this last timestamp is later divided in the map function between 
>> the sum of 1's which gives me the average, and finally in the last map 
>> function it adds a timestamp of the current moment to each reduced line and 
>> the difference between this timestamp and the average timestamp.
>> + This line is written to Kafka (to a 2 partitions' topic).
>> 
>> ######################### - CODE - ####################################
>> 
>>   //FLINK CONFIGURATION
>>   final StreamExecutionEnvironment env = StreamExecutionEnvironment
>>           .getExecutionEnvironment();
>> 
>>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>   //env.setParallelism(2);
>> 
>>   //KAFKA CONSUMER CONFIGURATION
>>   Properties properties = new Properties();
>>   properties.setProperty("bootstrap.servers", "192.168.0.155:9092");
>>   FlinkKafkaConsumer010<String> myConsumer = new 
>> FlinkKafkaConsumer010<>(args[0], new SimpleStringSchema(), properties);
>> 
>> 
>>   //KAFKA PRODUCER
>>   Properties producerConfig = new Properties();
>>   producerConfig.setProperty("bootstrap.servers", "192.168.0.155:9092");
>>   producerConfig.setProperty("acks", "0");
>>   producerConfig.setProperty("linger.ms", "0");
>> 
>> 
>>   //MAIN PROGRAM
>>   //Read from Kafka
>>   DataStream<String> line = env.addSource(myConsumer);
>> 
>>   //Add 1 to each line
>>   DataStream<Tuple2<String, Integer>> line_Num = line.map(new NumberAdder());
>> 
>>   //Filted Odd numbers
>>   DataStream<Tuple2<String, Integer>> line_Num_Odd = line_Num.filter(new 
>> FilterOdd());
>> 
>>   //Filter Even numbers
>>   DataStream<Tuple2<String, Integer>> line_Num_Even = line_Num.filter(new 
>> FilterEven());
>> 
>>   //Join Even and Odd
>>   DataStream<Tuple2<String, Integer>> line_Num_U = 
>> line_Num_Odd.union(line_Num_Even);
>> 
>>   //Tumbling windows every 2 seconds
>>   AllWindowedStream<Tuple2<String, Integer>, TimeWindow> windowedLine_Num_U 
>> = line_Num_U
>>           .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)));
>> 
>>   //Reduce to one line with the sum
>>   DataStream<Tuple2<String, Integer>> wL_Num_U_Reduced = 
>> windowedLine_Num_U.reduce(new Reducer());
>> 
>>   //Calculate the average of the elements summed
>>   DataStream<String> wL_Average = wL_Num_U_Reduced.map(new 
>> AverageCalculator());
>> 
>>   //Add timestamp and calculate the difference with the average
>>   DataStream<String> averageTS = wL_Average.map(new TimestampAdder());
>> 
>> 
>>   //Send the result to Kafka
>>   FlinkKafkaProducer010Configuration<String> myProducerConfig = 
>> (FlinkKafkaProducer010Configuration<String>) FlinkKafkaProducer010
>>           .writeToKafkaWithTimestamps(averageTS, "testRes", new 
>> SimpleStringSchema(), producerConfig);
>> 
>>   myProducerConfig.setWriteTimestampToKafka(true);
>> 
>>   env.execute("TimestampLongKafka");
>> }
>> 
>> 
>> //Functions used in the program implementation:
>> 
>> public static class FilterOdd implements FilterFunction<Tuple2<String, 
>> Integer>> {
>>   private static final long serialVersionUID = 1L;
>> 
>>   public boolean filter(Tuple2<String, Integer> line) throws Exception {
>>       Boolean isOdd = (Long.valueOf(line._1.split(" ")[0]) % 2) != 0;
>>       return isOdd;
>>   }
>> };
>> 
>> 
>> public static class FilterEven implements FilterFunction<Tuple2<String, 
>> Integer>> {
>>   private static final long serialVersionUID = 1L;
>> 
>>   public boolean filter(Tuple2<String, Integer> line) throws Exception {
>>       Boolean isEven = (Long.valueOf(line._1.split(" ")[0]) % 2) == 0;
>>       return isEven;
>>   }
>> };
>> 
>> 
>> public static class NumberAdder implements MapFunction<String, 
>> Tuple2<String, Integer>> {
>>   private static final long serialVersionUID = 1L;
>> 
>>   public Tuple2<String, Integer> map(String line) {
>>       Tuple2<String, Integer> newLine = new Tuple2<String, Integer>(line, 1);
>>       return newLine;
>>   }
>> };
>> 
>> 
>> public static class Reducer implements ReduceFunction<Tuple2<String, 
>> Integer>> {
>>   private static final long serialVersionUID = 1L;
>> 
>>   public Tuple2<String, Integer> reduce(Tuple2<String, Integer> line1, 
>> Tuple2<String, Integer> line2) throws Exception {
>>       Long sum = Long.valueOf(line1._1.split(" ")[0]) + 
>> Long.valueOf(line2._1.split(" ")[0]);
>>       Long sumTS = Long.valueOf(line1._1.split(" ")[1]) + 
>> Long.valueOf(line2._1.split(" ")[1]);
>>       Tuple2<String, Integer> newLine = new Tuple2<String, 
>> Integer>(String.valueOf(sum) + " " + String.valueOf(sumTS),
>>               line1._2 + line2._2);
>>       return newLine;
>>   }
>> };
>> 
>> 
>> public static class AverageCalculator implements MapFunction<Tuple2<String, 
>> Integer>, String> {
>>   private static final long serialVersionUID = 1L;
>> 
>>   public String map(Tuple2<String, Integer> line) throws Exception {
>>       Long average = Long.valueOf(line._1.split(" ")[1]) / line._2;
>>       String result = String.valueOf(line._2) + " " + 
>> String.valueOf(average);
>>       return result;
>>   }
>> };
>> 
>> 
>> public static final class TimestampAdder implements MapFunction<String, 
>> String> {
>>   private static final long serialVersionUID = 1L;
>> 
>>   public String map(String line) throws Exception {
>>       Long currentTime = System.currentTimeMillis();
>>       String totalTime = String.valueOf(currentTime - 
>> Long.valueOf(line.split(" ")[1]));
>>       String newLine = line.concat(" " + String.valueOf(currentTime) + " " + 
>> totalTime);
>> 
>>       return newLine;
>>   }
>> };
>> 
>> ######################### - CODE - ####################################
>> 
>> - SOME OUTPUT DATA: this output has been written to the 2 partitions' topic, 
>> and with a producing rate of less than 1000 records/second (**in this case 
>> it is creating 3 output lines per window):
>> 
>> (check that every 3 lines correspond to the same 2 seconds part)
>> 1969 1497791240910 1497791241999 1089 1497791242001 1091
>> 1973 1497791240971 1497791241999 1028 1497791242002 1031
>> 1970 1497791240937 1497791242094 1157 1497791242198 1261
>> 1917 1497791242912 1497791243999 1087 1497791244051 1139
>> 1905 1497791242971 1497791243999 1028 1497791244051 1080
>> 1916 1497791242939 1497791244096 1157 1497791244199 1260
>> 1994 1497791244915 1497791245999 1084 1497791246002 1087
>> 1993 1497791244966 1497791245999 1033 1497791246004 1038
>> 1990 1497791244939 1497791246097 1158 1497791246201 1262
>> 
>> Thanks again in advance!
>> 
>> ---
>> FRANCISCO BORJA ROBLES MARTÍN
>> Escuela Técnica Superior de Ingeniería Informática
>> Campus Montegancedo s/n
>> Universidad Politécnica de Madrid (Technical University of Madrid)
> 

Reply via email to