flink-sql-gateway相关问题

2021-01-26 Thread zilong xiao
请问有关于flink-sql-gateway rest api以pre job模式提交作业到yarn集群的文档吗?


python udf求助: Process died with exit code 0

2021-01-26 Thread Appleyuchi
我进行了如下操作:
https://yuchi.blog.csdn.net/article/details/112837327


然后报错:
java.lang.IllegalStateException: Process died with exit code 0


请问应该如何解决?
谢谢您~!

Re: 回复:请教关于Flink yarnship的使用

2021-01-26 Thread Yan Tang
我试过了,还是null



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink启动任务时异常,加载不到flink/lib目录下的包

2021-01-26 Thread yujianbo
环境: flink 1.12.0
报这个错:报加载不到这个log4j-slf4j-impl-2.12.1.jar包,但是我的lib目录下是有这个包的:
$ cd lib/
$ ll
-rw-r--r-- 1 yujianbo yujianbo 91554 1月  26 18:44 flink-csv-1.12.0.jar
-rw-r--r-- 1 yujianbo yujianbo 114119885 1月  26 18:45
flink-dist_2.11-1.12.0.jar
-rw-r--r-- 1 yujianbo yujianbo136664 1月  26 18:44 flink-json-1.12.0.jar
-rw-r--r-- 1 yujianbo yujianbo   7709742 1月  26 18:44
flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 yujianbo yujianbo  36147824 1月  26 18:44
flink-table_2.11-1.12.0.jar
-rw-r--r-- 1 yujianbo yujianbo  40286363 1月  26 18:45
flink-table-blink_2.11-1.12.0.jar
-rw-r--r-- 1 yujianbo yujianbo 67114 1月  26 18:44
log4j-1.2-api-2.12.1.jar
-rw-r--r-- 1 yujianbo yujianbo276771 1月  26 18:44 log4j-api-2.12.1.jar
-rw-r--r-- 1 yujianbo yujianbo   1674433 1月  26 18:44 log4j-core-2.12.1.jar
-rw-r--r-- 1 yujianbo yujianbo 23518 1月  26 18:44
log4j-slf4j-impl-2.12.1.jar


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Could not deploy Yarn job cluster.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
Could not deploy Yarn job cluster.
at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:460)
at
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1940)
at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822)
at com.xm4399.yhzx.task.VersionTest.main(VersionTest.java:118)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
... 11 more
Caused by:
org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1611303948765_0050 failed 1
times (global limit =2; local limit is =1) due to AM Container for
appattempt_1611303948765_0050_01 exited with  exitCode: -1000
*Failing this attempt.Diagnostics: [2021-01-26 19:00:46.608]File does not
exist:
hdfs://4399cluster/user/hadoop/.flink/application_1611303948765_0050/lib/log4j-slf4j-impl-2.12.1.jar
java.io.FileNotFoundException: File does not exist:
hdfs://4399cluster/user/hadoop/.flink/application_1611303948765_0050/lib/log4j-slf4j-impl-2.12.1.jar*
at
org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1729)
at
org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1722)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1737)
at
org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:271)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:68)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:415)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:412)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subj

[DISCUSS] Removal of flink-swift-fs-hadoop module

2021-01-26 Thread Robert Metzger
Hi all,

during a security maintenance PR [1], Chesnay noticed that the
flink-swift-fs-hadoop module is lacking test coverage [2].
Also, there hasn't been any substantial change since 2018, when it was
introduced.
On the user@ ML, I could not find any proof of significant use of the
module (no one mentioned any problems with it).

*I propose to remove this module in Flink 1.13*. Otherwise, we would
release a module with known vulnerable dependencies, and an unknown
stability.
If there are users, they can still use the 1.12.0 release of it. If we
notice that there are a lot of users, we can reintroduce the FS, and add
proper tests for it.

Please let me know if you have any concerns, otherwise, I'll remove it.


[1] https://github.com/apache/flink/pull/14749
[2] https://issues.apache.org/jira/browse/FLINK-20804


Re: flink-sql-gateway支持远程吗

2021-01-26 Thread Sebastian Liu
flink-sql-gateway在提交job时,会根据SPI加载executor,然后推断是local mode还是remote mode,
在remote mode下,根据FLINK_HOME 目录下config.sh推断的FLINK_CONF_DIR 寻找flink-conf.yaml,
其中的host, rest port决定了提交的远端集群

罗显宴 <15927482...@163.com> 于2021年1月23日周六 上午11:19写道:

>
> 大佬们,我试过flinksql-client和flink-sql-gateway发现都是把代码提交到本地集群运行,好像不能提交到指定远程集群上执行任务,请问大佬们,怎么提交代码到远程集群运行
>
>
> | |
> 15927482803
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制



-- 

*With kind regards

Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: liuyang0...@gmail.com 
QQ: 3239559*


Re: flink-sql-gateway相关问题

2021-01-26 Thread Sebastian Liu
sql gateway 提交job方式和flink client类似主要取决于flink-conf中的execution.config 配置,
对per job模式on yarn, 对应的配置是“yarn-per-job”,
这样加载的PipelineExecutor会控制为:YarnJobClusterExecutorFactory,具备向Yarn提交job的能力,这和使用flink
client
提交job最终是一个调用路径,余下在flink-conf中添加上yarn相关配置我理解就可以了。
org.apache.flink.yarn.configuration.YarnConfigOptions

zilong xiao  于2021年1月26日周二 下午4:00写道:

> 请问有关于flink-sql-gateway rest api以pre job模式提交作业到yarn集群的文档吗?
>


-- 

*With kind regards

Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: liuyang0...@gmail.com 
QQ: 3239559*


Re: Caused by: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V

2021-01-26 Thread yujianbo
Rui Li
上午好,能帮我看一下这个http://apache-flink.147419.n8.nabble.com/flink-flink-lib-td10518.html也是由依赖冲突引起的吗。我的情况是这样的,一样的集群,之前是cdh的hadoop3.0.0,hive是2.2.0;现在准备从cdh迁回社区版的hadoop集群,变成3.3.0,hive是3.1.2,昨天解决了hive-exec的问题,但是今天同样的一份代码在上一个集群是可以正常提交正常跑的,但是放到新的集群这边跑,启动的时候就直接说加载不到lib包下的一个jar,就有点奇怪,是yarn冲突了吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink启动任务时异常,加载不到flink/lib目录下的包

2021-01-26 Thread yujianbo
我已经解决了,是因为我跑的任务jar包里的resources目录下的一个文件hdfs-site.xml是上一份集群的,我把这个去掉就可以了。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-sql-gateway相关问题

2021-01-26 Thread zilong xiao
感谢您的回答,我在flink-conf.yaml里指定"execution.target: yarn-pre-job"后,尝试用rest
api生成session id时会遇到异常,不清楚是为何,可否帮忙看下

flink version: 1.11.3
execution.target: yarn-pre-job
rest api请求路径和参数:
http://localhost:8083/v1/sessions
{
"planner": "blink",
"execution_type": "streaming"
   }

异常信息:Caused by: java.lang.IllegalStateException: No ClusterClientFactory
found. If you were targeting a Yarn cluster, please make sure to export the
HADOOP_CLASSPATH environment variable or have hadoop in your classpath. For
more information refer to the "Deployment & Operations" section of the
official Apache Flink documentation.

Sebastian Liu  于2021年1月27日周三 上午1:01写道:

> sql gateway 提交job方式和flink client类似主要取决于flink-conf中的execution.config 配置,
> 对per job模式on yarn, 对应的配置是“yarn-per-job”,
>
> 这样加载的PipelineExecutor会控制为:YarnJobClusterExecutorFactory,具备向Yarn提交job的能力,这和使用flink
> client
> 提交job最终是一个调用路径,余下在flink-conf中添加上yarn相关配置我理解就可以了。
> org.apache.flink.yarn.configuration.YarnConfigOptions
>
> zilong xiao  于2021年1月26日周二 下午4:00写道:
>
> > 请问有关于flink-sql-gateway rest api以pre job模式提交作业到yarn集群的文档吗?
> >
>
>
> --
>
> *With kind regards
> 
> Sebastian Liu 刘洋
> Institute of Computing Technology, Chinese Academy of Science
> Mobile\WeChat: +86—15201613655
> E-mail: liuyang0...@gmail.com 
> QQ: 3239559*
>


Re: flink-sql-gateway支持远程吗

2021-01-26 Thread zilong xiao
如果是yarn-pre-job,是如何提交到yarn集群的呢,跟host * rest port应该没关系了吧?

Sebastian Liu  于2021年1月27日周三 上午12:48写道:

> flink-sql-gateway在提交job时,会根据SPI加载executor,然后推断是local mode还是remote mode,
> 在remote mode下,根据FLINK_HOME 目录下config.sh推断的FLINK_CONF_DIR 寻找flink-conf.yaml,
> 其中的host, rest port决定了提交的远端集群
>
> 罗显宴 <15927482...@163.com> 于2021年1月23日周六 上午11:19写道:
>
> >
> >
> 大佬们,我试过flinksql-client和flink-sql-gateway发现都是把代码提交到本地集群运行,好像不能提交到指定远程集群上执行任务,请问大佬们,怎么提交代码到远程集群运行
> >
> >
> > | |
> > 15927482803
> > |
> > |
> > 邮箱:15927482...@163.com
> > |
> >
> > 签名由 网易邮箱大师 定制
>
>
>
> --
>
> *With kind regards
> 
> Sebastian Liu 刘洋
> Institute of Computing Technology, Chinese Academy of Science
> Mobile\WeChat: +86—15201613655
> E-mail: liuyang0...@gmail.com 
> QQ: 3239559*
>


Re: flink-sql-gateway支持远程吗

2021-01-26 Thread Sebastian Liu
Hi zilong,

sql gateway复用了很多底层API,其提交方式很多复用了flink client的逻辑。
针对提交模式,主要是看flink-conf.yaml中的的execution.config 配置, 关于这个配置
可以参考:org.apache.flink.configuration.DeploymentOptions#TARGET
如果是yarn-per-job, 需要配置为:“yarn-per-job”, 同时也依赖flink-conf.yaml中其他yarn的相关配置:
可参考:org.apache.flink.yarn.configuration.YarnConfigOptions

总结一下就是:
1. sql gateway找到正确的flink-conf.yaml, 取决于FLINK_HOME or FLINK_CONF_DIR.
2. 根据execution.config选择对应executor,然后不同模式依赖于flink-conf对应的提交相关配置


zilong xiao  于2021年1月27日周三 上午10:19写道:

> 如果是yarn-pre-job,是如何提交到yarn集群的呢,跟host * rest port应该没关系了吧?
>
> Sebastian Liu  于2021年1月27日周三 上午12:48写道:
>
> > flink-sql-gateway在提交job时,会根据SPI加载executor,然后推断是local mode还是remote mode,
> > 在remote mode下,根据FLINK_HOME 目录下config.sh推断的FLINK_CONF_DIR
> 寻找flink-conf.yaml,
> > 其中的host, rest port决定了提交的远端集群
> >
> > 罗显宴 <15927482...@163.com> 于2021年1月23日周六 上午11:19写道:
> >
> > >
> > >
> >
> 大佬们,我试过flinksql-client和flink-sql-gateway发现都是把代码提交到本地集群运行,好像不能提交到指定远程集群上执行任务,请问大佬们,怎么提交代码到远程集群运行
> > >
> > >
> > > | |
> > > 15927482803
> > > |
> > > |
> > > 邮箱:15927482...@163.com
> > > |
> > >
> > > 签名由 网易邮箱大师 定制
> >
> >
> >
> > --
> >
> > *With kind regards
> > 
> > Sebastian Liu 刘洋
> > Institute of Computing Technology, Chinese Academy of Science
> > Mobile\WeChat: +86—15201613655
> > E-mail: liuyang0...@gmail.com 
> > QQ: 3239559*
> >
>


-- 

*With kind regards

Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: liuyang0...@gmail.com 
QQ: 3239559*


Re: flink-sql-gateway相关问题

2021-01-26 Thread Sebastian Liu
看报错提示,应该是没有指定正确HADOOP_HOME env? 如果用flink client提交,也是需要这个env的

zilong xiao  于2021年1月27日周三 上午10:17写道:

> 感谢您的回答,我在flink-conf.yaml里指定"execution.target: yarn-pre-job"后,尝试用rest
> api生成session id时会遇到异常,不清楚是为何,可否帮忙看下
>
> flink version: 1.11.3
> execution.target: yarn-pre-job
> rest api请求路径和参数:
> http://localhost:8083/v1/sessions
> {
> "planner": "blink",
> "execution_type": "streaming"
>}
>
> 异常信息:Caused by: java.lang.IllegalStateException: No ClusterClientFactory
> found. If you were targeting a Yarn cluster, please make sure to export the
> HADOOP_CLASSPATH environment variable or have hadoop in your classpath. For
> more information refer to the "Deployment & Operations" section of the
> official Apache Flink documentation.
>
> Sebastian Liu  于2021年1月27日周三 上午1:01写道:
>
> > sql gateway 提交job方式和flink client类似主要取决于flink-conf中的execution.config 配置,
> > 对per job模式on yarn, 对应的配置是“yarn-per-job”,
> >
> >
> 这样加载的PipelineExecutor会控制为:YarnJobClusterExecutorFactory,具备向Yarn提交job的能力,这和使用flink
> > client
> > 提交job最终是一个调用路径,余下在flink-conf中添加上yarn相关配置我理解就可以了。
> > org.apache.flink.yarn.configuration.YarnConfigOptions
> >
> > zilong xiao  于2021年1月26日周二 下午4:00写道:
> >
> > > 请问有关于flink-sql-gateway rest api以pre job模式提交作业到yarn集群的文档吗?
> > >
> >
> >
> > --
> >
> > *With kind regards
> > 
> > Sebastian Liu 刘洋
> > Institute of Computing Technology, Chinese Academy of Science
> > Mobile\WeChat: +86—15201613655
> > E-mail: liuyang0...@gmail.com 
> > QQ: 3239559*
> >
>


-- 

*With kind regards

Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: liuyang0...@gmail.com 
QQ: 3239559*


TaggedOperatorSubtaskState is missing when creating a new savepoint using state processor api

2021-01-26 Thread 郭斌
Hi, 各位好:
   我在使用state processor api创建新的包含kafka相关state的savepoint用来修改max 
parallelism时,创建成功后使用此savepoint来重启任务,发现抛出如下异常:
  {code}
  java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
state backend for StreamSource_e8ea6e352a1a627513ffbd4573fa1628_(1/1) from any 
of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:265)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:152)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore operator state backend
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:552)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:256)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.lang.IllegalStateException: Missing value for the key 
'org.apache.flink.state.api.output.TaggedOperatorSubtaskState'
at 
org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
at 
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225)
at 
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83)
at 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:204)
at 
org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189)
at 
org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164)
at 
org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.getPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.java:113)
at 
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:94)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
... 15 more
  {code}
我的使用方式如下:
@Override
 public void createNewSavepoint(ExecutionEnvironment env, String savepointPath, 
StateBackend stateBackend,
 ParameterTool config) {
// 加载未修改max parallelism的savepoint
 String savepointOutputPath = 
config.get(EapSavepointConstants.EAP_SAVEPOINT_OUTPUT_PATH);
   // 新的max parallelism
 int maxP

Flink sql 1.12写入hive报metastore失败

2021-01-26 Thread gimlee
使用flink sql 1.12写入hive,未提交到yarn上成功,错误信息如下:
2021-01-26 20:44:23.133 [main] INFO 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient  - Trying to connect to
metastore with URI thrift://hdcom02.prd.com:9083
2021-01-26 20:44:23.133 [main] INFO 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient  - Trying to connect to
metastore with URI thrift://hdcom02.prd.com:9083
2021-01-26 20:44:23.134 [main] INFO 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient  - Opened a connection
to metastore, current connections: 2
2021-01-26 20:44:23.134 [main] INFO 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient  - Opened a connection
to metastore, current connections: 2
2021-01-26 20:44:23.181 [main] WARN 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient  - set_ugi() not
successful, Likely cause: new client talking to old server. Continuing
without it.
org.apache.thrift.transport.TTransportException: null
at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at
org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at
org.apache.thrift.protocol.TBinaryProtocol.readStringBody(TBinaryProtocol.java:380)
at
org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:230)
at
org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77)
at
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_set_ugi(ThriftHiveMetastore.java:4787)
at
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.set_ugi(ThriftHiveMetastore.java:4773)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:534)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:224)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.hadoop.hive.metastore.utils.JavaUtils.newInstance(JavaUtils.java:84)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:95)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:148)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.table.catalog.hive.client.HiveShimV310.getHiveMetastoreClient(HiveShimV310.java:112)
at
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:274)
at
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:80)
at
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:32)
at
org.apache.flink.connectors.hive.HiveTableSink.consume(HiveTableSink.java:145)
at
org.apache.flink.connectors.hive.HiveTableSink.lambda$getSinkRuntimeProvider$0(HiveTableSink.java:137)
at
org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:109)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
at
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(Traversabl

Flink 1.10 table setIdleStateRetentionTime

2021-01-26 Thread xuhaiLong


hi,


Flink 1.10.1 on 
yarn,测试发现,`tableEnv.getConfig.setIdleStateRetentionTime(Time.days(180), 
Time.days(181))` 修改为 
`tableEnv.getConfig.setIdleStateRetentionTime(Time.days(30), 
Time.days(31))`。在savePoInt中恢复 job 启动正常,运行的时候 在webUI Exception 中有连接 kafka 
异常,Timeout expired while fetching topic metadata。不太明白是什么原因产生这个?是由于maxTime 问题吗?

Re: Caused by: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V

2021-01-26 Thread yujianbo
参考大佬Rui Li的建议,我解决了,想参考的可以看看这个:
https://blog.csdn.net/weixin_44500374/article/details/113244560

https://www.jianshu.com/p/f076a4f66527



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Caused by: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V

2021-01-26 Thread yujianbo
我的已经解决了,根据大佬Rui Li的建议,也可以参考我的方式:
https://blog.csdn.net/weixin_44500374/article/details/113244560

https://www.jianshu.com/p/f076a4f66527





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 1.10 table setIdleStateRetentionTime

2021-01-26 Thread xuhaiLong
抱歉..弄错了


On 1/27/2021 11:39,xuhaiLong wrote:


hi,


Flink 1.10.1 on 
yarn,测试发现,`tableEnv.getConfig.setIdleStateRetentionTime(Time.days(180), 
Time.days(181))` 修改为 
`tableEnv.getConfig.setIdleStateRetentionTime(Time.days(30), 
Time.days(31))`。在savePoInt中恢复 job 启动正常,运行的时候 在webUI Exception 中有连接 kafka 
异常,Timeout expired while fetching topic metadata。不太明白是什么原因产生这个?是由于maxTime 问题吗?

用application mode部署应用,classloader.resolve-order参数是否必须要改为parent-first?

2021-01-26 Thread lp
我写了一个 process
function的demo,自定义source产生数据sink到kafka,然后发布到yarn集群运行,flink版本是1.11.2,采用application
Mode 部署,然后发现jobmanager-log报错: Failed to construct kafka producer;Caused by:
org.apache.kafka.common.KafkaException: class
org.apache.kafka.common.serialization.ByteArraySerializer is not an instance
of org.apache.kafka.common.serialization.Serializer。 
换了flink版本为1.12.1发现还是报这个错,后尝试采用per-job
Mode部署发现是OK的。查资料发现是跟flink的类加载方式有关,即flink-conf.yml中的classloader.resolve-order参数,要将默认的
child-first改成parent-first,修改后确实ok了,但是有个疑惑,为啥要改这个参数呢,看了官方文档,一般不建议改这个参数的,他避免了使用flink内置的类加载器,而是使用APP自己的。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-sql-gateway稳定性如何,可以在生产环境使用吗?

2021-01-26 Thread jinsx
如果使用zeppelin,zeppelin可以提供rpc接口吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Scala REPL YARN 运行模式报 NoSuchMethodError setPrintSpaceAfterFullCompletion

2021-01-26 Thread macia kk
 bin/start-scala-shell.sh  yarn


scala> Exception in thread "main" java.lang.NoSuchMethodError:
jline.console.completer.CandidateListCompletionHandler.setPrintSpaceAfterFullCompletion(Z)V
at
scala.tools.nsc.interpreter.jline.JLineConsoleReader.initCompletion(JLineReader.scala:139)
at
scala.tools.nsc.interpreter.jline.InteractiveReader.postInit(JLineReader.scala:54)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1$$anonfun$25.apply(ILoop.scala:899)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1$$anonfun$25.apply(ILoop.scala:897)
at
scala.tools.nsc.interpreter.SplashReader.postInit(InteractiveReader.scala:130)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1$$anonfun$scala$tools$nsc$interpreter$ILoop$$anonfun$$loopPostInit$1$1.apply$mcV$sp(ILoop.scala:926)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1$$anonfun$scala$tools$nsc$interpreter$ILoop$$anonfun$$loopPostInit$1$1.apply(ILoop.scala:908)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1$$anonfun$scala$tools$nsc$interpreter$ILoop$$anonfun$$loopPostInit$1$1.apply(ILoop.scala:908)
at scala.tools.nsc.interpreter.ILoop$$anonfun$mumly$1.apply(ILoop.scala:189)
at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:221)
at scala.tools.nsc.interpreter.ILoop.mumly(ILoop.scala:186)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1$$anonfun$startup$1$1.apply(ILoop.scala:979)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:990)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:891)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:891)
at
scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:891)
at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:184)
at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:131)
at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)
va
Exception in thread "Thread-2" java.lang.InterruptedException
at java.util.concurrent.SynchronousQueue.put(SynchronousQueue.java:879)
at scala.tools.nsc.interpreter.SplashLoop.run(InteractiveReader.scala:77)
at java.lang.Thread.run(Thread.java:748)
```


Re: flink-sql-gateway稳定性如何,可以在生产环境使用吗?

2021-01-26 Thread Jeff Zhang
zeppelin 有 rest api 接口,https://www.yuque.com/jeffzhangjianfeng/gldg8w/pz2xoh

jinsx  于2021年1月27日周三 下午2:30写道:

> 如果使用zeppelin,zeppelin可以提供rpc接口吗
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best Regards

Jeff Zhang


Re: 用application mode部署应用,classloader.resolve-order参数是否必须要改为parent-first?

2021-01-26 Thread tison
你是打包的时候自己打了 kafka 的依赖进去吗?看起来是应用里有一个 kafka 版本 A,接口
org.apache.kafka.common.serialization.Serializer 用应用 classloader 加载,然后
flink kafka connector 用集群 classloader 加载,继承自集群 classloader 里的
org.apache.kafka.common.serialization.Serializer 导致这个问题。

Best,
tison.


lp <973182...@qq.com> 于2021年1月27日周三 下午12:39写道:

> 我写了一个 process
>
> function的demo,自定义source产生数据sink到kafka,然后发布到yarn集群运行,flink版本是1.11.2,采用application
> Mode 部署,然后发现jobmanager-log报错: Failed to construct kafka producer;Caused by:
> org.apache.kafka.common.KafkaException: class
> org.apache.kafka.common.serialization.ByteArraySerializer is not an
> instance
> of org.apache.kafka.common.serialization.Serializer。
> 换了flink版本为1.12.1发现还是报这个错,后尝试采用per-job
>
> Mode部署发现是OK的。查资料发现是跟flink的类加载方式有关,即flink-conf.yml中的classloader.resolve-order参数,要将默认的
>
> child-first改成parent-first,修改后确实ok了,但是有个疑惑,为啥要改这个参数呢,看了官方文档,一般不建议改这个参数的,他避免了使用flink内置的类加载器,而是使用APP自己的。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 用application mode部署应用,classloader.resolve-order参数是否必须要改为parent-first?

2021-01-26 Thread tison
又或者是反过来。

你可以尝试把 kafka connector 放到 /libs 里,自己应用打包里不带 kafka jar 也不带 kafka connector
jar,应该就可以不改配置。

Best,
tison.


tison  于2021年1月27日周三 下午2:47写道:

> 你是打包的时候自己打了 kafka 的依赖进去吗?看起来是应用里有一个 kafka 版本 A,接口
> org.apache.kafka.common.serialization.Serializer 用应用 classloader 加载,然后
> flink kafka connector 用集群 classloader 加载,继承自集群 classloader 里的
> org.apache.kafka.common.serialization.Serializer 导致这个问题。
>
> Best,
> tison.
>
>
> lp <973182...@qq.com> 于2021年1月27日周三 下午12:39写道:
>
>> 我写了一个 process
>>
>> function的demo,自定义source产生数据sink到kafka,然后发布到yarn集群运行,flink版本是1.11.2,采用application
>> Mode 部署,然后发现jobmanager-log报错: Failed to construct kafka producer;Caused
>> by:
>> org.apache.kafka.common.KafkaException: class
>> org.apache.kafka.common.serialization.ByteArraySerializer is not an
>> instance
>> of org.apache.kafka.common.serialization.Serializer。
>> 换了flink版本为1.12.1发现还是报这个错,后尝试采用per-job
>>
>> Mode部署发现是OK的。查资料发现是跟flink的类加载方式有关,即flink-conf.yml中的classloader.resolve-order参数,要将默认的
>>
>> child-first改成parent-first,修改后确实ok了,但是有个疑惑,为啥要改这个参数呢,看了官方文档,一般不建议改这个参数的,他避免了使用flink内置的类加载器,而是使用APP自己的。
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>
>


flink-sql-gateway如何使用flink自定义的udf

2021-01-26 Thread 阿华田
各位大佬,Flink-sql-gateway 提交flink sql任务 ,如何使用flink自定义的udf


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



pyflink 1.11 connector数据读取问题

2021-01-26 Thread 肖越
目前通过 connector 定义ddl的方式,通过数据库读取数据,方式如下:
CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users'
);
每次表格获取及定义都需要定义数据类型(如例子中: BIGINT,STRING,INT,BOOLEN),
请问大佬是否有其他从数据库读取方式,可以不用定义数据类型??

回复:pyflink 1.11 connector数据读取问题

2021-01-26 Thread 12862930
写个程序自动根据表的元数据生成ddl语句



---原始邮件---
发件人: "肖越"<18242988...@163.com>
发送时间: 2021年1月27日(周三) 下午3:29
收件人: "user-zh"