
Thanks for your reply! I think i didn't make myself clear for problem 1, so i 
draw a picture. 

1."tables in DB": things start at the database, and we sync the tables' 
changelog to dynamic tables by CDC tool. Each changelog includes RowData and 
RowKind such as INSERT / UPDATE / DELETE. 
2. "logics": modeling like ods / dwd ....
3. "Table 1": a table which has some downstream tables. Table 2 is produced by 
"count(1) from table_1" and input Table 1; Table 3 is produced by "udf(...) 
from table_1 where a>0" and input Table 1. And when an insert event or delete 
event occurs in Table 1, Table 2 and Table 3 will change accordingly, as does 
the downstream tables of Table 2 and Table 3. 

- problem: The logic which generates Table 1 changes from "select * from 
table_0 where a>0" to "select * from table_0 where a<0". The old data in Table 
1 generated by filter "a>0" is error now, and all downstream tables of Table 1 
are error too.  So I want to find an easy way to truncate error data in Table 1 
and all downstream tables of Table 1, but truncating Table 1 does not emit 
deletion event of each record in Table 1, so truncating doesn't trigger the 
deletion of corresponding records in all downstream tables which i think is the 
most important. Now I want to read all records in Table 1 and modify the 
rowkind of each Row from RowKind.INSERT to RowKind.DELETE, but i didn't find 
correspond API in BatchTableEnvironment or BatchExecutionEnvironment, code as 

TableEnvironemnt tenv; 
Table t1 = tenv.from("table_1 /*+OPTIONS('read.streaming.enabled'='false')*/") 
Table t2 = t1.map(row->row.setRowKind(RowKind.DELETE))

The suggestion creating a new table based on new logic, "new Table 1' " as 
shown in pic. I think creating new table will not solute this problem unless 
createing all downstream tables of Table 1 for example Table 2', but it's too 

Thanks for your suggestion. Do you have any other suggestions? 

Best Regards!


在 2021年10月22日 10:55,Caizhi Weng<tsreape...@gmail.com> 写道:


For problem 1, Flink does not support deleting specific records. As you're 
running a batch job, I suggest creating a new table based on the new filter 
condition. Even if you can delete the old records you'll still have to generate 
the new ones, so why not generate them directly into a new place?

For problem 2, yarn-cluster is the mode for a yarn session cluster, which means 
the cluster will remain even after the job is finished. If you want to finish 
the Flink job as well as the yarn job, use yarn-per-job mode instead.

vtygoss <vtyg...@126.com> 于2021年10月21日周四 下午2:46写道:

Hi, community!

I am working on building data processing pipeline based on changelog(CDC) and i 
met two problems. 

--(sql_0)--> Table A --(sql_1)---> Table B --->other tables downstream
                          --(sql_2)--->Table C---> other tables downstream

Table A is generated based on sql_0; Table B is generated based on sql_1 and 
input Table A; Table C is generated based on sql_2 and input Table A; Table B 
and C have some downstream tables based on modeling. 

- problem 1. When sql_0 logic is changed, e.g. from "select * from xx where 
a>0" to " from xx where a<0", the data produced by filter "a>0" is error. I 
want to find a way to clear the error data in Table A and trigger the 
corresponding deletions of all tables downstream, then produce new data by new 
filter a<0. So how to change the rowkind of each row in Table A to 
RowKind.DELETE in Flink Batch execution mode? It will be very nice if there is 
an use case of Flink 1.12.0.  

- problem 2.  I found that Flink will launch a session cluster even runtime 
mode is "yarn-cluster". In batch execution mode, the cluster still run after 
all tasks finished. How to shutdown the cluster? 

Thanks for your any suggestion or reply!

Best Regards!

Attachment: 787261bd-dff3-4162-9f6e-876312bd312b.png
Description: Binary data

Reply via email to