Example SQL:
SELECT *
FROM stream1 s1, stream2 s2
WHERE s1.id = s2.id AND s1.rowtime = s2.rowtime
And we have lots of messages in stream1 and stream2 share a same rowtime.
It runs fine when using heap as the state backend,
but requires lots of heap memory sometimes (when upstream out of sync, etc),
and a risk of full gc exists.
When we use RocksDBStateBackend to lower the heap memory usage, we found our
program runs unbearably slow.
After examing the code we found
org.apache.flink.table.runtime.join.TimeBoundedStreamJoin#processElement1()
may be the cause of the problem (we are using Flink 1.6 but 1.8 should be same):
...
// Check if we need to cache the current row.
if (rightOperatorTime < rightQualifiedUpperBound) {
// Operator time of right stream has not exceeded the upper window bound
of the current
// row. Put it into the left cache, since later coming records from the
right stream are
// expected to be joined with it.
var leftRowList = leftCache.get(timeForLeftRow)
if (null == leftRowList) {
leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1)
}
leftRowList.add(JTuple2.of(leftRow, emitted))
leftCache.put(timeForLeftRow, leftRowList)
...
In above code, if there are lots of messages with a same timeForLeftRow,
the serialization and deserialization cost will be very high when using
RocksDBStateBackend.
A simple fix I came up with:
...
// cache to store rows from the left stream
//private var leftCache: MapState[Long, JList[JTuple2[Row, Boolean]]] = _
private var leftCache: MapState[JTuple2[Long, Integer], JList[JTuple2[Row,
Boolean]]] = _
private var leftCacheSize: MapState[Long, Integer] = _
...
// Check if we need to cache the current row.
if (rightOperatorTime < rightQualifiedUpperBound) {
// Operator time of right stream has not exceeded the upper window bound
of the current
// row. Put it into the left cache, since later coming records from the
right stream are
// expected to be joined with it.
//var leftRowList = leftCache.get(timeForLeftRow)
//if (null == leftRowList) {
// leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1)
//}
//leftRowList.add(JTuple2.of(leftRow, emitted))
//leftCache.put(timeForLeftRow, leftRowList)
var leftRowListSize = leftCacheSize.get(timeForLeftRow)
if (null == leftRowListSize) {
leftRowListSize = new Integer(0)
}
leftCache.put(JTuple2.of(timeForLeftRow, leftRowListSize),
JTuple2.of(leftRow, emitted))
leftCacheSize.put(timeForLeftRow, leftRowListSize + 1)
...
--
LIU Xiao <[email protected]>