Re:Re:Re: Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。
刚做了一下测试 目前假定有3行数据需要同步(全量): | 编号 | 电话 | 座机 | | 1 | 1311313 | 123 | | 2 | 1311313 | 456 | | 3 | 1311313 | 789 | 这个时候我修改第四行数据的两个字段(增量): | 1 | 电话 | 座机 | | 1 | 1311313 | 123 | | 2 | 1311313 | 456 | | 3 | 13113133110 | 888 | 修改完后我删除字段2这个时候去mysql看结果2是正确被删除,且无新增的(操作正确). 然后我继续删除数据3这个时候就不对了,在flink里面有修改两次的缓存数据,所以删除的同时将原来的旧数据插入进了mysql中(操作错误). 上述是我基于flink1.16.1版本进行测试的结果,目前不知道是不是要配置flink还是下游算子具体配置什么也不是清楚。这个问题困扰有3周了,各种测试调整都没有起作用。 在 2023-03-06 10:54:23,"陈佳豪" 写道: >hi 早上好 >我将flink升级到了1.16.1的版本去执行kafka同步到mysql的任务,发现还是存在一样的问题,我本机执行了explain的执行过程给的输出如下 > >== Abstract Syntax Tree == >LogicalSink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称, >手机, 座机]) >+- LogicalProject(rowID=[CAST($0):VARCHAR(255) CHARACTER SET "UTF-16LE"], >名称=[$1], 手机=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], >座机=[CAST($3):VARCHAR(255) CHARACTER SET "UTF-16LE"]) > +- LogicalTableScan(table=[[default_catalog, default_database, 电话]]) > > >== Optimized Physical Plan == >Sink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称, 手机, >座机], upsertMaterialize=[true]) >+- Calc(select=[CAST(rowid AS VARCHAR(255) CHARACTER SET "UTF-16LE") AS rowID, >63fd65fb36521f81a2cfab90 AS 名称, CAST(63fd660536521f81a2cfabad AS >VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS 手机, >CAST(63fd660536521f81a2cfabae AS VARCHAR(255) CHARACTER SET "UTF-16LE") AS 座机]) > +- TableSourceScan(table=[[default_catalog, default_database, 电话]], > fields=[rowid, 63fd65fb36521f81a2cfab90, 63fd660536521f81a2cfabad, > 63fd660536521f81a2cfabae]) > > >== Optimized Execution Plan == >Sink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称, 手机, >座机], upsertMaterialize=[true]) >+- Calc(select=[CAST(rowid AS VARCHAR(255)) AS rowID, 63fd65fb36521f81a2cfab90 >AS 名称, CAST(63fd660536521f81a2cfabad AS VARCHAR(2147483647)) AS 手机, >CAST(63fd660536521f81a2cfabae AS VARCHAR(255)) AS 座机]) > +- TableSourceScan(table=[[default_catalog, default_database, 电话]], > fields=[rowid, 63fd65fb36521f81a2cfab90, 63fd660536521f81a2cfabad, > 63fd660536521f81a2cfabae]) > > > >在 2023-03-05 15:37:53,"Jane Chan" 写道: >>Hi, >> >>抱歉, 这里 typo 了, 应该是 1.16.1. 我在 1.16.1 上验证了你之前发的 query, 是可以正常删除的. 可以在 1.16.1 >>上尝试下, 也可以试试在 1.15.2 上使用 EXPLAIN CHANGELOG_MODE INSERT INTO...[1] 将 plan >>打印出来看看. >> >>[1] >>https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/ >> >>祝好! >>Jane >> >>On Sun, Mar 5, 2023 at 2:36 PM 陈佳豪 wrote: >> >>> hi 你好 >>> 目前没有1.16.2版本的吧? 我看flink官网都是1.16.0 或者是1.16.1的 >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> 在 2023-03-02 11:52:41,"Jane Chan" 写道: >>> >Hi, >>> > >>> >可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, 这个 >>> >query 在 1.16.2 上验证没有问题 >>> > >>> >[1] >>> > >>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/ >>> > >>> >Best, >>> >Jane >>> > >>> >On Wed, Mar 1, 2023 at 6:22 PM 陈佳豪 wrote: >>> > >>> >> flink ,kafka连接 jdbc连接版本都是1.15.2的 >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> 在 2023-03-01 18:14:35,"陈佳豪" 写道: >>> >> >问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。 >>> >> >String kafka = "CREATE TABLE `电话` (`rowid` >>> >> VARCHAR(2147483647),`63fd65fb36521f81a2cfab90` >>> >> VARCHAR(2147483647),`63fd660536521f81a2cfabad` >>> >> VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535) ) WITH ( >>> >> 'connector' = 'kafka', 'topic' = >>> >> 'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f', >>> >> 'properties.bootstrap.servers' = '132.232.27.116:9092', >>> >> 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json' )"; >>> >> > >>> >> >String mysql = "CREATE TABLE `电话_1` (`rowID` VARCHAR(255),`名称` >>> >> STRING,`手机` STRING,`座机` VARCHAR(255),PRIMARY KEY (`rowID`) NOT >>> >> ENFORCED ) WITH ('connector' = 'jdbc','driver' = >>> >> 'com.mysql.cj.jdbc.Driver','url' = 'jdbc:mysql:// >>> >> 43.136.128.102:6506/meihua_test','username' = 'root', >>> 'password' = >>> >> '123456','table-name' = '电话2' )"; >>> >> > >>> >> >String insert = "insert into `电话_1` select `t_1`.`rowID` as >>> >> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from ( >>> >> select `rowid` as `rowID`,`63fd65fb36521f81a2cfab90` as >>> >> `名称`,`63fd660536521f81a2cfabad` as `手机`,`63fd660536521f81a2cfabae` as >>> `座机` >>> >> from `电话` ) as t_1"; >>> >> > >>> >> >操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug? >>> >> >>>
Re:Re: Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。
hi 早上好 我将flink升级到了1.16.1的版本去执行kafka同步到mysql的任务,发现还是存在一样的问题,我本机执行了explain的执行过程给的输出如下 == Abstract Syntax Tree == LogicalSink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称, 手机, 座机]) +- LogicalProject(rowID=[CAST($0):VARCHAR(255) CHARACTER SET "UTF-16LE"], 名称=[$1], 手机=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 座机=[CAST($3):VARCHAR(255) CHARACTER SET "UTF-16LE"]) +- LogicalTableScan(table=[[default_catalog, default_database, 电话]]) == Optimized Physical Plan == Sink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称, 手机, 座机], upsertMaterialize=[true]) +- Calc(select=[CAST(rowid AS VARCHAR(255) CHARACTER SET "UTF-16LE") AS rowID, 63fd65fb36521f81a2cfab90 AS 名称, CAST(63fd660536521f81a2cfabad AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS 手机, CAST(63fd660536521f81a2cfabae AS VARCHAR(255) CHARACTER SET "UTF-16LE") AS 座机]) +- TableSourceScan(table=[[default_catalog, default_database, 电话]], fields=[rowid, 63fd65fb36521f81a2cfab90, 63fd660536521f81a2cfabad, 63fd660536521f81a2cfabae]) == Optimized Execution Plan == Sink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称, 手机, 座机], upsertMaterialize=[true]) +- Calc(select=[CAST(rowid AS VARCHAR(255)) AS rowID, 63fd65fb36521f81a2cfab90 AS 名称, CAST(63fd660536521f81a2cfabad AS VARCHAR(2147483647)) AS 手机, CAST(63fd660536521f81a2cfabae AS VARCHAR(255)) AS 座机]) +- TableSourceScan(table=[[default_catalog, default_database, 电话]], fields=[rowid, 63fd65fb36521f81a2cfab90, 63fd660536521f81a2cfabad, 63fd660536521f81a2cfabae]) 在 2023-03-05 15:37:53,"Jane Chan" 写道: >Hi, > >抱歉, 这里 typo 了, 应该是 1.16.1. 我在 1.16.1 上验证了你之前发的 query, 是可以正常删除的. 可以在 1.16.1 >上尝试下, 也可以试试在 1.15.2 上使用 EXPLAIN CHANGELOG_MODE INSERT INTO...[1] 将 plan >打印出来看看. > >[1] >https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/ > >祝好! >Jane > >On Sun, Mar 5, 2023 at 2:36 PM 陈佳豪 wrote: > >> hi 你好 >> 目前没有1.16.2版本的吧? 我看flink官网都是1.16.0 或者是1.16.1的 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2023-03-02 11:52:41,"Jane Chan" 写道: >> >Hi, >> > >> >可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, 这个 >> >query 在 1.16.2 上验证没有问题 >> > >> >[1] >> > >> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/ >> > >> >Best, >> >Jane >> > >> >On Wed, Mar 1, 2023 at 6:22 PM 陈佳豪 wrote: >> > >> >> flink ,kafka连接 jdbc连接版本都是1.15.2的 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2023-03-01 18:14:35,"陈佳豪" 写道: >> >> >问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。 >> >> >String kafka = "CREATE TABLE `电话` (`rowid` >> >> VARCHAR(2147483647),`63fd65fb36521f81a2cfab90` >> >> VARCHAR(2147483647),`63fd660536521f81a2cfabad` >> >> VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535) ) WITH ( >> >> 'connector' = 'kafka', 'topic' = >> >> 'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f', >> >> 'properties.bootstrap.servers' = '132.232.27.116:9092', >> >> 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json' )"; >> >> > >> >> >String mysql = "CREATE TABLE `电话_1` (`rowID` VARCHAR(255),`名称` >> >> STRING,`手机` STRING,`座机` VARCHAR(255),PRIMARY KEY (`rowID`) NOT >> >> ENFORCED ) WITH ('connector' = 'jdbc','driver' = >> >> 'com.mysql.cj.jdbc.Driver','url' = 'jdbc:mysql:// >> >> 43.136.128.102:6506/meihua_test','username' = 'root', >> 'password' = >> >> '123456','table-name' = '电话2' )"; >> >> > >> >> >String insert = "insert into `电话_1` select `t_1`.`rowID` as >> >> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from ( >> >> select `rowid` as `rowID`,`63fd65fb36521f81a2cfab90` as >> >> `名称`,`63fd660536521f81a2cfabad` as `手机`,`63fd660536521f81a2cfabae` as >> `座机` >> >> from `电话` ) as t_1"; >> >> > >> >> >操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug? >> >> >>
Re:Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。
hi 你好 目前没有1.16.2版本的吧? 我看flink官网都是1.16.0 或者是1.16.1的 在 2023-03-02 11:52:41,"Jane Chan" 写道: >Hi, > >可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, 这个 >query 在 1.16.2 上验证没有问题 > >[1] >https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/ > >Best, >Jane > >On Wed, Mar 1, 2023 at 6:22 PM 陈佳豪 wrote: > >> flink ,kafka连接 jdbc连接版本都是1.15.2的 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2023-03-01 18:14:35,"陈佳豪" 写道: >> >问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。 >> >String kafka = "CREATE TABLE `电话` (`rowid` >> VARCHAR(2147483647),`63fd65fb36521f81a2cfab90` >> VARCHAR(2147483647),`63fd660536521f81a2cfabad` >> VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535) ) WITH ( >> 'connector' = 'kafka', 'topic' = >> 'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f', >> 'properties.bootstrap.servers' = '132.232.27.116:9092', >> 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json' )"; >> > >> >String mysql = "CREATE TABLE `电话_1` (`rowID` VARCHAR(255),`名称` >> STRING,`手机` STRING,`座机` VARCHAR(255),PRIMARY KEY (`rowID`) NOT >> ENFORCED ) WITH ('connector' = 'jdbc','driver' = >> 'com.mysql.cj.jdbc.Driver','url' = 'jdbc:mysql:// >> 43.136.128.102:6506/meihua_test','username' = 'root','password' = >> '123456','table-name' = '电话2' )"; >> > >> >String insert = "insert into `电话_1` select `t_1`.`rowID` as >> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from ( >> select `rowid` as `rowID`,`63fd65fb36521f81a2cfab90` as >> `名称`,`63fd660536521f81a2cfabad` as `手机`,`63fd660536521f81a2cfabae` as `座机` >> from `电话` ) as t_1"; >> > >> >操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug? >>
Re:使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。
flink ,kafka连接 jdbc连接版本都是1.15.2的 在 2023-03-01 18:14:35,"陈佳豪" 写道: >问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。 >String kafka = "CREATE TABLE `电话` (`rowid` >VARCHAR(2147483647),`63fd65fb36521f81a2cfab90` >VARCHAR(2147483647),`63fd660536521f81a2cfabad` >VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535) ) WITH ( 'connector' >= 'kafka', 'topic' = >'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f', >'properties.bootstrap.servers' = '132.232.27.116:9092', 'scan.startup.mode' = >'earliest-offset', 'format' = 'debezium-json' )"; > >String mysql = "CREATE TABLE `电话_1` (`rowID` VARCHAR(255),`名称` STRING,`手机` >STRING,`座机` VARCHAR(255),PRIMARY KEY (`rowID`) NOT ENFORCED ) WITH ( >'connector' = 'jdbc','driver' = 'com.mysql.cj.jdbc.Driver','url' = >'jdbc:mysql://43.136.128.102:6506/meihua_test','username' = 'root', >'password' = '123456','table-name' = '电话2' )"; > >String insert = "insert into `电话_1` select `t_1`.`rowID` as `rowID`,`t_1`.`名称` >as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from ( select `rowid` as >`rowID`,`63fd65fb36521f81a2cfab90` as `名称`,`63fd660536521f81a2cfabad` as >`手机`,`63fd660536521f81a2cfabae` as `座机` from `电话` ) as t_1"; > >操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug?
使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。
问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。 String kafka = "CREATE TABLE `电话` (`rowid` VARCHAR(2147483647),`63fd65fb36521f81a2cfab90` VARCHAR(2147483647),`63fd660536521f81a2cfabad` VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535) ) WITH ( 'connector' = 'kafka', 'topic' = 'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f', 'properties.bootstrap.servers' = '132.232.27.116:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json' )"; String mysql = "CREATE TABLE `电话_1` (`rowID` VARCHAR(255),`名称` STRING,`手机` STRING,`座机` VARCHAR(255),PRIMARY KEY (`rowID`) NOT ENFORCED ) WITH ( 'connector' = 'jdbc','driver' = 'com.mysql.cj.jdbc.Driver','url' = 'jdbc:mysql://43.136.128.102:6506/meihua_test','username' = 'root', 'password' = '123456','table-name' = '电话2' )"; String insert = "insert into `电话_1` select `t_1`.`rowID` as `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from ( select `rowid` as `rowID`,`63fd65fb36521f81a2cfab90` as `名称`,`63fd660536521f81a2cfabad` as `手机`,`63fd660536521f81a2cfabae` as `座机` from `电话` ) as t_1"; 操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug?
使用flink sql 将kafka的数据同步到mysql无法删除。
-建表语法如下 String kafka = "CREATE TABLE `电话` " + "(`rowID` VARCHAR(255),`名称` STRING,`手机` VARCHAR(255),`座机` VARCHAR(255), " + " PRIMARY KEY (`rowID`) NOT ENFORCED ) " + " WITH " + "('connector' = 'jdbc', " + " 'driver' = 'com.mysql.cj.jdbc.Driver', " + " 'url' = 'jdbc:mysql://XX:6506/meihua_test', " + " 'username' = 'root', " + " 'password' = '123456', " + " 'table-name' = '电话' )"; String mysql = "CREATE TABLE `电话_1` " + "(`rowid` VARCHAR(100)," + "`63f73b332e77497da91286f0` VARCHAR(100)," + "`63f73b3f2e77497da91286fb` VARCHAR(100)," + "`63f73b3f2e77497da91286fc` VARCHAR(100)," + "`op` STRING ," + " PRIMARY KEY (rowid) NOT ENFORCED )" + " WITH " + "( 'connector' = 'kafka', " + "'topic' = 'sz_worksheet-63f82984f3ec743e45b0d561-63f73b332e77497da91286ef'," + " 'properties.bootstrap.servers' = 'XX:9092'," + " 'scan.startup.mode' = 'earliest-offset', " + "'format' = 'debezium-json' )"; -执行语句如下 String insert = "insert into `电话` select `t_1`.`rowID` as `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from" + " ( select `rowid` as `rowID`,`63f73b332e77497da91286f0` as `名称`,`63f73b3f2e77497da91286fb` as `手机`,`63f73b3f2e77497da91286fc` as `座机` from `电话_1` ) as t_1"; -操作数据如下 String insert = "insert into `电话` select `t_1`.`rowID` as `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from" + " ( select `rowid` as `rowID`,`63f73b332e77497da91286f0` as `名称`,`63f73b3f2e77497da91286fb` as `手机`,`63f73b3f2e77497da91286fc` as `座机` from `电话_1` ) as t_1"; -执行语句如下 { "op":"d", "before":{ "rowid":"f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d" }, "after":null } 现在的结论是可以新增和修改,但是无法删除。难道insert into这个语句搞不定吗? 走的debezuim json序列化的格式。 各位大佬帮看下 谢谢。
请问cancel的任务能够恢复running状态吗?
hi 我目前测试flink restapi 指定savepointpath来恢复任务发现会重新触发创建一个新的任务原有的任务还是cancel状态,请问有办法恢复原有cancel状态的任务为running吗?
请问flink sql可以被捕获异常吗?
hi 请问能在java代码里面try catch到基于flink sql写的任务异常信息吗?
Re:回复:请问flink metrics如何获取任务状态?
hi 不好意思刚刚图好像又挂了 不知道这个能否查看。 在 2022-11-28 13:50:37,"m17610775726_1" 写道: hi 你的图片挂了 可以用图床上传一下图片 在这里贴个链接 另外自定义 reportor 把需要的metric 过滤出来上报就行了 回复的原邮件 ---- | 发件人 | 陈佳豪 | | 发送日期 | 2022年11月28日 00:54 | | 收件人 | user-zh | | 主题 | 请问flink metrics如何获取任务状态? | 自定义了一个kafka Metric Reporters #请问如何使用上述指标呢? 我想通过上报获取任务状态。除了上述指标外如果有其他方案也可以,当前flink 版本是15.2 还望大神指教一番。
Re:回复:请问flink metrics如何获取任务状态?
这个metrics 我获取不到。 不知道要怎么配置才可以获取到。 在 2022-11-28 13:50:37,"m17610775726_1" 写道: hi 你的图片挂了 可以用图床上传一下图片 在这里贴个链接 另外自定义 reportor 把需要的metric 过滤出来上报就行了 回复的原邮件 ---- | 发件人 | 陈佳豪 | | 发送日期 | 2022年11月28日 00:54 | | 收件人 | user-zh | | 主题 | 请问flink metrics如何获取任务状态? | 自定义了一个kafka Metric Reporters #请问如何使用上述指标呢? 我想通过上报获取任务状态。除了上述指标外如果有其他方案也可以,当前flink 版本是15.2 还望大神指教一番。
回复:请问flink metrics如何获取任务状态?
有大佬告诉下吗? 这个指标的值获取不到。 | | 陈佳豪 邮箱:jagec...@yeah.net | 回复的原邮件 | 发件人 | 陈佳豪 | | 发送日期 | 2022年11月28日 00:54 | | 收件人 | user-zh | | 主题 | 请问flink metrics如何获取任务状态? | 自定义了一个kafka Metric Reporters #请问如何使用上述指标呢? 我想通过上报获取任务状态。除了上述指标外如果有其他方案也可以,当前flink 版本是15.2 还望大神指教一番。
请问flink metrics如何获取任务状态?
自定义了一个kafka Metric Reporters #请问如何使用上述指标呢? 我想通过上报获取任务状态。除了上述指标外如果有其他方案也可以,当前flink 版本是15.2 还望大神指教一番。