Re: jdbc connectors

2021-04-01 文章 guoyb
不是replace,用的是insert into duplicate key update



---Original---
From: "liujian"<13597820...@qq.com
Date: Thu, Apr 1, 2021 16:02 PM
To: "user-zh"

Re: flink集群提交任务挂掉

2021-04-01 文章 shimin huang
增大`taskmanager.memory.task.off-heap.size`配置

bowen li  于2021年4月2日周五 上午10:54写道:

> Hi,大家好:
>  现在我们遇到的场景是这样的,提交任务的时候会报错。我们使用的版本是1.12.1,搭建模式是standalone的。下面是报错信息。
>
>java.lang.OutOfMemoryError: Direct buffer memory. The direct
> out-of-memory error has occurred. This can mean two things: either job(s)
> require(s) a larger size of JVM direct memory or there is a direct memory
> leak. The direct memory can be allocated by user code or some of its
> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
> configuration option should be increased. Flink framework and its
> dependencies also consume the direct memory, mostly for network
> communication. The most of network memory is managed by Flink and should
> not result in out-of-memory error. In certain special cases, in particular
> for jobs with high parallelism, the framework may require more direct
> memory which is not managed by Flink. In this case
> 'taskmanager.memory.framework.off-heap.size' configuration option should be
> increased. If the error persists then there is probably a direct memory
> leak in user code or some of its dependencies
>   这种情况我们需要特别的配置吗?


flink集群提交任务挂掉

2021-04-01 文章 bowen li
Hi,大家好:
 现在我们遇到的场景是这样的,提交任务的时候会报错。我们使用的版本是1.12.1,搭建模式是standalone的。下面是报错信息。

   java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory 
error has occurred. This can mean two things: either job(s) require(s) a larger 
size of JVM direct memory or there is a direct memory leak. The direct memory 
can be allocated by user code or some of its dependencies. In this case 
'taskmanager.memory.task.off-heap.size' configuration option should be 
increased. Flink framework and its dependencies also consume the direct memory, 
mostly for network communication. The most of network memory is managed by 
Flink and should not result in out-of-memory error. In certain special cases, 
in particular for jobs with high parallelism, the framework may require more 
direct memory which is not managed by Flink. In this case 
'taskmanager.memory.framework.off-heap.size' configuration option should be 
increased. If the error persists then there is probably a direct memory leak in 
user code or some of its dependencies
  这种情况我们需要特别的配置吗?

求助:通过实时 Pipeline 的手段消费 Hive Table 报java.lang.ArrayIndexOutOfBoundsException: -1,谢谢!

2021-04-01 文章 samuel....@ubtrobot.com
你好:
1. 实时通过读KAFKA,然后将数据写入了hive,建一张hive表,format 是 Parquet,是按天、小时、分钟来分区;

2. 通过实时 Pipeline 的手段消费 Hive Table 报java.lang.ArrayIndexOutOfBoundsException: -1
  在flink sql client下:  
  1)直接select 所有字段,是没有问题,可以正常读出所有数据。
  执行:  select *
 from ubtCatalog.ubtHive.event_all_dwd  
 /*+ OPTIONS('streaming-source.enable'='true', 
'streaming-source.partition.include'='all', 
'streaming-source.monitor-interval'='5s', 
'streaming-source.consume-order'='partition-time','streaming-source.consume-start-offset'='2021-01-01')
 */ 
 ;

2) 在1)基础上加上统计函数,一直报莫名的错,java.lang.ArrayIndexOutOfBoundsException: -1
  执行:  select count(xubtappid) 
 from ubtCatalog.ubtHive.event_all_dwd  
 /*+ OPTIONS('streaming-source.enable'='true', 
'streaming-source.partition.include'='all', 
'streaming-source.monitor-interval'='5s', 
'streaming-source.consume-order'='partition-time','streaming-source.consume-start-offset'='2021-01-01')
 */ 
 ;

具体报错信息如下:
2021-04-02 10:06:26
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:89)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:240)
at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:469)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:404)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.util.FlinkException: Global failure triggered by 
OperatorCoordinator for 'Source: HiveSource-ubtHive.event_all_dwd' (operator 
bc764cd8ddf7a0cff126f51c16239658).
at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:466)
at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237)
at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:240)
at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:247)
at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:44)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.connectors.hive.FlinkHiveException: Failed to 
enumerate files
at 
org.apache.flink.connectors.hive.ContinuousHiveSplitEnumerator.handleNewSplits(ContinuousHiveSplitEnumerator.java:148)
at 
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$4(ExecutorNotifier.java:135)
at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
... 3 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
at 
org.apache.flink.connectors.hive.util.HivePartitionUtils.toHiveTablePartition(HivePartitionUtils.java:167)
at 

blink planner里的Scala代码,未来会由Java改写吗?

2021-04-01 文章 Luna Wong
目前blink planner中有大量Scala代码,Scala在这方面写起来确实简单不少。未来不需要用Java重写是吗?


Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic

2021-04-01 文章 HunterXHunter
查看hdfs文件:
分区一直是这样的一个文件,没有生成 _SUCCESS文件
.part-40a2c94d-0437-4666-8d43-31c908aaa02e-0-0.inprogress.73dcc10b-44f4-47e3-abac-0c14bd59f9c9



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic

2021-04-01 文章 HunterXHunter
你好,这个问题已经解决了。
我现在通过官方例子:


SET table.sql-dialect=default;

create table  flink_kafka(
sys_time bigint,
rt  AS TO_TIMESTAMP(FROM_UNIXTIME(sys_time / 1000, '-MM-dd HH:mm:ss')),
WATERMARK FOR rt AS rt - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'xx',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '',
'properties.group.id' = 'test-sql',
'format' = 'json',
'json.ignore-parse-errors' = 'true'
);

SET table.sql-dialect=hive;

CREATE TABLE hive_table (
  sys_time bigint
) PARTITIONED BY (dt STRING, hr STRING) STORED AS orc TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='process-time',
  'sink.partition-commit.delay'='0s',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);


INSERT INTO hive_table SELECT sys_time, DATE_FORMAT(rt, '-MM-dd') as dt,
DATE_FORMAT(rt, 'HH') as hr  FROM flink_kafka;

发现数据一直无法写入hive。程序没有报错,
select * from flink_kafka;是有数据的。
但是hive_table一直没有数据,
我发送各个时间段的数据,watermark应该也是超过了分区时间的,但是hive_table一直没有数据




--
Sent from: http://apache-flink.147419.n8.nabble.com/


????

2021-04-01 文章 ???0?6


Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic

2021-04-01 文章 Rui Li
你好,

我用你提供的这个DDL没有复现这个问题,有更详细的操作步骤么?另外如果kafka表是通过create table like创建的话有个已知问题:
https://issues.apache.org/jira/browse/FLINK-21660

On Thu, Apr 1, 2021 at 4:08 PM HunterXHunter <1356469...@qq.com> wrote:

> 当配置好HiveCatalog后,
> SQL-Cli 也可以查到hive库表信息
> 创建kafka表:
>
> create table test.test_kafka(
> word VARCHAR
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'xx',
> 'scan.startup.mode' = 'latest-offset',
> 'properties.bootstrap.servers' = 'xx',
> 'properties.group.id' = 'test',
> 'format' = 'json',
> 'json.ignore-parse-errors' = 'true'
> );
> 在 Hive里面可以查到改表
> hive > DESCRIBE FORMATTED test_kafka
>...
> is_generic  true
>.
>
> 但是我在 Flink SQL > select * from test.test_kafka;
> 报错:
> org.apache.flink.table.api.ValidationException: Unsupported options found
> for connector 'kafka'.
> Unsupported options:
> is_generic
> Supported options:
> connector
> format
> json.fail-on-missing-field
> json.ignore-parse-errors
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best regards!
Rui Li


Re: 回复:现在的flink1.12支持批流混合的作业吗?

2021-04-01 文章 键盘击打者
谢谢老哥的回复。

可能我举得例子不是很好。我其实还是单纯的想问
使用DataStream的API,支不支持同一个应用中批流混合的作业(比如一个应用多个job,job有流有批)...

就我目前看到的文档中来看,是支持的。但是指定execution.runtime-mode为批、流、自动模式后,作业的执行还是会依照批和流其中一种模式,所以在性能上不能很好的支持批流混合的作业。

不知道我理解的是否正确,学生狗一只。

祝好,
耳朵



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 关于Flink水位线与时间戳分配的疑问

2021-04-01 文章 Shengkai Fang
hi, 图挂了。

1. 可以这么使用这个方法:

···

input.assignTimestampsAndWatermarks(

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(10))
.withTimestampAssigner((event, timestamp) -> 42L));

···
TimestampAssigner 会从输入的event上读取数据 并由watermark generator 决定输出对应的watermark.

3. 容忍是由于数据会乱序.

Best,
Shengkai

陈柏含  于2021年4月1日周四 下午5:16写道:

> 您好:
>
> 我是目前正在自学Flink以完成毕业设计的计算机专业学生。目前对Flink时间戳与水位线的代码有很多不确定且找不到权威解答的疑问,自己调试程序也因为之前没有Flink经验而对Debug变量窗口中各种复杂的结构找不到头绪。因此,抱着试一试的想法尝试通过这个邮箱寻求解答。
>  下面两个图片是一个分配器
>
>
>
> 我有一下几个疑问:1.是不是我们只要调用env.addSource(...).assignTimestampsAndWatermarks(new
> PeriodicAssigner)就能分配时间戳和水位线了呢?但是我看不到这个类中有分配时间戳的行为,好像只是和获取时间戳和分配水位线相关。
>2.maxTs这个变量为什么能够从Long.MinValue中观察到最大的时间戳呢?
>3.这段代码的解释中提到“该分配器会返回一个时间戳等于最大时间戳减去1分钟容忍间隔的水位线”,这里所说的容忍间隔的目的是什么呢?
>
> 感谢您的回复,非常感谢您的点拨!
>


Re: FS StateBackend 到 RocksDB StateBackend 状态恢复问题

2021-04-01 文章 LakeShen
Hi fanrui,

thank you so much!

Best,
LakeShen


范瑞 <836961...@qq.com> 于2021年4月1日周四 下午7:36写道:

> Hi Lake:
>
>
> 目前的 Flink 版本应该都是不支持的,也就是说换了 StateBackend 不能正常恢复。Flink 1.13
> 做了这个事情,具体参考:FLIP41 和 FLINK-20976
>
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State
>
>
> https://issues.apache.org/jira/browse/FLINK-2097
>
>
> Best,
> fanrui
>
> ---原始邮件---
> 发件人: "LakeShen" 发送时间: 2021年4月1日(周四) 晚上7:16
> 收件人: "user-zh" 主题: FS StateBackend 到 RocksDB StateBackend 状态恢复问题
>
>
> Hi 社区,
>  如果实时任务状态后端之前是 FS StateBackend
> ,然后任务停止后,换成 RocksDB StateBackend
> 做恢复,作业状态能恢复吗?
>
> Best,
> LakeShen


Re: FS StateBackend 到 RocksDB StateBackend 状态恢复问题

2021-04-01 文章 LakeShen
确定了 不能

LakeShen  于2021年4月1日周四 下午7:15写道:

> Hi 社区,
>如果实时任务状态后端之前是 FS StateBackend ,然后任务停止后,换成 RocksDB StateBackend
> 做恢复,作业状态能恢复吗?
>
> Best,
> LakeShen
>


回复:FS StateBackend 到 RocksDB StateBackend 状态恢复问题

2021-04-01 文章 范瑞
Hi Lake:


目前的 Flink 版本应该都是不支持的,也就是说换了 StateBackend 不能正常恢复。Flink 1.13 做了这个事情,具体参考:FLIP41 和 
FLINK-20976


https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State


https://issues.apache.org/jira/browse/FLINK-2097


Best,
fanrui

---原始邮件---
发件人: "LakeShen"

FS StateBackend 到 RocksDB StateBackend 状态恢复问题

2021-04-01 文章 LakeShen
Hi 社区,
   如果实时任务状态后端之前是 FS StateBackend ,然后任务停止后,换成 RocksDB StateBackend
做恢复,作业状态能恢复吗?

Best,
LakeShen


Re: 关于flink CheckPoint 状态数据保存的问题

2021-04-01 文章 Paul Lam
关于 chk 下只有 _metadata 的问题,大概是因为 state 比较小,被嵌入到 _medata 文件里了。可以参考这个配置项 [1]。

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#state-backend-fs-memory-threshold

Best,
Paul Lam

> 2021年4月1日 16:25,lp <973182...@qq.com> 写道:
> 
> 好的,谢谢
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



回复:现在的flink1.12支持批流混合的作业吗?

2021-04-01 文章 飞翔
哈哈,建议你去了解flink-cdc



发自我的iPhone


-- 原始邮件 --
发件人: 键盘击打者 http://apache-flink.147419.n8.nabble.com/file/t1425/1617266981%281%29.jpg; 

祝好,
耳朵



--
Sent from: http://apache-flink.147419.n8.nabble.com/

现在的flink1.12支持批流混合的作业吗?

2021-04-01 文章 键盘击打者
嗨,您好!

比如我想在一个应用里同时跑实时数据清洗和历史数据清洗(批作业和流作业混合)?
类似下图的应用。
 

祝好,
耳朵



--
Sent from: http://apache-flink.147419.n8.nabble.com/

关于Flink水位线与时间戳分配的疑问

2021-04-01 文章 陈柏含
您好:
  
我是目前正在自学Flink以完成毕业设计的计算机专业学生。目前对Flink时间戳与水位线的代码有很多不确定且找不到权威解答的疑问,自己调试程序也因为之前没有Flink经验而对Debug变量窗口中各种复杂的结构找不到头绪。因此,抱着试一试的想法尝试通过这个邮箱寻求解答。
 下面两个图片是一个分配器



我有一下几个疑问:1.是不是我们只要调用env.addSource(...).assignTimestampsAndWatermarks(new 
PeriodicAssigner)就能分配时间戳和水位线了呢?但是我看不到这个类中有分配时间戳的行为,好像只是和获取时间戳和分配水位线相关。
   2.maxTs这个变量为什么能够从Long.MinValue中观察到最大的时间戳呢?
   3.这段代码的解释中提到“该分配器会返回一个时间戳等于最大时间戳减去1分钟容忍间隔的水位线”,这里所说的容忍间隔的目的是什么呢?

感谢您的回复,非常感谢您的点拨!


Re: Connection reset by peer

2021-04-01 文章 骆凡
加内存试试



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于flink CheckPoint 状态数据保存的问题

2021-04-01 文章 lp
好的,谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


???????????????? OutOfMemoryError: Metaspace ????

2021-04-01 文章 ??????
??
 sql ?? s3  
clickhouse ?? kafka
?? task-manager  OutOfMemoryError: Metaspace 


??
flink ??1.12.2
?? standalone kubernetes session 
??

  

Re: 关于flink CheckPoint 状态数据保存的问题

2021-04-01 文章 lp
如题,除了通过这种全局配置文件中的方式修改,能在程序中通过代码的方式修改吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/

???????????????? OutOfMemoryError: Metaspace ????

2021-04-01 文章 ??????
??
 sql ?? s3  
clickhouse ?? kafka
?? task-manager OutOfMemoryError: Metaspace 


??
flink ??1.12.2
?? standalone kubernetes session 
??

  

Re: 关于flink CheckPoint 状态数据保存的问题

2021-04-01 文章 tison
这个配置本身我看了一下只能走 flink-conf.yaml,而且似乎是 per cluster 配置的,虽然 perjob /
application 部署的时候没啥问题,但是 session 可能就不行了。配置这块 Flink 是有点全走 flink-conf.yaml +
默认你是用 perjob / application 的意思。

你提的数据看不到的问题,首先确认一下是否 chk 真的有数据。另外我依稀记得 tangyun(in cc) 做过一个改动,可以问下他的看法。

Best,
tison.


tison  于2021年4月1日周四 下午3:50写道:

> 只有一个的问题是因为默认保留的 chk 数量是一个,可以修改这个配置[1]来改变。
>
> Best,
> tison.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#state-checkpoints-num-retained
>
>
> lp <973182...@qq.com> 于2021年4月1日周四 下午3:48写道:
>
>> 我写了一个带状态的function
>> 采用了如下cp配置:
>> env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);
>> env.getCheckpointConfig().setCheckpointTimeout(6L);
>>
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> env.setStateBackend(new FsStateBackend("file:///cp/eventCounter"));
>>
>> 请教几个问题:
>> ①按照官网的介绍,目录数据应该是这样的
>> /user-defined-checkpoint-dir
>> /{job-id}
>> |
>> + --shared/
>> + --taskowned/
>> + --chk-1/
>> + --chk-2/
>> + --chk-3/
>> ...
>>
>> 但是我的测试是job正常running时,chk-*永远只有一个,每次做chk,递增 +1 一次
>>
>>
>> ②状态数据按照理解是保存在chk-*下面的,但是我的测试下面只有一个_metadata,并没有每次chk的数据,使用的flink1.12
>> 当我改成使用flink1.8时,是可以看到如下chk-*目录下除了_metadata,还有每次的chk数据.
>> 所以flink1.12高版本的情况每次chk的数据在哪里
>>
>>
>>
>> ③按照官网介绍,默认只保留最新的一份chk数据,如果想保留最近的多份,除了全局flink-conf.yaml中配置state.checkpoints.num-retained:
>> 5, 有程序中使用env 针对每job的配置方式吗
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>


flink 1.12.2 sql-cli 写入Hive报错 is_generic

2021-04-01 文章 HunterXHunter
当配置好HiveCatalog后,
SQL-Cli 也可以查到hive库表信息
创建kafka表:

create table test.test_kafka(
word VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'xx',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'xx',
'properties.group.id' = 'test',
'format' = 'json',
'json.ignore-parse-errors' = 'true'
);
在 Hive里面可以查到改表
hive > DESCRIBE FORMATTED test_kafka
   ...
is_generic  true
   .

但是我在 Flink SQL > select * from test.test_kafka;
报错:
org.apache.flink.table.api.ValidationException: Unsupported options found
for connector 'kafka'.
Unsupported options:
is_generic
Supported options:
connector
format
json.fail-on-missing-field
json.ignore-parse-errors




--
Sent from: http://apache-flink.147419.n8.nabble.com/


jdbc connectors

2021-04-01 文章 liujian
Hi:
  jdbc connectors sink??mysql, 
?? replace 
into??,,mysqlid,AUTO_INCREMENT
 ??2

Re: 关于flink CheckPoint 状态数据保存的问题

2021-04-01 文章 tison
只有一个的问题是因为默认保留的 chk 数量是一个,可以修改这个配置[1]来改变。

Best,
tison.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#state-checkpoints-num-retained


lp <973182...@qq.com> 于2021年4月1日周四 下午3:48写道:

> 我写了一个带状态的function
> 采用了如下cp配置:
> env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);
> env.getCheckpointConfig().setCheckpointTimeout(6L);
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.setStateBackend(new FsStateBackend("file:///cp/eventCounter"));
>
> 请教几个问题:
> ①按照官网的介绍,目录数据应该是这样的
> /user-defined-checkpoint-dir
> /{job-id}
> |
> + --shared/
> + --taskowned/
> + --chk-1/
> + --chk-2/
> + --chk-3/
> ...
>
> 但是我的测试是job正常running时,chk-*永远只有一个,每次做chk,递增 +1 一次
>
>
> ②状态数据按照理解是保存在chk-*下面的,但是我的测试下面只有一个_metadata,并没有每次chk的数据,使用的flink1.12
> 当我改成使用flink1.8时,是可以看到如下chk-*目录下除了_metadata,还有每次的chk数据.
> 所以flink1.12高版本的情况每次chk的数据在哪里
>
>
>
> ③按照官网介绍,默认只保留最新的一份chk数据,如果想保留最近的多份,除了全局flink-conf.yaml中配置state.checkpoints.num-retained:
> 5, 有程序中使用env 针对每job的配置方式吗
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


关于flink CheckPoint 状态数据保存的问题

2021-04-01 文章 lp
我写了一个带状态的function
采用了如下cp配置:
env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);
env.getCheckpointConfig().setCheckpointTimeout(6L);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new FsStateBackend("file:///cp/eventCounter"));

请教几个问题:
①按照官网的介绍,目录数据应该是这样的
/user-defined-checkpoint-dir
/{job-id}
|
+ --shared/
+ --taskowned/
+ --chk-1/
+ --chk-2/
+ --chk-3/
...

但是我的测试是job正常running时,chk-*永远只有一个,每次做chk,递增 +1 一次


②状态数据按照理解是保存在chk-*下面的,但是我的测试下面只有一个_metadata,并没有每次chk的数据,使用的flink1.12
当我改成使用flink1.8时,是可以看到如下chk-*目录下除了_metadata,还有每次的chk数据.
所以flink1.12高版本的情况每次chk的数据在哪里


③按照官网介绍,默认只保留最新的一份chk数据,如果想保留最近的多份,除了全局flink-conf.yaml中配置state.checkpoints.num-retained:
5, 有程序中使用env 针对每job的配置方式吗





--
Sent from: http://apache-flink.147419.n8.nabble.com/


退订

2021-04-01 文章 Chouchou Mei
退订