Hi!
Thanks for your reply! I think i didn't make myself clear for problem 1, so i draw a picture. 1."tables in DB": things start at the database, and we sync the tables' changelog to dynamic tables by CDC tool. Each changelog includes RowData and RowKind such as INSERT / UPDATE / DELETE. 2. "logics": modeling like ods / dwd .... 3. "Table 1": a table which has some downstream tables. Table 2 is produced by "count(1) from table_1" and input Table 1; Table 3 is produced by "udf(...) from table_1 where a>0" and input Table 1. And when an insert event or delete event occurs in Table 1, Table 2 and Table 3 will change accordingly, as does the downstream tables of Table 2 and Table 3. - problem: The logic which generates Table 1 changes from "select * from table_0 where a>0" to "select * from table_0 where a<0". The old data in Table 1 generated by filter "a>0" is error now, and all downstream tables of Table 1 are error too. So I want to find an easy way to truncate error data in Table 1 and all downstream tables of Table 1, but truncating Table 1 does not emit deletion event of each record in Table 1, so truncating doesn't trigger the deletion of corresponding records in all downstream tables which i think is the most important. Now I want to read all records in Table 1 and modify the rowkind of each Row from RowKind.INSERT to RowKind.DELETE, but i didn't find correspond API in BatchTableEnvironment or BatchExecutionEnvironment, code as below. ``` TableEnvironemnt tenv; Table t1 = tenv.from("table_1 /*+OPTIONS('read.streaming.enabled'='false')*/") .... Table t2 = t1.map(row->row.setRowKind(RowKind.DELETE)) t2.insertInto("table_1") ``` The suggestion creating a new table based on new logic, "new Table 1' " as shown in pic. I think creating new table will not solute this problem unless createing all downstream tables of Table 1 for example Table 2', but it's too heavy. Thanks for your suggestion. Do you have any other suggestions? Best Regards! 在 2021年10月22日 10:55,Caizhi Weng<tsreape...@gmail.com> 写道: Hi! For problem 1, Flink does not support deleting specific records. As you're running a batch job, I suggest creating a new table based on the new filter condition. Even if you can delete the old records you'll still have to generate the new ones, so why not generate them directly into a new place? For problem 2, yarn-cluster is the mode for a yarn session cluster, which means the cluster will remain even after the job is finished. If you want to finish the Flink job as well as the yarn job, use yarn-per-job mode instead. vtygoss <vtyg...@126.com> 于2021年10月21日周四 下午2:46写道: Hi, community! I am working on building data processing pipeline based on changelog(CDC) and i met two problems. --(sql_0)--> Table A --(sql_1)---> Table B --->other tables downstream --(sql_2)--->Table C---> other tables downstream Table A is generated based on sql_0; Table B is generated based on sql_1 and input Table A; Table C is generated based on sql_2 and input Table A; Table B and C have some downstream tables based on modeling. - problem 1. When sql_0 logic is changed, e.g. from "select * from xx where a>0" to " from xx where a<0", the data produced by filter "a>0" is error. I want to find a way to clear the error data in Table A and trigger the corresponding deletions of all tables downstream, then produce new data by new filter a<0. So how to change the rowkind of each row in Table A to RowKind.DELETE in Flink Batch execution mode? It will be very nice if there is an use case of Flink 1.12.0. - problem 2. I found that Flink will launch a session cluster even runtime mode is "yarn-cluster". In batch execution mode, the cluster still run after all tasks finished. How to shutdown the cluster? Thanks for your any suggestion or reply! Best Regards!
787261bd-dff3-4162-9f6e-876312bd312b.png
Description: Binary data