flink 1.13.2 ?? Java/Scala ?????????? Python UDF??????????yarn-application??????yarn????????????????????????pyflink?

2021-11-08 文章 Asahi Lee
HI!
  ??flink 1.13.2??java table 
apipython 
udf??yarn-applicationyarn??pyflink?

Re: FlinkSQL 使用 streamingSink 写入 hive orc数据,如何控制文件数量。

2021-11-08 文章 yidan zhao
关于FlinkSQL写hive,orc格式,性能和稳定性方面有什么建议吗。
比如并行度设置多少合理,目前compact-coordinator并行度定死为1,不可更改应该,compact-operator是60,日常来看compact-operator经常是红色,busy100%。目前问题是偶尔会发现检查点失败,延迟等,导致实际现象是文件没合并,进而inode不足。(我们的inode的quota不足实际是)。

4个task节点,source、compact-coordinator、compact-operator、partition-commiter,分别考虑什么设置并行度呢,仅针对能设置的部分。
比如souce部分我主要考虑数据量,不清楚这个compact-operator的并行主要考虑啥,也是数据量吗?


此外,也没有可能kafka2hdfs和compact分成2个线做,互不影响。

Caizhi Weng  于2021年11月5日周五 下午1:35写道:

> Hi!
>
> 1 换言之,是针对每个检查点,合并了多个并发subtask产生的文件对吧。
>
> 正确
>
> 2 除此以外,多个检查点之间的文件是没有办法合并的对吧。
>
> 正确
>
> 实际部分节点做的是后台IO了事情,是不是反映不到busy情况上
>
> 是的,busy 的计算方式是通过采样看有多少个线程正在工作。对于 sink 这种线程都在等待后台 io 的节点来说确实 busy 值不会很高。
>
> yidan zhao  于2021年11月4日周四 下午5:57写道:
>
> > hi,还想继续问下。这个合并机制,根据文档介绍如下。
> > Whether to enable automatic compaction in streaming sink or not. The data
> > will be written to temporary files. After the checkpoint is completed,
> the
> > temporary files generated by a checkpoint will be compacted. The
> temporary
> > files are invisible before compaction.
> > 看文档,是指每次检查点完成后,会将单个检查点产生的文件进行合并。也就是说只有单个检查点产生的文件会被合并。
> > 1 换言之,是针对每个检查点,合并了多个并发subtask产生的文件对吧。
> >
> > 2 除此以外,多个检查点之间的文件是没有办法合并的对吧。
> >
> > 3 另外一个问题:目前看flinksql写hive,streaming情况。从web
> >
> >
> ui上看不开启compact情况下,几乎每个节点都是蓝色,而且数据量不大。开启compact情况,几乎也都是蓝色,数据量也不大,但只有compact节点是持续红色。
> >
> >
> 按照我的理解写hive这种情况下,实际部分节点做的是后台IO了事情,是不是反映不到busy情况上,busy比如只考虑对接受元素的处理,至于这个元素导致这个算子有多少background的工作并反映不出来。对吗。
> > 所以即使看起来都是蓝色的,也不能降低并行度,而是自行根据数据量采用一个差不多的并行度。
> >
>


Flink SQL Join ????????minBatch ????????

2021-11-08 文章 ????
Hi :
 ??Flink 1.12 SQL   
??Join ??Kafka DB ?? FOR 
SYSTEM_TIME AS Temporal Joins ?? ?? 
??  In ?? QPS ?? 
??
 


 ---
Best,
WuKong

Flink SQL Join 如何使用minBatch 方式查询

2021-11-08 文章 WuKong
Hi :
   我现在有一个场景,基于Flink 1.12 SQL 来实现, 想查询下游数据, 大概逻辑就是多张表Join 其中一张是Kafka 表,其他的是DB 
表,我基于处理时间 FOR SYSTEM_TIME AS 做了Temporal Joins , 但是下游数据库查询压力比较大, 现在想通过延迟 批量 In 
的方式 减小QPS ,请问如何配置 可以调这个时间长度,我理解默认就是来一条就查询一次



---
Best,
WuKong


回复: Re: 提交flink作业抛 java.lang.LinkageError

2021-11-08 文章 WuKong
Hi :
看报错日志,还是类加载问题 提示的报错信息 是说已经由不同类加载器已经加装了改依赖。如果生产环境上已经由了相关依赖包,建议将依赖设置为provided

Caused by: java.lang.LinkageError: loader constraint violation: loader
>> (instance of org/apache/flink/util/ChildFirstClassLoader) previously
>> initiated loading for a different type with name
>> "org/apache/kafka/clients/consumer/ConsumerRecord"



---
Best,
WuKong
 
发件人: casel.chen
发送时间: 2021-11-08 14:38
收件人: user-zh
主题: Re:Re: 提交flink作业抛 java.lang.LinkageError
版本是一致的,都是1.12.5版本
 
 
 
 
在 2021-11-08 11:11:35,"Shuiqiang Chen"  写道:
>Hi,
>
>能检查下作业jar里 kafka client的版本和平台上的是否一致吗?
>
>casel.chen  于2021年11月5日周五 下午11:25写道:
>
>> 我在公司实时计算平台上提交了一个streaming api写的作业,结果抛如下异常。因为我们的实时计算平台是以flink
>> sql为主的,上面已经集成了flink-kafka-connector。而我提交的作业也是需要从kafka消费,所以将相同版本的flink kafka
>> connector也打进了作业jar包内。请问是什么原因造成的,需要如何修复?谢谢!
>>
>>
>> 2021-11-05 16:38:58 -  [submit-session-executor-6] ERROR
>> c.h.s.launcher.AbstractJobExecutor - -start job failed-
>>
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> caused an error:
>>
>>
>>
>>
>>
>>
>> Classpath:
>> [file:/opt/streamsql/jobs/aml-aml-aml/aml-datasync/TEST/aml-datasync-1.0-SNAPSHOT_zwb3274543418822102949.jar]
>>
>>
>>
>>
>>
>>
>> System.out: (none)
>>
>>
>>
>>
>>
>>
>> System.err: (none)
>>
>>
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:264)
>>
>>
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:172)
>>
>>
>> at
>> com.huifu.streamsql.launcher.AbstractJobExecutor.createJobGraph(AbstractJobExecutor.java:205)
>>
>>
>> at
>> com.huifu.streamsql.launcher.standalone.RemoteExecutor.doStart(RemoteExecutor.java:31)
>>
>>
>> at
>> com.huifu.streamsql.launcher.AbstractJobExecutor.start(AbstractJobExecutor.java:51)
>>
>>
>> at com.huifu.streamsql.launcher.JobCommand$1.execute(JobCommand.java:15)
>>
>>
>> at
>> com.huifu.streamsql.service.StreamSqlServiceImpl.submitJob(StreamSqlServiceImpl.java:443)
>>
>>
>> at
>> com.huifu.kunpeng.service.DeploymentServiceImpl.submitJob(DeploymentServiceImpl.java:1662)
>>
>>
>> at
>> com.huifu.kunpeng.service.DeploymentServiceImpl.launchDeployment(DeploymentServiceImpl.java:1623)
>>
>>
>> at
>> com.huifu.kunpeng.service.DeploymentServiceImpl$$FastClassBySpringCGLIB$$855501cb.invoke()
>>
>>
>> at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
>>
>>
>> at
>> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
>>
>>
>> at
>> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
>>
>>
>> at
>> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
>>
>>
>> at
>> org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor.invoke(AnnotationAwareRetryOperationsInterceptor.java:156)
>>
>>
>> at
>> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
>>
>>
>> at
>> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
>>
>>
>> at
>> org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
>>
>>
>> at
>> com.huifu.kunpeng.service.DeploymentServiceImpl$$EnhancerBySpringCGLIB$$9aed5b42.launchDeployment()
>>
>>
>> at
>> com.huifu.kunpeng.runner.SubmitQueueApplicationRunner.lambda$run$0(SubmitQueueApplicationRunner.java:63)
>>
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>
>>
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>> Caused by: java.lang.LinkageError: loader constraint violation: loader
>> (instance of org/apache/flink/util/ChildFirstClassLoader) previously
>> initiated loading for a different type with name
>> "org/apache/kafka/clients/consumer/ConsumerRecord"
>>
>>
>> at java.lang.ClassLoader.defineClass1(Native Method)
>>
>>
>> at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
>>
>>
>> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>
>>
>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>
>>
>> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>
>>
>> at java.security.AccessController.doPrivileged(Native Method)


????: ????????

2021-11-08 文章 WuKong
Hi : 
   ??   user-zh-unsubscr...@flink.apache.org ?? 
??/  ?? 
https://flink.apache.org/zh/community.html 




---
Best,
WuKong
 
 ??
?? 2021-11-08 14:42
 user-zh
?? 
 


回复: Re: 提交flink作业抛 java.lang.LinkageError

2021-11-08 文章 WuKong
Hi :
看报错日志,还是类加载问题 提示的报错信息 是说已经由不同类加载器已经加装了改依赖。如果生产环境上已经由了相关依赖包,建议将依赖设置为provided

Caused by: java.lang.LinkageError: loader constraint violation: loader
>> (instance of org/apache/flink/util/ChildFirstClassLoader) previously
>> initiated loading for a different type with name
>> "org/apache/kafka/clients/consumer/ConsumerRecord"



---
Best,
WuKong
 
发件人: casel.chen
发送时间: 2021-11-08 14:38
收件人: user-zh
主题: Re:Re: 提交flink作业抛 java.lang.LinkageError
版本是一致的,都是1.12.5版本
 
 
 
 
在 2021-11-08 11:11:35,"Shuiqiang Chen"  写道:
>Hi,
>
>能检查下作业jar里 kafka client的版本和平台上的是否一致吗?
>
>casel.chen  于2021年11月5日周五 下午11:25写道:
>
>> 我在公司实时计算平台上提交了一个streaming api写的作业,结果抛如下异常。因为我们的实时计算平台是以flink
>> sql为主的,上面已经集成了flink-kafka-connector。而我提交的作业也是需要从kafka消费,所以将相同版本的flink kafka
>> connector也打进了作业jar包内。请问是什么原因造成的,需要如何修复?谢谢!
>>
>>
>> 2021-11-05 16:38:58 -  [submit-session-executor-6] ERROR
>> c.h.s.launcher.AbstractJobExecutor - -start job failed-
>>
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> caused an error:
>>
>>
>>
>>
>>
>>
>> Classpath:
>> [file:/opt/streamsql/jobs/aml-aml-aml/aml-datasync/TEST/aml-datasync-1.0-SNAPSHOT_zwb3274543418822102949.jar]
>>
>>
>>
>>
>>
>>
>> System.out: (none)
>>
>>
>>
>>
>>
>>
>> System.err: (none)
>>
>>
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:264)
>>
>>
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:172)
>>
>>
>> at
>> com.huifu.streamsql.launcher.AbstractJobExecutor.createJobGraph(AbstractJobExecutor.java:205)
>>
>>
>> at
>> com.huifu.streamsql.launcher.standalone.RemoteExecutor.doStart(RemoteExecutor.java:31)
>>
>>
>> at
>> com.huifu.streamsql.launcher.AbstractJobExecutor.start(AbstractJobExecutor.java:51)
>>
>>
>> at com.huifu.streamsql.launcher.JobCommand$1.execute(JobCommand.java:15)
>>
>>
>> at
>> com.huifu.streamsql.service.StreamSqlServiceImpl.submitJob(StreamSqlServiceImpl.java:443)
>>
>>
>> at
>> com.huifu.kunpeng.service.DeploymentServiceImpl.submitJob(DeploymentServiceImpl.java:1662)
>>
>>
>> at
>> com.huifu.kunpeng.service.DeploymentServiceImpl.launchDeployment(DeploymentServiceImpl.java:1623)
>>
>>
>> at
>> com.huifu.kunpeng.service.DeploymentServiceImpl$$FastClassBySpringCGLIB$$855501cb.invoke()
>>
>>
>> at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
>>
>>
>> at
>> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
>>
>>
>> at
>> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
>>
>>
>> at
>> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
>>
>>
>> at
>> org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor.invoke(AnnotationAwareRetryOperationsInterceptor.java:156)
>>
>>
>> at
>> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
>>
>>
>> at
>> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
>>
>>
>> at
>> org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
>>
>>
>> at
>> com.huifu.kunpeng.service.DeploymentServiceImpl$$EnhancerBySpringCGLIB$$9aed5b42.launchDeployment()
>>
>>
>> at
>> com.huifu.kunpeng.runner.SubmitQueueApplicationRunner.lambda$run$0(SubmitQueueApplicationRunner.java:63)
>>
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>
>>
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>> Caused by: java.lang.LinkageError: loader constraint violation: loader
>> (instance of org/apache/flink/util/ChildFirstClassLoader) previously
>> initiated loading for a different type with name
>> "org/apache/kafka/clients/consumer/ConsumerRecord"
>>
>>
>> at java.lang.ClassLoader.defineClass1(Native Method)
>>
>>
>> at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
>>
>>
>> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>
>>
>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>
>>
>> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>
>>
>> at java.security.AccessController.doPrivileged(Native Method)


Re: flink1.12.4 写入hdfs报错 java.lang.OutOfMemoryError: Direct buffer memory

2021-11-08 文章 Caizhi Weng
Hi!

可以通过配置 taskmanager.memory.task.off-heap.size 指定 direct memory 和 native
memory 的大小,详见 [1]。

[1]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/memory/mem_setup_tm/#%e9%85%8d%e7%bd%ae%e5%a0%86%e5%a4%96%e5%86%85%e5%ad%98%e7%9b%b4%e6%8e%a5%e5%86%85%e5%ad%98%e6%88%96%e6%9c%ac%e5%9c%b0%e5%86%85%e5%ad%98

xiao cai  于2021年11月8日周一 下午10:20写道:

> 通过flink 1.12.4 streaming file sink 写入hdfs,运行过程中抛出以下异常:
>
>
> 2021-11-08 20:39:05
> java.io.IOException: java.lang.OutOfMemoryError: Direct buffer memory
> at
> org.apache.hadoop.hdfs.DataStreamer$LastExceptionInStreamer.set(DataStreamer.java:299)
> at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:820)
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory
> at java.nio.Bits.reserveMemory(Bits.java:694)
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
> at sun.nio.ch.IOUtil.write(IOUtil.java:58)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468)
> at org.apache.hadoop.net
> .SocketOutputStream$Writer.performIO(SocketOutputStream.java:63)
> at org.apache.hadoop.net
> .SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
> at org.apache.hadoop.net
> .SocketOutputStream.write(SocketOutputStream.java:159)
> at org.apache.hadoop.net
> .SocketOutputStream.write(SocketOutputStream.java:117)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
> at java.io.DataOutputStream.write(DataOutputStream.java:107)
> at org.apache.hadoop.hdfs.DFSPacket.writeTo(DFSPacket.java:180)
> at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:765)


Re:checkpoint??????????

2021-11-08 文章 sunzili
UnknownHostException: mycluster
 ??
| |


|
|


|


On 11/8/2021 16:04<2572805...@qq.com.INVALID> wrote??
:
flink on yarn ??flink 
hdfs,ark1??hdfs??active??standby
ark2standbyactive
:??flink??checkpoint??hdfs??url??hdfs:ark:8082 
,standby??,




hdfs??mycluster checkpoint
final String HADOOP_CONF_DIR = "/etc/hadoop/conf";
org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
configuration.addResource(new Path(HADOOP_CONF_DIR + "/core-site.xml"));
configuration.addResource(new Path(HADOOP_CONF_DIR + "/hdfs-site.xml"));

env.setStateBackend(new FsStateBackend("hdfs://mycluster/flinkCheckpoint"));   
//??

??
Caused by: java.io.IOException: Cannot instantiate file system for URI: 
hdfs://mycluster/flinkCheckpoint
at 
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:196)
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:527)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:408)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.

flink1.12.4 写入hdfs报错 java.lang.OutOfMemoryError: Direct buffer memory

2021-11-08 文章 xiao cai
通过flink 1.12.4 streaming file sink 写入hdfs,运行过程中抛出以下异常:


2021-11-08 20:39:05
java.io.IOException: java.lang.OutOfMemoryError: Direct buffer memory
at 
org.apache.hadoop.hdfs.DataStreamer$LastExceptionInStreamer.set(DataStreamer.java:299)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:820)
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:694)
at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
at sun.nio.ch.IOUtil.write(IOUtil.java:58)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468)
at 
org.apache.hadoop.net.SocketOutputStream$Writer.performIO(SocketOutputStream.java:63)
at 
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
at 
org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:159)
at 
org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:117)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.hadoop.hdfs.DFSPacket.writeTo(DFSPacket.java:180)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:765)

flinksql insert????

2021-11-08 文章 ??????
sql :
String sql1="CREATE TABLE detal (\n" +
"   id INT,\n" +
"  produceId VARCHAR,\n"+
"  color VARCHAR,\n"+
"  size VARCHAR,\n"+
"  PRIMARY KEY (id) NOT ENFORCED\n"+
") WITH (\n" +
"  'connector' = 'jdbc',\n" +
"  'url' = 'jdbc:mysql://ark1:3306/test_1', \n" +
"  'username' = 'ark_admin', \n" +
"  'password' = 'ark_Admin@0927Da', \n" +
"  'table-name' = 'detal'\n" +
")\n";
tenv.executeSql(sql1).print();

String sql2 = "SELECT * FROM detal";
tenv.executeSql(sql2).print();

String sql3="CREATE TABLE shangping (\n" +
"   id INT,\n" +
"  orderId INT,\n"+
"  produceId VARCHAR,\n"+
"  PRIMARY KEY (id) NOT ENFORCED\n"+
") WITH (\n" +
"  'connector' = 'jdbc',\n" +
"  'url' = 'jdbc:mysql://ark1:3306/test_1', \n" +
"  'username' = 'ark_admin', \n" +
"  'password' = 'ark_Admin@0927Da', \n" +
"  'table-name' = 'shangping'\n" +
")\n";
tenv.executeSql(sql3).print();

String sql4 = "SELECT * FROM shangping";
tenv.executeSql(sql4).print();

String sql5="CREATE TABLE new_table (\n" +
"   id INT,\n" +
"  orderId INT,\n"+
"  produceId VARCHAR,\n"+
"  color VARCHAR,\n"+
"  size VARCHAR\n"+
   // "  PRIMARY KEY (id) NOT ENFORCED\n"+
") WITH (\n" +
"  'connector' = 'jdbc',\n" +
"  'url' = 'jdbc:mysql://ark1:3306/test_1', \n" +
"  'username' = 'ark_admin', \n" +
"  'password' = 'ark_Admin@0927Da', \n" +
"  'table-name' = 'new_table'\n" +
")\n";
tenv.executeSql(sql5).print();
String sql6 = "SELECT * FROM new_table";

tenv.executeSql(sql6).print();
String insertSql = "insert into new_table " +
"select * " +
"from detal";


:??sink










:

java.lang.IllegalArgumentException: open() failed.Table 'test_1.new_table' 
doesn't exist
 at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.open(JdbcRowDataInputFormat.java:215)
 at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:84)
 at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
 at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table 
'test_1.new_table' doesn't exist
 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at 
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
 at com.mysql.jdbc.Util.handleNewInstance(Util.java:406)
 at com.mysql.jdbc.Util.getInstance(Util.java:381)
 at 
com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1030)
 at 
com.mysql.jdbc.SQLError.createSQLException(SQLError.java:956)
 at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3558)
 at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3490)
 at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1959)
 at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2109)
 at 
com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2643)
 at 
com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2077)
 at 
com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2228)
 at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.open(JdbcRowDataInputFormat.java:212)
 ... 4 common frames omitted


??




Flink SQL Join 如何使用minBatch 方式查询

2021-11-08 文章 WuKong
Hi :
   我现在有一个场景,基于Flink 1.12 SQL 来实现, 想查询下游数据, 大概逻辑就是多张表Join 其中一张是Kafka 表,其他的是DB 
表,我基于处理时间 FOR SYSTEM_TIME AS 做了Temporal Joins , 但是下游数据库查询压力比较大, 现在想通过延迟 批量 In 
的方式 减小QPS ,请问如何配置 可以调这个时间长度,我理解默认就是来一条就查询一次



---
Best,
WuKong


Re: flink的一个场景问题

2021-11-08 文章 Caizhi Weng
Hi!

一般将结果写到外部系统是通过 sink 节点。如果 Flink 没有内置你需要的 connector,可以考虑继承并实现
SinkFunction(很基本的 sink)或 RichSinkFunction(带 checkpoint 等功能)等自定义 sink,然后通过
DataStream#addSink 方法把这个 sink 加在 datastream 的末尾。

陈卓宇 <2572805...@qq.com.invalid> 于2021年11月8日周一 下午2:40写道:

> 场景:我对一批kafa中的数据使用flink进行消费,然后通过process算子进行处理加工,将其写更新到三方的数据存储介质中
> 问题:在上述的过程中并没有sink的阶段,直接在process中处理完成后写入到存储介质里了,sink没办法只能写一个print()
> ,这种场景有更优的解决方案么?
>
>
>
>
>
> 陈卓宇
>
>
> 


????: ????????

2021-11-08 文章 WuKong
Hi : 
   ??   user-zh-unsubscr...@flink.apache.org ?? 
??/  ?? 
https://flink.apache.org/zh/community.html 




---
Best,
WuKong
 
 ??
?? 2021-11-08 14:42
 user-zh
?? 
 


回复: Re: 提交flink作业抛 java.lang.LinkageError

2021-11-08 文章 WuKong
Hi :
看报错日志,还是类加载问题 提示的报错信息 是说已经由不同类加载器已经加装了改依赖。如果生产环境上已经由了相关依赖包,建议将依赖设置为provided

Caused by: java.lang.LinkageError: loader constraint violation: loader
>> (instance of org/apache/flink/util/ChildFirstClassLoader) previously
>> initiated loading for a different type with name
>> "org/apache/kafka/clients/consumer/ConsumerRecord"



---
Best,
WuKong
 
发件人: casel.chen
发送时间: 2021-11-08 14:38
收件人: user-zh
主题: Re:Re: 提交flink作业抛 java.lang.LinkageError
版本是一致的,都是1.12.5版本
 
 
 
 
在 2021-11-08 11:11:35,"Shuiqiang Chen"  写道:
>Hi,
>
>能检查下作业jar里 kafka client的版本和平台上的是否一致吗?
>
>casel.chen  于2021年11月5日周五 下午11:25写道:
>
>> 我在公司实时计算平台上提交了一个streaming api写的作业,结果抛如下异常。因为我们的实时计算平台是以flink
>> sql为主的,上面已经集成了flink-kafka-connector。而我提交的作业也是需要从kafka消费,所以将相同版本的flink kafka
>> connector也打进了作业jar包内。请问是什么原因造成的,需要如何修复?谢谢!
>>
>>
>> 2021-11-05 16:38:58 -  [submit-session-executor-6] ERROR
>> c.h.s.launcher.AbstractJobExecutor - -start job failed-
>>
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> caused an error:
>>
>>
>>
>>
>>
>>
>> Classpath:
>> [file:/opt/streamsql/jobs/aml-aml-aml/aml-datasync/TEST/aml-datasync-1.0-SNAPSHOT_zwb3274543418822102949.jar]
>>
>>
>>
>>
>>
>>
>> System.out: (none)
>>
>>
>>
>>
>>
>>
>> System.err: (none)
>>
>>
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:264)
>>
>>
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:172)
>>
>>
>> at
>> com.huifu.streamsql.launcher.AbstractJobExecutor.createJobGraph(AbstractJobExecutor.java:205)
>>
>>
>> at
>> com.huifu.streamsql.launcher.standalone.RemoteExecutor.doStart(RemoteExecutor.java:31)
>>
>>
>> at
>> com.huifu.streamsql.launcher.AbstractJobExecutor.start(AbstractJobExecutor.java:51)
>>
>>
>> at com.huifu.streamsql.launcher.JobCommand$1.execute(JobCommand.java:15)
>>
>>
>> at
>> com.huifu.streamsql.service.StreamSqlServiceImpl.submitJob(StreamSqlServiceImpl.java:443)
>>
>>
>> at
>> com.huifu.kunpeng.service.DeploymentServiceImpl.submitJob(DeploymentServiceImpl.java:1662)
>>
>>
>> at
>> com.huifu.kunpeng.service.DeploymentServiceImpl.launchDeployment(DeploymentServiceImpl.java:1623)
>>
>>
>> at
>> com.huifu.kunpeng.service.DeploymentServiceImpl$$FastClassBySpringCGLIB$$855501cb.invoke()
>>
>>
>> at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
>>
>>
>> at
>> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
>>
>>
>> at
>> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
>>
>>
>> at
>> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
>>
>>
>> at
>> org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor.invoke(AnnotationAwareRetryOperationsInterceptor.java:156)
>>
>>
>> at
>> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
>>
>>
>> at
>> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
>>
>>
>> at
>> org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
>>
>>
>> at
>> com.huifu.kunpeng.service.DeploymentServiceImpl$$EnhancerBySpringCGLIB$$9aed5b42.launchDeployment()
>>
>>
>> at
>> com.huifu.kunpeng.runner.SubmitQueueApplicationRunner.lambda$run$0(SubmitQueueApplicationRunner.java:63)
>>
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>
>>
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>> Caused by: java.lang.LinkageError: loader constraint violation: loader
>> (instance of org/apache/flink/util/ChildFirstClassLoader) previously
>> initiated loading for a different type with name
>> "org/apache/kafka/clients/consumer/ConsumerRecord"
>>
>>
>> at java.lang.ClassLoader.defineClass1(Native Method)
>>
>>
>> at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
>>
>>
>> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>
>>
>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>
>>
>> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>
>>
>> at java.security.AccessController.doPrivileged(Native Method)


回复:Flink1.12 Streaming 消费kafka

2021-11-08 文章 JasonLee
hi


可以使用 setPartitions 方法 
具体参考官网: 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#topic-partition-subscription


Best
JasonLee
在2021年11月8日 17:06,guanyq 写道:
请大佬指导下:
flink streaming可以指定partition消费kafka么
如有100个partition,但是我只想消费15partiton。











Re:取消订阅

2021-11-08 文章 Yuepeng Pan
Hi,




退订请发送任意内容到 user-zh-unsubscr...@flink.apache.org 




Best,

Roc














在 2021-11-08 14:42:33,"张伟明" <821596...@qq.com.INVALID> 写道:
>取消订阅


Re:取消订阅

2021-11-08 文章 Yuepeng Pan



Hi,


退订请发送任意内容到 user-zh-unsubscr...@flink.apache.org 


Best,
Roc














在 2021-11-08 13:58:37,"tanggen...@163.com"  写道:
>取消订阅
>
>
>tanggen...@163.com


Flink1.12 Streaming 消费kafka

2021-11-08 文章 guanyq
请大佬指导下:
flink streaming可以指定partition消费kafka么
如有100个partition,但是我只想消费15partiton。



 





 

checkpoint??????????

2021-11-08 文章 ??????
:
flink on yarn ??flink 
hdfs,ark1??hdfs??active??standby
ark2standbyactive
:??flink??checkpoint??hdfs??url??hdfs:ark:8082 
,standby??,




hdfs??mycluster checkpoint
final String HADOOP_CONF_DIR = "/etc/hadoop/conf";
org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
configuration.addResource(new Path(HADOOP_CONF_DIR + "/core-site.xml"));
configuration.addResource(new Path(HADOOP_CONF_DIR + "/hdfs-site.xml"));

env.setStateBackend(new FsStateBackend("hdfs://mycluster/flinkCheckpoint"));   
//??

??
Caused by: java.io.IOException: Cannot instantiate file system for URI: 
hdfs://mycluster/flinkCheckpoint
at 
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:196)
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:527)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:408)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.