Subham created FLINK-8663:
-----------------------------

             Summary: Execution of DataStreams result in non functionality of 
size of Window for countWindow
                 Key: FLINK-8663
                 URL: https://issues.apache.org/jira/browse/FLINK-8663
             Project: Flink
          Issue Type: Bug
          Components: DataStream API
    Affects Versions: 1.4.0
         Environment: package com.vnl.stocks;



import java.util.concurrent.TimeUnit;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class StocksProcessing {
    
    public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env =
                    StreamExecutionEnvironment.getExecutionEnvironment();
        
                //Read from a socket stream at map it to StockPrice objects
                DataStream<StockPrice> socketStockStream = env
                        .socketTextStream("localhost", 9999)
                        .map(new MapFunction<String, StockPrice>() {
                            private String[] tokens;
        
                            @Override
                            public StockPrice map(String value) throws 
Exception {
                                tokens = value.split(",");
                                return new StockPrice(tokens[0],
                                    Double.parseDouble(tokens[1]));
                            }
                        });
                
                socketStockStream.print();
                //Generate other stock streams
                DataStream<StockPrice> SPX_stream = env.addSource(new 
StockSource("SPX", 10));
              //  DataStream<StockPrice> FTSE_stream = env.addSource(new 
StockSource("FTSE", 20));
              //  DataStream<StockPrice> DJI_stream = env.addSource(new 
StockSource("DJI", 30));
              //  DataStream<StockPrice> BUX_stream = env.addSource(new 
StockSource("BUX", 40));
        
                //Merge all stock streams together
                
                DataStream<StockPrice> stockStream = 
socketStockStream.union(SPX_stream/*, FTSE_stream, DJI_stream, BUX_stream*/);
                
                
               // stockStream.print();
                Thread.sleep(100);
                                                
                AllWindowedStream<StockPrice, GlobalWindow> windowedStream = 
stockStream
                        .countWindowAll(10, 5);
                        
                        //.keyBy("symbol")
                        //.timeWindowAll(Time.of(10, TimeUnit.SECONDS), 
Time.of(1, TimeUnit.SECONDS));
                
                    //stockStream.keyBy("symbol");
                    //Compute some simple statistics on a rolling window
                    DataStream<StockPrice> lowest = 
windowedStream.maxBy("price");
                    //DataStream<StockPrice> highest = windowedStream.;
                    /*DataStream<StockPrice> maxByStock = 
windowedStream.groupBy("symbol")
                        .maxBy("price").flatten();
                    DataStream<StockPrice> rollingMean = 
windowedStream.groupBy("symbol")
                        .mapWindow(new WindowMean()).flatten();*/
                    lowest.print();
                    
                      Thread.sleep(100);
                /*    
                    AllWindowedStream<StockPrice, GlobalWindow> windowedStream1 
= lowest
                            .countWindowAll(5,2);
                    //windowedStream1.print();
                    DataStream<StockPrice> highest = 
windowedStream1.minBy("price");*/
                    //highest.print();
                    
                    env.execute("Stock stream");
        }
}
            Reporter: Subham


I used AllWindowedStream<?,GlobalWindow> to process a stream and generate 
maximum of my window using countWindowAll functions. In this output the size 
and slide of window works incorrectly.

Refer below example for the bug

Initial stream : 1,2,3,4,5,6.........

Output 1: (Find min for window 10,5) : 1,6,11.....(This is correct)

However if i calculate maximum, I get output as:

Output 2: (Find max for window 10,5) : 5,10,15.... (which is wrong)

Expected: 10,15,20....

 

Please resolve this error.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to