Hello all,

I was playing around with the the IncrementalLearningSkeleton example and I
had a couple of questions regarding the behavior of connected streams.

In the example the elements are assigned timestamps, and there is a stream,
model, that produces
Double[] elements by ingesting and processing a stream of training Integer
data points.

DataStream<Double[]> model = trainingData
                .assignTimestampsAndWatermarks(new LinearTimestamp())
                .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
                .apply(new PartialModelBuilder());

The model stream is then connected onto a newData stream which allows us to
use the
constantly updated model stream to make predictions for the incoming stream
of newData,
by having a model variable shared between the two map functions in the
coMap class.
The shared model var is updated every time an element from the model stream
arrives (starts
out as null)

DataStream<Integer> prediction = newData.connect(model).map(new Predictor
());

My confusion comes when I tried a slightly different approach [2], without
using timestamps
or watermarks. In my example I simply create countWindows of say 100
elements,
and I use readTextFile to read in the trainingData and newData :

DataStream<ArrayList<Double>> model = trainingData
        .countWindowAll(100)
        .apply(new PartialModelBuilder());

When I then connect the model stream to the newData stream, the map1
function of the
comap never sees the model as not null, as it seems that the map functions
are executed
in order: first the map1 function is executed for all the newData elements,
then the map2
function is executed for all the model elements.

So how does having or not having timestamps affect the behavior of the
connected stream?

How would I handle such a case if the notion of timestamps does not apply
for my data?
(i.e. here I'm interested in streaming historical data, I assume their
order does not matter)


[1]
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java

[2] https://gist.github.com/thvasilo/67bcb9370b03971f380ae43c4ae6e2d0

Reply via email to