Re: Flink1.12 用官方默认的log4j2打日志到kafka,如何区分jobmanager和TaskManager的日志?怎么加参数?

2021-06-15 Thread yujianbo
最新详细配置,可以看看我的博客:
https://blog.csdn.net/weixin_44500374/article/details/117931457
如果对你有帮助帮忙点个赞~



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink1.12 用官方默认的log4j2打日志到kafka,如何区分jobmanager和TaskManager的日志?怎么加参数?

2021-06-10 Thread yujianbo
大佬,能告知一下吗?我目前知道lay out有这么多的参数可以配置,哪个参数能区分jm或者tm的日志呢:

具体的格式化说明:
  %p:输出日志信息的优先级,即DEBUG,INFO,WARN,ERROR,FATAL。
  %d:输出日志时间点的日期或时间,默认格式为ISO8601,也可以在其后指定格式,如:%d{/MM/dd
HH:mm:ss,SSS}。
  %r:输出自应用程序启动到输出该log信息耗费的毫秒数。
  %t:输出产生该日志事件的线程名。
 
%l:输出日志事件的发生位置,相当于%c.%M(%F:%L)的组合,包括类全名、方法、文件名以及在代码中的行数。例如:test.TestLog4j.main(TestLog4j.java:10)。
  %c:输出日志信息所属的日志对象,也就是getLogger()中的内容。
  %C:输出日志信息所属的类目;
  %logger:log4j中没有此格式;
  %M:输出产生日志信息的方法名。
  %F:输出日志消息产生时所在的文件名称。
  %L::输出代码中的行号。
  %m::输出代码中指定的具体日志信息。
  %n:输出一个回车换行符,Windows平台为"rn",Unix平台为"n"。
  %x:输出和当前线程相关联的NDC(嵌套诊断环境),尤其用到像java servlets这样的多客户多线程的应用中。
  %%:输出一个"%"字符。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink1.12 用官方默认的log4j2打日志到kafka,如何区分jobmanager和TaskManager的日志?怎么加参数?

2021-06-09 Thread yujianbo
log4j可以,log4j2也可以,现在头疼已经实现打kafka,不知道怎么区分这两边的日志



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink1.12 用官方默认的log4j2打日志到kafka,如何区分jobmanager和TaskManager的日志?怎么加参数?

2021-06-09 Thread yujianbo
版本:1.12
框架:用默认的log4j2框架
问题:日志打到kafka,如何去区分jobmanager和taskmanger日志?我发现去改layout.pattern,还是没有能找到区分的好办法?


appender.kafka.layout.type=PatternLayout
appender.kafka.layout.pattern=%d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x -
%m%n -- %t -- %F



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-02 Thread yujianbo
好的非常感谢,我拿几个任务测试一波,看看性能能不能接受!


Hi,

没有被引用的文件可能也不是完全无用的,可能是当前pending checkpoint正在上传的,所以还需要比较一下那些不在checkpoint
meta内的文件的修改时间戳,可能比你分析的complete checkpoint的时间戳要大。

总体上来说,我不认为这个问题是一个bug,这个是LSM架构的DB的空间放大问题。如果你对空间放大非常担心的话,可以启用 dynamic level
[1] 来严格控制空间放大,不过这个可能会影响写放大和读放大,导致性能受到一定影响。


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-compaction-level-use-dynamic-size

祝好
唐云



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-02 Thread yujianbo
Hi,

确认的情况:
 
大佬,我根据loadCheckpointMeta能够去分析_metadata引用到的/shared目录下的sst文件,然后发现/shared里没有被引用的sst文件即未删除的旧文件,这些旧文件占比很少,只有5%左右。

配置:
   idleStateRetention确实是设置3600秒,保留的ck目录是3个。
目前情况:
 每次checkpoint size增量差不多1-2G,所以size不小。5分钟checkpoint一次。
 最近单次checkpoint的sst文件数是23个,文件大小大约在65M。
 总共目前/checkpoint/shared目录大小是49.4G,然后savepoint结束大小在6.3G。

那我想问一下,那中间是不是有大量的冗余数据存在这还在被应用的sst文件里,有没有什么参数可以调整?还是JM单点删除跟不上相关速度?
JM还能扩展吗??HA不是也是一个JM在工作吗(这个地方不是很明白)





Hi,

先确定一下,你的 idleStateRetention 是 3600秒?其次,要想看是否所有数据均有用,可以利用
Checkpoints.loadCheckpointMeta [1] 去加载你所保留的checkpoint目录下的 _metadata
文件,然后与当前checkpoint目录下的文件作对比,看是否存在大量的未删除旧文件。

目前仅凭你的描述和一段SQL代码其实很难判断。
可能存在的原因有:

  1.  单次checkpoint文件数目过多,JM单点删除跟不上相关速度
  2.  整体checkpoint
size不大,RocksDB的compaction频率较低,导致相关包含过期数据的旧文件并没有被及时删除,仍然存在checkpoint目录下(保留多个checkpoint同样会放大该问题)

所以还是最好先分析已有checkpoint的meta文件,再与目前的实际情况做对比。

另外,Flink
JIRA其实不是一个很好的直接问问题的地方,尤其是这种并不明确的问题,更妥善的方式还是先在邮件列表讨论,具体定位到原因后,再创建相关详细的ticket。

[1]
https://github.com/apache/flink/blob/b582991b8b2b8dadb89e71d5002c4a9cc2055e34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99

祝好
唐云

From: yujianbo <[hidden email]>
Sent: Tuesday, June 1, 2021 10:51
To: [hidden email] <[hidden email]>
Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

没有更好的方式吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 Thread yujianbo
感谢大佬的回复!以后优先现在邮箱这边讨论发现问题再去提个issue!
我的 idleStateRetention确实是设置3600秒,我先进行测试看看。




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 Thread yujianbo
没有更好的方式吗,这样治标不治本,那我大状态任务会有很多问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 Thread yujianbo
没有更好的方式吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 Thread yujianbo
有没有大佬帮忙看看



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 Thread yujianbo
一、环境:
1、版本:1.12.0
2、flink sql
3、已经设置了setIdleStateRetention 为1小时
4、状态后端是rocksDB, 增量模式
5、源数据没有数据激增情况,任务已经跑了两天

二、详情
具体sql见第三大点,就是普通的group by统计的
sql,然后设置setIdleStateRetention(3600)。目前观察两天了,checkpoint目录下面的shared文件夹的大小一直在增长,然后看文件夹里的文件是在一直更新,最早的文件也会消失。
   
我sql的groupby维度有加一个具体的分钟字段,所以一小时之后是不可能有一模一样的维度数据,那过期的数据正常是要被清理掉,那/checkpoint/shared/文件夹大小不断增长是否能说明过期的旧数据还没有被清理?
这种情况应该怎么处理

三、sql具体

CREATE TABLE user_behavior (
   `request_ip` STRING,
   `request_time` BIGINT,
   `header` STRING ,
//这个操作是将时间戳转为分钟
   `t_min` as cast(`request_time`-(`request_time` + 2880)%6 as
BIGINT),
   `ts` as TO_TIMESTAMP(FROM_UNIXTIME(`request_time`/1000-28800,'-MM-dd
HH:mm:ss')),
   WATERMARK FOR `ts` AS `ts` - INTERVAL '60' MINUTE) 
with (
   'connector' = 'kafka',
    
);


CREATE TABLE blackhole_table (
   `cnt` BIGINT,
   `lists` STRING
) WITH (
 'connector' = 'blackhole'
);


insert into blackhole_table 
select 
count(*) as cnt, 
LISTAGG(concat(`request_ip`, `header`, cast(`request_time` as STRING)))
as lists
from user_behavior 
group by `request_ip`,`header`,`t_min`;





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flinksql引入flink-parquet_2.11任务提交失败

2021-02-01 Thread yujianbo
大佬后面你是怎么解决的,我也是突然遇到这个问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Caused by: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V

2021-01-26 Thread yujianbo
我的已经解决了,根据大佬Rui Li的建议,也可以参考我的方式:
https://blog.csdn.net/weixin_44500374/article/details/113244560

https://www.jianshu.com/p/f076a4f66527





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Caused by: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V

2021-01-26 Thread yujianbo
参考大佬Rui Li的建议,我解决了,想参考的可以看看这个:
https://blog.csdn.net/weixin_44500374/article/details/113244560

https://www.jianshu.com/p/f076a4f66527



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink启动任务时异常,加载不到flink/lib目录下的包

2021-01-26 Thread yujianbo
我已经解决了,是因为我跑的任务jar包里的resources目录下的一个文件hdfs-site.xml是上一份集群的,我把这个去掉就可以了。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Caused by: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V

2021-01-26 Thread yujianbo
Rui Li
上午好,能帮我看一下这个http://apache-flink.147419.n8.nabble.com/flink-flink-lib-td10518.html也是由依赖冲突引起的吗。我的情况是这样的,一样的集群,之前是cdh的hadoop3.0.0,hive是2.2.0;现在准备从cdh迁回社区版的hadoop集群,变成3.3.0,hive是3.1.2,昨天解决了hive-exec的问题,但是今天同样的一份代码在上一个集群是可以正常提交正常跑的,但是放到新的集群这边跑,启动的时候就直接说加载不到lib包下的一个jar,就有点奇怪,是yarn冲突了吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink启动任务时异常,加载不到flink/lib目录下的包

2021-01-26 Thread yujianbo
环境: flink 1.12.0
报这个错:报加载不到这个log4j-slf4j-impl-2.12.1.jar包,但是我的lib目录下是有这个包的:
$ cd lib/
$ ll
-rw-r--r-- 1 yujianbo yujianbo 91554 1月  26 18:44 flink-csv-1.12.0.jar
-rw-r--r-- 1 yujianbo yujianbo 114119885 1月  26 18:45
flink-dist_2.11-1.12.0.jar
-rw-r--r-- 1 yujianbo yujianbo136664 1月  26 18:44 flink-json-1.12.0.jar
-rw-r--r-- 1 yujianbo yujianbo   7709742 1月  26 18:44
flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 yujianbo yujianbo  36147824 1月  26 18:44
flink-table_2.11-1.12.0.jar
-rw-r--r-- 1 yujianbo yujianbo  40286363 1月  26 18:45
flink-table-blink_2.11-1.12.0.jar
-rw-r--r-- 1 yujianbo yujianbo 67114 1月  26 18:44
log4j-1.2-api-2.12.1.jar
-rw-r--r-- 1 yujianbo yujianbo276771 1月  26 18:44 log4j-api-2.12.1.jar
-rw-r--r-- 1 yujianbo yujianbo   1674433 1月  26 18:44 log4j-core-2.12.1.jar
-rw-r--r-- 1 yujianbo yujianbo 23518 1月  26 18:44
log4j-slf4j-impl-2.12.1.jar


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Could not deploy Yarn job cluster.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
Could not deploy Yarn job cluster.
at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:460)
at
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1940)
at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822)
at com.xm4399.yhzx.task.VersionTest.main(VersionTest.java:118)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
... 11 more
Caused by:
org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1611303948765_0050 failed 1
times (global limit =2; local limit is =1) due to AM Container for
appattempt_1611303948765_0050_01 exited with  exitCode: -1000
*Failing this attempt.Diagnostics: [2021-01-26 19:00:46.608]File does not
exist:
hdfs://4399cluster/user/hadoop/.flink/application_1611303948765_0050/lib/log4j-slf4j-impl-2.12.1.jar
java.io.FileNotFoundException: File does not exist:
hdfs://4399cluster/user/hadoop/.flink/application_1611303948765_0050/lib/log4j-slf4j-impl-2.12.1.jar*
at
org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1729)
at
org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1722)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1737)
at
org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:271)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:68)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:415)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:412)
at java.security.AccessController.doPrivileged(Native Method

Re: Caused by: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V

2021-01-24 Thread yujianbo
请教一下大佬后来如何解决,我的hadoop和hive版本跟您一致。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink sql cli 查询hive 为两个字段的分区表如果where条件不指定这两个字段条件会出异常

2021-01-12 Thread yujianbo
生产的hive表由datekey和event两个字段作为分区,查询sql语句:
(1)第一组对比
SELECT vid From table_A WHERE datekey = '20210112' AND event = 'XXX' AND vid
= 'aa';(*正常*)
SELECT vid From table_A WHERE datekey = '20210112' AND vid = 'aa';
(*异常*)

(2)第二组对比
SELECT vid From table_B WHERE datekey = '20210112' AND event = 'YYY' AND vid
= 'bb';(*正常*)
SELECT vid From table_B WHERE datekey = '20210112' AND vid = 'bb';
(*异常*)

报错异常为:
java.lang.RuntimeException: One or more fetchers have encountered exception
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:273)
at
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: SplitFetcher thread 19 received
unexpected exception while polling the records
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Caused by: java.lang.IllegalArgumentException
at java.nio.Buffer.position(Buffer.java:244)
at
org.apache.flink.hive.shaded.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes(Binary.java:424)
at
org.apache.flink.hive.shaded.formats.parquet.vector.reader.BytesColumnReader.readBatchFromDictionaryIds(BytesColumnReader.java:79)
at
org.apache.flink.hive.shaded.formats.parquet.vector.reader.BytesColumnReader.readBatchFromDictionaryIds(BytesColumnReader.java:33)
at
org.apache.flink.hive.shaded.formats.parquet.vector.reader.AbstractColumnReader.readToVector(AbstractColumnReader.java:199)
at
org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.nextBatch(ParquetVectorizedInputFormat.java:359)
at
org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readBatch(ParquetVectorizedInputFormat.java:328)
at
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
at
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)
... 6 more




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink sql cli 查询hive 为两个字段的分区表如果where条件不指定这两个字段条件会出异常

2021-01-12 Thread yujianbo
生产的hive表由datekey和event两个字段作为分区,查询sql语句:
(1)第一组对比
SELECT vid From table_A WHERE datekey = '20210112' AND event = 'XXX' AND vid
= 'aa';(*正常*)
SELECT vid From table_A WHERE datekey = '20210112' AND vid = 'aa';
(*异常*)
SELECT vid From table_A WHERE datekey = '20210112';(*正常*)

(2)第二组对比
SELECT vid From table_B WHERE datekey = '20210112' AND event = 'YYY' AND vid
= 'bb';(*正常*)
SELECT vid From table_B WHERE datekey = '20210112' AND vid = 'bb';
(*异常*)
SELECT vid From table_B WHERE datekey = '20210112';(*正常*)

报错异常为:
java.lang.RuntimeException: One or more fetchers have encountered exception
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:273)
at
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: SplitFetcher thread 19 received
unexpected exception while polling the records
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Caused by: java.lang.IllegalArgumentException
at java.nio.Buffer.position(Buffer.java:244)
at
org.apache.flink.hive.shaded.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes(Binary.java:424)
at
org.apache.flink.hive.shaded.formats.parquet.vector.reader.BytesColumnReader.readBatchFromDictionaryIds(BytesColumnReader.java:79)
at
org.apache.flink.hive.shaded.formats.parquet.vector.reader.BytesColumnReader.readBatchFromDictionaryIds(BytesColumnReader.java:33)
at
org.apache.flink.hive.shaded.formats.parquet.vector.reader.AbstractColumnReader.readToVector(AbstractColumnReader.java:199)
at
org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.nextBatch(ParquetVectorizedInputFormat.java:359)
at
org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readBatch(ParquetVectorizedInputFormat.java:328)
at
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
at
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)
... 6 more




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复:Flink1.12.0 sql-client连接hive报错

2021-01-12 Thread yujianbo
1、现在sql-cli能够提交到yarn的session那边,但是会直接报错:
Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.mapred.JobConf
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

找不到Hadoop的依赖,我已经在三台测试机上已经将export HADOOP_CLASSPATH=`hadoop
classpath`配置到/etc/profile。

2、我的perjob任务或者,启动一个任务flink run -yid提交到这个session都是没有问题的

3、有个朋友给了我这个shade包flink-shaded-hadoop-2-uber-2.7.5-8.0.jar放到lib目录下,就可以了。

4、奇怪的是我不加这个shade包,通过export HADOOP_CLASSPATH=`hadoop
classpath`配置全局变量,session启动不起来吗???



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink1.12.0 sql-client连接hive报错

2021-01-11 Thread yujianbo
一、环境
1、Flink1.12.0
2、hive 2.1.1
3、下载release-1.12编译的jar包,用export HADOOP_CLASSPATH=`hadoop
classpath`连接Hadoop集群
4、flink的lib目录下是这些包:(是不是还需要加一下什么包?)
flink-csv-1.12.jar
flink-dist_2.11-1.12.jar
flink-json-1.12.jar
flink-shaded-zookeeper-3.4.14.jar
flink-table_2.11-1.12.jar
flink-table-blink_2.11-1.12.jar
log4j-1.2-api-2.12.1.jar
log4j-api-2.12.1.jar
log4j-core-2.12.1.jar
log4j-slf4j-impl-2.12.1.jar
  
5、flink的conf目录下的sql-client-defaults.yaml 只修改了:
catalogs: #[] # empty list
   - name: myhive
 type: hive
 hive-conf-dir: /etc/hive/conf


二、启动:
 export HADOOP_CLASSPATH=`hadoop classpath`
 /tmp/flink-1.12.0/bin/sql-client.sh embedded


三、报错:
   
[yujianbo@qzcs86 conf]$ /tmp/flink-1.12.0/bin/sql-client.sh embedded
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/tmp/flink-1.12.0/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/jars/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type
[org.apache.logging.slf4j.Log4jLoggerFactory]
No default environment specified.
Searching for '/tmp/flink-1.12.0/conf/sql-client-defaults.yaml'...found.
Reading default environment from:
file:/tmp/flink-1.12.0/conf/sql-client-defaults.yaml
No session environment specified.


Exception in thread "main" org.apache.flink.table.client.SqlClientException:
Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
Could not create execution context.
at
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878)
at
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could
not find a suitable table factory for
'org.apache.flink.table.factories.CatalogFactory' in
the classpath.

Reason: Required context properties mismatch.

The following properties are requested:
hive-conf-dir=/etc/hive/conf
type=hive

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
at
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.createCatalog(ExecutionContext.java:383)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$null$5(ExecutionContext.java:634)
at java.util.HashMap.forEach(HashMap.java:1280)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:633)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:266)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:632)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:529)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:185)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:138)
at
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:867)
... 3 more













--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-21 Thread yujianbo
各位大佬好:
 请问Flink on  yarn 如何指定固定几台yarn节点当做flink任务的运行节点?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


关于flink cdc sql转出Stream流问题

2020-11-30 Thread yujianbo
代码采用sql方式接入mysql cdc数据然后转出Stream流, 写入kudu,但是不知道怎么去获取
row里面的主键字段是哪一个和字段名称和类型等?
或者toRetractStream可以指定其他的class???

下面是代码
==
tableEnv.executeSql(createTableSql);
Table table = tableEnv.sqlQuery(querySql);
DataStream> dataStream =
tableEnv.toRetractStream(table, Row.class);
dataStream.print().setParallelism(1);
==



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink CDC 遇到关于不发生packet造成的卡顿问题该如何解决

2020-11-25 Thread yujianbo
感谢Jark!
上次调整了mysql连接参数解决了超时问题,但是目前还是同步这张表的时候,就是在Snapshot快结束阶段卡主,报连接异常,请问这个地方应该排查哪个地方?

一、环境:
1、版本:1.11.2
2、flink CDC 用Stream  API 从mysql  同步到kudu
3、*这张表有3400万数据,老是在3340左右就卡住,目前已经把sink 到kudu直接改成 print输出还是会出现一模一样的报错。*


日志如下:
==
2020-11-26 14:00:15,293 ERROR *io.debezium.connector.mysql.SnapshotReader*  

[] - Failed due to error: Aborting snapshot due to error when last running
'SELECT * FROM `uchome`.`forums_post_12`': *Communications link failure*

The last packet successfully received from the server was 16 milliseconds
ago.  The last packet sent successfully to the server was 335,794
milliseconds ago.
*org.apache.kafka.connect.errors.ConnectException: Communications link
failure*

The last packet successfully received from the server was 16 milliseconds
ago.  The last packet sent successfully to the server was 335,794
milliseconds ago. Error code: 0; SQLSTATE: 08S01.
at 
io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
~[flinkcdc4mysql2kudu.jar:?]
at
io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
~[flinkcdc4mysql2kudu.jar:?]
at
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831)
~[flinkcdc4mysql2kudu.jar:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_231]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_231]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
*Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException:
Communications link failure*

The last packet successfully received from the server was 16 milliseconds
ago.  The last packet sent successfully to the server was 335,794
milliseconds ago.
at
com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:174)
~[flinkcdc4mysql2kudu.jar:?]
at
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64)
~[flinkcdc4mysql2kudu.jar:?]
at com.mysql.cj.jdbc.ConnectionImpl.commit(ConnectionImpl.java:813)
~[flinkcdc4mysql2kudu.jar:?]
at
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:747)
~[flinkcdc4mysql2kudu.jar:?]
... 3 more
Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications
link failure

The last packet successfully received from the server was 16 milliseconds
ago.  The last packet sent successfully to the server was 335,794
milliseconds ago.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
~[?:1.8.0_231]
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
~[?:1.8.0_231]
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
~[?:1.8.0_231]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
~[?:1.8.0_231]
at
com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:61)
~[flinkcdc4mysql2kudu.jar:?]
at
com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:105)
~[flinkcdc4mysql2kudu.jar:?]
at
com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:151)
~[flinkcdc4mysql2kudu.jar:?]
at
com.mysql.cj.exceptions.ExceptionFactory.createCommunicationsException(ExceptionFactory.java:167)
~[flinkcdc4mysql2kudu.jar:?]
at
com.mysql.cj.protocol.a.NativeProtocol.clearInputStream(NativeProtocol.java:837)
~[flinkcdc4mysql2kudu.jar:?]
at
com.mysql.cj.protocol.a.NativeProtocol.sendCommand(NativeProtocol.java:652)
~[flinkcdc4mysql2kudu.jar:?]
at
com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:986)
~[flinkcdc4mysql2kudu.jar:?]
at
com.mysql.cj.protocol.a.NativeProtocol.sendQueryString(NativeProtocol.java:921)
~[flinkcdc4mysql2kudu.jar:?]
at com.mysql.cj.NativeSession.execSQL(NativeSession.java:1165)
~[flinkcdc4mysql2kudu.jar:?]
at com.mysql.cj.jdbc.ConnectionImpl.commit(ConnectionImpl.java:801)
~[flinkcdc4mysql2kudu.jar:?]
at
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:747)
~[flinkcdc4mysql2kudu.jar:?]
... 3 more
Caused by: java.io.IOException: Socket is closed
at
com.mysql.cj.protocol.AbstractSocketConnection.getMysqlInput(AbstractSocketConnection.java:72)
~[flinkcdc4mysql2kudu.jar:?]
at
com.mysql.cj.protocol.a.NativeProtocol.clearInputStream(NativeProtocol.java:833)
~[flinkcdc4mysql2kudu.jar:?]
at
com.mysql.cj.protocol.a.NativeProtocol.sendCommand(NativeProtocol.java:652)
~[flinkcdc4mysql2kudu.jar:?]
at
com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:986)
~[flinkcdc4mysql2kudu.jar:?]
at
com.mysql.cj.protocol.a.NativeProtocol.sendQueryString(NativeProtocol.java:921)
~[flinkcdc4mysql2kudu.jar:?]
at 

Re: Flink CDC 遇到关于不发生packet造成的卡顿问题该如何解决

2020-11-25 Thread yujianbo
感谢Jark的回答,还想请问大佬,想问社区的mysql cdc 的wiki上说具有许多的不同的 server id去连接mysql服务器,会造成mysql
cpu和连接高峰。那想问我们cdc采用sql指定不同的 serverid 去拉不同的表, 是不是这样的cdc 任务也不要太多?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink CDC 遇到关于不发生packet造成的卡顿问题该如何解决

2020-11-25 Thread yujianbo
主要是为了实现解析自定义的schema,sink端好输出到下游。
想请教一个问题:
https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector#set-a-differnet-server-id-for-each-job
看了上面这个链接关于为每个作业设置一个differnet server id的问题。我看sql可以指定不同的server id,所以有下面这三个疑惑:
1、 如果是不同的stream 任务 的它的server id是不是同一个?
2、不同的stream 任务 同步同一个数据库的不同表是不是没有问题
3、不同的stream 任务 同步同一个数据库的同一张表是不是有问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink CDC 遇到关于不发生packet造成的卡顿问题该如何解决

2020-11-24 Thread yujianbo
一、环境:
1、版本:1.11.2 
2、flink CDC 用Stream  API 从mysql  同步到kudu

二、遇到的问题现象:
1、目前线上已经同步了几张mysql表到kudu了,mysql的量级都在3千万左右。
 但是有一张mysql表同步了几次都遇到一个问题:大概能判断在全量阶段,还没到增量阶段。

错误日志在下面。目前想采取“autoReconnect=true”看看来避免,到是不应该加在哪个地方,看日志感觉加了这个参数也是治标不治本,重点是为啥不发送packet,造成了卡顿?

 下面是具体报错:
==
2020-11-24 20:00:37,547 *ERROR io.debezium.connector.mysql.SnapshotReader * 

[] - Failed due to error: Aborting snapshot due to error when last running
'SELECT * FROM `uchome`.`forums_post_12`': *The last packet successfully
received from the server was 39 milliseconds ago.  The last packet sent
successfully to the server was 6,772,615 milliseconds ago. is longer than
the server configured value of 'wait_timeout'. You should consider either
expiring and/or testing connection validity before use in your application,
increasing the server configured values for client timeouts, or using the
Connector/J connection property 'autoReconnect=true' to avoid this problem.*
org.apache.kafka.connect.errors.ConnectException: The last packet
successfully received from the server was 39 milliseconds ago.  The last
packet sent successfully to the server was 6,772,615 milliseconds ago. is
longer than the server configured value of 'wait_timeout'. You should
consider either expiring and/or testing connection validity before use in
your application, increasing the server configured values for client
timeouts, or using the Connector/J connection property 'autoReconnect=true'
to avoid this problem. Error code: 0; SQLSTATE: 08S01.
at 
io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
at
io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
at
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831)
~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_231]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_231]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
*Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException: The last
packet successfully received from the server was 39 milliseconds ago.  The
last packet sent successfully to the server was 6,772,615 milliseconds ago.
is longer than the server configured value of 'wait_timeout'. *You should
consider either expiring and/or testing connection validity before use in
your application, increasing the server configured values for client
timeouts, or using the Connector/J connection property 'autoReconnect=true'
to avoid this problem.
at
com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:174)
~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
at
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64)
~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
===



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink1.10 history server无法监控 FlinkSQL任务

2020-10-23 Thread yujianbo
大佬,我发现我配置完后就只能看到完成的任务在history sever上面,失败的看不到。现在疑惑的是失败的能不能出现在history server



--
Sent from: http://apache-flink.147419.n8.nabble.com/