Hi, Flink guarantees order only within a partition. For example, if you have the program map_1 -> map_2 and both map functions run with parallelism 4, the order of records in each of the 4 partitions is not changed.. In case of a shuffle (such as a keyBy or change in parallelism) records are shipped to the same downstream task in the same order as they are produced by the shipping task. However, the receiving task merges streams of received records from multiple sending tasks without considering the order across parallel streams.
This means that you have to manually sort the data if the input data of the scaled-reading map function receives the input data for the same key from different source tasks. Sorting streaming data is of course not as straight-forward as for finite sets, but you can do it with a process function and timers, i.e., you collect data for a certain amount of time and sort it. Best, Fabian 2018-07-17 10:06 GMT+02:00 Nicholas Walton <nwal...@me.com>: > Martin, > > To clarify things the code causing the issue is here, nothing clever. The > code fails at the line in bold. The Long index values are set earlier in > sequence 1,2,3,4,5,6,7…... > > val scaledReadings : DataStream[(Int,Long, Double, Double)] = > maxChannelReading > .keyBy(0) > .map { in => > LOG.info <http://log.info>(s"scaledReadings $in") > (in._1, in._2, in._3/in._4 + 2.0D, in._3) } > > > val logRatioWindow: DataStream[(Int,Long, Int, Double)] = scaledReadings > .keyBy(0) > .countWindow(100, 99) > .process(new logRatioWindowFunction() ) > > > and > > class logRatioWindowFunction extends ProcessWindowFunction[(Int, Long, > Double, Double), (Int, Long, Int, Double), > org.apache.flink.api.java.tuple.Tuple, > GlobalWindow] { > > def process(key: Tuple, context: logRatioWindowFunction.this.Context, > input: Iterable[(Int, Long, Double, Double)], out: Collector[(Int, Long, > Int, Double)]) = > { > > val a: Array[(Int, Long, Double, Double)] = input toArray; > val ch = a(0)._1 > val s = a(0)._2 > val l = input.size > > if (l < 100) Job.LOG.info <http://job.log.info>(s"Log ratio window > length $l on channel $ch at sample $s") > > *for (i <- 1 to a.size - 1) assert (a(i)._2 == a(i-1)._2+1, > "logRatioWindowFunction:Failure non-monotonic indexes "+ a(i-1)._2 + " and > " + a(i)._2 )* > > if (l == 100) { > for (i <- 0 to l-2) { > val v: Int = rint(100 * log (E + a(i+1)._3 / a(i)._3)) toInt; > assert(v > 0, "Bad minhash in medianLogRatioWindowFunction " + v) > Job.LOG.debug("logRatioWindowFunction [" + a(i+1)._1 + ", " + > a(i+1)._2 + ", " + v+ ", " + a(i+1)._4 +"]") > out.collect(scala.Tuple4(a(i+1)._1, a(i+1)._2, v, a(i+1)._4)) > } > Job.LOG.debug("logRatioWindowFunction [" + a.head._1 + ", " + > a.head._2 + " ... " + a.last._2 +"] collected") > } > } > > } > > > On 17 Jul 2018, at 00:15, Martin, Nick <nick.mar...@orbitalatk.com > <nick.mar...@orbitalatk.com>> wrote: > > Is value(index-1) stored in Keyed State, or just a local variable inside > the operator? > > -----Original Message----- > From: Nicholas Walton [mailto:nwal...@me.com <nwal...@me.com>] > Sent: Monday, July 16, 2018 1:33 PM > To: user@flink.apache.org > Subject: Parallelism and keyed streams > > I have a stream of tuples <channel: Int, index: Long, value: Double> , > which I form into a keyedStream using keyBy on channel. I then need to > process each channel in parallel. Each parallel stream must be processed in > strict sequential order by index to calculate the ratios > value(index)/value(index-1). If I set parallelism to 1 all is well, each > channel is processed in order of index 1,2,3,,4… > > My problem is when I set parallelism to a value greater than 1 each > channel’s keyedStream appears to be split across multiple processes. So a > channel may be processed wrongly for example as value(2), value(5), > Value(6) , value(9)….. > > The number of channels N is unknown. So how do I rig up N processing > streams with an unknown parallelism so that each stream processes each > channel by strictly increasing index v(1),v(2),…..v(t),v(t+1),…..v(t+n) > > Thanks in advance > > NIck Walton > > > ------------------------------------------------------------ > ------------------ > > Notice: This e-mail is intended solely for use of the individual or entity > to which it is addressed and may contain information that is proprietary, > privileged and/or exempt from disclosure under applicable law. If the > reader is not the intended recipient or agent responsible for delivering > the message to the intended recipient, you are hereby notified that any > dissemination, distribution or copying of this communication is strictly > prohibited. This communication may also contain data subject to U.S. export > laws. If so, data subject to the International Traffic in Arms Regulation > cannot be disseminated, distributed, transferred, or copied, whether > incorporated or in its original form, to foreign nationals residing in the > U.S. or abroad, absent the express prior approval of the U.S. Department of > State. Data subject to the Export Administration Act may not be > disseminated, distributed, transferred or copied contrary to U. S. > Department of Commerce regulations. If you have received this communication > in error, please notify the sender by reply e-mail and destroy the e-mail > message and any physical copies made of the communication. > Thank you. > ********************* > > >