Hi Timo,

Thanks for the reply!

> You could filter the deletions manually in DataStream API before writing
them to Kafka.

Yah I agree this helps the issue, though I will need to mix up SQL and
DataStream API.

> To simplify the query you could also investigate to implement your own
aggregate function and combine the Top 2 and ListAgg into one operation.

Do you mean implement an UDF to do so?

Besides, is 'upsert-kafka' connector designed for this use case?

Thank you.

On Thu, Mar 4, 2021 at 4:41 PM Timo Walther <twal...@apache.org> wrote:

> Hi Yik,
>
> if I understand you correctly you would like to avoid the deletions in
> your stream?
>
> You could filter the deletions manually in DataStream API before writing
> them to Kafka. Semantically the deletions are required to produce a
> correct result because the runtime is not aware of a key for idempotent
> updates.
>
> To simplify the query you could also investigate to implement your own
> aggregate function and combine the Top 2 and ListAgg into one operation.
>
> Regards,
> Timo
>
> On 28.02.21 09:55, Yik San Chan wrote:
> > I define a `Transaction` class:
> >
> > ```scala
> > case class Transaction(accountId: Long, amount: Long, timestamp: Long)
> > ```
> >
> > The `TransactionSource` simply emits `Transaction` with some time
> > interval. Now I want to compute the last 2 transaction timestamp of each
> > account id, see code below:
> >
> > ```scala
> > import org.apache.flink.streaming.api.scala.{DataStream,
> > StreamExecutionEnvironment, _}
> > import org.apache.flink.table.api.EnvironmentSettings
> > import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> > import org.apache.flink.walkthrough.common.entity.Transaction
> > import org.apache.flink.walkthrough.common.source.TransactionSource
> >
> > object LastNJob {
> >
> >    final val QUERY =
> >      """
> >        |WITH last_n AS (
> >        |    SELECT accountId, `timestamp`
> >        |    FROM (
> >        |        SELECT *,
> >        |            ROW_NUMBER() OVER (PARTITION BY accountId ORDER BY
> > `timestamp` DESC) AS row_num
> >        |        FROM transactions
> >        |    )
> >        |    WHERE row_num <= 2
> >        |)
> >        |SELECT accountId, LISTAGG(CAST(`timestamp` AS STRING))
> > last2_timestamp
> >        |FROM last_n
> >        |GROUP BY accountId
> >        |""".stripMargin
> >
> >    def main(args: Array[String]): Unit = {
> >      val settings: EnvironmentSettings =
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >      val streamEnv: StreamExecutionEnvironment =
> > StreamExecutionEnvironment.getExecutionEnvironment
> >      val tableEnv: StreamTableEnvironment =
> > StreamTableEnvironment.create(streamEnv, settings)
> >
> >      val txnStream: DataStream[Transaction] = streamEnv
> >        .addSource(new TransactionSource)
> >        .name("transactions")
> >
> >      tableEnv.createTemporaryView("transactions", txnStream)
> >
> >      tableEnv.executeSql(QUERY).print()
> >    }
> > }
> > ```
> >
> > When I run the program, I get:
> >
> > ```
> > +----+----------------------+--------------------------------+
> > | op |            accountId |                last2_timestamp |
> > +----+----------------------+--------------------------------+
> > | +I |                    1 |                  1546272000000 |
> > | +I |                    2 |                  1546272360000 |
> > | +I |                    3 |                  1546272720000 |
> > | +I |                    4 |                  1546273080000 |
> > | +I |                    5 |                  1546273440000 |
> > | -U |                    1 |                  1546272000000 |
> > | +U |                    1 |    1546272000000,1546273800000 |
> > | -U |                    2 |                  1546272360000 |
> > | +U |                    2 |    1546272360000,1546274160000 |
> > | -U |                    3 |                  1546272720000 |
> > | +U |                    3 |    1546272720000,1546274520000 |
> > | -U |                    4 |                  1546273080000 |
> > | +U |                    4 |    1546273080000,1546274880000 |
> > | -U |                    5 |                  1546273440000 |
> > | +U |                    5 |    1546273440000,1546275240000 |
> > | -U |                    1 |    1546272000000,1546273800000 |
> > | +U |                    1 |                  1546273800000 |
> > | -U |                    1 |                  1546273800000 |
> > | +U |                    1 |    1546273800000,1546275600000 |
> > (to continue)
> > ```
> >
> > Let's focus on the last transaction (from above) of accountId=1. When
> > there is a new transaction from account 1 that happens at
> > timestamp=1546275600000, there are 4 operations in total.
> >
> > ```
> > +----+----------------------+--------------------------------+
> > | op |            accountId |                last2_timestamp |
> > +----+----------------------+--------------------------------+
> > | -U |                    1 |    1546272000000,1546273800000 |
> > | +U |                    1 |                  1546273800000 |
> > | -U |                    1 |                  1546273800000 |
> > | +U |                    1 |    1546273800000,1546275600000 |
> > ```
> >
> > While I only want to emit the below "new status" to my downstream (let's
> > say another Kafka topic) via some sort of merging:
> >
> > ```
> > +----------------------+--------------------------------+
> > |            accountId |                last2_timestamp |
> > +----------------------+--------------------------------+
> > |                    1 |    1546273800000,1546275600000 |
> > ```
> >
> > So that my downstream is able to consume literally "the last 2
> > transaction timestamps of each account":
> > ```
> > +----------------------+--------------------------------+
> > |            accountId |                last2_timestamp |
> > +----------------------+--------------------------------+
> > |                    1 |                  1546272000000 |
> > |                    1 |    1546272000000,1546273800000 |
> > |                    1 |    1546273800000,1546275600000 |
> > (to continue)
> > ```
> >
> > What is the right way to do this?
>
>

Reply via email to