Are you referring to "ExampleIntegrationTest"? If so, then I'm afraid this test is slightly misleading since the order isn't guaranteed in this case.

1) As long as the parallelism of the sink is 1 the elements should arrive in order.

2) The order is maintained if parallelism=1 since elements cannot overtake each other in a single stream.

If the parallelism is increased by a subsequent operation O1, then the individual subtasks of O1 will still see a sorted stream. If an operation O2 after O1 has a lower parallelism than O1 then it will not see a sorted stream, since the outputs of O1-subtasks may interleave at will. This is the reason why the "ExampleIntegrationTest" is incorrect; while the 2 sink instances receive a sorted input they are adding them into a single collection, interleaving data.

This is fine:

env.fromElements(1L, 21L, 22L)
   .map(x -> x *2)
   .setParallelism(2)
   <apply order-dependent operation>
   .setParallelism(2);

This is not:

env.fromElements(1L, 21L, 22L)
   .map(x -> x *2)
   .setParallelism(2)
   <apply order-dependent operation>
   .setParallelism(1);

In other words, if you never reduce the parallelism your functions should be fine. If you have to reduce the parallelism then you must resort the stream (or realistically, window) somehow.

On 02/10/2019 23:32, Dominik Wosiński wrote:
Hello,
I have a question, since I am observing quite weird behavior. In the
documentation[1] the example of FlinkMiniCluster usage, shows that we can
expect the results to appear in the same order as they were injected to the
stream by use of *fromElements(). *I mean that Java version of the code is
using assertEquals for list, which will be true only if ArrayLists have the
same elements with the same order. On the other hand the Scala version of
this code uses different matcher that only asserts if all elements are
actually present in the list.
So I have two questions here:

1) For the code below can we be sure that the output will have the same
order as the input ?
For some reason the code returns the elements In quite random order in the
sink. I was actually sure that it is the expected behavior but this piece
of documentation made me wonder.

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.fromElements("A", "B", "C", "D", "E", "F")
     .addSink(new TestSink)
env.execute()

class TestSink extends SinkFunction[String] {
   override def invoke(value: String): Unit =
synchronized{TestSink.vals.add(value)}
}

object TestSink {
   val vals = new ConcurrentLinkedQueue[String]()
}

  2) Is there a reason to enforce order to be kept for env with parallelism
= 1 ? If I want to test some function or set of functions that depend on
the order of the events. Like for example detecting the beginning and the
end of the pattern, can I somehow assure that the order for testing
purposes ??


Best Regards,
Dom.
a
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html


Reply via email to