Fabian Hueske created FLINK-9528:
------------------------------------
Summary: Incorrect results: Filter does not treat Upsert messages
correctly.
Key: FLINK-9528
URL: https://issues.apache.org/jira/browse/FLINK-9528
Project: Flink
Issue Type: Bug
Components: Table API & SQL
Affects Versions: 1.4.2, 1.5.0, 1.3.3
Reporter: Fabian Hueske
Currently, Filters (i.e., Calcs with predicates) do not distinguish between
retraction and upsert mode. A Calc looks at record (regardless of its update
semantics) and either discard it (predicate evaluates to false) or pass it on
(predicate evaluates to true).
This works fine for messages with retraction semantics but is not correct for
upsert messages.
The following test case (can be pasted into {{TableSinkITCase}}) shows the
problem:
{code:java}
@Test
def testUpsertsWithFilter(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
val t = StreamTestData.get3TupleDataStream(env)
.assignAscendingTimestamps(_._1.toLong)
.toTable(tEnv, 'id, 'num, 'text)
t.select('text.charLength() as 'len)
.groupBy('len)
.select('len, 'len.count as 'cnt)
// .where('cnt < 7)
.writeToSink(new TestUpsertSink(Array("len"), false))
env.execute()
val results = RowCollector.getAndClearValues
val retracted = RowCollector.upsertResults(results, Array(0)).sorted
val expectedWithoutFilter = List(
"2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
val expectedWithFilter = List(
"2,1", "5,1", "11,1", "14,1", "25,1").sorted
assertEquals(expectedWithoutFilter, retracted)
// assertEquals(expectedWithFilter, retracted)
}
{code}
When we add a filter on the aggregation result, we would expect that all rows
that do not fulfill the condition are removed from the result. However, the
filter only removes the upsert message such that the previous version remains
in the result.
One solution could be to make a filter aware of the update semantics (retract
or upsert) and convert the upsert message into a delete message if the
predicate evaluates to false.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)