flink1.10.0任务失败但是application仍然在yarn运行不会自动kill

2020-08-11 文章 魏烽
各位好:

Flink1.10.0版本发现偶发任务失败后但是web ui仍然挂着该任务并没有停止,日志如下:

请问有遇到一样的情况的嘛

[INFO] 2020-07-28 16:34:00.938  - [taskAppId=TASK-38-97-193]:[106] -  -> 
2020-07-28 16:33:52,863 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Submitting application master application_1571540269403_52656

2020-07-28 16:33:52,895 INFO  
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted 
application application_1571540269403_52656

2020-07-28 16:33:52,895 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Waiting for the cluster to be allocated

2020-07-28 16:33:52,897 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Deploying cluster, current state ACCEPTED

2020-07-28 16:34:00,938 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- YARN application has been deployed successfully.

[INFO] 2020-07-28 16:36:04.622  - [taskAppId=TASK-38-97-193]:[106] -  -> 
2020-07-28 16:34:00,941 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Found Web Interface nb-bdh-hadoop.slave14:21913 of application 
'application_1571540269403_52656'.

Job has been submitted with JobID 54ac0f3db08f29022d9f3d51d797a724

[INFO] 2020-07-28 16:36:04.623  - [taskAppId=TASK-38-97-193]:[106] -  -> 


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: 
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
complete the operation. Number of retries has been exhausted.

at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)

at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)

at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)

at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)

Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
complete the operation. Number of retries has been exhausted.

at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)

at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)

at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)

at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)

at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:678)

at com.nequal.bdh.cdp.IDMappingLauncher$.main(IDMappingLauncher.scala:140)

at com.nequal.bdh.cdp.IDMappingLauncher.main(IDMappingLauncher.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)

... 11 more

Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
Could not complete the operation. Number of retries has been exhausted.

at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:284)

at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)

at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)

at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)

at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:342)

at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500)

at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:493)

at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:472)

at 

Re:Re: Re:Re: flink 1.11 StreamingFileWriter及sql-client问题

2020-08-11 文章 kandy.wang



source就是kafka 
json格式,是exactly-once,按照process-time处理就已经写完了呢。起来的时候,process-time已经属于新的分区了,很正常。但以前的老分区状态还没提交呢。






in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?



in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?

在 2020-08-12 13:28:01,"Jingsong Li"  写道:
>你的source是exactly-once的source吗?
>
>in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
>
>On Wed, Aug 12, 2020 at 12:51 PM kandy.wang  wrote:
>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> >@ Jingsong
>>
>> >导致的影响是停止前的那个分区,分区没有提交, 当程序起来之后,写的分区和之前分区不是同一个分区,没有_SUCCESS文件标记。
>> 用presto查询查不了
>> 举例:12:35分钟应当写的是 12:35 00秒 -12:39分 59秒 之间的数据,
>>  'sink.partition-commit.trigger'='process-time',
>>   'sink.partition-commit.delay'='0 min',
>>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>>   'sink.rolling-policy.check-interval'='30s',
>>   'sink.rolling-policy.rollover-interval'='10min',
>>   'sink.rolling-policy.file-size'='128MB'
>>如果是12:39分 05秒左右做一次savepoint,然后
>> 12:41分程序重启后,发现之前的12:35分区不再写入,里面的in-progress文件还在,但是分区没有提交,没有往hive add
>> partition,就导致有数据,但是确查不 了。
>> 按照你说的,in-progress文件对没影响,但是影响了分区提交。就没地方触发之前12:35分区提交逻辑了。相当于丢了一个分区。这种情况我试了一下,手动add
>> partition 也能查了。
>> >
>> >
>> >
>> >在 2020-08-12 12:11:53,"Jingsong Li"  写道:
>> >>in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响
>> >>
>> >>On Wed, Aug 12, 2020 at 11:05 AM Jark Wu  wrote:
>> >>
>> >>> 与我所知,(2) & (3) 有希望能在 1.12 中支持。
>> >>>
>> >>> On Tue, 11 Aug 2020 at 21:15, kandy.wang  wrote:
>> >>>
>> >>> > 1.StreamingFileWriter 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
>> >>> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在
>> >>> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm
>> >>> > =2100分区的数据还存在很多的in-progress文件。
>> >>> > 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。
>> >>> >
>> >>> >
>> >>> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持
>> >>> >
>> >>> >
>> >>> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持
>> >>>
>> >>
>> >>
>> >>--
>> >>Best, Jingsong Lee
>>
>
>
>-- 
>Best, Jingsong Lee


Re: Flink 1.10 堆外内存一直在增加

2020-08-11 文章 Congxian Qiu
Hi
   你能拿到 memory 的 dump 吗?OOM 可能需要看一下 memory 的 dump 才能更好的确定是什么问题
Best,
Congxian


ReignsDYL <1945627...@qq.com> 于2020年8月11日周二 下午4:01写道:

> 各位好,
>   Flink 1.10,集群在运行过程中,堆外内存一直在不断增加,内存就被慢慢耗尽,导致任务挂掉,请问是什么原因啊?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql client:cdc 至elasticsearch 有问题(版本1.11.0),提交任务成功,但web dashbord上却看不到任务!求看是不是bug?还是我配置有问题?

2020-08-11 文章 Leonard Xu
Hello

1. 使用CDC功能的话请用1.11.1,该版本修复了一个CDC的bug[1]
2. 另外你这个异常栈是没有找到对应的 connector jar,确认下用的是 
flink-sql-connector-elasticsearch6_2.11-1.11.0 这个jar.

祝好
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-18461 


> 在 2020年8月12日,13:31,jindy_liu <286729...@qq.com> 写道:
> 
> 建表如下:
> 
> CREATE TABLE test (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> PRIMARY KEY(id) NOT ENFORCED 
> ) WITH (
> 'connector'='kafka',
> 'topic'='test',
> 'properties.group.id'='c_mysql_binlog_es',
> 'properties.bootstrap.servers'='localhost:9092',
> 'scan.startup.mode'='latest-offset',
> 'format'='canal-json',
> 'canal-json.ignore-parse-errors'='true'
> );
> 
> 
> # 输出表至es
> CREATE TABLE test_mirror_es (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> PRIMARY KEY(id) NOT ENFORCED 
> ) WITH (
>  'connector' = 'elasticsearch-7',
>  'hosts' = 'http://localhost:9200',
>  'index' = 'test_mirror'
> );
> 
> INSERT into test_mirror_es SELECT * from test where test.id >=0;
> 
> 日志:Caused by: org.apache.flink.table.api.ValidationException: Unable to
> create a source for reading table
> 'default_catalog.default_database.test_mirror_es'.
> 
> 完整日志如下:
> 
> 
> 2020-08-12 13:07:20,815 INFO 
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.rpc.address, localhost
> 2020-08-12 13:07:20,820 INFO 
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.rpc.port, 6123
> 2020-08-12 13:07:20,820 INFO 
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.memory.process.size, 1600m
> 2020-08-12 13:07:20,820 INFO 
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: taskmanager.memory.process.size, 1728m
> 2020-08-12 13:07:20,820 INFO 
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: taskmanager.numberOfTaskSlots, 10
> 2020-08-12 13:07:20,820 INFO 
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: parallelism.default, 5
> 2020-08-12 13:07:20,821 INFO 
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: state.savepoints.dir,
> hdfs://localhost:9000/flink-1.11.0/flink-savepoints
> 2020-08-12 13:07:20,821 INFO 
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.execution.failover-strategy, region
> 2020-08-12 13:07:21,198 INFO 
> org.apache.flink.table.client.config.entries.ExecutionEntry  [] - Property
> 'execution.restart-strategy.type' not specified. Using default value:
> fallback
> 2020-08-12 13:07:22,099 INFO 
> org.apache.flink.table.client.gateway.local.ExecutionContext [] - Executor
> config: {taskmanager.memory.process.size=1728m,
> jobmanager.execution.failover-strategy=region,
> jobmanager.rpc.address=localhost, execution.target=remote,
> jobmanager.memory.process.size=1600m,
> state.savepoints.dir=hdfs://localhost:9000/flink-1.11.0/flink-savepoints,
> jobmanager.rpc.port=6123, execution.savepoint.ignore-unclaimed-state=false,
> execution.attached=true, execution.shutdown-on-attached-exit=false,
> pipeline.jars=[file:/data1/home/xxx/flink-demo/flink-1.11.0/opt/flink-sql-client_2.11-1.11.0.jar],
> parallelism.default=5, taskmanager.numberOfTaskSlots=10,
> pipeline.classpaths=[]}
> 2020-08-12 13:07:22,286 INFO  org.apache.flink.table.client.cli.CliClient 
> 
> [] - Command history file path: /root/.flink-sql-history
> 2020-08-12 13:07:46,637 INFO 
> org.apache.flink.table.client.gateway.local.ProgramDeployer  [] - Submitting
> job org.apache.flink.streaming.api.graph.StreamGraph@41a16eb3 for query
> default: INSERT into test_mirror_es SELECT * from test where id >0`
> 2020-08-12 13:07:46,709 INFO  org.apache.flink.configuration.Configuration
> 
> [] - Config uses fallback configuration key 'jobmanager.rpc.address' instead
> of key 'rest.address'
> 2020-08-12 13:10:17,512 INFO 
> org.apache.flink.table.client.gateway.local.ProgramDeployer  [] - Submitting
> job org.apache.flink.streaming.api.graph.StreamGraph@3ff8a3ad for query
> default: INSERT into test_mirror_es SELECT * from test where id >0`
> 2020-08-12 13:10:17,516 INFO  org.apache.flink.configuration.Configuration
> 
> [] - Config uses fallback configuration key 'jobmanager.rpc.address' instead
> of key 'rest.address'
> 2020-08-12 13:10:38,360 WARN  org.apache.flink.table.client.cli.CliClient 
> 
> [] - Could not execute SQL statement.
> org.apache.flink.table.client.gateway.SqlExecutionException: Invalidate SQL
> statement.
>   at
> org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:99)
> ~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
>   at

flink sql client:cdc 至elasticsearch 有问题(版本1.11.0),提交任务成功,但web dashbord上却看不到任务!求看是不是bug?还是我配置有问题?

2020-08-11 文章 jindy_liu
建表如下:

CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED 
) WITH (
 'connector'='kafka',
 'topic'='test',
 'properties.group.id'='c_mysql_binlog_es',
 'properties.bootstrap.servers'='localhost:9092',
 'scan.startup.mode'='latest-offset',
 'format'='canal-json',
 'canal-json.ignore-parse-errors'='true'
);


# 输出表至es
CREATE TABLE test_mirror_es (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED 
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://localhost:9200',
  'index' = 'test_mirror'
);

INSERT into test_mirror_es SELECT * from test where test.id >=0;

日志:Caused by: org.apache.flink.table.api.ValidationException: Unable to
create a source for reading table
'default_catalog.default_database.test_mirror_es'.

完整日志如下:


2020-08-12 13:07:20,815 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.rpc.address, localhost
2020-08-12 13:07:20,820 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.rpc.port, 6123
2020-08-12 13:07:20,820 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.memory.process.size, 1600m
2020-08-12 13:07:20,820 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: taskmanager.memory.process.size, 1728m
2020-08-12 13:07:20,820 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: taskmanager.numberOfTaskSlots, 10
2020-08-12 13:07:20,820 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: parallelism.default, 5
2020-08-12 13:07:20,821 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: state.savepoints.dir,
hdfs://localhost:9000/flink-1.11.0/flink-savepoints
2020-08-12 13:07:20,821 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.execution.failover-strategy, region
2020-08-12 13:07:21,198 INFO 
org.apache.flink.table.client.config.entries.ExecutionEntry  [] - Property
'execution.restart-strategy.type' not specified. Using default value:
fallback
2020-08-12 13:07:22,099 INFO 
org.apache.flink.table.client.gateway.local.ExecutionContext [] - Executor
config: {taskmanager.memory.process.size=1728m,
jobmanager.execution.failover-strategy=region,
jobmanager.rpc.address=localhost, execution.target=remote,
jobmanager.memory.process.size=1600m,
state.savepoints.dir=hdfs://localhost:9000/flink-1.11.0/flink-savepoints,
jobmanager.rpc.port=6123, execution.savepoint.ignore-unclaimed-state=false,
execution.attached=true, execution.shutdown-on-attached-exit=false,
pipeline.jars=[file:/data1/home/xxx/flink-demo/flink-1.11.0/opt/flink-sql-client_2.11-1.11.0.jar],
parallelism.default=5, taskmanager.numberOfTaskSlots=10,
pipeline.classpaths=[]}
2020-08-12 13:07:22,286 INFO  org.apache.flink.table.client.cli.CliClient   
  
[] - Command history file path: /root/.flink-sql-history
2020-08-12 13:07:46,637 INFO 
org.apache.flink.table.client.gateway.local.ProgramDeployer  [] - Submitting
job org.apache.flink.streaming.api.graph.StreamGraph@41a16eb3 for query
default: INSERT into test_mirror_es SELECT * from test where id >0`
2020-08-12 13:07:46,709 INFO  org.apache.flink.configuration.Configuration  
  
[] - Config uses fallback configuration key 'jobmanager.rpc.address' instead
of key 'rest.address'
2020-08-12 13:10:17,512 INFO 
org.apache.flink.table.client.gateway.local.ProgramDeployer  [] - Submitting
job org.apache.flink.streaming.api.graph.StreamGraph@3ff8a3ad for query
default: INSERT into test_mirror_es SELECT * from test where id >0`
2020-08-12 13:10:17,516 INFO  org.apache.flink.configuration.Configuration  
  
[] - Config uses fallback configuration key 'jobmanager.rpc.address' instead
of key 'rest.address'
2020-08-12 13:10:38,360 WARN  org.apache.flink.table.client.cli.CliClient   
  
[] - Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Invalidate SQL
statement.
at
org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:99)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.client.cli.SqlCommandParser.parse(SqlCommandParser.java:90)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:257)
[flink-sql-client_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
[flink-sql-client_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)
[flink-sql-client_2.11-1.11.0.jar:1.11.0]
at 

Re: Re:Re: flink 1.11 StreamingFileWriter及sql-client问题

2020-08-11 文章 Jingsong Li
你的source是exactly-once的source吗?

in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?

On Wed, Aug 12, 2020 at 12:51 PM kandy.wang  wrote:

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> >@ Jingsong
>
> >导致的影响是停止前的那个分区,分区没有提交, 当程序起来之后,写的分区和之前分区不是同一个分区,没有_SUCCESS文件标记。
> 用presto查询查不了
> 举例:12:35分钟应当写的是 12:35 00秒 -12:39分 59秒 之间的数据,
>  'sink.partition-commit.trigger'='process-time',
>   'sink.partition-commit.delay'='0 min',
>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>   'sink.rolling-policy.check-interval'='30s',
>   'sink.rolling-policy.rollover-interval'='10min',
>   'sink.rolling-policy.file-size'='128MB'
>如果是12:39分 05秒左右做一次savepoint,然后
> 12:41分程序重启后,发现之前的12:35分区不再写入,里面的in-progress文件还在,但是分区没有提交,没有往hive add
> partition,就导致有数据,但是确查不 了。
> 按照你说的,in-progress文件对没影响,但是影响了分区提交。就没地方触发之前12:35分区提交逻辑了。相当于丢了一个分区。这种情况我试了一下,手动add
> partition 也能查了。
> >
> >
> >
> >在 2020-08-12 12:11:53,"Jingsong Li"  写道:
> >>in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响
> >>
> >>On Wed, Aug 12, 2020 at 11:05 AM Jark Wu  wrote:
> >>
> >>> 与我所知,(2) & (3) 有希望能在 1.12 中支持。
> >>>
> >>> On Tue, 11 Aug 2020 at 21:15, kandy.wang  wrote:
> >>>
> >>> > 1.StreamingFileWriter 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
> >>> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在
> >>> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm
> >>> > =2100分区的数据还存在很多的in-progress文件。
> >>> > 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。
> >>> >
> >>> >
> >>> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持
> >>> >
> >>> >
> >>> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持
> >>>
> >>
> >>
> >>--
> >>Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


Re:Re:Re: flink 1.11 StreamingFileWriter及sql-client问题

2020-08-11 文章 kandy.wang


















>@ Jingsong

>导致的影响是停止前的那个分区,分区没有提交, 当程序起来之后,写的分区和之前分区不是同一个分区,没有_SUCCESS文件标记。 用presto查询查不了
举例:12:35分钟应当写的是 12:35 00秒 -12:39分 59秒 之间的数据,
 'sink.partition-commit.trigger'='process-time',
  'sink.partition-commit.delay'='0 min',
  'sink.partition-commit.policy.kind'='metastore,success-file,custom',
  'sink.rolling-policy.check-interval'='30s',
  'sink.rolling-policy.rollover-interval'='10min',
  'sink.rolling-policy.file-size'='128MB'
   如果是12:39分 05秒左右做一次savepoint,然后 
12:41分程序重启后,发现之前的12:35分区不再写入,里面的in-progress文件还在,但是分区没有提交,没有往hive add 
partition,就导致有数据,但是确查不 了。 
按照你说的,in-progress文件对没影响,但是影响了分区提交。就没地方触发之前12:35分区提交逻辑了。相当于丢了一个分区。这种情况我试了一下,手动add
 partition 也能查了。
>
>
>
>在 2020-08-12 12:11:53,"Jingsong Li"  写道:
>>in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响
>>
>>On Wed, Aug 12, 2020 at 11:05 AM Jark Wu  wrote:
>>
>>> 与我所知,(2) & (3) 有希望能在 1.12 中支持。
>>>
>>> On Tue, 11 Aug 2020 at 21:15, kandy.wang  wrote:
>>>
>>> > 1.StreamingFileWriter 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
>>> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在
>>> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm
>>> > =2100分区的数据还存在很多的in-progress文件。
>>> > 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。
>>> >
>>> >
>>> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持
>>> >
>>> >
>>> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持
>>>
>>
>>
>>-- 
>>Best, Jingsong Lee


Re:Re: flink 1.11 StreamingFileWriter及sql-client问题

2020-08-11 文章 kandy.wang






@ Jingsong
导致的影响是停止前的那个分区,分区没有提交, 当程序起来之后,写的分区和之前分区不是同一个分区,没有_SUCCESS文件标记。 用presto查询查不了




在 2020-08-12 12:11:53,"Jingsong Li"  写道:
>in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响
>
>On Wed, Aug 12, 2020 at 11:05 AM Jark Wu  wrote:
>
>> 与我所知,(2) & (3) 有希望能在 1.12 中支持。
>>
>> On Tue, 11 Aug 2020 at 21:15, kandy.wang  wrote:
>>
>> > 1.StreamingFileWriter 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
>> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在
>> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm
>> > =2100分区的数据还存在很多的in-progress文件。
>> > 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。
>> >
>> >
>> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持
>> >
>> >
>> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持
>>
>
>
>-- 
>Best, Jingsong Lee


当数据源产生的速度过快时使用AscendingTimestampExtractor并没有忽略非递增的事件时间数据

2020-08-11 文章 SSY

 
数据源如上图所示,6行3列。这里的逻辑是以第三列为事件事件,采用滚动窗口(10s),统计窗口内最大的第一列的时间(PS:第一列数据这里故意设置成升序),输出为“窗口内最大的第一列时间所在行的第二列的值,窗口内元素的个数”。如果一切正常,我认为结果应该是
2,2
2,5
我是用kafka来发送数据源,当发送速率为100毫秒每条数据时,结果和预期相符,如下图

 
但是当发送速率为1毫秒每条数据时,发现红框内的数据也被包含进来了(即第二列是4的那条数据源,本来应该被忽略),如下图

 
这样看来数据发送的速率不同也会影响最后的结果吗?请问这是什么原因呢?
部分程序代码如下:

 

 

 

 






--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 StreamingFileWriter及sql-client问题

2020-08-11 文章 Jingsong Li
in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响

On Wed, Aug 12, 2020 at 11:05 AM Jark Wu  wrote:

> 与我所知,(2) & (3) 有希望能在 1.12 中支持。
>
> On Tue, 11 Aug 2020 at 21:15, kandy.wang  wrote:
>
> > 1.StreamingFileWriter 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在
> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm
> > =2100分区的数据还存在很多的in-progress文件。
> > 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。
> >
> >
> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持
> >
> >
> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持
>


-- 
Best, Jingsong Lee


Re: Flink SQL CDC 的实时数据同步方案,替代debezium的问题

2020-08-11 文章 陈韬
好的,刚刚有复习了下视频,理解了。谢谢


Best,
TonyChen

> 2020年8月12日 上午11:00,Jark Wu  写道:
> 
> 总结一下:
> 
> 1. 如果你已经有 Debezium (Canal) + Kafka 的采集层 (E),可以使用 Flink 作为计算层 (T) 和传输层 (L)
>Demo1: Flink SQL CDC + JDBC Connector
> 
> 2. 也可以用 Flink 替代 Debezium (Canal),由 Flink 同步变更数据到 Kafka,Flink 统一 ETL 流程
>Demo3: Streaming Changes to Kafka
> 
> 3. 如果不需要 Kafka 数据缓存,可以由 Flink 直接同步变更数据到目的地,Flink 统一 ETL 流程
>Demo2: CDC Streaming ETL
> 
> 
> On Wed, 12 Aug 2020 at 10:59, Jark Wu  wrote:
> 
>> Hi,
>> 
>> 这个就是 Demo3: Streaming Changes to Kafka 哈~
>> 
>> On Wed, 12 Aug 2020 at 09:00, 陈韬  wrote:
>> 
>>> Hi everyone,
>>> 
>>> 上周云邪的分享中,在总结FLink CDC应用场景时的第二部分,谈到“也可以用 Flink 替代
>>> Debezium(Canal),由Flink同步变更少恒句到kafka,Flink统一ETL流程”
>>> 
>>> 这部分后面没有DEMO,官方文档中也没有相应的说明,哪位大佬用过,能给个思路吗,谢谢
>>> 
>>> 
>>> Best,
>>> TonyChen
>>> 
>>> 



Re: flink 1.11 StreamingFileWriter及sql-client问题

2020-08-11 文章 Jark Wu
与我所知,(2) & (3) 有希望能在 1.12 中支持。

On Tue, 11 Aug 2020 at 21:15, kandy.wang  wrote:

> 1.StreamingFileWriter 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
>举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在
> 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm
> =2100分区的数据还存在很多的in-progress文件。
> 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。
>
>
> 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持
>
>
> 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持


Re: 关于flink sql的内置函数实现与debug

2020-08-11 文章 Benchao Li
Hi,

内置的scalar
function都是通过代码生成来关联到的,入口是`ExprCodeGenerator#generateCallExpression(...)`,
你可以顺着这里找到你需要看的具体的函数的对应的方法。
PS:有很多方法是纯代码生成的,可能没法调试

内置的aggregate function有两种,一种是通过表达式直接写的,叫做`DeclarativeAggregateFunction`;
一种是通过类似于UDAF的方式来实现的,继承的是`AggregateFunction`
他们都在`org.apache.flink.table.planner.functions.aggfunctions`
包里面(flink-table-planner-blink模块)

shizk233  于2020年8月12日周三 上午10:39写道:

> hi all,
>
> 请教一下,flink sql内置的众多functions[1]有对应的Java实现类吗?我只在blink table
> planner模块下的functions package里找到了一部分,并且是基于Expresstion的。
>
> 问题来源:我试图在flink
> sql里去做debug,如果是自定义的udf可以打断点在实现上,但内置函数没找到相应的实现,似乎也没有相应的文档在这一块。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html
>


-- 

Best,
Benchao Li


Re: 关于flink sql的内置函数实现与debug

2020-08-11 文章 Jark Wu
Flink 内置函数的有一些是直接 codegen 出来的,有一些是调用的 util。
对于前者,得去看org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens。
对于后者,可以看下 org.apache.flink.table.planner.codegen.calls.BuiltInMethods.

Best,
Jark

On Wed, 12 Aug 2020 at 10:39, shizk233  wrote:

> hi all,
>
> 请教一下,flink sql内置的众多functions[1]有对应的Java实现类吗?我只在blink table
> planner模块下的functions package里找到了一部分,并且是基于Expresstion的。
>
> 问题来源:我试图在flink
> sql里去做debug,如果是自定义的udf可以打断点在实现上,但内置函数没找到相应的实现,似乎也没有相应的文档在这一块。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html
>


Re: Flink SQL CDC 的实时数据同步方案,替代debezium的问题

2020-08-11 文章 Jark Wu
总结一下:

1. 如果你已经有 Debezium (Canal) + Kafka 的采集层 (E),可以使用 Flink 作为计算层 (T) 和传输层 (L)
Demo1: Flink SQL CDC + JDBC Connector

2. 也可以用 Flink 替代 Debezium (Canal),由 Flink 同步变更数据到 Kafka,Flink 统一 ETL 流程
Demo3: Streaming Changes to Kafka

3. 如果不需要 Kafka 数据缓存,可以由 Flink 直接同步变更数据到目的地,Flink 统一 ETL 流程
Demo2: CDC Streaming ETL


On Wed, 12 Aug 2020 at 10:59, Jark Wu  wrote:

> Hi,
>
> 这个就是 Demo3: Streaming Changes to Kafka 哈~
>
> On Wed, 12 Aug 2020 at 09:00, 陈韬  wrote:
>
>> Hi everyone,
>>
>> 上周云邪的分享中,在总结FLink CDC应用场景时的第二部分,谈到“也可以用 Flink 替代
>> Debezium(Canal),由Flink同步变更少恒句到kafka,Flink统一ETL流程”
>>
>> 这部分后面没有DEMO,官方文档中也没有相应的说明,哪位大佬用过,能给个思路吗,谢谢
>>
>>
>> Best,
>> TonyChen
>>
>>


Re: Flink SQL CDC 的实时数据同步方案,替代debezium的问题

2020-08-11 文章 Jark Wu
Hi,

这个就是 Demo3: Streaming Changes to Kafka 哈~

On Wed, 12 Aug 2020 at 09:00, 陈韬  wrote:

> Hi everyone,
>
> 上周云邪的分享中,在总结FLink CDC应用场景时的第二部分,谈到“也可以用 Flink 替代
> Debezium(Canal),由Flink同步变更少恒句到kafka,Flink统一ETL流程”
>
> 这部分后面没有DEMO,官方文档中也没有相应的说明,哪位大佬用过,能给个思路吗,谢谢
>
>
> Best,
> TonyChen
>
>


Re: Table api son schema

2020-08-11 文章 Jark Wu
目前建议使用 DDL ,不建议使用 tEnv.connect() 方法。
如果想声明 map 类型,在 SQL 中通过  MAP 来声明。

Best,
Jark

On Wed, 12 Aug 2020 at 10:37, Benchao Li  wrote:

> 看起来你这个case直接用SQL是可以搞定的。我对table api不太了解,你可以尝试下直接用SQL的DDL~
>
> Zhao,Yi(SEC)  于2020年8月11日周二 下午3:40写道:
>
> >
> >
> >
> >
> > 如上图,field api被标注过期。替换写法被注释掉,使用*注视掉的写法*会报错如下。
> >
> >
> >
> > Exception in thread "main" org.apache.flink.table.api.TableException: A
> > raw type backed by type information has no serializable string
> > representation. It needs to be resolved into a proper raw type.
> >
> >at
> >
> org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:97)
> >
> >at org.apache.flink.table.descriptors.Schema.field(Schema.java:88)
> >
> >at jobs.IpGapUserFt2.main(IpGapUserFt2.java:83)
> >
> >
> >
> > 不清楚有啥解决方法吗?
> >
> >
> >
> >
> >
> > 其次,我这边鼓捣了半天,发现使用json schema貌似也没办法实现整个表的动态结构。
> >
> > 我业务中json实际如下:
> >
> > {
> >
> > “d”:{
> >
> >   “key”: value
> >
> > …. … .  .. ..// 此处key动态扩展
> >
> > }
> >
> > }
> >
> > 我大概想法是d作为一个field,类型是map(但好像不支持map?必须用row)。用row呢,又必须指定其所有field,就不是动态扩展字段了。
> >
>
>
> --
>
> Best,
> Benchao Li
>


关于flink sql的内置函数实现与debug

2020-08-11 文章 shizk233
hi all,

请教一下,flink sql内置的众多functions[1]有对应的Java实现类吗?我只在blink table
planner模块下的functions package里找到了一部分,并且是基于Expresstion的。

问题来源:我试图在flink sql里去做debug,如果是自定义的udf可以打断点在实现上,但内置函数没找到相应的实现,似乎也没有相应的文档在这一块。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html


Re: Table api son schema

2020-08-11 文章 Benchao Li
看起来你这个case直接用SQL是可以搞定的。我对table api不太了解,你可以尝试下直接用SQL的DDL~

Zhao,Yi(SEC)  于2020年8月11日周二 下午3:40写道:

>
>
>
>
> 如上图,field api被标注过期。替换写法被注释掉,使用*注视掉的写法*会报错如下。
>
>
>
> Exception in thread "main" org.apache.flink.table.api.TableException: A
> raw type backed by type information has no serializable string
> representation. It needs to be resolved into a proper raw type.
>
>at
> org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:97)
>
>at org.apache.flink.table.descriptors.Schema.field(Schema.java:88)
>
>at jobs.IpGapUserFt2.main(IpGapUserFt2.java:83)
>
>
>
> 不清楚有啥解决方法吗?
>
>
>
>
>
> 其次,我这边鼓捣了半天,发现使用json schema貌似也没办法实现整个表的动态结构。
>
> 我业务中json实际如下:
>
> {
>
> “d”:{
>
>   “key”: value
>
> …. … .  .. ..// 此处key动态扩展
>
> }
>
> }
>
> 我大概想法是d作为一个field,类型是map(但好像不支持map?必须用row)。用row呢,又必须指定其所有field,就不是动态扩展字段了。
>


-- 

Best,
Benchao Li


Re: flink 1.11 使用sql将流式数据写入hive

2020-08-11 文章 Jark Wu
你是在 IDE 中直接执行的吗? 还是打包成 jar 包在集群上运行的呢?
如果是在 IDE 中执行,确实会有这个现象。原因是 tEnv.executeSql() 在执行 insert into
语句的时候,提交成功后就返回了,不会等待执行结束。
所以如果要在 IDE 中等待执行结束,需要额外增加等待代码:

val tableResult = tEnv.executeSql("insert into ...")
// wait job finished
tableResult.getJobClient.get()
  .getJobExecutionResult(Thread.currentThread().getContextClassLoader)
  .get()


Best,
Jark

On Tue, 11 Aug 2020 at 15:20, liya...@huimin100.cn 
wrote:

> 下面粘的就是主程序代码
>
> 能在hive里建表,创建的TemporaryView也有数据,但是tEnv.executeSql(insertSql)这块好像没执行,往新建的hive表里插入数据没反应。求助
>
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv,
> bsSettings);DataStream dataStream = bsEnv.addSource(new
> MySource());//构造hive catalog
> String name = "myhive";
> String defaultDatabase = "default";
> String hiveConfDir = "D:\\demo\\flink-hive\\src\\main\\resources"; // a
> local path
> String version = "1.1.0";
>
> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir,
> version);
> tEnv.registerCatalog("myhive", hive);
> tEnv.useCatalog("myhive");
> tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>
> tEnv.createTemporaryView("users", dataStream);
>
> Table result3= tEnv.sqlQuery("SELECT userId, amount, DATE_FORMAT(ts,
> '-MM-dd') ymd, DATE_FORMAT(ts, 'HH') h, DATE_FORMAT(ts, 'mm') m FROM
> users");
>
>
> tEnv.toRetractStream(result3, TypeInformation.of(new
> TypeHint>(){})).print("res");//
> 如果hive中已经存在了相应的表,则这段代码省略
> //String hiveSql = "CREATE TABLE fs_table (\n" +
> // "  user_id STRING,\n" +
> // "  order_amount DOUBLE \n" +
> // ") partitioned by (dt string,h string,m string) \n"
> +
> // "stored as textfile \n" +
> // "TBLPROPERTIES (\n" +
> // "
> 'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" +
> // "  'sink.partition-commit.delay'='0s',\n" +
> // "
> 'sink.partition-commit.trigger'='partition-time',\n" +
> // "  'sink.partition-commit.policy.kind'='metastore'"
> +
> // ")";
> //tEnv.executeSql(hiveSql);
>
> String insertSql = "insert into table fs_table partition (dt,h,m)
> SELECT userId, amount, DATE_FORMAT(ts, '-MM-dd') dt, DATE_FORMAT(ts,
> 'HH') h, DATE_FORMAT(ts, 'mm') m FROM users";
>
> tEnv.executeSql(insertSql);
>
> bsEnv.execute("test");
>
>
> liya...@huimin100.cn
>


?????? flink1.11 es connector

2020-08-11 文章 jacky-cui
??QQ??QQ


----
??: 
   "user-zh"

https://issues.apache.org/jira/browse/FLINK-16713
 gt; gt; lookup
 gt; gt;
 gt; gt; Best,
 gt; gt; Jark
 gt; gt;
 gt; gt; On Thu, 6 Aug 2020 at 11:20, Dream- 


Re: flink1.11 sql使用问题

2020-08-11 文章 魏烽
您好:

我使用的场景是连接postgreps 
catalog,直接对postgrep的表进行操作,但是只能有最简单的count操作,其它sql操作都无法实现,感谢!

 原始邮件
发件人: Shengkai Fang
收件人: user-zh
发送时间: 2020年8月11日(周二) 22:54
主题: Re: flink1.11 sql使用问题


能展示下完整的例子吗? 我用的时候没有这种情况。

魏烽 mailto:weif...@nequal.com>>于2020年8月11日 周二下午10:27写道:

> 各位大佬好:
>
> 在使用flink1.11 sql客户端的时候,只能只用最基本的count,group by、order
> by、join等都无法实现,请问这个是什么原因呢,感谢!
>
>
> Flink SQL> select count(t2.superid) from cdp_profile_union t1 inner join
> cdp_crowd_10002 t2 on t1.superid=t2.superid;
>
> [ERROR] Could not execute SQL statement. Reason:
>
> org.apache.flink.table.api.ValidationException: SQL validation failed. null
>
>
> Flink SQL> select superid from cdp_profile_union group by superid;
>
> [ERROR] Could not execute SQL statement. Reason:
>
> org.apache.flink.table.api.ValidationException: SQL validation failed. null
>



Flink SQL CDC 的实时数据同步方案,替代debezium的问题

2020-08-11 文章 陈韬
Hi everyone,

上周云邪的分享中,在总结FLink CDC应用场景时的第二部分,谈到“也可以用 Flink 替代 
Debezium(Canal),由Flink同步变更少恒句到kafka,Flink统一ETL流程”

这部分后面没有DEMO,官方文档中也没有相应的说明,哪位大佬用过,能给个思路吗,谢谢


Best,
TonyChen



Re: flink 1.11 使用sql将流式数据写入hive

2020-08-11 文章 JasonLee
Hi

可以看下這個demo 
https://mp.weixin.qq.com/s/HqXaREr_NZbZ8lgu_yi7yA



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.11 sql使用问题

2020-08-11 文章 Shengkai Fang
能展示下完整的例子吗? 我用的时候没有这种情况。

魏烽 于2020年8月11日 周二下午10:27写道:

> 各位大佬好:
>
> 在使用flink1.11 sql客户端的时候,只能只用最基本的count,group by、order
> by、join等都无法实现,请问这个是什么原因呢,感谢!
>
>
> Flink SQL> select count(t2.superid) from cdp_profile_union t1 inner join
> cdp_crowd_10002 t2 on t1.superid=t2.superid;
>
> [ERROR] Could not execute SQL statement. Reason:
>
> org.apache.flink.table.api.ValidationException: SQL validation failed. null
>
>
> Flink SQL> select superid from cdp_profile_union group by superid;
>
> [ERROR] Could not execute SQL statement. Reason:
>
> org.apache.flink.table.api.ValidationException: SQL validation failed. null
>


flink1.11 sql使用问题

2020-08-11 文章 魏烽
各位大佬好:

在使用flink1.11 sql客户端的时候,只能只用最基本的count,group by、order by、join等都无法实现,请问这个是什么原因呢,感谢!


Flink SQL> select count(t2.superid) from cdp_profile_union t1 inner join 
cdp_crowd_10002 t2 on t1.superid=t2.superid;

[ERROR] Could not execute SQL statement. Reason:

org.apache.flink.table.api.ValidationException: SQL validation failed. null


Flink SQL> select superid from cdp_profile_union group by superid;

[ERROR] Could not execute SQL statement. Reason:

org.apache.flink.table.api.ValidationException: SQL validation failed. null


Re: Flink 1.10.1版本StreamingFileSink写入HDFS失败

2020-08-11 文章 Yu Li
Hi 王剑,

我认为你的分析是正确的,相关代码在超过lease的soft limit之后应该主动调用一下recover
lease的逻辑。你是否愿意提交一个patch来fix该问题?我在JIRA上也留言了,后续可以直接在JIRA上讨论。

另外,更正一下JIRA链接:https://issues.apache.org/jira/browse/FLINK-18592

Best Regards,
Yu


On Tue, 11 Aug 2020 at 15:16, Jian Wang  wrote:

> Hi all,
>
> 我们在使用flink-1.10.1 on YARN(版本:
> 3.0.0-cdh6.3.2)的时候,使用StreamingFileSink时遇到异常信息,详细信息如下:
>
> 代码部分:
>
> public static  StreamingFileSink build(String dir,
> BucketAssigner assigner, String prefix){
> return StreamingFileSink.forRowFormat(new Path(dir), new
> SimpleStringEncoder())
> .withRollingPolicy(
> DefaultRollingPolicy.builder()
> .withRolloverInterval(TimeUnit.HOURS.toMillis(2))
> .withInactivityInterval(TimeUnit.MINUTES.toMillis(10))
> .withMaxPartSize(1024L * 1024L * 1024L * 50) // Max 50GB
> .build()
> )
> .withBucketAssigner(assigner)
>
> .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix(prefix).build())
> .build();
> }
>
>
> 当任务执行一段时间后,会抛出异常:
>
>
> java.io.IOException: Problem while truncating file:
>
> hdfs:///home/2020-08-11/.home-69-71.inprogress.29cb86c7-a943-411f-aa22-6d12692ae523
> at
>
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:168)
> at
>
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:91)
> at
>
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:83)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:144)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:131)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:407)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:182)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:170)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:154)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:434)
> at
>
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> at
>
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> at
>
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
>
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
> Failed to TRUNCATE_FILE
>
> /home/2020-08-11/.shop_home_recommend-69-71.inprogress.29cb86c7-a943-411f-aa22-6d12692ae523
> for DFSClient_NONMAPREDUCE_-1035692182_1 on 10.131.79.228 because
> DFSClient_NONMAPREDUCE_-1035692182_1 is already the current lease holder.
> at
>
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522)
> at
>
> org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119)
> at
>
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091)
> at
>
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070)
> at
>
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669)
> at
>
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
>
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
> at 

flink 1.11 StreamingFileWriter及sql-client问题

2020-08-11 文章 kandy.wang
1.StreamingFileWriter 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
   举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在 21:04分左右的时候做一次checkpoint 
或savepoint,重启任务的时候,hm =2100分区的数据还存在很多的in-progress文件。 
另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。


2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持


3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持

Re: flink 1.11 使用sql将流式数据写入hive

2020-08-11 文章 godfrey he
 tEnv.executeSql(insertSql); 是异步提交完任务就返回了,
如果是IDE里运行的话话,进程就直接退出导致job也就结束了。需要需要等到job结束,
目前可以通过下面这种方式
TableResult result = tEnv.executeSql(insertSql);
result..getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();

另外  tEnv.executeSql(insertSql); 已经提交作业了,不需要调用  bsEnv.execute("test");

liya...@huimin100.cn  于2020年8月11日周二 下午3:20写道:

> 下面粘的就是主程序代码
>
> 能在hive里建表,创建的TemporaryView也有数据,但是tEnv.executeSql(insertSql)这块好像没执行,往新建的hive表里插入数据没反应。求助
>
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv,
> bsSettings);DataStream dataStream = bsEnv.addSource(new
> MySource());//构造hive catalog
> String name = "myhive";
> String defaultDatabase = "default";
> String hiveConfDir = "D:\\demo\\flink-hive\\src\\main\\resources"; // a
> local path
> String version = "1.1.0";
>
> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir,
> version);
> tEnv.registerCatalog("myhive", hive);
> tEnv.useCatalog("myhive");
> tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>
> tEnv.createTemporaryView("users", dataStream);
>
> Table result3= tEnv.sqlQuery("SELECT userId, amount, DATE_FORMAT(ts,
> '-MM-dd') ymd, DATE_FORMAT(ts, 'HH') h, DATE_FORMAT(ts, 'mm') m FROM
> users");
>
>
> tEnv.toRetractStream(result3, TypeInformation.of(new
> TypeHint>(){})).print("res");//
> 如果hive中已经存在了相应的表,则这段代码省略
> //String hiveSql = "CREATE TABLE fs_table (\n" +
> // "  user_id STRING,\n" +
> // "  order_amount DOUBLE \n" +
> // ") partitioned by (dt string,h string,m string) \n"
> +
> // "stored as textfile \n" +
> // "TBLPROPERTIES (\n" +
> // "
> 'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" +
> // "  'sink.partition-commit.delay'='0s',\n" +
> // "
> 'sink.partition-commit.trigger'='partition-time',\n" +
> // "  'sink.partition-commit.policy.kind'='metastore'"
> +
> // ")";
> //tEnv.executeSql(hiveSql);
>
> String insertSql = "insert into table fs_table partition (dt,h,m)
> SELECT userId, amount, DATE_FORMAT(ts, '-MM-dd') dt, DATE_FORMAT(ts,
> 'HH') h, DATE_FORMAT(ts, 'mm') m FROM users";
>
> tEnv.executeSql(insertSql);
>
> bsEnv.execute("test");
>
>
> liya...@huimin100.cn
>


Re: 请教关闭部署的问题

2020-08-11 文章 Yang Wang
如果你是部署在Yarn上面的话,可以把依赖和配置单独拆出来,然后使用--yarnship功能
把它们上传到集群里面

在JobManager和TaskManager起来以后,依赖jars和配置会自动加入到classpath,因此
可以使用this.getClassLoader().getResource("conf.properties")来读取配置文件


Best,
Yang

 于2020年8月11日周二 上午11:16写道:

> 部署的能不能把依赖和配置文件单独出来指定,而不是打成一个jar,如果可以具体要怎么做?
>
> 发自我的iPhone


Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-08-11 文章 Benchao Li
Hi,

目前Json Format的实现就是假设json最外层是一个json object,暂时还无法做到顶层的所有字段无限扩展。

如果是在SQL里面,可以直接定义成map类型就可以,比如:
```SQL
CREATE TABLE source (
  d MAP
) WITH (...)
```

Zhao,Yi(SEC)  于2020年8月11日周二 下午4:58写道:

> 刚刚进一步发现一个方法可以做动态扩展的类型。代码如下:
>
> 这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。
> 此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢?
>
> stEnv.connect(
> new Kafka()
> .properties(TestKafkaUtils.getKafkaProperties())
> .version("universal")
> .topic("test")
> .startFromLatest()
> ).withFormat(new Json()
> .failOnMissingField(false)
> ).withSchema(
> new Schema()
> .field("d", TypeInformation.of(Map.class))
> ).inAppendMode().createTemporaryTable("t");
>
> 其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW',
> 'ANY'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢?
> root
> |-- d: LEGACY('RAW', 'ANY')
>
>
> 在 2020/8/11 下午4:23,“zhao liang” 写入:
>
> Hi,你图挂了,换个图床试试呢
>
> 发件人: Zhao,Yi(SEC) 
> 日期: 星期二, 2020年8月11日 16:04
> 收件人: user-zh@flink.apache.org 
> 主题: 关于FLinkSQL如何创建类json无限扩展的表结构问题
> 刚刚进一步发现一个方法可以做动态扩展的类型。代码如下:
> [cid:image001.png@01D66FF8.F697E2D0]
> 这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。
> 此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢?
>
>
> 其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW',
> 'ANY'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢?
> root
> |-- d: LEGACY('RAW', 'ANY')
>
>
>

-- 

Best,
Benchao Li


Re: Table api son schema

2020-08-11 文章 Danny Chan
Flink 1.11 支持 RAW 作为 sql 类型,基于此,你可以自己扩展 SE/DE 的逻辑实现部分动态解析[1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw

Best,
Danny Chan
在 2020年8月11日 +0800 PM3:40,Zhao,Yi(SEC) ,写道:
> <>
>
>
> 如上图,field api被标注过期。替换写法被注释掉,使用注视掉的写法会报错如下。
>
> Exception in thread "main" org.apache.flink.table.api.TableException: A raw 
> type backed by type information has no serializable string representation. It 
> needs to be resolved into a proper raw type.
>    at 
> org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:97)
>    at org.apache.flink.table.descriptors.Schema.field(Schema.java:88)
>    at jobs.IpGapUserFt2.main(IpGapUserFt2.java:83)
>
> 不清楚有啥解决方法吗?
>
>
> 其次,我这边鼓捣了半天,发现使用json schema貌似也没办法实现整个表的动态结构。
> 我业务中json实际如下:
> {
> “d”:{
>   “key”: value
> …. … .  .. ..// 此处key动态扩展
> }
> }
> 我大概想法是d作为一个field,类型是map(但好像不支持map?必须用row)。用row呢,又必须指定其所有field,就不是动态扩展字段了。


Flink SQL No Watermark

2020-08-11 文章 forideal
大家好,请教一个问题


   我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 
watermark。消费大量的数据的时候,就无法生成watermark。
   一直是No Watermark。 暂时找不到排查问题的思路。
  Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 
EventTime mode 模式,Blink Planner。
|
No Watermark |
   SQL如下


  DDL:
   create table test(
   user_id varchar,
   action varchar,
   event_time TIMESTAMP(3),
   WATERMARK FOR event_time AS event_time - INTERVAL '10' 
SECOND
   ) with();


  DML:
insert into
  console
select
  user_id,
  f_get_str(bind_id) as id_list
from
  (
select
  action as bind_id,
  user_id,
  event_time
from
  (
SELECT
  user_id,
  action,
  PROCTIME() as proc_time,
  event_time
FROM
  test
  ) T
where
  user_id is not null
  and user_id <> ''
  and CHARACTER_LENGTH(user_id) = 24
  ) T
group by
  SESSION(event_time, INTERVAL '10' SECOND),
  user_id
 
Best forideal


Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-08-11 文章 Zhao,Yi(SEC)
刚刚进一步发现一个方法可以做动态扩展的类型。代码如下:

这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。
此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢?

stEnv.connect(
new Kafka()
.properties(TestKafkaUtils.getKafkaProperties())
.version("universal")
.topic("test")
.startFromLatest()
).withFormat(new Json()
.failOnMissingField(false)
).withSchema(
new Schema()
.field("d", TypeInformation.of(Map.class))
).inAppendMode().createTemporaryTable("t");

其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW', 
'ANY'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢?
root
|-- d: LEGACY('RAW', 'ANY')


在 2020/8/11 下午4:23,“zhao liang” 写入:

Hi,你图挂了,换个图床试试呢

发件人: Zhao,Yi(SEC) 
日期: 星期二, 2020年8月11日 16:04
收件人: user-zh@flink.apache.org 
主题: 关于FLinkSQL如何创建类json无限扩展的表结构问题
刚刚进一步发现一个方法可以做动态扩展的类型。代码如下:
[cid:image001.png@01D66FF8.F697E2D0]
这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。
此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢?


其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW', 
'ANY'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢?
root
|-- d: LEGACY('RAW', 'ANY')




自定义PatternProcessFunction编译出错,type erasure相关。

2020-08-11 文章 lgs
Hi,

我定义了一个PatternProcessFunction,

  public static class MyPatternProcessFunction extends
PatternProcessFunction implements
TimedOutPartialMatchHandler {
@Override
public void processMatch(Map> pattern,
PatternProcessFunction.Context ctx, Collector out) {
  System.out.println ("pattern found");
  for (String key: pattern.keySet()) //iteration over keys  
  {  
//returns the value to which specified key is mapped  
ObjectNode event=pattern.get(key).get(0);   
// out.collect(event.get("value").get("flow_name").asText() + ", "
//  + event.get("value").get("component").asText() + ", "
//  + event.get("value").get("event_time").asText() + ", "
//  + event.get("value").get("filename").asText());   
// out.collect(key);
  } 
}

@Override
public void processTimedOutMatch(Map>
pattern, PatternProcessFunction.Context ctx) {
  System.out.println ("pattern found");
  for (String key: pattern.keySet()) //iteration over keys  
  {  
//returns the value to which specified key is mapped  
ObjectNode event=pattern.get(key).get(0);   
// ctx.output(event.get("value").get("flow_name").asText() + ", "
//  + event.get("value").get("component").asText() + ", "
//  + event.get("value").get("event_time").asText() + ", "
//  + event.get("value").get("filename").asText());  
// out.collect(key); 
  } 
}
  }

编译出错:

[ERROR]
/home/sysadmin/cepmonitor/src/main/java/org/apache/cepmonitor/StreamingJob.java:[451,17]
org.apache.flink.cepmonitor.StreamingJob.MyPatternProcessFunction is not
abstract and does not override abstract method
processMatch(java.util.Map>,org.apache.flink.cep.functions.PatternProcessFunction.Context,org.apache.flink.util.Collector)
in org.apache.flink.cep.functions.PatternProcessFunction
[ERROR]
/home/sysadmin/cepmonitor/src/main/java/org/apache/cepmonitor/StreamingJob.java:[453,17]
name clash:
processMatch(java.util.Map>,org.apache.flink.cep.functions.PatternProcessFunction.Context,org.apache.flink.util.Collector)
in org.apache.flink.cepmonitor.StreamingJob.MyPatternProcessFunction and
processMatch(java.util.Map>,org.apache.flink.cep.functions.PatternProcessFunction.Context,org.apache.flink.util.Collector)
in org.apache.flink.cep.functions.PatternProcessFunction have the same
erasure, yet neither overrides the other
[ERROR]
/home/sysadmin/cepmonitor/src/main/java/org/apache/cepmonitor/StreamingJob.java:[452,5]
method does not override or implement a method from a supertype

请问要怎么解决。我如果把processMatch改成processMatch(Map> pattern,... ), 运行flink job的时候又会报错:

Type of TypeVariable 'String' in 'class
org.apache.flink.cepmonitor.StreamingJob$MyPatternProcessFunction' could not
be determined. This is most likely a type erasure problem. The type
extraction currently supports types with generic variables only in cases
where all variables in the return type can be deduced from the input
type(s). Otherwise the type has to be specified explicitly using type
information.



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink任务大状态使用filesystem反压

2020-08-11 文章 Yang Peng
感谢唐云老师的回复,问题已经找到了,是我们切换了statebackend为filesystem时使用的state
清除策略配置有误导致的,这样设置清除策略cleanupIncrementally(1, true) 执行cp就会非常慢,后来我们修改成了
.cleanupIncrementally(100, false)这样执行cp速度就马上上来了,感谢各位的帮助


答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-08-11 文章 zhao liang
Hi,你图挂了,换个图床试试呢

发件人: Zhao,Yi(SEC) 
日期: 星期二, 2020年8月11日 16:04
收件人: user-zh@flink.apache.org 
主题: 关于FLinkSQL如何创建类json无限扩展的表结构问题
刚刚进一步发现一个方法可以做动态扩展的类型。代码如下:
[cid:image001.png@01D66FF8.F697E2D0]
这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。
此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢?


其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW', 
'ANY'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢?
root
|-- d: LEGACY('RAW', 'ANY')


关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-08-11 文章 Zhao,Yi(SEC)
刚刚进一步发现一个方法可以做动态扩展的类型。代码如下:
[cid:image001.png@01D66FF8.F697E2D0]
这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。
此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢?


其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW', 
'ANY'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢?
root
|-- d: LEGACY('RAW', 'ANY')


Flink 1.10 堆外内存一直在增加

2020-08-11 文章 ReignsDYL
各位好,
  Flink 1.10,集群在运行过程中,堆外内存一直在不断增加,内存就被慢慢耗尽,导致任务挂掉,请问是什么原因啊?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

??????flink 1.11 ????sql??????????????hive

2020-08-11 文章 liujian
??Hive??:
?? checkpoint
??  wartermark
?? ??,??..,,




----
??: 
   "user-zh"



Re: flink1.11 es connector

2020-08-11 文章 Dream-底限
hi
主要做实时特征,历史数据会做一次批量load,然后flink跑实时特征的计算,涉及到一些维度打算用双流关联+流表时态表关联,下游存储hbase/es/jdbc,先前想都用es,不过感觉es不是干这个的。。

jacky-cui <826885...@qq.com> 于2020年8月11日周二 下午2:33写道:

> Hi, 我这边现在也用es做报表分析,但是对于你提到的场景我有个点不是很明白。
> 你们的flink 在其中充当的是什么角色呢,是清洗出维度结果,然后写到es吗?还是?
> 我是直接做成大宽表,然后用DSL语句去查询这个大宽表,能cover大部分报表需求
>
>
> 祝好
> 崔黄飞
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> imj...@gmail.com;
> 发送时间:2020年8月10日(星期一) 中午12:36
> 收件人:"user-zh"
> 主题:Re: flink1.11 es connector
>
>
>
> 对 ES 研究不是很深入。 个人觉得是一个实用的场景。
>
> On Fri, 7 Aug 2020 at 09:50, Dream-底限 
>  hi、
> 
> 是的,大佬感觉用es做维表靠谱吗,这样就不用维护hbase二级索引了(或者用es存hbase二级索引,hbase存数据,但还是要用es充当一个维度)
> 
>  Jark Wu  
>   目前社区由一个 issue 在跟进 es source ,可以关注一下:
>   https://issues.apache.org/jira/browse/FLINK-16713
>   你想要的时态表查询,是想当成维表查询吗(lookup)?
>  
>   Best,
>   Jark
>  
>   On Thu, 6 Aug 2020 at 11:20, Dream-底限  wrote:
>  
>hi
>   
>   
>  
> 
> 我们这面想用es做时态表查询,但是flink没有报漏es源连接器,需要自己实现一个,请问大家对es做时态表这件事感觉靠谱吗(ps:之所以不用hbase是因为hbase的rowkey设计以及可能维护的二级索引比较麻烦,但hbase也是一个调研方案)
>   
>  
> 


Table api son schema

2020-08-11 文章 Zhao,Yi(SEC)
[cid:image001.png@01D66FF5.A3F680C0]


如上图,field api被标注过期。替换写法被注释掉,使用注视掉的写法会报错如下。

Exception in thread "main" org.apache.flink.table.api.TableException: A raw 
type backed by type information has no serializable string representation. It 
needs to be resolved into a proper raw type.
   at 
org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:97)
   at org.apache.flink.table.descriptors.Schema.field(Schema.java:88)
   at jobs.IpGapUserFt2.main(IpGapUserFt2.java:83)

不清楚有啥解决方法吗?


其次,我这边鼓捣了半天,发现使用json schema貌似也没办法实现整个表的动态结构。
我业务中json实际如下:
{
“d”:{
  “key”: value
…. … .  .. ..// 此处key动态扩展
}
}
我大概想法是d作为一个field,类型是map(但好像不支持map?必须用row)。用row呢,又必须指定其所有field,就不是动态扩展字段了。


flink 1.11 使用sql将流式数据写入hive

2020-08-11 文章 liya...@huimin100.cn
下面粘的就是主程序代码   
能在hive里建表,创建的TemporaryView也有数据,但是tEnv.executeSql(insertSql)这块好像没执行,往新建的hive表里插入数据没反应。求助

StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);DataStream dataStream = bsEnv.addSource(new 
MySource());//构造hive catalog
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "D:\\demo\\flink-hive\\src\\main\\resources"; // a local 
path
String version = "1.1.0";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tEnv.registerCatalog("myhive", hive);
tEnv.useCatalog("myhive");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

tEnv.createTemporaryView("users", dataStream);

Table result3= tEnv.sqlQuery("SELECT userId, amount, DATE_FORMAT(ts, 
'-MM-dd') ymd, DATE_FORMAT(ts, 'HH') h, DATE_FORMAT(ts, 'mm') m FROM 
users");


tEnv.toRetractStream(result3, TypeInformation.of(new 
TypeHint>(){})).print("res");//  
如果hive中已经存在了相应的表,则这段代码省略
//String hiveSql = "CREATE TABLE fs_table (\n" +
// "  user_id STRING,\n" +
// "  order_amount DOUBLE \n" +
// ") partitioned by (dt string,h string,m string) \n" +
// "stored as textfile \n" +
// "TBLPROPERTIES (\n" +
// "  'partition.time-extractor.timestamp-pattern'='$dt 
$h:$m:00',\n" +
// "  'sink.partition-commit.delay'='0s',\n" +
// "  'sink.partition-commit.trigger'='partition-time',\n" +
// "  'sink.partition-commit.policy.kind'='metastore'" +
// ")";
//tEnv.executeSql(hiveSql);

String insertSql = "insert into table fs_table partition (dt,h,m) 
SELECT userId, amount, DATE_FORMAT(ts, '-MM-dd') dt, DATE_FORMAT(ts, 'HH') 
h, DATE_FORMAT(ts, 'mm') m FROM users";

tEnv.executeSql(insertSql);

bsEnv.execute("test");


liya...@huimin100.cn


Flink 1.10.1版本StreamingFileSink写入HDFS失败

2020-08-11 文章 Jian Wang
Hi all,

我们在使用flink-1.10.1 on YARN(版本:
3.0.0-cdh6.3.2)的时候,使用StreamingFileSink时遇到异常信息,详细信息如下:

代码部分:

public static  StreamingFileSink build(String dir,
BucketAssigner assigner, String prefix){
return StreamingFileSink.forRowFormat(new Path(dir), new
SimpleStringEncoder())
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.HOURS.toMillis(2))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(10))
.withMaxPartSize(1024L * 1024L * 1024L * 50) // Max 50GB
.build()
)
.withBucketAssigner(assigner)

.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix(prefix).build())
.build();
}


当任务执行一段时间后,会抛出异常:


java.io.IOException: Problem while truncating file:
hdfs:///home/2020-08-11/.home-69-71.inprogress.29cb86c7-a943-411f-aa22-6d12692ae523
at
org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:168)
at
org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:91)
at
org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:83)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:144)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:131)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:407)
at
org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:182)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:170)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:154)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:434)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
Failed to TRUNCATE_FILE
/home/2020-08-11/.shop_home_recommend-69-71.inprogress.29cb86c7-a943-411f-aa22-6d12692ae523
for DFSClient_NONMAPREDUCE_-1035692182_1 on 10.131.79.228 because
DFSClient_NONMAPREDUCE_-1035692182_1 is already the current lease holder.
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522)
at
org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2675)

at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1499)
at org.apache.hadoop.ipc.Client.call(Client.java:1445)
at 

Re: flink-1.10.1 想用 DDL 入 ES5.6

2020-08-11 文章 Leonard Xu
Hi,
ES5 没有SQL jar,所以不支持,可以参考[1]  支持ES6 的sql connector 的实现,这是在1.11里支持的。

Best
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-1 
7027

> 在 2020年8月11日,11:18,kcz <573693...@qq.com> 写道:
> 
> 查看了文档之后,DDL只支持ES6以上,如果我想ES5也用,我需要从哪里着手修改下,使得它可以支持呢。ES5版本目前无法进行升级,很长一段时间内都无法进行升级。



?????? flink1.11 es connector

2020-08-11 文章 jacky-cui
Hi, ??es
??flink es??
??DSLcover??



??


----
??: 
   "user-zh"

https://issues.apache.org/jira/browse/FLINK-16713
  lookup
 
  Best,
  Jark
 
  On Thu, 6 Aug 2020 at 11:20, Dream- 

Re:请教:用flink实现实时告警的功能

2020-08-11 文章 RS
Hi,
你这个完全就是CEP的使用场景啊, 大于多少次, 大于一定数值组合起来判定事件, 
1. 规则变更了, 重启任务就行, 规则都变了, 任务重启也没影响
2. CEP支持规则组合, 时间窗口
3. 最佳实践官网的介绍就很合适
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/cep.html

在 2020-08-06 10:26:19,"samuel@ubtrobot.com"  写道:
>由于需要实时告警功能,经调研,用flink 来实现是比较合适,但有几个问题没搞清楚,请大神们指教,感谢!
>
>告警有分两部分:
>   一是告警规则的设置,数据存放在mysql,存储的格式是json
>{"times":5}  ---就是事件发生大于5次就发出告警;
>{"temperature": 80} ---就是温度大于80就告警;
>   二是告警实现
>  1)上报的数据写入到kafka
>  2)flink读取kafka的数据,然后通过翻滚窗口进行计算,如果满足规则就生产告警。
>
>
>现在遇到的问题是:
>1. 当规则变更时,如何及时生效?
>2.如果用flink CEP来是实现,对于同一数据源,能否在一个作业里让多个规则同时生效?
>3.这一功能有最佳实践吗?
>
>希望哪位解答一下,谢谢!
>   
>
>