flink 1.13.2 ?? Java/Scala ?????????? Python UDF??????????yarn-application??????yarn????????????????????????pyflink?
HI! ??flink 1.13.2??java table apipython udf??yarn-applicationyarn??pyflink?
Re: FlinkSQL 使用 streamingSink 写入 hive orc数据,如何控制文件数量。
关于FlinkSQL写hive,orc格式,性能和稳定性方面有什么建议吗。 比如并行度设置多少合理,目前compact-coordinator并行度定死为1,不可更改应该,compact-operator是60,日常来看compact-operator经常是红色,busy100%。目前问题是偶尔会发现检查点失败,延迟等,导致实际现象是文件没合并,进而inode不足。(我们的inode的quota不足实际是)。 4个task节点,source、compact-coordinator、compact-operator、partition-commiter,分别考虑什么设置并行度呢,仅针对能设置的部分。 比如souce部分我主要考虑数据量,不清楚这个compact-operator的并行主要考虑啥,也是数据量吗? 此外,也没有可能kafka2hdfs和compact分成2个线做,互不影响。 Caizhi Weng 于2021年11月5日周五 下午1:35写道: > Hi! > > 1 换言之,是针对每个检查点,合并了多个并发subtask产生的文件对吧。 > > 正确 > > 2 除此以外,多个检查点之间的文件是没有办法合并的对吧。 > > 正确 > > 实际部分节点做的是后台IO了事情,是不是反映不到busy情况上 > > 是的,busy 的计算方式是通过采样看有多少个线程正在工作。对于 sink 这种线程都在等待后台 io 的节点来说确实 busy 值不会很高。 > > yidan zhao 于2021年11月4日周四 下午5:57写道: > > > hi,还想继续问下。这个合并机制,根据文档介绍如下。 > > Whether to enable automatic compaction in streaming sink or not. The data > > will be written to temporary files. After the checkpoint is completed, > the > > temporary files generated by a checkpoint will be compacted. The > temporary > > files are invisible before compaction. > > 看文档,是指每次检查点完成后,会将单个检查点产生的文件进行合并。也就是说只有单个检查点产生的文件会被合并。 > > 1 换言之,是针对每个检查点,合并了多个并发subtask产生的文件对吧。 > > > > 2 除此以外,多个检查点之间的文件是没有办法合并的对吧。 > > > > 3 另外一个问题:目前看flinksql写hive,streaming情况。从web > > > > > ui上看不开启compact情况下,几乎每个节点都是蓝色,而且数据量不大。开启compact情况,几乎也都是蓝色,数据量也不大,但只有compact节点是持续红色。 > > > > > 按照我的理解写hive这种情况下,实际部分节点做的是后台IO了事情,是不是反映不到busy情况上,busy比如只考虑对接受元素的处理,至于这个元素导致这个算子有多少background的工作并反映不出来。对吗。 > > 所以即使看起来都是蓝色的,也不能降低并行度,而是自行根据数据量采用一个差不多的并行度。 > > >
Flink SQL Join ????????minBatch ????????
Hi : ??Flink 1.12 SQL ??Join ??Kafka DB ?? FOR SYSTEM_TIME AS Temporal Joins ?? ?? ?? In ?? QPS ?? ?? --- Best, WuKong
Flink SQL Join 如何使用minBatch 方式查询
Hi : 我现在有一个场景,基于Flink 1.12 SQL 来实现, 想查询下游数据, 大概逻辑就是多张表Join 其中一张是Kafka 表,其他的是DB 表,我基于处理时间 FOR SYSTEM_TIME AS 做了Temporal Joins , 但是下游数据库查询压力比较大, 现在想通过延迟 批量 In 的方式 减小QPS ,请问如何配置 可以调这个时间长度,我理解默认就是来一条就查询一次 --- Best, WuKong
回复: Re: 提交flink作业抛 java.lang.LinkageError
Hi : 看报错日志,还是类加载问题 提示的报错信息 是说已经由不同类加载器已经加装了改依赖。如果生产环境上已经由了相关依赖包,建议将依赖设置为provided Caused by: java.lang.LinkageError: loader constraint violation: loader >> (instance of org/apache/flink/util/ChildFirstClassLoader) previously >> initiated loading for a different type with name >> "org/apache/kafka/clients/consumer/ConsumerRecord" --- Best, WuKong 发件人: casel.chen 发送时间: 2021-11-08 14:38 收件人: user-zh 主题: Re:Re: 提交flink作业抛 java.lang.LinkageError 版本是一致的,都是1.12.5版本 在 2021-11-08 11:11:35,"Shuiqiang Chen" 写道: >Hi, > >能检查下作业jar里 kafka client的版本和平台上的是否一致吗? > >casel.chen 于2021年11月5日周五 下午11:25写道: > >> 我在公司实时计算平台上提交了一个streaming api写的作业,结果抛如下异常。因为我们的实时计算平台是以flink >> sql为主的,上面已经集成了flink-kafka-connector。而我提交的作业也是需要从kafka消费,所以将相同版本的flink kafka >> connector也打进了作业jar包内。请问是什么原因造成的,需要如何修复?谢谢! >> >> >> 2021-11-05 16:38:58 - [submit-session-executor-6] ERROR >> c.h.s.launcher.AbstractJobExecutor - -start job failed- >> >> >> org.apache.flink.client.program.ProgramInvocationException: The program >> caused an error: >> >> >> >> >> >> >> Classpath: >> [file:/opt/streamsql/jobs/aml-aml-aml/aml-datasync/TEST/aml-datasync-1.0-SNAPSHOT_zwb3274543418822102949.jar] >> >> >> >> >> >> >> System.out: (none) >> >> >> >> >> >> >> System.err: (none) >> >> >> at >> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:264) >> >> >> at >> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:172) >> >> >> at >> com.huifu.streamsql.launcher.AbstractJobExecutor.createJobGraph(AbstractJobExecutor.java:205) >> >> >> at >> com.huifu.streamsql.launcher.standalone.RemoteExecutor.doStart(RemoteExecutor.java:31) >> >> >> at >> com.huifu.streamsql.launcher.AbstractJobExecutor.start(AbstractJobExecutor.java:51) >> >> >> at com.huifu.streamsql.launcher.JobCommand$1.execute(JobCommand.java:15) >> >> >> at >> com.huifu.streamsql.service.StreamSqlServiceImpl.submitJob(StreamSqlServiceImpl.java:443) >> >> >> at >> com.huifu.kunpeng.service.DeploymentServiceImpl.submitJob(DeploymentServiceImpl.java:1662) >> >> >> at >> com.huifu.kunpeng.service.DeploymentServiceImpl.launchDeployment(DeploymentServiceImpl.java:1623) >> >> >> at >> com.huifu.kunpeng.service.DeploymentServiceImpl$$FastClassBySpringCGLIB$$855501cb.invoke() >> >> >> at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) >> >> >> at >> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771) >> >> >> at >> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) >> >> >> at >> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) >> >> >> at >> org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor.invoke(AnnotationAwareRetryOperationsInterceptor.java:156) >> >> >> at >> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) >> >> >> at >> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) >> >> >> at >> org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691) >> >> >> at >> com.huifu.kunpeng.service.DeploymentServiceImpl$$EnhancerBySpringCGLIB$$9aed5b42.launchDeployment() >> >> >> at >> com.huifu.kunpeng.runner.SubmitQueueApplicationRunner.lambda$run$0(SubmitQueueApplicationRunner.java:63) >> >> >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> >> >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> >> >> at java.lang.Thread.run(Thread.java:748) >> >> >> Caused by: java.lang.LinkageError: loader constraint violation: loader >> (instance of org/apache/flink/util/ChildFirstClassLoader) previously >> initiated loading for a different type with name >> "org/apache/kafka/clients/consumer/ConsumerRecord" >> >> >> at java.lang.ClassLoader.defineClass1(Native Method) >> >> >> at java.lang.ClassLoader.defineClass(ClassLoader.java:756) >> >> >> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) >> >> >> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) >> >> >> at java.net.URLClassLoader.access$100(URLClassLoader.java:74) >> >> >> at java.net.URLClassLoader$1.run(URLClassLoader.java:369) >> >> >> at java.net.URLClassLoader$1.run(URLClassLoader.java:363) >> >> >> at java.security.AccessController.doPrivileged(Native Method)
????: ????????
Hi : ?? user-zh-unsubscr...@flink.apache.org ?? ??/ ?? https://flink.apache.org/zh/community.html --- Best, WuKong ?? ?? 2021-11-08 14:42 user-zh ??
回复: Re: 提交flink作业抛 java.lang.LinkageError
Hi : 看报错日志,还是类加载问题 提示的报错信息 是说已经由不同类加载器已经加装了改依赖。如果生产环境上已经由了相关依赖包,建议将依赖设置为provided Caused by: java.lang.LinkageError: loader constraint violation: loader >> (instance of org/apache/flink/util/ChildFirstClassLoader) previously >> initiated loading for a different type with name >> "org/apache/kafka/clients/consumer/ConsumerRecord" --- Best, WuKong 发件人: casel.chen 发送时间: 2021-11-08 14:38 收件人: user-zh 主题: Re:Re: 提交flink作业抛 java.lang.LinkageError 版本是一致的,都是1.12.5版本 在 2021-11-08 11:11:35,"Shuiqiang Chen" 写道: >Hi, > >能检查下作业jar里 kafka client的版本和平台上的是否一致吗? > >casel.chen 于2021年11月5日周五 下午11:25写道: > >> 我在公司实时计算平台上提交了一个streaming api写的作业,结果抛如下异常。因为我们的实时计算平台是以flink >> sql为主的,上面已经集成了flink-kafka-connector。而我提交的作业也是需要从kafka消费,所以将相同版本的flink kafka >> connector也打进了作业jar包内。请问是什么原因造成的,需要如何修复?谢谢! >> >> >> 2021-11-05 16:38:58 - [submit-session-executor-6] ERROR >> c.h.s.launcher.AbstractJobExecutor - -start job failed- >> >> >> org.apache.flink.client.program.ProgramInvocationException: The program >> caused an error: >> >> >> >> >> >> >> Classpath: >> [file:/opt/streamsql/jobs/aml-aml-aml/aml-datasync/TEST/aml-datasync-1.0-SNAPSHOT_zwb3274543418822102949.jar] >> >> >> >> >> >> >> System.out: (none) >> >> >> >> >> >> >> System.err: (none) >> >> >> at >> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:264) >> >> >> at >> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:172) >> >> >> at >> com.huifu.streamsql.launcher.AbstractJobExecutor.createJobGraph(AbstractJobExecutor.java:205) >> >> >> at >> com.huifu.streamsql.launcher.standalone.RemoteExecutor.doStart(RemoteExecutor.java:31) >> >> >> at >> com.huifu.streamsql.launcher.AbstractJobExecutor.start(AbstractJobExecutor.java:51) >> >> >> at com.huifu.streamsql.launcher.JobCommand$1.execute(JobCommand.java:15) >> >> >> at >> com.huifu.streamsql.service.StreamSqlServiceImpl.submitJob(StreamSqlServiceImpl.java:443) >> >> >> at >> com.huifu.kunpeng.service.DeploymentServiceImpl.submitJob(DeploymentServiceImpl.java:1662) >> >> >> at >> com.huifu.kunpeng.service.DeploymentServiceImpl.launchDeployment(DeploymentServiceImpl.java:1623) >> >> >> at >> com.huifu.kunpeng.service.DeploymentServiceImpl$$FastClassBySpringCGLIB$$855501cb.invoke() >> >> >> at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) >> >> >> at >> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771) >> >> >> at >> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) >> >> >> at >> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) >> >> >> at >> org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor.invoke(AnnotationAwareRetryOperationsInterceptor.java:156) >> >> >> at >> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) >> >> >> at >> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) >> >> >> at >> org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691) >> >> >> at >> com.huifu.kunpeng.service.DeploymentServiceImpl$$EnhancerBySpringCGLIB$$9aed5b42.launchDeployment() >> >> >> at >> com.huifu.kunpeng.runner.SubmitQueueApplicationRunner.lambda$run$0(SubmitQueueApplicationRunner.java:63) >> >> >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> >> >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> >> >> at java.lang.Thread.run(Thread.java:748) >> >> >> Caused by: java.lang.LinkageError: loader constraint violation: loader >> (instance of org/apache/flink/util/ChildFirstClassLoader) previously >> initiated loading for a different type with name >> "org/apache/kafka/clients/consumer/ConsumerRecord" >> >> >> at java.lang.ClassLoader.defineClass1(Native Method) >> >> >> at java.lang.ClassLoader.defineClass(ClassLoader.java:756) >> >> >> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) >> >> >> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) >> >> >> at java.net.URLClassLoader.access$100(URLClassLoader.java:74) >> >> >> at java.net.URLClassLoader$1.run(URLClassLoader.java:369) >> >> >> at java.net.URLClassLoader$1.run(URLClassLoader.java:363) >> >> >> at java.security.AccessController.doPrivileged(Native Method)
Re: flink1.12.4 写入hdfs报错 java.lang.OutOfMemoryError: Direct buffer memory
Hi! 可以通过配置 taskmanager.memory.task.off-heap.size 指定 direct memory 和 native memory 的大小,详见 [1]。 [1] https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/memory/mem_setup_tm/#%e9%85%8d%e7%bd%ae%e5%a0%86%e5%a4%96%e5%86%85%e5%ad%98%e7%9b%b4%e6%8e%a5%e5%86%85%e5%ad%98%e6%88%96%e6%9c%ac%e5%9c%b0%e5%86%85%e5%ad%98 xiao cai 于2021年11月8日周一 下午10:20写道: > 通过flink 1.12.4 streaming file sink 写入hdfs,运行过程中抛出以下异常: > > > 2021-11-08 20:39:05 > java.io.IOException: java.lang.OutOfMemoryError: Direct buffer memory > at > org.apache.hadoop.hdfs.DataStreamer$LastExceptionInStreamer.set(DataStreamer.java:299) > at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:820) > Caused by: java.lang.OutOfMemoryError: Direct buffer memory > at java.nio.Bits.reserveMemory(Bits.java:694) > at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) > at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) > at sun.nio.ch.IOUtil.write(IOUtil.java:58) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468) > at org.apache.hadoop.net > .SocketOutputStream$Writer.performIO(SocketOutputStream.java:63) > at org.apache.hadoop.net > .SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142) > at org.apache.hadoop.net > .SocketOutputStream.write(SocketOutputStream.java:159) > at org.apache.hadoop.net > .SocketOutputStream.write(SocketOutputStream.java:117) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) > at java.io.DataOutputStream.write(DataOutputStream.java:107) > at org.apache.hadoop.hdfs.DFSPacket.writeTo(DFSPacket.java:180) > at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:765)
Re:checkpoint??????????
UnknownHostException: mycluster ?? | | | | | On 11/8/2021 16:04<2572805...@qq.com.INVALID> wrote?? : flink on yarn ??flink hdfs,ark1??hdfs??active??standby ark2standbyactive :??flink??checkpoint??hdfs??url??hdfs:ark:8082 ,standby??, hdfs??mycluster checkpoint final String HADOOP_CONF_DIR = "/etc/hadoop/conf"; org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(); configuration.addResource(new Path(HADOOP_CONF_DIR + "/core-site.xml")); configuration.addResource(new Path(HADOOP_CONF_DIR + "/hdfs-site.xml")); env.setStateBackend(new FsStateBackend("hdfs://mycluster/flinkCheckpoint")); //?? ?? Caused by: java.io.IOException: Cannot instantiate file system for URI: hdfs://mycluster/flinkCheckpoint at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:196) at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:527) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:408) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274) at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.
flink1.12.4 写入hdfs报错 java.lang.OutOfMemoryError: Direct buffer memory
通过flink 1.12.4 streaming file sink 写入hdfs,运行过程中抛出以下异常: 2021-11-08 20:39:05 java.io.IOException: java.lang.OutOfMemoryError: Direct buffer memory at org.apache.hadoop.hdfs.DataStreamer$LastExceptionInStreamer.set(DataStreamer.java:299) at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:820) Caused by: java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:694) at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) at sun.nio.ch.IOUtil.write(IOUtil.java:58) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468) at org.apache.hadoop.net.SocketOutputStream$Writer.performIO(SocketOutputStream.java:63) at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142) at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:159) at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:117) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.hadoop.hdfs.DFSPacket.writeTo(DFSPacket.java:180) at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:765)
flinksql insert????
sql : String sql1="CREATE TABLE detal (\n" + " id INT,\n" + " produceId VARCHAR,\n"+ " color VARCHAR,\n"+ " size VARCHAR,\n"+ " PRIMARY KEY (id) NOT ENFORCED\n"+ ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://ark1:3306/test_1', \n" + " 'username' = 'ark_admin', \n" + " 'password' = 'ark_Admin@0927Da', \n" + " 'table-name' = 'detal'\n" + ")\n"; tenv.executeSql(sql1).print(); String sql2 = "SELECT * FROM detal"; tenv.executeSql(sql2).print(); String sql3="CREATE TABLE shangping (\n" + " id INT,\n" + " orderId INT,\n"+ " produceId VARCHAR,\n"+ " PRIMARY KEY (id) NOT ENFORCED\n"+ ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://ark1:3306/test_1', \n" + " 'username' = 'ark_admin', \n" + " 'password' = 'ark_Admin@0927Da', \n" + " 'table-name' = 'shangping'\n" + ")\n"; tenv.executeSql(sql3).print(); String sql4 = "SELECT * FROM shangping"; tenv.executeSql(sql4).print(); String sql5="CREATE TABLE new_table (\n" + " id INT,\n" + " orderId INT,\n"+ " produceId VARCHAR,\n"+ " color VARCHAR,\n"+ " size VARCHAR\n"+ // " PRIMARY KEY (id) NOT ENFORCED\n"+ ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://ark1:3306/test_1', \n" + " 'username' = 'ark_admin', \n" + " 'password' = 'ark_Admin@0927Da', \n" + " 'table-name' = 'new_table'\n" + ")\n"; tenv.executeSql(sql5).print(); String sql6 = "SELECT * FROM new_table"; tenv.executeSql(sql6).print(); String insertSql = "insert into new_table " + "select * " + "from detal"; :??sink : java.lang.IllegalArgumentException: open() failed.Table 'test_1.new_table' doesn't exist at org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.open(JdbcRowDataInputFormat.java:215) at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:84) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263) Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table 'test_1.new_table' doesn't exist at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at com.mysql.jdbc.Util.handleNewInstance(Util.java:406) at com.mysql.jdbc.Util.getInstance(Util.java:381) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1030) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:956) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3558) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3490) at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1959) at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2109) at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2643) at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2077) at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2228) at org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.open(JdbcRowDataInputFormat.java:212) ... 4 common frames omitted ??
Flink SQL Join 如何使用minBatch 方式查询
Hi : 我现在有一个场景,基于Flink 1.12 SQL 来实现, 想查询下游数据, 大概逻辑就是多张表Join 其中一张是Kafka 表,其他的是DB 表,我基于处理时间 FOR SYSTEM_TIME AS 做了Temporal Joins , 但是下游数据库查询压力比较大, 现在想通过延迟 批量 In 的方式 减小QPS ,请问如何配置 可以调这个时间长度,我理解默认就是来一条就查询一次 --- Best, WuKong
Re: flink的一个场景问题
Hi! 一般将结果写到外部系统是通过 sink 节点。如果 Flink 没有内置你需要的 connector,可以考虑继承并实现 SinkFunction(很基本的 sink)或 RichSinkFunction(带 checkpoint 等功能)等自定义 sink,然后通过 DataStream#addSink 方法把这个 sink 加在 datastream 的末尾。 陈卓宇 <2572805...@qq.com.invalid> 于2021年11月8日周一 下午2:40写道: > 场景:我对一批kafa中的数据使用flink进行消费,然后通过process算子进行处理加工,将其写更新到三方的数据存储介质中 > 问题:在上述的过程中并没有sink的阶段,直接在process中处理完成后写入到存储介质里了,sink没办法只能写一个print() > ,这种场景有更优的解决方案么? > > > > > > 陈卓宇 > > >
????: ????????
Hi : ?? user-zh-unsubscr...@flink.apache.org ?? ??/ ?? https://flink.apache.org/zh/community.html --- Best, WuKong ?? ?? 2021-11-08 14:42 user-zh ??
回复: Re: 提交flink作业抛 java.lang.LinkageError
Hi : 看报错日志,还是类加载问题 提示的报错信息 是说已经由不同类加载器已经加装了改依赖。如果生产环境上已经由了相关依赖包,建议将依赖设置为provided Caused by: java.lang.LinkageError: loader constraint violation: loader >> (instance of org/apache/flink/util/ChildFirstClassLoader) previously >> initiated loading for a different type with name >> "org/apache/kafka/clients/consumer/ConsumerRecord" --- Best, WuKong 发件人: casel.chen 发送时间: 2021-11-08 14:38 收件人: user-zh 主题: Re:Re: 提交flink作业抛 java.lang.LinkageError 版本是一致的,都是1.12.5版本 在 2021-11-08 11:11:35,"Shuiqiang Chen" 写道: >Hi, > >能检查下作业jar里 kafka client的版本和平台上的是否一致吗? > >casel.chen 于2021年11月5日周五 下午11:25写道: > >> 我在公司实时计算平台上提交了一个streaming api写的作业,结果抛如下异常。因为我们的实时计算平台是以flink >> sql为主的,上面已经集成了flink-kafka-connector。而我提交的作业也是需要从kafka消费,所以将相同版本的flink kafka >> connector也打进了作业jar包内。请问是什么原因造成的,需要如何修复?谢谢! >> >> >> 2021-11-05 16:38:58 - [submit-session-executor-6] ERROR >> c.h.s.launcher.AbstractJobExecutor - -start job failed- >> >> >> org.apache.flink.client.program.ProgramInvocationException: The program >> caused an error: >> >> >> >> >> >> >> Classpath: >> [file:/opt/streamsql/jobs/aml-aml-aml/aml-datasync/TEST/aml-datasync-1.0-SNAPSHOT_zwb3274543418822102949.jar] >> >> >> >> >> >> >> System.out: (none) >> >> >> >> >> >> >> System.err: (none) >> >> >> at >> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:264) >> >> >> at >> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:172) >> >> >> at >> com.huifu.streamsql.launcher.AbstractJobExecutor.createJobGraph(AbstractJobExecutor.java:205) >> >> >> at >> com.huifu.streamsql.launcher.standalone.RemoteExecutor.doStart(RemoteExecutor.java:31) >> >> >> at >> com.huifu.streamsql.launcher.AbstractJobExecutor.start(AbstractJobExecutor.java:51) >> >> >> at com.huifu.streamsql.launcher.JobCommand$1.execute(JobCommand.java:15) >> >> >> at >> com.huifu.streamsql.service.StreamSqlServiceImpl.submitJob(StreamSqlServiceImpl.java:443) >> >> >> at >> com.huifu.kunpeng.service.DeploymentServiceImpl.submitJob(DeploymentServiceImpl.java:1662) >> >> >> at >> com.huifu.kunpeng.service.DeploymentServiceImpl.launchDeployment(DeploymentServiceImpl.java:1623) >> >> >> at >> com.huifu.kunpeng.service.DeploymentServiceImpl$$FastClassBySpringCGLIB$$855501cb.invoke() >> >> >> at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) >> >> >> at >> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771) >> >> >> at >> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) >> >> >> at >> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) >> >> >> at >> org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor.invoke(AnnotationAwareRetryOperationsInterceptor.java:156) >> >> >> at >> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) >> >> >> at >> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) >> >> >> at >> org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691) >> >> >> at >> com.huifu.kunpeng.service.DeploymentServiceImpl$$EnhancerBySpringCGLIB$$9aed5b42.launchDeployment() >> >> >> at >> com.huifu.kunpeng.runner.SubmitQueueApplicationRunner.lambda$run$0(SubmitQueueApplicationRunner.java:63) >> >> >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> >> >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> >> >> at java.lang.Thread.run(Thread.java:748) >> >> >> Caused by: java.lang.LinkageError: loader constraint violation: loader >> (instance of org/apache/flink/util/ChildFirstClassLoader) previously >> initiated loading for a different type with name >> "org/apache/kafka/clients/consumer/ConsumerRecord" >> >> >> at java.lang.ClassLoader.defineClass1(Native Method) >> >> >> at java.lang.ClassLoader.defineClass(ClassLoader.java:756) >> >> >> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) >> >> >> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) >> >> >> at java.net.URLClassLoader.access$100(URLClassLoader.java:74) >> >> >> at java.net.URLClassLoader$1.run(URLClassLoader.java:369) >> >> >> at java.net.URLClassLoader$1.run(URLClassLoader.java:363) >> >> >> at java.security.AccessController.doPrivileged(Native Method)
回复:Flink1.12 Streaming 消费kafka
hi 可以使用 setPartitions 方法 具体参考官网: https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#topic-partition-subscription Best JasonLee 在2021年11月8日 17:06,guanyq 写道: 请大佬指导下: flink streaming可以指定partition消费kafka么 如有100个partition,但是我只想消费15partiton。
Re:取消订阅
Hi, 退订请发送任意内容到 user-zh-unsubscr...@flink.apache.org Best, Roc 在 2021-11-08 14:42:33,"张伟明" <821596...@qq.com.INVALID> 写道: >取消订阅
Re:取消订阅
Hi, 退订请发送任意内容到 user-zh-unsubscr...@flink.apache.org Best, Roc 在 2021-11-08 13:58:37,"tanggen...@163.com" 写道: >取消订阅 > > >tanggen...@163.com
Flink1.12 Streaming 消费kafka
请大佬指导下: flink streaming可以指定partition消费kafka么 如有100个partition,但是我只想消费15partiton。
checkpoint??????????
: flink on yarn ??flink hdfs,ark1??hdfs??active??standby ark2standbyactive :??flink??checkpoint??hdfs??url??hdfs:ark:8082 ,standby??, hdfs??mycluster checkpoint final String HADOOP_CONF_DIR = "/etc/hadoop/conf"; org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(); configuration.addResource(new Path(HADOOP_CONF_DIR + "/core-site.xml")); configuration.addResource(new Path(HADOOP_CONF_DIR + "/hdfs-site.xml")); env.setStateBackend(new FsStateBackend("hdfs://mycluster/flinkCheckpoint")); //?? ?? Caused by: java.io.IOException: Cannot instantiate file system for URI: hdfs://mycluster/flinkCheckpoint at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:196) at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:527) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:408) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274) at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.