[ 
https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503155#comment-16503155
 ] 

Fabian Hueske commented on FLINK-9528:
--------------------------------------

Hi [~hequn8128], can you explain in more detail what you mean by:

{quote}However it may bring some extra side effects, for instance, user can not 
filter the data what they don't need(actually), although the redundant messages 
won't affect the correctness.{quote}

I'm not sure about adding state or a special communication mode. From my point 
of view, we should not add additional storage overhead (state) or special code 
paths if the result is correct already. It is true that we would send more 
records than necessary, but they can be swallowed by the next operator. A sink 
that does not want to emit duplicate delete messages could also add state to 
remove duplicates (or a simple in-memory cache to reduce them).

As an optimization, we can also think about adding a GroupByHaving operator 
that does the filtering internally. However, we definitely need a general 
solution for Filters with Upsert inputs.


> Incorrect results: Filter does not treat Upsert messages correctly.
> -------------------------------------------------------------------
>
>                 Key: FLINK-9528
>                 URL: https://issues.apache.org/jira/browse/FLINK-9528
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.3.3, 1.5.0, 1.4.2
>            Reporter: Fabian Hueske
>            Assignee: Hequn Cheng
>            Priority: Critical
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between 
> retraction and upsert mode. A Calc looks at record (regardless of its update 
> semantics) and either discard it (predicate evaluates to false) or pass it on 
> (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for 
> upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the 
> problem:
> {code:java}
>   @Test
>   def testUpsertsWithFilter(): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.getConfig.enableObjectReuse()
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>     val t = StreamTestData.get3TupleDataStream(env)
>       .assignAscendingTimestamps(_._1.toLong)
>       .toTable(tEnv, 'id, 'num, 'text)
>     t.select('text.charLength() as 'len)
>       .groupBy('len)
>       .select('len, 'len.count as 'cnt)
>       // .where('cnt < 7)
>       .writeToSink(new TestUpsertSink(Array("len"), false))
>     env.execute()
>     val results = RowCollector.getAndClearValues
>     val retracted = RowCollector.upsertResults(results, Array(0)).sorted
>     val expectedWithoutFilter = List(
>       "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
>     val expectedWithFilter = List(
>     "2,1", "5,1", "11,1", "14,1", "25,1").sorted
>     assertEquals(expectedWithoutFilter, retracted)
>     // assertEquals(expectedWithFilter, retracted)
>   }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows 
> that do not fulfill the condition are removed from the result. However, the 
> filter only removes the upsert message such that the previous version remains 
> in the result.
> One solution could be to make a filter aware of the update semantics (retract 
> or upsert) and convert the upsert message into a delete message if the 
> predicate evaluates to false.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to