Flink AsyncWriter如何进行固定速率的限速?这一块似乎有bug
Flink 1.16.0 搜索到社区有相关文章,其中的实例如下: https://flink.apache.org/2022/11/25/optimising-the-throughput-of-async-sinks-using-a-custom-ratelimitingstrategy/#rationale-behind-the-ratelimitingstrategy-interface public class TokenBucketRateLimitingStrategy implements RateLimitingStrategy { private final Bucket bucket; public TokenBucketRateLimitingStrategy() { Refill refill = Refill.intervally(1, Duration.ofSeconds(1)); Bandwidth limit = Bandwidth.classic(10, refill); this.bucket = Bucket4j.builder() .addLimit(limit) .build(); } // ... (information methods not needed) @Override public boolean shouldBlock(RequestInfo requestInfo) { return bucket.tryConsume(requestInfo.getBatchSize()); } } 我但是这个shouldblock的返回值似乎是反的,我实际使用后发现会发现异步的线程池的队列会很快被打满,抛出RejectedExecutionException。
Re: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据
我试了下,当我显示的设置env.setRuntimeMode(RuntimeExecutionMode.BATCH); 就不会进行checkpoint了,否则是可以。 > 2024年2月2日 17:20,ha.fen...@aisino.com 写道: > > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >env.setStateBackend(new FsStateBackend("file:\\d:\\abc")); > > > 发件人: jinzhuguang > 发送时间: 2024-02-02 17:16 > 收件人: user-zh > 主题: Re: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据 > 你是在batch模式下手动开启了checkpoint吗 > >> 2024年2月2日 17:11,ha.fen...@aisino.com 写道: >> >> 今天正好测试了这个问题,开启checkpoint后,读取一个文件内容,在checkpoints有记录时,停止程序,然后再从checkpoint读取启动,读取的记录并不是从最开始,这说明批处理下也会自动保存状态。 >> >> 发件人: jinzhuguang >> 发送时间: 2024-02-02 16:47 >> 收件人: user-zh >> 主题: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据 >> Flink 1.16.0 >> >> 我在阅读FileSink的代码时发现,其依靠StatefulSinkWriter的snapshotState接口在checkpoint时存储当前的状态。 >> >> interface StatefulSinkWriter extends >> SinkWriter { >> /** >>* @return The writer's state. >>* @throws IOException if fail to snapshot writer's state. >>*/ >> List snapshotState(long checkpointId) throws IOException; >> } >> >> 然而,我了解到Flink在batch模式不会开启checkpoint机制,那我如何能够保证批任务的状态能够得到及时保存。 >> >> 如果我在进行大规模数据的ETL操作,因为某些task失败导致任务重试,难道整个任务都要从头开始吗? >> >> 恳请各位大佬赐教 >
Re: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据
你是在batch模式下手动开启了checkpoint吗 > 2024年2月2日 17:11,ha.fen...@aisino.com 写道: > > 今天正好测试了这个问题,开启checkpoint后,读取一个文件内容,在checkpoints有记录时,停止程序,然后再从checkpoint读取启动,读取的记录并不是从最开始,这说明批处理下也会自动保存状态。 > > 发件人: jinzhuguang > 发送时间: 2024-02-02 16:47 > 收件人: user-zh > 主题: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据 > Flink 1.16.0 > > 我在阅读FileSink的代码时发现,其依靠StatefulSinkWriter的snapshotState接口在checkpoint时存储当前的状态。 > > interface StatefulSinkWriter extends SinkWriter > { >/** > * @return The writer's state. > * @throws IOException if fail to snapshot writer's state. > */ >List snapshotState(long checkpointId) throws IOException; >} > > 然而,我了解到Flink在batch模式不会开启checkpoint机制,那我如何能够保证批任务的状态能够得到及时保存。 > > 如果我在进行大规模数据的ETL操作,因为某些task失败导致任务重试,难道整个任务都要从头开始吗? > > 恳请各位大佬赐教
Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据
Flink 1.16.0 我在阅读FileSink的代码时发现,其依靠StatefulSinkWriter的snapshotState接口在checkpoint时存储当前的状态。 interface StatefulSinkWriter extends SinkWriter { /** * @return The writer's state. * @throws IOException if fail to snapshot writer's state. */ List snapshotState(long checkpointId) throws IOException; } 然而,我了解到Flink在batch模式不会开启checkpoint机制,那我如何能够保证批任务的状态能够得到及时保存。 如果我在进行大规模数据的ETL操作,因为某些task失败导致任务重试,难道整个任务都要从头开始吗? 恳请各位大佬赐教
FileSystem Connector如何优雅的支持同时写入多个路径
Flink版本:1.16.0 看官网上的案例: CREATE TABLE MyUserTable ( column_name1 INT, column_name2 STRING, ... part_name1 INT, part_name2 STRING ) PARTITIONED BY (part_name1, part_name2) WITH ( 'connector' = 'filesystem', -- 必选:指定连接器类型 'path' = 'file:///path/to/whatever', -- 必选:指定路径 'format' = '...', -- 必选:文件系统连接器指定 format -- 有关更多详情,请参考 Table Formats 'partition.default-name' = '...', -- 可选:默认的分区名,动态分区模式下分区字段值是 null 或空字符串 -- 可选:该属性开启了在 sink 阶段通过动态分区字段来 shuffle 数据,该功能可以大大减少文件系统 sink 的文件数,但是可能会导致数据倾斜,默认值是 false 'sink.shuffle-by-partition.enable' = '...', ... ) 目前只支持写入一个path,有没有大佬有过最佳实践,如何写入多个path。
Re: 关于Flink SQL语句内的函数名和内建函数名称对应不上的问题
感谢大佬,我找到了。 所以说SQL类的内建函数实际上使用的是calcite的能力,而flink自己的内建函数是在table api中使用 > 2023年11月24日 17:07,Xuyang 写道: > > Hi, > 关于你举的例子,如果编译了源码的话,可以在FlinkSqlParserImpl这个被动态生成的词法解析器类中找到PostfixRowOperator方法,大致是通过识别到IS > NOT NULL这三个关键字,转化为Calcite的这个内置函数SqlStdOperatorTable.IS_NOT_NULL > > > > > -- > >Best! >Xuyang > > > > > > 在 2023-11-24 15:15:04,"jinzhuguang" 写道: >> flink 1.18.0 >> >> >> 例如我写下一条SQL: >> select * from KafkaTable where id is not null; >> >> IS NOT NULL应该属于系统内建函数,于是我找到相关代码: >> >> public static final BuiltInFunctionDefinition IS_NOT_NULL = >> BuiltInFunctionDefinition.newBuilder() >> .name("isNotNull") >> .kind(SCALAR) >> >> .inputTypeStrategy(wildcardWithCount(ConstantArgumentCount.of(1))) >> >> .outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull())) >> .build(); >> >> 发现他的name是“ isNotNull”,和“is not null”对应不上。并且经过实际测验,确实证实了我的猜想: >> >> DEBUG org.apache.flink.table.module.ModuleManager [] - >> Cannot find FunctionDefinition 'is not null' from any loaded modules. >> >> >> 所以我很疑惑,SQL到底是在哪里找到了”is not null”这个函数的呢? >> >> 以下是调用栈: >> @org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads() >> at >> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:69) >> at >> org.apache.calcite.sql.SqlUtil.lookupSubjectRoutinesByName(SqlUtil.java:609) >> at >> org.apache.calcite.sql.SqlUtil.lookupSubjectRoutines(SqlUtil.java:535) >> at org.apache.calcite.sql.SqlUtil.lookupRoutine(SqlUtil.java:486) >> at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:595) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:6302) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:6287) >> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1869) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1860) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:4341) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereClause(SqlValidatorImpl.java:4333) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3606) >> at >> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:64) >> at >> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:89) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1050) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1025) >> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:248) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1000) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:749) >> at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196) >> at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117) >> at >> org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:261) >> at >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) >> at >> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:187) >> at >> org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212) >> at >> org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119) >> at >> org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:750)
关于Flink SQL语句内的函数名和内建函数名称对应不上的问题
flink 1.18.0 例如我写下一条SQL: select * from KafkaTable where id is not null; IS NOT NULL应该属于系统内建函数,于是我找到相关代码: public static final BuiltInFunctionDefinition IS_NOT_NULL = BuiltInFunctionDefinition.newBuilder() .name("isNotNull") .kind(SCALAR) .inputTypeStrategy(wildcardWithCount(ConstantArgumentCount.of(1))) .outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull())) .build(); 发现他的name是“ isNotNull”,和“is not null”对应不上。并且经过实际测验,确实证实了我的猜想: DEBUG org.apache.flink.table.module.ModuleManager [] - Cannot find FunctionDefinition 'is not null' from any loaded modules. 所以我很疑惑,SQL到底是在哪里找到了”is not null”这个函数的呢? 以下是调用栈: @org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads() at org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:69) at org.apache.calcite.sql.SqlUtil.lookupSubjectRoutinesByName(SqlUtil.java:609) at org.apache.calcite.sql.SqlUtil.lookupSubjectRoutines(SqlUtil.java:535) at org.apache.calcite.sql.SqlUtil.lookupRoutine(SqlUtil.java:486) at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:595) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:6302) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:6287) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1869) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1860) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:4341) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereClause(SqlValidatorImpl.java:4333) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3606) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:64) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:89) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1050) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1025) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:248) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1000) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:749) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117) at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:261) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:187) at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212) at org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119) at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)
Re: flink sql如何实现json字符数据解析?
Flink SQL比较适合处理结构化的数据,不知道你的body_data中的filed数量是否是固定的。如果是固定的,那可以将源和目标的格式写成Table形式。 比如: SourceT: ( uuid String, body_data ARRAY> ) SinkT ( result ARRAY> ) Insert into SinkT (result) select Array[ROW(uuid, null,body_data[1]. field1 as body_data.fild1, body_data[1]. Field2 as body_data.fild2), ROW(uuid, null,body_data[2]. field, body_data[2]. field2)] as result 希望对你有帮助 > 2023年11月22日 20:54,casel.chen 写道: > > 输入: > > { > > "uuid":"", > > "body_data": > "[{\"fild1\":1"1231","fild2\":1"2341"},{"fild1\":"abc\","fild2\":"cdf\"}]" > > } > > > > > 输出: > > [ > > { > > "uuid": "", > > "body_data: null, > > "body_data.fild1": "123”, > > "body_data.fild2": "234" > > }, > > { > > "uuid": "", > > "body_data": null, > > "body_data.fild1": "abc", > > "body_data.fild2": "cdf" > > } > > ] > > > > > 当格式错误时 > > > > > 输入: > > { > > "uuid": "”, > > "body_data": "abc" > > } > > 输出: > > { > > "uuid": "", > > "body_data": "abc", > > "body_data.fild1": null, > > "body_data.fild2": null > > }
如何在Flink Connector Source退出时清理资源
版本:Flink 1.16.0 需求:在某个source结束退出时清理相关的资源。 问题:目前没有找到Source退出时相关的hook函数,不知道在哪里编写清理资源的代码。 恳请大佬们指教。
Re: 关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。
感谢大佬!!! > 2023年10月13日 10:44,tanjialiang 写道: > > Hi, > 这个问题已经在1.17.0修复,详细可以看https://issues.apache.org/jira/browse/FLINK-30922 > > > best wishes, > tanjialiang. > > > 回复的原邮件 > | 发件人 | jinzhuguang | > | 发送日期 | 2023年10月13日 10:39 | > | 收件人 | user-zh | > | 主题 | 关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。 | > 首先,我的Flink版本为1.16.0 > 为了方便理解,我以Kafka作为案例来描述: > 我有以下两个表: > CREATE TABLE orders( > user_id BIGINT, > name STRING, > timestamp TIMESTAMP(3) METADATA VIRTUAL > )WITH( > 'connector'='kafka', > 'topic'='orders', > 'properties.group.id' = 'test_join_tempral', > 'scan.startup.mode'='earliest-offset', > 'properties.bootstrap.servers'='localhost:9092', > 'format'='json', > 'json.ignore-parse-errors' = 'true' > ); > CREATE TABLE kafka_sink( > user_id BIGINT, > name STRING, > timestamp TIMESTAMP(3) METADATA FROM 'timestamp' > )WITH( > 'connector'='kafka', > 'topic'='kafka_sink', > 'properties.group.id' = 'test_join_tempral', > 'scan.startup.mode'='earliest-offset', > 'properties.bootstrap.servers'='localhost:9092', > 'format'='json', > 'json.ignore-parse-errors' = 'true' > ); > > 正常情况: > Flink SQL> insert into kafka_sink select user_id,name,`timestamp` from orders; > [INFO] Submitting SQL update statement to the cluster... > [INFO] SQL update statement has been successfully submitted to the cluster: > Job ID: e419ae9d2cad4c3c2a2c1150c1a86653 > > > 异常情况: > Flink SQL> insert into kafka_sink(user_id,name,`timestamp`) select > user_id,name,`timestamp` from orders; > [ERROR] Could not execute SQL statement. Reason: > org.apache.calcite.sql.validate.SqlValidatorException: Unknown target column > 'timestamp' > 很奇怪,为什么指定列名就不行了呢?而且还是识别不到”ts”列,kafka_sink schema如下: > Flink SQL> describe kafka_sink; > +---+--+--+-+---+---+ > | name | type | null | key |extras | > watermark | > +---+--+--+-+---+---+ > | user_id | BIGINT | TRUE | | | > | > | name | STRING | TRUE | | | > | > | timestamp | TIMESTAMP(3) | TRUE | | METADATA FROM 'timestamp' | > | > +---+--+--+-+---+---+ > > > > 恳请解答!
关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。
首先,我的Flink版本为1.16.0 为了方便理解,我以Kafka作为案例来描述: 我有以下两个表: CREATE TABLE orders( user_id BIGINT, name STRING, timestamp TIMESTAMP(3) METADATA VIRTUAL )WITH( 'connector'='kafka', 'topic'='orders', 'properties.group.id' = 'test_join_tempral', 'scan.startup.mode'='earliest-offset', 'properties.bootstrap.servers'='localhost:9092', 'format'='json', 'json.ignore-parse-errors' = 'true' ); CREATE TABLE kafka_sink( user_id BIGINT, name STRING, timestamp TIMESTAMP(3) METADATA FROM 'timestamp' )WITH( 'connector'='kafka', 'topic'='kafka_sink', 'properties.group.id' = 'test_join_tempral', 'scan.startup.mode'='earliest-offset', 'properties.bootstrap.servers'='localhost:9092', 'format'='json', 'json.ignore-parse-errors' = 'true' ); 正常情况: Flink SQL> insert into kafka_sink select user_id,name,`timestamp` from orders; [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: e419ae9d2cad4c3c2a2c1150c1a86653 异常情况: Flink SQL> insert into kafka_sink(user_id,name,`timestamp`) select user_id,name,`timestamp` from orders; [ERROR] Could not execute SQL statement. Reason: org.apache.calcite.sql.validate.SqlValidatorException: Unknown target column 'timestamp' 很奇怪,为什么指定列名就不行了呢?而且还是识别不到”ts”列,kafka_sink schema如下: Flink SQL> describe kafka_sink; +---+--+--+-+---+---+ | name | type | null | key |extras | watermark | +---+--+--+-+---+---+ | user_id | BIGINT | TRUE | | | | | name | STRING | TRUE | | | | | timestamp | TIMESTAMP(3) | TRUE | | METADATA FROM 'timestamp' | | +---+--+--+-+---+---+ 恳请解答!
Re: Flink cdc 2.0 历史数据太大,导致log积压怎么解决
你好,除了这些运维手段外,flink cdc本身有什么解法吗,比如说增量阶段不用从头开始读binlog,因为其实很多都是重复读到的数据 > 2023年9月20日 21:00,Jiabao Sun 写道: > > Hi, > 生产环境的binlog还是建议至少保留7天,可以提高故障恢复时间容忍度。 > 另外,可以尝试增加snapshot的并行度和资源来提升snapshot速度,snapshot完成后可以从savepoint恢复并减少资源。 > Best, > Jiabao > -- > From:jinzhuguang > Send Time:2023年9月20日(星期三) 20:56 > To:user-zh > Subject:Flink cdc 2.0 历史数据太大,导致log积压怎么解决 > 以mysql > cdc为例,现在的f整体流程是先同步全量数据,再开启增量同步;我看代码目前增量的初始offset选择的是所有全量split的最小的highwatermark。那我如果全量数据很大,TB级别,全量同步可能需要很久,但是binlog又不能删除,这样堆积起来会占用很大的空间,不知道这个问题现在有什么常见的解法吗?
Flink cdc 2.0 历史数据太大,导致log积压怎么解决
以mysql cdc为例,现在的f整体流程是先同步全量数据,再开启增量同步;我看代码目前增量的初始offset选择的是所有全量split的最小的highwatermark。那我如果全量数据很大,TB级别,全量同步可能需要很久,但是binlog又不能删除,这样堆积起来会占用很大的空间,不知道这个问题现在有什么常见的解法吗?
Flink cdc 2.0 历史数据太大,导致log积压怎么解决
以mysql cdc为例,现在的f整体流程是先同步全量数据,再开启增量同步;我看代码目前增量的初始offset选择的是所有全量split的最小的highwatermark。那我如果全量数据很大,TB级别,全量同步可能需要很久,但是binlog又不能删除,这样堆积起来会占用很大的空间,不知道这个问题现在有什么常见的解法吗?
Re: 如何把自己新增的定制化connector deploy snapshot版本到私服仓库
非常感谢你的提醒,我现在用maven工具修改了所有的版本号为snapshot,但是flink-connectors(connectors的父模块)也变成snapshot,打包的时候仓库里找不到他了,而且也没法想flink-runtime这些包手动改下版本好,这种该怎么办 > 2023年7月27日 11:05,Jiabao Sun 写道: > > 你好, > > 通常在 pom 中引入 maven-deploy-plugin,并且通过 声明私服地址,使用 mvn > clean deploy 命令部署到nexus私服。 > 部署到 SNAPSHOT 仓库需要项目版本号包含 -SNAPSHOT 后缀,可以在IDE中全局替换,也可以使用 versions-maven-plugin > 统一设置。 > > > > > > org.apache.maven.plugins > maven-deploy-plugin > 2.8.2 > > ${maven.deploy.skip} > > > > > > > >private-snapshots > > https://xxx.xxx.xxx/nexus/content/repositories/snapshots/ > > >private-releases >https://xxx.xxx.xxx/nexus/content/repositories/releases/ > > > > > >> 2023年7月27日 上午10:48,jinzhuguang 写道: >> >> 我是基于flink 1.16.0开发的,由于版本号没有snapshot,现在又无法发布release版本的,我该怎么办?
如何把自己新增的定制化connector deploy snapshot版本到私服仓库
我是基于flink 1.16.0开发的,由于版本号没有snapshot,现在又无法发布release版本的,我该怎么办?
Re: ASF jira account
我也遇到类似的问题,我是链接失效了,最后没办法再注册了 > 2023年7月20日 14:54,李天龙 写道: > > 您好! > 我想注册一个flink jira的账号,但由于提出的里有不充分给拒掉了,想再次申请,却提示邮箱已申请过,还未处理: > > > There is already a pending Jira account request associated with this email > address. Please wait for it to be processed > > > 请问怎么解决这个问题,并且成功申请一个账号 > > > > > > > -- > 发自我的网易邮箱平板适配版
Re: 如果DataSet API 被彻底废掉了,那我如何用DataStream实现分区、排序这个需求?
嗨你好,用于sort的中间数据是存储在状态后端当中吗,数据量很大的情况下。 > 2023年7月12日 19:48,weijie guo 写道: > > 你好, > 首先,Batch Shuffle 的中间数据都是会落盘的。其次,对于 Sort 这个操作来说,上面给出的解法和Dataset一致,都不会落盘。 > > Best regards, > > Weijie > > > jinzhuguang 于2023年7月12日周三 17:28写道: > >> 如果我的数据量很大,内存装不下,flink在batch >> mode下的行为是否会像传统的批处理系统,例如hive那样,会进行shuffe、中间数据落盘等操作。 >> >>> 2023年7月12日 17:05,weijie guo 写道: >>> >>> >> 你好,对于DataSet中不按照key进行全量聚合/排序的API(例如,sortPartition/mapPartition),DataStream上目前没有直接提供相同的API,但可以通过组合DataStream上现有的API实现相同的功能。 >>> 以mapPartition为例,可以通过以下三个步骤实现相同的功能: >>> 1. dataStream.map(record -> (subtaskIndex, >>> record)),为每个Record增加处理该record时子任务编号。 >>> 2. dataStream.assignTimestampsAndWatermarks,为每个Record增加相同的时间戳。 >>> 3. >>> >> dataStream.window(TumblingEventTimeWindows.of(Time.seconds(1))).apply(mapPartition >>> udf),基于固定时间窗口开窗,收集全量数据进行并使用和mapPartition相同的逻辑进行处理。 >>> >>> 以下链接中是相关的示例代码,其中的第一步和第二步,在DataSet API被移除之前后续会考虑为DataStream API提供相关的工具方法: >>> >>> https://netcut.cn/p/dc693599e9031cd7 >> >>
Re: 如果DataSet API 被彻底废掉了,那我如何用DataStream实现分区、排序这个需求?
如果我的数据量很大,内存装不下,flink在batch mode下的行为是否会像传统的批处理系统,例如hive那样,会进行shuffe、中间数据落盘等操作。 > 2023年7月12日 17:05,weijie guo 写道: > > 你好,对于DataSet中不按照key进行全量聚合/排序的API(例如,sortPartition/mapPartition),DataStream上目前没有直接提供相同的API,但可以通过组合DataStream上现有的API实现相同的功能。 > 以mapPartition为例,可以通过以下三个步骤实现相同的功能: > 1. dataStream.map(record -> (subtaskIndex, > record)),为每个Record增加处理该record时子任务编号。 > 2. dataStream.assignTimestampsAndWatermarks,为每个Record增加相同的时间戳。 > 3. > dataStream.window(TumblingEventTimeWindows.of(Time.seconds(1))).apply(mapPartition > udf),基于固定时间窗口开窗,收集全量数据进行并使用和mapPartition相同的逻辑进行处理。 > > 以下链接中是相关的示例代码,其中的第一步和第二步,在DataSet API被移除之前后续会考虑为DataStream API提供相关的工具方法: > > https://netcut.cn/p/dc693599e9031cd7
如果DataSet API 被彻底废掉了,那我如何用DataStream实现分区、排序这个需求?
目前,我在flink 1.16还没有发现DataStream有实现分区排序的接口,现在是否有方法在有界数据集上实现分区、排序这个需求呢?
如果DataSet API 被彻底废掉了,那我如何用DataStream实现分区、排序这个需求?
目前,我在flink 1.16还没有发现DataStream有实现分区排序的接口,现在是否有方法在有界数据集上实现分区、排序这个需求呢?