Hi Timo, Thanks for the reply!
> You could filter the deletions manually in DataStream API before writing them to Kafka. Yah I agree this helps the issue, though I will need to mix up SQL and DataStream API. > To simplify the query you could also investigate to implement your own aggregate function and combine the Top 2 and ListAgg into one operation. Do you mean implement an UDF to do so? Besides, is 'upsert-kafka' connector designed for this use case? Thank you. On Thu, Mar 4, 2021 at 4:41 PM Timo Walther <twal...@apache.org> wrote: > Hi Yik, > > if I understand you correctly you would like to avoid the deletions in > your stream? > > You could filter the deletions manually in DataStream API before writing > them to Kafka. Semantically the deletions are required to produce a > correct result because the runtime is not aware of a key for idempotent > updates. > > To simplify the query you could also investigate to implement your own > aggregate function and combine the Top 2 and ListAgg into one operation. > > Regards, > Timo > > On 28.02.21 09:55, Yik San Chan wrote: > > I define a `Transaction` class: > > > > ```scala > > case class Transaction(accountId: Long, amount: Long, timestamp: Long) > > ``` > > > > The `TransactionSource` simply emits `Transaction` with some time > > interval. Now I want to compute the last 2 transaction timestamp of each > > account id, see code below: > > > > ```scala > > import org.apache.flink.streaming.api.scala.{DataStream, > > StreamExecutionEnvironment, _} > > import org.apache.flink.table.api.EnvironmentSettings > > import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment > > import org.apache.flink.walkthrough.common.entity.Transaction > > import org.apache.flink.walkthrough.common.source.TransactionSource > > > > object LastNJob { > > > > final val QUERY = > > """ > > |WITH last_n AS ( > > | SELECT accountId, `timestamp` > > | FROM ( > > | SELECT *, > > | ROW_NUMBER() OVER (PARTITION BY accountId ORDER BY > > `timestamp` DESC) AS row_num > > | FROM transactions > > | ) > > | WHERE row_num <= 2 > > |) > > |SELECT accountId, LISTAGG(CAST(`timestamp` AS STRING)) > > last2_timestamp > > |FROM last_n > > |GROUP BY accountId > > |""".stripMargin > > > > def main(args: Array[String]): Unit = { > > val settings: EnvironmentSettings = > > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > > val streamEnv: StreamExecutionEnvironment = > > StreamExecutionEnvironment.getExecutionEnvironment > > val tableEnv: StreamTableEnvironment = > > StreamTableEnvironment.create(streamEnv, settings) > > > > val txnStream: DataStream[Transaction] = streamEnv > > .addSource(new TransactionSource) > > .name("transactions") > > > > tableEnv.createTemporaryView("transactions", txnStream) > > > > tableEnv.executeSql(QUERY).print() > > } > > } > > ``` > > > > When I run the program, I get: > > > > ``` > > +----+----------------------+--------------------------------+ > > | op | accountId | last2_timestamp | > > +----+----------------------+--------------------------------+ > > | +I | 1 | 1546272000000 | > > | +I | 2 | 1546272360000 | > > | +I | 3 | 1546272720000 | > > | +I | 4 | 1546273080000 | > > | +I | 5 | 1546273440000 | > > | -U | 1 | 1546272000000 | > > | +U | 1 | 1546272000000,1546273800000 | > > | -U | 2 | 1546272360000 | > > | +U | 2 | 1546272360000,1546274160000 | > > | -U | 3 | 1546272720000 | > > | +U | 3 | 1546272720000,1546274520000 | > > | -U | 4 | 1546273080000 | > > | +U | 4 | 1546273080000,1546274880000 | > > | -U | 5 | 1546273440000 | > > | +U | 5 | 1546273440000,1546275240000 | > > | -U | 1 | 1546272000000,1546273800000 | > > | +U | 1 | 1546273800000 | > > | -U | 1 | 1546273800000 | > > | +U | 1 | 1546273800000,1546275600000 | > > (to continue) > > ``` > > > > Let's focus on the last transaction (from above) of accountId=1. When > > there is a new transaction from account 1 that happens at > > timestamp=1546275600000, there are 4 operations in total. > > > > ``` > > +----+----------------------+--------------------------------+ > > | op | accountId | last2_timestamp | > > +----+----------------------+--------------------------------+ > > | -U | 1 | 1546272000000,1546273800000 | > > | +U | 1 | 1546273800000 | > > | -U | 1 | 1546273800000 | > > | +U | 1 | 1546273800000,1546275600000 | > > ``` > > > > While I only want to emit the below "new status" to my downstream (let's > > say another Kafka topic) via some sort of merging: > > > > ``` > > +----------------------+--------------------------------+ > > | accountId | last2_timestamp | > > +----------------------+--------------------------------+ > > | 1 | 1546273800000,1546275600000 | > > ``` > > > > So that my downstream is able to consume literally "the last 2 > > transaction timestamps of each account": > > ``` > > +----------------------+--------------------------------+ > > | accountId | last2_timestamp | > > +----------------------+--------------------------------+ > > | 1 | 1546272000000 | > > | 1 | 1546272000000,1546273800000 | > > | 1 | 1546273800000,1546275600000 | > > (to continue) > > ``` > > > > What is the right way to do this? > >