Re: flink on yarn ha 高可用问题

2019-04-01 文章 Biao Liu
Hi,
这问题其实跟 Flink 无关,请了解下 HDFS nameservice。正确配置 HDFS 后,在 Flink 中填写路径时就不需要写死 name
node 地址了

天之痕 <575209...@qq.com> 于2019年4月2日周二 上午11:29写道:

> 请问该怎么处理,我目前在hadoop中配置了
>
> 
>
> fs.defaultFS
>
> hdfs://hacluster/
>  
>
>
>
> 
>
> dfs.client.failover.proxy.provider.hacluster
>
>
> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
>  
>
>
> hadoop中是能模拟namenode切换的
>
>
> 1.请问现在flink中怎么配置?目前flink的环境都安装了hadoop,导出了hadoop环境变量
> 2.如果要求客户端也要包含hdfs ha的配置,那是不是flink扩展的同时也到在对应的服务器上配置hadoop,同时进行hadoop扩展?
>
>
>
>
>
>
>
>
> -- 原始邮件 --
> 发件人: "Lin Li";
> 发送时间: 2019年4月2日(星期二) 上午9:47
> 收件人: "user-zh";
>
> 主题: Re: flink on yarn ha 高可用问题
>
>
>
> hdfs ha mode 下配置 logical name (
>
> https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithNFS.html
> )
> flink 中访问配置的 hdfs logical name (同时需要确保 flink task 运行环境 hdfs-client 中也包含了
> hdfs ha 配置的 hdfs logical name 和实际指向的 namedone 映射关系)
>
>
> 天之痕 <575209...@qq.com> 于2019年4月2日周二 上午9:07写道:
>
> > flink on yarn ha模式下
> > state.checkpoints.dir:
> > hdfs://namenode-host:port/flink-checkpoints这个配置怎么配置能进行namenode能自动切换
> >
> >
> 现在只能指定namenode的hostname,假设这个namenode节点挂了,或者standby状态,flink任务checkpoint就报错,只能把节点起来,而且要将其切换为active节点
> > 目前hadoop namenode已经进行ha部署了,手动kill
> 一个namenode进程能自动切换;暂时理解flink中这个配置貌似只能配死一个
> >
> >
> > 请问如何解决这个问题,保证flink的高可用


????????

2019-04-01 文章 1900
kafka1-25??
??


??keyby

?????? flink on yarn ha ??????????

2019-04-01 文章 ??????
hadoop


 
fs.defaultFS
 
hdfs://hacluster/
 




 
dfs.client.failover.proxy.provider.hacluster
 

org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
 


hadoop??namenode??


1.flinkflink??hadoophadoop
2.??hdfs 
haflink??hadoop??hadoop??








--  --
??: "Lin Li";
: 2019??4??2??(??) 9:47
??: "user-zh";

: Re: flink on yarn ha ??



hdfs ha mode ?? logical name (
https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithNFS.html
)
flink  hdfs logical name ( flink task  
hdfs-client ??
hdfs ha ?? hdfs logical name  namedone )


?? <575209...@qq.com> ??2019??4??2?? 9:07??

> flink on yarn ha??
> state.checkpoints.dir:
> hdfs://namenode-host:port/flink-checkpoints??namenode??
>
> namenode??hostname,namenode,standby,flinkcheckpointactive
> hadoop namenodehakill 
> namenodeflink??
>
>
> ??flink

????????

2019-04-01 文章 492341344


答复: flink-connector-redis连接器

2019-04-01 文章 戴嘉诚
源码里面是不支持expire, 你可以自己覆盖源码的接口,自定义方法

发件人: 周美娜
发送时间: 2019年4月1日 20:48
收件人: user-zh@flink.apache.org
主题: flink-connector-redis连接器

请问:flink 的redis connector作为sink时 不支持Expire命令吗?



Re: flink on yarn ha 高可用问题

2019-04-01 文章 Lin Li
hdfs ha mode 下配置 logical name (
https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithNFS.html
)
flink 中访问配置的 hdfs logical name (同时需要确保 flink task 运行环境 hdfs-client 中也包含了
hdfs ha 配置的 hdfs logical name 和实际指向的 namedone 映射关系)


天之痕 <575209...@qq.com> 于2019年4月2日周二 上午9:07写道:

> flink on yarn ha模式下
> state.checkpoints.dir:
> hdfs://namenode-host:port/flink-checkpoints这个配置怎么配置能进行namenode能自动切换
>
> 现在只能指定namenode的hostname,假设这个namenode节点挂了,或者standby状态,flink任务checkpoint就报错,只能把节点起来,而且要将其切换为active节点
> 目前hadoop namenode已经进行ha部署了,手动kill 一个namenode进程能自动切换;暂时理解flink中这个配置貌似只能配死一个
>
>
> 请问如何解决这个问题,保证flink的高可用


回复: HA切换

2019-04-01 文章 马 敬源
Hi,wuzhixin:
尝试改一下flink-conf.yaml 这个配置:

jobmanager.execution.failover-strategy: individual


来自 Outlook


发件人: wuzhixin 
发送时间: 2019年4月1日 16:37
收件人: user-zh@flink.apache.org
主题: HA切换

Hi all:
今天我们standalone的集群,使用zookeeper做了HA机制,但是今天因为zookeeper的一些原因,来了一次HA切换,然后
我们发现所有的job都重启了,请问这是标准处理么?
flink的这种机制是不是不太好


??????flink-connector-redis??????

2019-04-01 文章 ????
redis??


--  --
??: "??"<15957136...@163.com>;
: 2019??4??1??(??) 8:22
??: "user-zh";

: flink-connector-redis??



??flink ??redis connectorsink?? ??Expire

flink on yarn ha ??????????

2019-04-01 文章 ??????
flink on yarn ha??
state.checkpoints.dir: 
hdfs://namenode-host:port/flink-checkpoints??namenode??
namenode??hostname,namenode,standby,flinkcheckpointactive
hadoop namenodehakill 
namenodeflink??


??flink

flink-connector-redis连接器

2019-04-01 文章 周美娜
请问:flink 的redis connector作为sink时 不支持Expire命令吗?

Re: 【Flink SQL】无法启动env.yaml

2019-04-01 文章 Zhenghua Gao
format 和 schema 应该在同一层。
参考一下 flink-sql-client 测试里TableNumber1的配置文件: test-sql-client-defaults.yaml

*Best Regards,*
*Zhenghua Gao*


On Mon, Apr 1, 2019 at 4:09 PM 曾晓勇 <469663...@qq.com> wrote:

> @1543332...@qq.com
>  
> 谢谢,格式问题后面我检查了也已经调整正确了,直接从flink官网下载最新的版本在启动的时候报错,具体报错如下,目前想调试下能否将生产的个别脚本直接换成FLINKSQL
> 而不走java编程。如果走程序调整的量很大。
>  FLINK 版本:flink-1.7.2-bin-hadoop28-scala_2.11
>  启动命令:/home/hadoop/flink-1.7.2/bin/sql-client.sh embedded -e
> /home/hadoop/flink_test/env.yaml
>
>
> [hadoop@server2 bin]$ /home/hadoop/flink-1.7.2/bin/sql-client.sh embedded
> -e /home/hadoop/flink_test/env.yaml
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was
> set.
> No default environment specified.
> Searching for
> '/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml'...found.
> Reading default environment from:
> file:/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml
> Reading session environment from: file:/home/hadoop/flink_test/env.yaml
> Validating current environment...
>
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: The configured
> environment is invalid. Please check your environment files again.
> at
> org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:140)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:187)
> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> Could not create execution context.
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:488)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:316)
> at
> org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:137)
> ... 2 more
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.StreamTableSourceFactory' in
> the classpath.
>
>
> Reason:
> The matching factory
> 'org.apache.flink.table.sources.CsvAppendTableSourceFactory' doesn't
> support 'format.schema.#.type'.
>
>
> Supported properties of this factory are:
> connector.path
> connector.path
> format.comment-prefix
> format.field-delimiter
> format.fields.#.name
> format.fields.#.type
> format.ignore-first-line
> format.ignore-parse-errors
> format.line-delimiter
> format.quote-character
> schema.#.name
> schema.#.type
>
>
> The following properties are requested:
> connector.path=/home/hadoop/flink_test/input.csv
> connector.type=filesystem
> format.comment-prefix=#
> format.fields.0.name=MyField1
> format.fields.0.type=INT
> format.fields.1.name=MyField2
> format.fields.1.type=VARCHAR
> format.line-delimiter=\n
> format.schema.0.name=MyField1
> format.schema.0.type=INT
> format.schema.1.name=MyField2
> format.schema.1.type=VARCHAR
> format.type=csv
> update-mode=append
>
>
> The following factories have been considered:
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
>
>
> at
> org.apache.flink.table.factories.TableFactoryService$.filterBySupportedProperties(TableFactoryService.scala:277)
> at
> org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:136)
> at
> org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:100)
> at
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.scala)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:236)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$0(ExecutionContext.java:121)
> at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:119)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:484)
> ... 4 more
> [hadoop@server2 bin]$
>
>
>
>
>
>
>
>
>
>
>
> -- 调整后的格式问题。
> tables:
>  - name: MyTableSource
>type: source-table
>update-mode: append
>connector:
>  type: filesystem
>  path: "/home/hadoop/flink_test/input.csv"
>format:
> type: csv
> fields:
> - name: MyField1
>   type: INT
> - name: MyField2
>   type: VARCHAR
> line-delimiter: "\n"
> comment-prefix: "#"
> schema:
> - name: MyField1
>   type: INT
> - name: MyField2
>   type: VARCHAR
>  - name: MyCustomView
>type: view
>query: "SELECT MyField2 FROM MyTableSource"
> # Execution properties allow for changing the 

??????flink????????????????

2019-04-01 文章 ????
--  --
??: ""<1048095...@qq.com>;
: 2019??4??1??(??) 5:30
??: "";

: ??flink



??





--  --
??: ""<1048095...@qq.com>;
: 2019??4??1??(??) 5:28
??: "user-zh";

: flink



??flink??flink??/??tm??

??????flink????????????????

2019-04-01 文章 ????
??





--  --
??: ""<1048095...@qq.com>;
: 2019??4??1??(??) 5:28
??: "user-zh";

: flink



??flink??flink??/??tm??

flink????????????????

2019-04-01 文章 ????
??flink??flink??/??tm??

Re: flink ha hdfs目录权限问题

2019-04-01 文章 Yun Tang
怀疑你的HDFS有配置了默认用户hdfs,使得创建目录时,总会以hdfs的用户进行创建。检查一下YARN页面上运行Flink 
application的用户名,是不是root。最简单的workaround的方式就是按照[1] 里面描述的,配置环境变量 HADOOP_USER_NAME 
为 hdfs,这样你在用flink run命令行提交作业时以hdfs的用户名进行操作。

export HADOOP_USER_NAME=hdfs

[1] 
https://stackoverflow.com/questions/11371134/how-to-specify-username-when-putting-files-on-hdfs-from-a-remote-machine


From: 孙森 
Sent: Monday, April 1, 2019 16:16
To: user-zh@flink.apache.org
Subject: Re: flink ha hdfs目录权限问题

修改目录权限对已有的文件是生效的,新生成的目录还是没有写权限。

[root@hdp1 ~]# hadoop fs -ls /flink/ha
Found 15 items
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/0e950900-c00e-4f24-a0bd-880ba9029a92
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/42e61028-e063-4257-864b-05f46e121a4e
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/58465b44-1d38-4f46-a450-edc06d2f625f
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/61b6a5b8-1e11-4ac1-99e4-c4dce842aa38
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/931291f3-717c-4ccb-a622-0207037267a8
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:14 
/flink/ha/application_1553766783203_0026
drwxr-xr-x   - hdfs hdfs  0 2019-04-01 16:13 
/flink/ha/application_1553766783203_0028
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/b2d16faa-ae2e-4130-81b8-56eddb9ef317
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/bef09af0-6462-4c88-8998-d18f922054a1
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/bf486c37-ab44-49a1-bb66-45be4817773d
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/c07351fb-b2d8-4aec-801c-27a983ca3f32
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/d779d3a2-3ec8-4998-ae9c-9d93ffb7f265
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:12 
/flink/ha/dee74bc7-d450-4fb4-a9f2-4983d1f9949f
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/edd59fcf-8413-4ceb-92cf-8dcd637803f8
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/f6329551-56fb-4c52-a028-51fd838c4af6

> 在 2019年4月1日,下午4:02,Yun Tang  写道:
>
> Hi 孙森,
>
> 将提交用户root加到hadoop的hdfs用户组内,或者使用hadoop的hdfs用户提交程序[1],或者修改整个目录HDFS:///flink/ha的权限[2]
>  放开到任意用户应该可以解决问题,记得加上 -R ,保证对子目录都生效。
>
>
> [1] 
> https://stackoverflow.com/questions/11371134/how-to-specify-username-when-putting-files-on-hdfs-from-a-remote-machine
> [2] 
> https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-common/FileSystemShell.html#chmod
>
> 祝好
> 唐云
>
> 发件人: 孙森
> 发送时间: 4月1日星期一 15:50
> 主题: flink ha hdfs目录权限问题
> 收件人: user-zh@flink.apache.org
>
>
> Hi all :
> 我使用flink on yarn 模式启动flink,并且配置了高可用。当向flink 
> cluster提交job时,会出现permission 
> denied的异常。原因是HDFS:///flink/ha下创建的文件夹的权限都是755,没有写权限。所以每启动一个新的flink 
> cluster的时候,就会生成一个新的目录 
> ,比如:/flink/ha/application_1553766783203_0026。需要修改/flink/ha/application_1553766783203_0026的权限才能成功提交job。请问这个问题应该怎么解决呢?
>
> 异常信息如下:
> The program finished with the following exception:
>
> org.apache.flink.client.deployment.ClusterRetrieveException: Couldn't 
> retrieve Yarn cluster
>at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.retrieve(AbstractYarnClusterDescriptor.java:409)
>at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.retrieve(AbstractYarnClusterDescriptor.java:111)
>at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:253)
>at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>at java.security.AccessController.doPrivileged(Native Method)
>at javax.security.auth.Subject.doAs(Subject.java:422)
>at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.hadoop.security.AccessControlException: Permission 
> denied: user=root, access=WRITE, 
> inode="/flink/ha/application_1553766783203_0026/blob":hdfs:hdfs:drwxr-xr-x
>at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:353)
>at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:325)
>at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:246)
>at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
>at 
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1950)
>at 
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1934)
>at 
> 

????flink??????????????jb??tm????

2019-04-01 文章 ????
??flink??flink??/??tm??

HA切换

2019-04-01 文章 wuzhixin
Hi all:
今天我们standalone的集群,使用zookeeper做了HA机制,但是今天因为zookeeper的一些原因,来了一次HA切换,然后
我们发现所有的job都重启了,请问这是标准处理么?
flink的这种机制是不是不太好

Re: flink ha hdfs目录权限问题

2019-04-01 文章 孙森
修改目录权限对已有的文件是生效的,新生成的目录还是没有写权限。

[root@hdp1 ~]# hadoop fs -ls /flink/ha
Found 15 items
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/0e950900-c00e-4f24-a0bd-880ba9029a92
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/42e61028-e063-4257-864b-05f46e121a4e
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/58465b44-1d38-4f46-a450-edc06d2f625f
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/61b6a5b8-1e11-4ac1-99e4-c4dce842aa38
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/931291f3-717c-4ccb-a622-0207037267a8
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:14 
/flink/ha/application_1553766783203_0026
drwxr-xr-x   - hdfs hdfs  0 2019-04-01 16:13 
/flink/ha/application_1553766783203_0028
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/b2d16faa-ae2e-4130-81b8-56eddb9ef317
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/bef09af0-6462-4c88-8998-d18f922054a1
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/bf486c37-ab44-49a1-bb66-45be4817773d
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/c07351fb-b2d8-4aec-801c-27a983ca3f32
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/d779d3a2-3ec8-4998-ae9c-9d93ffb7f265
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:12 
/flink/ha/dee74bc7-d450-4fb4-a9f2-4983d1f9949f
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/edd59fcf-8413-4ceb-92cf-8dcd637803f8
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/f6329551-56fb-4c52-a028-51fd838c4af6

> 在 2019年4月1日,下午4:02,Yun Tang  写道:
> 
> Hi 孙森,
> 
> 将提交用户root加到hadoop的hdfs用户组内,或者使用hadoop的hdfs用户提交程序[1],或者修改整个目录HDFS:///flink/ha的权限[2]
>  放开到任意用户应该可以解决问题,记得加上 -R ,保证对子目录都生效。
> 
> 
> [1] 
> https://stackoverflow.com/questions/11371134/how-to-specify-username-when-putting-files-on-hdfs-from-a-remote-machine
> [2] 
> https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-common/FileSystemShell.html#chmod
> 
> 祝好
> 唐云
> 
> 发件人: 孙森
> 发送时间: 4月1日星期一 15:50
> 主题: flink ha hdfs目录权限问题
> 收件人: user-zh@flink.apache.org
> 
> 
> Hi all :
> 我使用flink on yarn 模式启动flink,并且配置了高可用。当向flink 
> cluster提交job时,会出现permission 
> denied的异常。原因是HDFS:///flink/ha下创建的文件夹的权限都是755,没有写权限。所以每启动一个新的flink 
> cluster的时候,就会生成一个新的目录 
> ,比如:/flink/ha/application_1553766783203_0026。需要修改/flink/ha/application_1553766783203_0026的权限才能成功提交job。请问这个问题应该怎么解决呢?
> 
> 异常信息如下:
> The program finished with the following exception:
> 
> org.apache.flink.client.deployment.ClusterRetrieveException: Couldn't 
> retrieve Yarn cluster
>at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.retrieve(AbstractYarnClusterDescriptor.java:409)
>at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.retrieve(AbstractYarnClusterDescriptor.java:111)
>at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:253)
>at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>at java.security.AccessController.doPrivileged(Native Method)
>at javax.security.auth.Subject.doAs(Subject.java:422)
>at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.hadoop.security.AccessControlException: Permission 
> denied: user=root, access=WRITE, 
> inode="/flink/ha/application_1553766783203_0026/blob":hdfs:hdfs:drwxr-xr-x
>at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:353)
>at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:325)
>at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:246)
>at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
>at 
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1950)
>at 
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1934)
>at 
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1917)
>at 
> org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:71)
>at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4181)
>at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:1109)
>at 
> 

?????? ??Flink SQL??????????env.yaml

2019-04-01 文章 ??????
@1543332...@qq.com   
??flink??FLINKSQL
 ??java??
 FLINK :flink-1.7.2-bin-hadoop28-scala_2.11
 :/home/hadoop/flink-1.7.2/bin/sql-client.sh embedded -e 
/home/hadoop/flink_test/env.yaml


[hadoop@server2 bin]$ /home/hadoop/flink-1.7.2/bin/sql-client.sh embedded -e 
/home/hadoop/flink_test/env.yaml
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
No default environment specified.
Searching for '/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml'...found.
Reading default environment from: 
file:/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml
Reading session environment from: file:/home/hadoop/flink_test/env.yaml
Validating current environment...


Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
The configured environment is invalid. Please check your environment files 
again.
at 
org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:140)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:187)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could 
not create execution context.
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:488)
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:316)
at 
org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:137)
... 2 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.StreamTableSourceFactory' in
the classpath.


Reason: 
The matching factory 
'org.apache.flink.table.sources.CsvAppendTableSourceFactory' doesn't support 
'format.schema.#.type'.


Supported properties of this factory are:
connector.path
connector.path
format.comment-prefix
format.field-delimiter
format.fields.#.name
format.fields.#.type
format.ignore-first-line
format.ignore-parse-errors
format.line-delimiter
format.quote-character
schema.#.name
schema.#.type


The following properties are requested:
connector.path=/home/hadoop/flink_test/input.csv
connector.type=filesystem
format.comment-prefix=#
format.fields.0.name=MyField1
format.fields.0.type=INT
format.fields.1.name=MyField2
format.fields.1.type=VARCHAR
format.line-delimiter=\n
format.schema.0.name=MyField1
format.schema.0.type=INT
format.schema.1.name=MyField2
format.schema.1.type=VARCHAR
format.type=csv
update-mode=append


The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory


at 
org.apache.flink.table.factories.TableFactoryService$.filterBySupportedProperties(TableFactoryService.scala:277)
at 
org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:136)
at 
org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:100)
at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.scala)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:236)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$0(ExecutionContext.java:121)
at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:119)
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:484)
... 4 more
[hadoop@server2 bin]$ 











-- ??
tables:
 - name: MyTableSource
   type: source-table
   update-mode: append
   connector:
 type: filesystem
 path: "/home/hadoop/flink_test/input.csv"
   format:
type: csv
fields:
- name: MyField1
  type: INT
- name: MyField2
  type: VARCHAR
line-delimiter: "\n"
comment-prefix: "#"
schema:
- name: MyField1
  type: INT
- name: MyField2
  type: VARCHAR
 - name: MyCustomView
   type: view
   query: "SELECT MyField2 FROM MyTableSource"
# Execution properties allow for changing the behavior of a table program.
execution:
 type: streaming 
# required: execution mode either 'batch' or 'streaming'
 result-mode: table 
# required: either 'table' or 'changelog'
 max-table-result-rows: 100 
# optional: maximum number of maintained rows in table mode 100 by default, 
smaller 1 means unlimited
 

Re: flink ha hdfs目录权限问题

2019-04-01 文章 Yun Tang
Hi 孙森,

将提交用户root加到hadoop的hdfs用户组内,或者使用hadoop的hdfs用户提交程序[1],或者修改整个目录HDFS:///flink/ha的权限[2]
 放开到任意用户应该可以解决问题,记得加上 -R ,保证对子目录都生效。


[1] 
https://stackoverflow.com/questions/11371134/how-to-specify-username-when-putting-files-on-hdfs-from-a-remote-machine
[2] 
https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-common/FileSystemShell.html#chmod

祝好
唐云

发件人: 孙森
发送时间: 4月1日星期一 15:50
主题: flink ha hdfs目录权限问题
收件人: user-zh@flink.apache.org


Hi all :
 我使用flink on yarn 模式启动flink,并且配置了高可用。当向flink 
cluster提交job时,会出现permission 
denied的异常。原因是HDFS:///flink/ha下创建的文件夹的权限都是755,没有写权限。所以每启动一个新的flink 
cluster的时候,就会生成一个新的目录 
,比如:/flink/ha/application_1553766783203_0026。需要修改/flink/ha/application_1553766783203_0026的权限才能成功提交job。请问这个问题应该怎么解决呢?

异常信息如下:
 The program finished with the following exception:

org.apache.flink.client.deployment.ClusterRetrieveException: Couldn't retrieve 
Yarn cluster
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.retrieve(AbstractYarnClusterDescriptor.java:409)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.retrieve(AbstractYarnClusterDescriptor.java:111)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:253)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.hadoop.security.AccessControlException: Permission 
denied: user=root, access=WRITE, 
inode="/flink/ha/application_1553766783203_0026/blob":hdfs:hdfs:drwxr-xr-x
at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:353)
at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:325)
at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:246)
at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
at 
org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1950)
at 
org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1934)
at 
org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1917)
at 
org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:71)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4181)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:1109)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:645)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)




Best!

Sen




flink ha hdfs目录权限问题

2019-04-01 文章 孙森
Hi all :
 我使用flink on yarn 模式启动flink,并且配置了高可用。当向flink 
cluster提交job时,会出现permission 
denied的异常。原因是HDFS:///flink/ha下创建的文件夹的权限都是755,没有写权限。所以每启动一个新的flink 
cluster的时候,就会生成一个新的目录 
,比如:/flink/ha/application_1553766783203_0026。需要修改/flink/ha/application_1553766783203_0026的权限才能成功提交job。请问这个问题应该怎么解决呢?

异常信息如下:
 The program finished with the following exception:

org.apache.flink.client.deployment.ClusterRetrieveException: Couldn't retrieve 
Yarn cluster
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.retrieve(AbstractYarnClusterDescriptor.java:409)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.retrieve(AbstractYarnClusterDescriptor.java:111)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:253)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.hadoop.security.AccessControlException: Permission 
denied: user=root, access=WRITE, 
inode="/flink/ha/application_1553766783203_0026/blob":hdfs:hdfs:drwxr-xr-x
at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:353)
at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:325)
at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:246)
at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
at 
org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1950)
at 
org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1934)
at 
org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1917)
at 
org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:71)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4181)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:1109)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:645)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)




Best!

Sen


?????? ??Flink SQL??????????env.yaml

2019-04-01 文章 ????
??


--  --
??: "Zhenghua Gao";
: 2019??4??1??(??) 3:40
??: "user-zh";

: Re: ??Flink SQL??env.yaml



yaml??
??yaml  [1]
yaml [2][3]

[1] http://nodeca.github.io/js-yaml/
[2] http://www.ruanyifeng.com/blog/2016/07/yaml.html
[3] https://en.wikipedia.org/wiki/YAML

*Best Regards,*
*Zhenghua Gao*


On Mon, Apr 1, 2019 at 11:51 AM ?? <469663...@qq.com> wrote:

> 
>
>??Flink SQL 
> ??yaml,
> hive??'\036'
>
> [root@server2 bin]# /home/hadoop/flink-1.7.2/bin/sql-client.sh embedded
> -e /home/hadoop/flink_test/env.yaml
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was
> set.
> No default environment specified.
> Searching for
> '/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml'...found.
> Reading default environment from:
> file:/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml
> Reading session environment from: file:/home/hadoop/flink_test/env.yaml
>
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Could not parse
> environment file. Cause: YAML decoding problem: while parsing a block
> collection
>  in 'reader', line 2, column 2:
>  - name: MyTableSource
>  ^
> expected , but found BlockMappingStart
>  in 'reader', line 17, column 3:
>   schema:
>   ^
>  (through reference chain:
> org.apache.flink.table.client.config.Environment["tables"])
> at
> org.apache.flink.table.client.config.Environment.parse(Environment.java:146)
> at
> org.apache.flink.table.client.SqlClient.readSessionEnvironment(SqlClient.java:162)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:90)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:187)
>
>
>
>
> --env.yaml
> tables:
>  - name: MyTableSource
>type: source-table
>update-mode: append
>connector:
>  type: filesystem
>  path: "/home/hadoop/flink_test/input.csv"
>format:
> type: csv
> fields:
> - name: MyField1
>   type: INT
> - name: MyField2
>   type: VARCHAR
> line-delimiter: "\n"
> comment-prefix: "#"
>   schema:
> - name: MyField1
> type: INT
> - name: MyField2
> type: VARCHAR
>  - name: MyCustomView
>type: view
>query: "SELECT MyField2 FROM MyTableSource"
> # Execution properties allow for changing the behavior of a table program.
> execution:
>  type: streaming # required: execution mode either 'batch' or 'streaming'
>  result-mode: table # required: either 'table' or 'changelog'
>  max-table-result-rows: 100 # optional: maximum number of maintained
> rows in
>  # 'table' mode (100 by default, smaller 1 means unlimited)
>  time-characteristic: event-time # optional: 'processing-time' or
> 'event-time' (default)
>  parallelism: 1 # optional: Flink's parallelism (1 by default)
>  periodic-watermarks-interval: 200 # optional: interval for periodic
> watermarks(200 ms by default)
>  max-parallelism: 16 # optional: Flink's maximum parallelism (128by
> default)
>  min-idle-state-retention: 0 # optional: table program's minimum idle
> state time
>  max-idle-state-retention: 0 # optional: table program's maximum idle
> state time
>  restart-strategy: # optional: restart strategy
>type: fallback # "fallback" to global restart strategy by
> default
> # Deployment properties allow for describing the cluster to which table
> programsare submitted to.
> deployment:
>   response-timeout: 5000
>
>

?????? ??Flink SQL??????????env.yaml

2019-04-01 文章 ????
--  --
??: "Zhenghua Gao";
: 2019??4??1??(??) 3:40
??: "user-zh";

: Re: ??Flink SQL??env.yaml



yaml??
??yaml  [1]
yaml [2][3]

[1] http://nodeca.github.io/js-yaml/
[2] http://www.ruanyifeng.com/blog/2016/07/yaml.html
[3] https://en.wikipedia.org/wiki/YAML

*Best Regards,*
*Zhenghua Gao*


On Mon, Apr 1, 2019 at 11:51 AM ?? <469663...@qq.com> wrote:

> 
>
>??Flink SQL 
> ??yaml,
> hive??'\036'
>
> [root@server2 bin]# /home/hadoop/flink-1.7.2/bin/sql-client.sh embedded
> -e /home/hadoop/flink_test/env.yaml
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was
> set.
> No default environment specified.
> Searching for
> '/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml'...found.
> Reading default environment from:
> file:/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml
> Reading session environment from: file:/home/hadoop/flink_test/env.yaml
>
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Could not parse
> environment file. Cause: YAML decoding problem: while parsing a block
> collection
>  in 'reader', line 2, column 2:
>  - name: MyTableSource
>  ^
> expected , but found BlockMappingStart
>  in 'reader', line 17, column 3:
>   schema:
>   ^
>  (through reference chain:
> org.apache.flink.table.client.config.Environment["tables"])
> at
> org.apache.flink.table.client.config.Environment.parse(Environment.java:146)
> at
> org.apache.flink.table.client.SqlClient.readSessionEnvironment(SqlClient.java:162)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:90)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:187)
>
>
>
>
> --env.yaml
> tables:
>  - name: MyTableSource
>type: source-table
>update-mode: append
>connector:
>  type: filesystem
>  path: "/home/hadoop/flink_test/input.csv"
>format:
> type: csv
> fields:
> - name: MyField1
>   type: INT
> - name: MyField2
>   type: VARCHAR
> line-delimiter: "\n"
> comment-prefix: "#"
>   schema:
> - name: MyField1
> type: INT
> - name: MyField2
> type: VARCHAR
>  - name: MyCustomView
>type: view
>query: "SELECT MyField2 FROM MyTableSource"
> # Execution properties allow for changing the behavior of a table program.
> execution:
>  type: streaming # required: execution mode either 'batch' or 'streaming'
>  result-mode: table # required: either 'table' or 'changelog'
>  max-table-result-rows: 100 # optional: maximum number of maintained
> rows in
>  # 'table' mode (100 by default, smaller 1 means unlimited)
>  time-characteristic: event-time # optional: 'processing-time' or
> 'event-time' (default)
>  parallelism: 1 # optional: Flink's parallelism (1 by default)
>  periodic-watermarks-interval: 200 # optional: interval for periodic
> watermarks(200 ms by default)
>  max-parallelism: 16 # optional: Flink's maximum parallelism (128by
> default)
>  min-idle-state-retention: 0 # optional: table program's minimum idle
> state time
>  max-idle-state-retention: 0 # optional: table program's maximum idle
> state time
>  restart-strategy: # optional: restart strategy
>type: fallback # "fallback" to global restart strategy by
> default
> # Deployment properties allow for describing the cluster to which table
> programsare submitted to.
> deployment:
>   response-timeout: 5000
>
>

Re: 【Flink SQL】无法启动env.yaml

2019-04-01 文章 Zhenghua Gao
yaml格式不对,看起来是缩进导致的。
你可以找个在线yaml编辑器验证一下, 比如 [1]
更多yaml格式的说明,参考 [2][3]

[1] http://nodeca.github.io/js-yaml/
[2] http://www.ruanyifeng.com/blog/2016/07/yaml.html
[3] https://en.wikipedia.org/wiki/YAML

*Best Regards,*
*Zhenghua Gao*


On Mon, Apr 1, 2019 at 11:51 AM 曾晓勇 <469663...@qq.com> wrote:

> 各位好,
>
>今天在测试Flink SQL 无法启动,错误日志如下。请问下配置yaml文件的格式需要注意下什么,分割符号能否支持特殊的符号如
> hive建表语句中的分隔符'\036',详细报错日志如下。
>
> [root@server2 bin]# /home/hadoop/flink-1.7.2/bin/sql-client.sh embedded
> -e /home/hadoop/flink_test/env.yaml
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was
> set.
> No default environment specified.
> Searching for
> '/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml'...found.
> Reading default environment from:
> file:/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml
> Reading session environment from: file:/home/hadoop/flink_test/env.yaml
>
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Could not parse
> environment file. Cause: YAML decoding problem: while parsing a block
> collection
>  in 'reader', line 2, column 2:
>  - name: MyTableSource
>  ^
> expected , but found BlockMappingStart
>  in 'reader', line 17, column 3:
>   schema:
>   ^
>  (through reference chain:
> org.apache.flink.table.client.config.Environment["tables"])
> at
> org.apache.flink.table.client.config.Environment.parse(Environment.java:146)
> at
> org.apache.flink.table.client.SqlClient.readSessionEnvironment(SqlClient.java:162)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:90)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:187)
>
>
>
>
> --配置文件env.yaml
> tables:
>  - name: MyTableSource
>type: source-table
>update-mode: append
>connector:
>  type: filesystem
>  path: "/home/hadoop/flink_test/input.csv"
>format:
> type: csv
> fields:
> - name: MyField1
>   type: INT
> - name: MyField2
>   type: VARCHAR
> line-delimiter: "\n"
> comment-prefix: "#"
>   schema:
> - name: MyField1
> type: INT
> - name: MyField2
> type: VARCHAR
>  - name: MyCustomView
>type: view
>query: "SELECT MyField2 FROM MyTableSource"
> # Execution properties allow for changing the behavior of a table program.
> execution:
>  type: streaming # required: execution mode either 'batch' or 'streaming'
>  result-mode: table # required: either 'table' or 'changelog'
>  max-table-result-rows: 100 # optional: maximum number of maintained
> rows in
>  # 'table' mode (100 by default, smaller 1 means unlimited)
>  time-characteristic: event-time # optional: 'processing-time' or
> 'event-time' (default)
>  parallelism: 1 # optional: Flink's parallelism (1 by default)
>  periodic-watermarks-interval: 200 # optional: interval for periodic
> watermarks(200 ms by default)
>  max-parallelism: 16 # optional: Flink's maximum parallelism (128by
> default)
>  min-idle-state-retention: 0 # optional: table program's minimum idle
> state time
>  max-idle-state-retention: 0 # optional: table program's maximum idle
> state time
>  restart-strategy: # optional: restart strategy
>type: fallback # "fallback" to global restart strategy by
> default
> # Deployment properties allow for describing the cluster to which table
> programsare submitted to.
> deployment:
>   response-timeout: 5000
>
>