Hello all, I was playing around with the the IncrementalLearningSkeleton example and I had a couple of questions regarding the behavior of connected streams.
In the example the elements are assigned timestamps, and there is a stream, model, that produces Double[] elements by ingesting and processing a stream of training Integer data points. DataStream<Double[]> model = trainingData .assignTimestampsAndWatermarks(new LinearTimestamp()) .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS)) .apply(new PartialModelBuilder()); The model stream is then connected onto a newData stream which allows us to use the constantly updated model stream to make predictions for the incoming stream of newData, by having a model variable shared between the two map functions in the coMap class. The shared model var is updated every time an element from the model stream arrives (starts out as null) DataStream<Integer> prediction = newData.connect(model).map(new Predictor ()); My confusion comes when I tried a slightly different approach [2], without using timestamps or watermarks. In my example I simply create countWindows of say 100 elements, and I use readTextFile to read in the trainingData and newData : DataStream<ArrayList<Double>> model = trainingData .countWindowAll(100) .apply(new PartialModelBuilder()); When I then connect the model stream to the newData stream, the map1 function of the comap never sees the model as not null, as it seems that the map functions are executed in order: first the map1 function is executed for all the newData elements, then the map2 function is executed for all the model elements. So how does having or not having timestamps affect the behavior of the connected stream? How would I handle such a case if the notion of timestamps does not apply for my data? (i.e. here I'm interested in streaming historical data, I assume their order does not matter) [1] https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java [2] https://gist.github.com/thvasilo/67bcb9370b03971f380ae43c4ae6e2d0