checkpoint??????????

2021-11-07 文章 ??????
:
flink on yarn ??flink 
hdfs,ark1??hdfs??active??standby
ark2standbyactive
:??flink??checkpoint??hdfs??url??hdfs:ark:8082 
,standby??,


??




????????

2021-11-07 文章 ??????


????

2021-11-07 文章 ??????


flink??????????????

2021-11-07 文章 ??????
:kafaflink,process,
:sink??,??process??,sinkprint()
 ,???





??




Re:Re: 提交flink作业抛 java.lang.LinkageError

2021-11-07 文章 casel.chen
版本是一致的,都是1.12.5版本




在 2021-11-08 11:11:35,"Shuiqiang Chen"  写道:
>Hi,
>
>能检查下作业jar里 kafka client的版本和平台上的是否一致吗?
>
>casel.chen  于2021年11月5日周五 下午11:25写道:
>
>> 我在公司实时计算平台上提交了一个streaming api写的作业,结果抛如下异常。因为我们的实时计算平台是以flink
>> sql为主的,上面已经集成了flink-kafka-connector。而我提交的作业也是需要从kafka消费,所以将相同版本的flink kafka
>> connector也打进了作业jar包内。请问是什么原因造成的,需要如何修复?谢谢!
>>
>>
>> 2021-11-05 16:38:58 -  [submit-session-executor-6] ERROR
>> c.h.s.launcher.AbstractJobExecutor - -start job failed-
>>
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> caused an error:
>>
>>
>>
>>
>>
>>
>> Classpath:
>> [file:/opt/streamsql/jobs/aml-aml-aml/aml-datasync/TEST/aml-datasync-1.0-SNAPSHOT_zwb3274543418822102949.jar]
>>
>>
>>
>>
>>
>>
>> System.out: (none)
>>
>>
>>
>>
>>
>>
>> System.err: (none)
>>
>>
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:264)
>>
>>
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:172)
>>
>>
>> at
>> com.huifu.streamsql.launcher.AbstractJobExecutor.createJobGraph(AbstractJobExecutor.java:205)
>>
>>
>> at
>> com.huifu.streamsql.launcher.standalone.RemoteExecutor.doStart(RemoteExecutor.java:31)
>>
>>
>> at
>> com.huifu.streamsql.launcher.AbstractJobExecutor.start(AbstractJobExecutor.java:51)
>>
>>
>> at com.huifu.streamsql.launcher.JobCommand$1.execute(JobCommand.java:15)
>>
>>
>> at
>> com.huifu.streamsql.service.StreamSqlServiceImpl.submitJob(StreamSqlServiceImpl.java:443)
>>
>>
>> at
>> com.huifu.kunpeng.service.DeploymentServiceImpl.submitJob(DeploymentServiceImpl.java:1662)
>>
>>
>> at
>> com.huifu.kunpeng.service.DeploymentServiceImpl.launchDeployment(DeploymentServiceImpl.java:1623)
>>
>>
>> at
>> com.huifu.kunpeng.service.DeploymentServiceImpl$$FastClassBySpringCGLIB$$855501cb.invoke()
>>
>>
>> at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
>>
>>
>> at
>> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
>>
>>
>> at
>> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
>>
>>
>> at
>> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
>>
>>
>> at
>> org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor.invoke(AnnotationAwareRetryOperationsInterceptor.java:156)
>>
>>
>> at
>> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
>>
>>
>> at
>> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
>>
>>
>> at
>> org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
>>
>>
>> at
>> com.huifu.kunpeng.service.DeploymentServiceImpl$$EnhancerBySpringCGLIB$$9aed5b42.launchDeployment()
>>
>>
>> at
>> com.huifu.kunpeng.runner.SubmitQueueApplicationRunner.lambda$run$0(SubmitQueueApplicationRunner.java:63)
>>
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>
>>
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>> Caused by: java.lang.LinkageError: loader constraint violation: loader
>> (instance of org/apache/flink/util/ChildFirstClassLoader) previously
>> initiated loading for a different type with name
>> "org/apache/kafka/clients/consumer/ConsumerRecord"
>>
>>
>> at java.lang.ClassLoader.defineClass1(Native Method)
>>
>>
>> at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
>>
>>
>> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>
>>
>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>
>>
>> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>
>>
>> at java.security.AccessController.doPrivileged(Native Method)


Re:Re:Re: flink启动yarn-session失败

2021-11-07 文章 casel.chen
没有人遇到这类问题吗?



[docker@master flink-1.13.2]$ ./bin/yarn-session.sh

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in 
[jar:file:/home/docker/flink-1.13.2/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in 
[jar:file:/home/docker/hadoop-3.2.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

2021-11-08 14:36:43,651 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: jobmanager.rpc.address, master

2021-11-08 14:36:43,657 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: jobmanager.rpc.port, 6123

2021-11-08 14:36:43,657 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: jobmanager.memory.process.size, 1600m

2021-11-08 14:36:43,657 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: taskmanager.memory.process.size, 1728m

2021-11-08 14:36:43,657 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: taskmanager.numberOfTaskSlots, 8

2021-11-08 14:36:43,657 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: parallelism.default, 1

2021-11-08 14:36:43,658 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: execution.checkpointing.interval, 30s

2021-11-08 14:36:43,658 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: execution.checkpointing.unaligned, true

2021-11-08 14:36:43,658 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: execution.checkpointing.timeout, 1200s

2021-11-08 14:36:43,658 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: state.backend, filesystem

2021-11-08 14:36:43,658 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: state.checkpoints.dir, 
oss://datalake-huifu/hudi/flink/checkpoints

2021-11-08 14:36:43,659 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: state.savepoints.dir, 
oss://datalake-huifu/hudi/flink/savepoints

2021-11-08 14:36:43,659 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: state.backend.incremental, true

2021-11-08 14:36:43,659 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: jobmanager.execution.failover-strategy, region

2021-11-08 14:36:43,659 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: rest.port, 18081

2021-11-08 14:36:43,660 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: classloader.resolve-order, parent-first

2021-11-08 14:36:43,660 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: web.submit.enabled, false

2021-11-08 14:36:43,660 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: fs.oss.endpoint, oss-cn-shanghai.aliyuncs.com

2021-11-08 14:36:43,660 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: fs.oss.accessKeyId, LTAI5tJ4k9pk1KwZVsLd8NHd

2021-11-08 14:36:43,661 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: fs.oss.accessKeySecret, **

2021-11-08 14:36:44,042 WARN  org.apache.hadoop.util.NativeCodeLoader   
   [] - Unable to load native-hadoop library for your platform... using 
builtin-java classes where applicable

2021-11-08 14:36:44,155 INFO  
org.apache.flink.runtime.security.modules.HadoopModule   [] - Hadoop user 
set to docker (auth:SIMPLE)

2021-11-08 14:36:44,171 INFO  
org.apache.flink.runtime.security.modules.JaasModule [] - Jaas file 
will be created as /tmp/jaas-5414790866026859677.conf.

2021-11-08 14:36:44,213 WARN  
org.apache.flink.yarn.configuration.YarnLogConfigUtil[] - The 
configuration directory ('/home/docker/flink-1.13.2/conf') already contains a 
LOG4J config file.If you want to use logback, then please delete or rename the 
log configuration file.

2021-11-08 14:36:44,340 INFO  org.apache.hadoop.yarn.client.RMProxy 
   [] - Connecting to ResourceManager at master/192.168.16.191:8032

2021-11-08 14:36:44,711 INFO  
org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The 

?????? ????????????

2021-11-07 文章 ??????



??







----
??: 
   "user-zh"



取消订阅

2021-11-07 文章 tanggen...@163.com
取消订阅


tanggen...@163.com


Re: 窗口时间不准

2021-11-07 文章 Caizhi Weng
Hi!

如果某个窗口里没有任何数据,那么这个窗口就不会产生。这个输出应该说明数据里没有 11:12:36 ~ 11:12:37 的内容。

陈卓宇 <2572805...@qq.com.invalid> 于2021年11月8日周一 上午11:24写道:

> 场景是:
> 首先使用assignTimestampsAndWatermarks定义了eventTime的时间语义,然后我调用window(TumblingEventTimeWindows.of(Time.seconds(1))).process(...)
> 开一个1秒钟的窗口进行逻辑的计算
> 在这个窗口内进行一个print控制台打印,打印的内容中通过context.window().getEnd()方法拿到窗口结束时间,发现有一部分
> 数据是大于1s的
> 数据:
> == ResultBean(key=ZYSZ01, count=2,
> timestamp=1636341155000, datetime=2021-11-08 11:12:35,
> indicator=elevator.analysis.sum.floor, userablerate=null,
> sumofflinetime=null, forusetime=null, producttime=null)
> == ResultBean(key=ZYSZ01, count=3,
> timestamp=1636341158000, datetime=2021-11-08 11:12:38,
> indicator=elevator.analysis.sum.floor, userablerate=null,
> sumofflinetime=null, forusetime=null, producttime=null)
> == ResultBean(key=ZYSZ01, count=5,
> timestamp=1636341159000, datetime=2021-11-08 11:12:39,
> indicator=elevator.analysis.sum.floor, userablerate=null,
> sumofflinetime=null, forusetime=null, producttime=null)
>
>
> 问题:
>   为什么名名我设置了1s的窗口,还会会出现时间大于1s的情况?
>
> 陈卓宇
>
>
> 


取消订阅

2021-11-07 文章 lpf
取消订阅

????????????

2021-11-07 文章 ??????
??:
assignTimestampsAndWatermarks??eventTime??,??window(TumblingEventTimeWindows.of(Time.seconds(1))).process(...)
  ??1
print??,context.window().getEnd(),
??1s??
:
== ResultBean(key=ZYSZ01, count=2, timestamp=1636341155000, 
datetime=2021-11-08 11:12:35, indicator=elevator.analysis.sum.floor, 
userablerate=null, sumofflinetime=null, forusetime=null, producttime=null)
== ResultBean(key=ZYSZ01, count=3, timestamp=1636341158000, 
datetime=2021-11-08 11:12:38, indicator=elevator.analysis.sum.floor, 
userablerate=null, sumofflinetime=null, forusetime=null, producttime=null)
== ResultBean(key=ZYSZ01, count=5, timestamp=1636341159000, 
datetime=2021-11-08 11:12:39, indicator=elevator.analysis.sum.floor, 
userablerate=null, sumofflinetime=null, forusetime=null, producttime=null)


:
  ??1s??,??1s???

??




Re: 提交flink作业抛 java.lang.LinkageError

2021-11-07 文章 Shuiqiang Chen
Hi,

能检查下作业jar里 kafka client的版本和平台上的是否一致吗?

casel.chen  于2021年11月5日周五 下午11:25写道:

> 我在公司实时计算平台上提交了一个streaming api写的作业,结果抛如下异常。因为我们的实时计算平台是以flink
> sql为主的,上面已经集成了flink-kafka-connector。而我提交的作业也是需要从kafka消费,所以将相同版本的flink kafka
> connector也打进了作业jar包内。请问是什么原因造成的,需要如何修复?谢谢!
>
>
> 2021-11-05 16:38:58 -  [submit-session-executor-6] ERROR
> c.h.s.launcher.AbstractJobExecutor - -start job failed-
>
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> caused an error:
>
>
>
>
>
>
> Classpath:
> [file:/opt/streamsql/jobs/aml-aml-aml/aml-datasync/TEST/aml-datasync-1.0-SNAPSHOT_zwb3274543418822102949.jar]
>
>
>
>
>
>
> System.out: (none)
>
>
>
>
>
>
> System.err: (none)
>
>
> at
> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:264)
>
>
> at
> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:172)
>
>
> at
> com.huifu.streamsql.launcher.AbstractJobExecutor.createJobGraph(AbstractJobExecutor.java:205)
>
>
> at
> com.huifu.streamsql.launcher.standalone.RemoteExecutor.doStart(RemoteExecutor.java:31)
>
>
> at
> com.huifu.streamsql.launcher.AbstractJobExecutor.start(AbstractJobExecutor.java:51)
>
>
> at com.huifu.streamsql.launcher.JobCommand$1.execute(JobCommand.java:15)
>
>
> at
> com.huifu.streamsql.service.StreamSqlServiceImpl.submitJob(StreamSqlServiceImpl.java:443)
>
>
> at
> com.huifu.kunpeng.service.DeploymentServiceImpl.submitJob(DeploymentServiceImpl.java:1662)
>
>
> at
> com.huifu.kunpeng.service.DeploymentServiceImpl.launchDeployment(DeploymentServiceImpl.java:1623)
>
>
> at
> com.huifu.kunpeng.service.DeploymentServiceImpl$$FastClassBySpringCGLIB$$855501cb.invoke()
>
>
> at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
>
>
> at
> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
>
>
> at
> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
>
>
> at
> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
>
>
> at
> org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor.invoke(AnnotationAwareRetryOperationsInterceptor.java:156)
>
>
> at
> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
>
>
> at
> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
>
>
> at
> org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
>
>
> at
> com.huifu.kunpeng.service.DeploymentServiceImpl$$EnhancerBySpringCGLIB$$9aed5b42.launchDeployment()
>
>
> at
> com.huifu.kunpeng.runner.SubmitQueueApplicationRunner.lambda$run$0(SubmitQueueApplicationRunner.java:63)
>
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>
> at java.lang.Thread.run(Thread.java:748)
>
>
> Caused by: java.lang.LinkageError: loader constraint violation: loader
> (instance of org/apache/flink/util/ChildFirstClassLoader) previously
> initiated loading for a different type with name
> "org/apache/kafka/clients/consumer/ConsumerRecord"
>
>
> at java.lang.ClassLoader.defineClass1(Native Method)
>
>
> at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
>
>
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>
>
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>
>
> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>
>
> at java.security.AccessController.doPrivileged(Native Method)


Re:idea 导入编译报错,类似代码没有提交全

2021-11-07 文章 Yuepeng Pan



Hi, weiguangjin.
 图示挂掉了。请尝试上传外部图床,或者粘贴原始 hint 文本信息。


Best,
Roc.




在 2021-11-08 09:45:08,"weiguangjin"  写道:

所有的maven 依赖已经下载,请教一下社区帮忙检查一下代码是否提交全




 

Re:flink广播流

2021-11-07 文章 Yuepeng Pan



Hi, 俊超.
如果你指的是数据流必须在接受到一个或者多个ddl数据流才能够继续解析的话,那么你可以在ddl流到达算子之前,将数据流存入liststate,当接收到ddl类型的数据流元素后,先解析或处理
 liststate中的数据,而后继续处理当前与后续的来自数据流的元素。
  也可以使用上述方式达到 ‘使用广播流的方式来提前加载mysql表结构的变化’  的逻辑效果。
  
   
[1].https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/state_backends/
   
[2].https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/broadcast_state/


祝好。


Best,
Roc




在 2021-11-08 09:44:52,"程俊超"  写道:
>您好,我想使用广播流的方式来提前加载mysql表结构的变化(ddl),但是会遇到广播流没有数据流到达速度快的情况,导致问题。网上说可以使用liststate来解决这种情况,但是具体应该如何使用呢
>
>
>| |
>程俊超
>|
>|
>邮箱:c_18641943...@163.com
>|
>
>签名由 网易邮箱大师 定制


idea 导入编译报错,类似代码没有提交全

2021-11-07 文章 weiguangjin
所有的maven 依赖已经下载,请教一下社区帮忙检查一下代码是否提交全

flink广播流

2021-11-07 文章 程俊超
您好,我想使用广播流的方式来提前加载mysql表结构的变化(ddl),但是会遇到广播流没有数据流到达速度快的情况,导致问题。网上说可以使用liststate来解决这种情况,但是具体应该如何使用呢


| |
程俊超
|
|
邮箱:c_18641943...@163.com
|

签名由 网易邮箱大师 定制

Re: Tumbling Windows 窗口可开的最小单位

2021-11-07 文章 venn
1ms ,但是在数量不是特别大的场景下,小长度的窗口没有意义,flink 
默认的网络缓冲区超时时间是 100ms


应该不怎么影响性能,就跟你在流中直接跟一个 
process方法差不多(定时器一直在刷)


On 2021/11/5 12:32, 李航飞 wrote:

滚动窗口最小可开多大,100ms?
对性能有什么影响吗?




Re:小心你的flink作业正成为某些人的挖矿工具

2021-11-07 文章 sunzili


During a security analysis of Flink, I noticed that Flink allows for remote 
code execution, is this an issue?

Apache Flink is a framework for executing user-supplied code in clusters. Users 
can submit code to Flink processes, which will be executed unconditionally, 
without any attempts to limit what code can run. Starting other processes, 
establishing network connections or accessing and modifying local files is 
possible.

Historically, we’ve received numerous remote code execution vulnerability 
reports, which we had to reject, as this is by design.

We strongly discourage users to expose Flink processes to the public internet. 
Within company networks or “cloud” accounts, we recommend restricting access to 
a Flink cluster via appropriate means.

Flink Security文档的说明。这个不是flink框架本身解决的问题,是安全团队和诸如漏扫、威胁检测工具应当完成的工作


| |


|
|


|


On 11/5/2021 23:29,casel.chen wrote:
今天在公司遇到一件蹊跷的事情,我之前用于session模式提交作业的flink session集群成为某个别有用心的人运行挖矿机的温床。
表面上他提交了一个flink作业jar包,该jar包经过反编译查看到里面执行了一些shell命令从github外网下载诸好C3Pool等一些挖矿脚本运行,
幸好是测试环境,幸好信息安全部门及时扫描发现该漏洞并定位到有问题的作业jar包。
除了将 web.submit.enabled 参数设置成false外,因为有flink sql无法覆盖的场景,
我们并不能阻止用户提交合法streaming api形式的作业,那么要怎样阻止某些合法用户恶意执行一些非法作业呢?flink框架本身能提供哪些保护手段呢?

回复:Flink Sql读取Hbase表

2021-11-07 文章 zst...@163.com
作为读的数据源时,使用的hbase的sdk 
scanner读取,不是全量读。见org.apache.flink.connector.hbase2.source.AbstractTableInputFormat#nextRecord。


作为维表时,使用Guava 
cache缓存每次join到的key。见org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction#eval。


Best Wishes!
- Yuan
在2021年11月7日 16:26,guanyq 写道:
请大佬指导下:

-- 在 Flink SQL 中注册 HBase 表 "mytable"
CREATETABLEhTable(rowkeyINT,family1ROW,family2ROW,family3ROW,PRIMARYKEY(rowkey)NOTENFORCED)WITH('connector'='hbase-1.4','table-name'='mytable','zookeeper.quorum'='localhost:2181');
Flink sql在读取hbase表时,是一次将数据加载到内存还是每次加载一批数据呀?
其实就是想知道,如果hbase表数据量特别大的时候,Flink sql是如何处理的?





Flink Sql读取Hbase表

2021-11-07 文章 guanyq
请大佬指导下:

-- 在 Flink SQL 中注册 HBase 表 "mytable"
CREATETABLEhTable(rowkeyINT,family1ROW,family2ROW,family3ROW,PRIMARYKEY(rowkey)NOTENFORCED)WITH('connector'='hbase-1.4','table-name'='mytable','zookeeper.quorum'='localhost:2181');
Flink sql在读取hbase表时,是一次将数据加载到内存还是每次加载一批数据呀?
其实就是想知道,如果hbase表数据量特别大的时候,Flink sql是如何处理的?