JobManager自定义log4j.properties不生效问题

2022-04-12 文章 QQ
现在有这样一个场景,我在同一台客户机上提交flink任务时,想要做到不同的任务的JobManager/TaskManager使用不同的log4j配置。于是通过命令行动态参数-Dlog4j.configurationFile=file:/Users/Documents/log4j.properties
 
设置,结果部署任务后发现配置没有生效,依然使用了FLINK_HOME/conf/log4j.properties。想问一下如何解决这个问题,还是说flink目前不支持通过动态参数方式配置log4j?

Re: flink命令行参数不生效问题

2022-04-11 文章 QQ
非常感谢解答。关于这套新的统一的cli命令的说明或者FLIP在哪里?

> 2022年4月11日 下午11:49,Zhanghao Chen  写道:
> 
> 你好,-m 配合 -yxx 的参数是早期 Flink on YARN 的 cli 参数用法,后来社区开始推进一套新的统一的 cli 命令,使用 -t 
> 指定部署形式,并将原先的 cli options 统一动态参数化,比如原先的 -yxx 命令都能从 
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#yarn
>  找到替代的动态参数。
> Configuration | Apache 
> Flink
> Key Default Type Description; restart-strategy.fixed-delay.attempts: 1: 
> Integer: The number of times that Flink retries the execution before the job 
> is declared as failed if restart-strategy has been set to fixed-delay.: 
> restart-strategy.fixed-delay.delay
> nightlies.apache.org
> 
> 
> Best,
> Zhanghao Chen
> 
> From: gangzi <1139872...@qq.com.INVALID>
> Sent: Monday, April 11, 2022 19:55
> To: user-zh 
> Subject: flink命令行参数不生效问题
> 
> 我用命令提交作业:flink run -t yarn-per-job -ynm SocketWordCount -yqu root.root 
> -d -n SocketWindowWordCount.jar --hostname 10.199.0.97 --port 
> 9878。结果作业提交成功之后发现 -ynm和-yqu不生效。后来通过查看源码发现是因为如果指定了 
> -t,那么-y开头的所有参数都不生效了,因为-y系列参数是在FlinkYarnSessionCli中解析的,而源码中:public 
> CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) {
>   LOG.debug("Custom commandlines: {}", customCommandLines);
>   for (CustomCommandLine cli : customCommandLines) {
> LOG.debug(
> "Checking custom 
> commandline {}, isActive: {}", cli, cli.isActive(commandLine));
> if (cli.isActive(commandLine)) {
>   return cli;
> }
>   }
>   throw new IllegalStateException("No valid command-line found.");
> }
> 这段代码返回的是GenericCLI。导致后面的:
> final Configuration effectiveConfiguration =
> getEffectiveConfiguration(activeCommandLine, 
> commandLine, programOptions, jobJars);
> 这行代码返回的命令行参数配置只包含了GenericCli中定义的参数。想请教一下,-t和-m设置参数时有啥区别?如何解决上述参数不生效的问题?这是一个bug么?



退订

2021-09-13 文章 qq
退订


退订

2021-09-11 文章 qq
退订


写入到Kafka的时候报错ByteArraySerializer could not be found.

2019-09-17 文章 qq邮箱
在运行flink写入到kafka的时候提示如下错误

org.apache.kafka.common.config.ConfigException: Invalid value 
org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArraySerializer
 for configuration key.serializer: Class 
org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArraySerializer
 could not be found.






????: ??HBase??????????????????????????IO????????sink????outputFormat????????

2019-04-07 文章 ????????qq??
??Hbae  
API,Hbase,??Hbase

qq??
?0?2
?0?2??
???0?22019-04-06?0?216:38
?0?2user-zh
???0?2??HBase??IOsinkoutputFormat
HBaseHBase??
Stream API 
??

请教,如何把 RetractStream 不更新的数据 sink 出来

2019-03-31 文章 邓成刚【qq
HI,请教一个问题,SQL API,non  window join 生成的 RetractStream,如何把不更新的数据 sink 
出来(系统会自动清理掉超时的数据,我的需求是SINK 出来后再清理的掉),不知道Side Outputs 
是否可以,如果可以该怎么做,如果不可以,有没有其它的方法,具体该怎么做,谢谢!


结邮 Re: Re: 请教一下Blink资源分配问题

2019-03-29 文章 邓成刚【qq
终于发现是什么问题了,是由于Blink的 配置与FLINK不同导致:
Flink 里没有这个配置:  taskmanager.cpu.core  默认是   1   

另外:blink 里 taskmanager.heap.mb   与 flink 的 taskmanager.heap.size  不同导致   
taskmanager.heap 配置过小,默认1G

之前错误的配置:
# The heap size for the JobManager JVM

jobmanager.heap.size: 20480m


# The heap size for the TaskManager JVM

taskmanager.heap.size: 40960m


# The number of task slots that each TaskManager offers. Each slot runs one 
parallel pipeline.

taskmanager.numberOfTaskSlots: 24



现在配置:


# The heap size for the JobManager JVM

jobmanager.heap.mb: 20480


# The heap size for the TaskManager JVM

taskmanager.heap.mb: 102400


# How many physical cpu cores a task manager will supply for user

taskmanager.cpu.core: 30


# The number of task slots that each TaskManager offers. Each slot runs one 
parallel pipeline.

taskmanager.numberOfTaskSlots: 30



之前安装了Flink 1.7.2,后又安装 了 Blink,我直接拿的Flink 的配置,没注意看,在此希望我的经历能给大家一些提醒,谢谢。 

在此 感谢龙三 的帮忙,和  谢谢大家的回复。。

邓成刚【qq】
 
发件人: 邓成刚【qq】
发送时间: 2019-03-29 17:18
收件人: user-zh
主题: Re: Re: 请教一下Blink资源分配问题
用的是table , LOG里资源申请情况:
 
ResourceProfile{cpuCores=0.6, heapMemoryInMB=96, directMemoryInMB=0, 
nativeMemoryInMB=0, networkMemoryInMB=2, MANAGED_MEMORY_MB=0.0, 
FLOATING_MANAGED_MEMORY_MB=0.0} from resource manager (request = 
5ac9229acae1e6ef90563a5a0bf3fe21).
ResourceProfile{cpuCores=0.6, heapMemoryInMB=96, directMemoryInMB=0, 
nativeMemoryInMB=0, networkMemoryInMB=2, MANAGED_MEMORY_MB=0.0, 
FLOATING_MANAGED_MEMORY_MB=0.0} from resource manager (request = 
2e39ec9bb22b2afe2baa15a87d854796).
ResourceProfile{cpuCores=0.6, heapMemoryInMB=96, directMemoryInMB=0, 
nativeMemoryInMB=0, networkMemoryInMB=2, MANAGED_MEMORY_MB=0.0, 
FLOATING_MANAGED_MEMORY_MB=0.0} for job e40415828bbe184dc051e422e85e98c3 with 
allocation id 1a8a3b10d3c235dcb64b4b91e0a22bc8.
ResourceProfile{cpuCores=0.6, heapMemoryInMB=96, directMemoryInMB=0, 
nativeMemoryInMB=0, networkMemoryInMB=2, MANAGED_MEMORY_MB=0.0, 
FLOATING_MANAGED_MEMORY_MB=0.0} from resource manager (request = 
ce1a10fca1c7a9f6f93dcd248ebce56c).
ResourceProfile{cpuCores=0.6, heapMemoryInMB=96, directMemoryInMB=0, 
nativeMemoryInMB=0, networkMemoryInMB=2, MANAGED_MEMORY_MB=0.0, 
FLOATING_MANAGED_MEMORY_MB=0.0} for job e40415828bbe184dc051e422e85e98c3 with 
allocation id b9ea06e5ba61013ead29424b72e8535c.
ResourceProfile{cpuCores=0.6, heapMemoryInMB=96, directMemoryInMB=0, 
nativeMemoryInMB=0, networkMemoryInMB=2, MANAGED_MEMORY_MB=0.0, 
FLOATING_MANAGED_MEMORY_MB=0.0} for job e40415828bbe184dc051e422e85e98c3 with 
allocation id c3742a90378a264341e8d1a573c67535.
 
 
发件人: Guowei Ma
发送时间: 2019-03-29 17:12
收件人: user-zh
主题: Re: Re: 请教一下Blink资源分配问题
用的什么api,DataStream还是Table?
如果是DataStream的话,申请什么样的资源?
详细列下。
 
Best,
Guowei
 
 
邓成刚【qq】  于2019年3月29日周五 下午5:09写道:
 
> 是的。
>
> 发件人: moxian
> 发送时间: 2019-03-29 17:06
> 收件人: user-zh
> 主题: Re: 请教一下Blink资源分配问题
> standalone 模式?
>
> 邓成刚【qq】  于2019年3月29日周五 上午9:59写道:
>
> > 请教一下Blink资源分配问题:
> > blink 任务并行度设置 20  提示0个满足:Batch request 40 slots, but only 0 are
> fulfilled.
> > 调整到 3 并行度 提示:Batch request 6 slots, but only 4 are fulfilled.,
> > 但是我的TASK SLOTS有配 48,没有其它任务,
> > 按理説没有资源问题啊,集群配置情况:
> >
> > 其它的都是默认配置:
> >
> > taskmanager.numberOfTaskSlots: 24
> >
> > jobmanager.heap.size: 20480m
> >
> > # The heap size for the TaskManager JVM
> >
> > taskmanager.heap.size: 40960m
> >
> >
> > 服务器 2 台:每台 48核,256G
> >
> >
> >

Re: Re: 请教一下Blink资源分配问题

2019-03-29 文章 邓成刚【qq
用的是table , LOG里资源申请情况:

ResourceProfile{cpuCores=0.6, heapMemoryInMB=96, directMemoryInMB=0, 
nativeMemoryInMB=0, networkMemoryInMB=2, MANAGED_MEMORY_MB=0.0, 
FLOATING_MANAGED_MEMORY_MB=0.0} from resource manager (request = 
5ac9229acae1e6ef90563a5a0bf3fe21).
ResourceProfile{cpuCores=0.6, heapMemoryInMB=96, directMemoryInMB=0, 
nativeMemoryInMB=0, networkMemoryInMB=2, MANAGED_MEMORY_MB=0.0, 
FLOATING_MANAGED_MEMORY_MB=0.0} from resource manager (request = 
2e39ec9bb22b2afe2baa15a87d854796).
ResourceProfile{cpuCores=0.6, heapMemoryInMB=96, directMemoryInMB=0, 
nativeMemoryInMB=0, networkMemoryInMB=2, MANAGED_MEMORY_MB=0.0, 
FLOATING_MANAGED_MEMORY_MB=0.0} for job e40415828bbe184dc051e422e85e98c3 with 
allocation id 1a8a3b10d3c235dcb64b4b91e0a22bc8.
ResourceProfile{cpuCores=0.6, heapMemoryInMB=96, directMemoryInMB=0, 
nativeMemoryInMB=0, networkMemoryInMB=2, MANAGED_MEMORY_MB=0.0, 
FLOATING_MANAGED_MEMORY_MB=0.0} from resource manager (request = 
ce1a10fca1c7a9f6f93dcd248ebce56c).
ResourceProfile{cpuCores=0.6, heapMemoryInMB=96, directMemoryInMB=0, 
nativeMemoryInMB=0, networkMemoryInMB=2, MANAGED_MEMORY_MB=0.0, 
FLOATING_MANAGED_MEMORY_MB=0.0} for job e40415828bbe184dc051e422e85e98c3 with 
allocation id b9ea06e5ba61013ead29424b72e8535c.
ResourceProfile{cpuCores=0.6, heapMemoryInMB=96, directMemoryInMB=0, 
nativeMemoryInMB=0, networkMemoryInMB=2, MANAGED_MEMORY_MB=0.0, 
FLOATING_MANAGED_MEMORY_MB=0.0} for job e40415828bbe184dc051e422e85e98c3 with 
allocation id c3742a90378a264341e8d1a573c67535.

 
发件人: Guowei Ma
发送时间: 2019-03-29 17:12
收件人: user-zh
主题: Re: Re: 请教一下Blink资源分配问题
用的什么api,DataStream还是Table?
如果是DataStream的话,申请什么样的资源?
详细列下。
 
Best,
Guowei
 
 
邓成刚【qq】  于2019年3月29日周五 下午5:09写道:
 
> 是的。
>
> 发件人: moxian
> 发送时间: 2019-03-29 17:06
> 收件人: user-zh
> 主题: Re: 请教一下Blink资源分配问题
> standalone 模式?
>
> 邓成刚【qq】  于2019年3月29日周五 上午9:59写道:
>
> > 请教一下Blink资源分配问题:
> > blink 任务并行度设置 20  提示0个满足:Batch request 40 slots, but only 0 are
> fulfilled.
> > 调整到 3 并行度 提示:Batch request 6 slots, but only 4 are fulfilled.,
> > 但是我的TASK SLOTS有配 48,没有其它任务,
> > 按理説没有资源问题啊,集群配置情况:
> >
> > 其它的都是默认配置:
> >
> > taskmanager.numberOfTaskSlots: 24
> >
> > jobmanager.heap.size: 20480m
> >
> > # The heap size for the TaskManager JVM
> >
> > taskmanager.heap.size: 40960m
> >
> >
> > 服务器 2 台:每台 48核,256G
> >
> >
> >

Re: Re: 请教一下Blink资源分配问题

2019-03-29 文章 邓成刚【qq
是的。

发件人: moxian
发送时间: 2019-03-29 17:06
收件人: user-zh
主题: Re: 请教一下Blink资源分配问题
standalone 模式?
 
邓成刚【qq】  于2019年3月29日周五 上午9:59写道:
 
> 请教一下Blink资源分配问题:
> blink 任务并行度设置 20  提示0个满足:Batch request 40 slots, but only 0 are fulfilled.
> 调整到 3 并行度 提示:Batch request 6 slots, but only 4 are fulfilled.,
> 但是我的TASK SLOTS有配 48,没有其它任务,
> 按理説没有资源问题啊,集群配置情况:
>
> 其它的都是默认配置:
>
> taskmanager.numberOfTaskSlots: 24
>
> jobmanager.heap.size: 20480m
>
> # The heap size for the TaskManager JVM
>
> taskmanager.heap.size: 40960m
>
>
> 服务器 2 台:每台 48核,256G
>
>
>

请教一下Blink资源分配问题

2019-03-28 文章 邓成刚【qq
请教一下Blink资源分配问题:
blink 任务并行度设置 20  提示0个满足:Batch request 40 slots, but only 0 are fulfilled.
调整到 3 并行度 提示:Batch request 6 slots, but only 4 are fulfilled.,
但是我的TASK SLOTS有配 48,没有其它任务,
按理説没有资源问题啊,集群配置情况:

其它的都是默认配置:

taskmanager.numberOfTaskSlots: 24

jobmanager.heap.size: 20480m

# The heap size for the TaskManager JVM

taskmanager.heap.size: 40960m


服务器 2 台:每台 48核,256G




关于Blink 消费kafka并行度问题

2019-03-28 文章 邓成刚【qq
请教一下,Blink 消费kafka数据时,把并行度设置 30 ,就会出现Timeout,JOB跑不起来,应该是没有消费到数据,把并行度调 到 
5就没问题,另外,JOB用到4个TOPic,每个30个PARTITION,但是把这同样JOB提交给FLINK 设置 30 并行度 
就可以跑起来,有哪位大佬知道什么情况吗?



Re: blink消费kafka出现诡异的情况,困扰很久了,哪位大佬知道怎么回事

2019-03-28 文章 邓成刚【qq
通过测试发现,不是sql 脚本的问题,是并行度的问题,30个并行度不行,改成5就OK了。。。
env.setParallelism(5);
 
发件人: 邓成刚【qq】
发送时间: 2019-03-26 18:17
收件人: user-zh
主题: blink消费kafka出现诡异的情况,困扰很久了,哪位大佬知道怎么回事
HI,各位大佬:
      发现一个很诡异的问题:使用SQL API时,在窗口上group by,JOB 5分钟后会timeout,但如果改成select * 
就能正常消费kafka。。。
说明:本地模式和提交JOB均存在此异常
相关信息:
blink 1.5.1
kafka 1.1.1
flink-connector-kafka-0.11_2.11-1.5.1-sql-jar.jar
 
消费正常的code:
 
String sql = "select * from table1"
 
Table sip_distinct_event_id = tableEnv.sqlQuery( sql );           
 
tableEnv.toRetractStream(sip_distinct_event_id, Row.class).print();
env.execute("myjob2");
 
 
 
如果把SQL换成如下就会timeout...
 
String sql ="select TUMBLE_START(EVENTTIME,INTERVAL '1' MINUTE) AS 
EVENTTIME,NEW_EVENT_ID,MSISDN from   
       +"select EVENTTIME,EVENT_ID as NEW_EVENT_ID,MSISDN from table1"         
      +") group by TUMBLE(EVENTTIME,INTERVAL '1' MINUTE),NEW_EVENT_ID,MSISDN"); 
 
 
 
Table sip_distinct_event_id = tableEnv.sqlQuery( sql );           
 
tableEnv.toRetractStream(sip_distinct_event_id, Row.class).print();
env.execute("myjob2");
 
 
异常:
Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: 
java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJob(MiniCluster.java:637)
at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.executeInternal(LocalStreamEnvironment.java:98)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1893)
at com.nsn.flink.service.DealRegisterFile13.main(DealRegisterFile13.java:98)
Caused by: java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:834)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
 
 
 
 

无窗口的Join 问题

2019-03-27 文章 邓成刚【qq
HI,请教一个问题:
两个流无窗口的Join后生成的新流是一个更新流,怎么把它变成追加流然后-sink出去?



Re: Re: 实现 UpsertStreamTableSink, BatchTableSink 接口代码

2019-03-26 文章 邓成刚【qq

sql:
select EVENTTIME,ID,EVENT_ID,MSISDN,TS 
from (select a.*,ROW_NUMBER() over(partition by EVENT_ID,MSISDN order by TS 
desc) AS rw
          from table1 a
) where rw = 1

tableEnv.toRetractStream(结果表, Row.class).print();


输出结果,分析结果发现,第二条的  1553652720961584  比第一条的时间 1553652720927835 更大,同时输出一条 false 
的,数据结果与第一条相同,説明第三条是用来作删除操作,删掉第一条数据。。。

(true,2019-03-27 
02:12:00.0,1243296274875303847,"1c3.2729.20190327021200",XX,1553652720927835)
(true,2019-03-27 
02:12:00.0,1243296274875303910,"1c3.2729.20190327021200",XX,1553652720961584)
(false,2019-03-27 
02:12:00.0,1243296274875303847,"1c3.2729.20190327021200",XX,1553652720927835)

结论:true 是用来插入数据的,false 是用来删除数据的,出现false时一定会有一条之前插入的数据 。。。

邓成刚【qq】
 
发件人: 邓成刚【qq】
发送时间: 2019-03-26 18:40
收件人: user-zh
主题: Re: 回复: 实现 UpsertStreamTableSink, BatchTableSink 接口代码
不好意思,我理解错了,更正一下:
APPEND 流是没有这个字段的,只有更新流才有,true 表示 APPEND ,false 表示 
update,这个值应该是流发出数据时自己带的,不是人为赋值的。。。
 
 
发件人: 邓成刚【qq】
发送时间: 2019-03-26 18:27
收件人: user-zh
主题: 回复: 实现 UpsertStreamTableSink, BatchTableSink 接口代码
这里面决定 update 或 delete 的 Boolean型值 怎么赋?
 
这里的  Boolean 值 是流类型决定,如果流是APPEND,则为true, update,为false,你直接打印流会有这个字段
 
不知道我的理解是否正确,期待大佬解答。。。
 
邓成刚【qq】
 
发件人: baiyg25...@hundsun.com
发送时间: 2019-03-26 18:02
收件人: user-zh
主题: 实现 UpsertStreamTableSink, BatchTableSink 接口代码
大家好!
 
        伙伴们,附件有实现 blink 中 flink-table 模块  UpsertStreamTableSink, BatchTableSink 
接口代码 ,自实现类放在 flink-jdbc 模块  org.apache.flink.api.java.io.jdbc 包下。大神们帮忙看看呗! 
江湖救急啊!
 
        目前实现后,在代码中调用,报异常:
Exception in thread "main" org.apache.flink.table.api.TableException: Arity [4] 
of result [[Lorg.apache.flink.api.common.typeinfo.TypeInformation;@377dfb8d] 
does not match the number[2] of requested type [Java Tuple2].
 
       
 
        主要盲点:
        1、要怎么匹配上这个类型  Tuple2 ?这里面决定 update 或 delete 的 Boolean型值 
怎么赋? Row 映射进去的底层原理?
        2、这两个改怎么重写?另外 keys 和   isAppendOnly 调用时该怎么赋值?
             @Override
             public void setKeyFields(String[] keys) {}
    @Override
    public void setIsAppendOnly(Boolean isAppendOnly){}
 
baiyg25...@hundsun.com

Re: 回复: 实现 UpsertStreamTableSink, BatchTableSink 接口代码

2019-03-26 文章 邓成刚【qq
不好意思,我理解错了,更正一下:
APPEND 流是没有这个字段的,只有更新流才有,true 表示 APPEND ,false 表示 
update,这个值应该是流发出数据时自己带的,不是人为赋值的。。。

 
发件人: 邓成刚【qq】
发送时间: 2019-03-26 18:27
收件人: user-zh
主题: 回复: 实现 UpsertStreamTableSink, BatchTableSink 接口代码
这里面决定 update 或 delete 的 Boolean型值 怎么赋?
 
这里的  Boolean 值 是流类型决定,如果流是APPEND,则为true, update,为false,你直接打印流会有这个字段
 
不知道我的理解是否正确,期待大佬解答。。。
 
邓成刚【qq】
 
发件人: baiyg25...@hundsun.com
发送时间: 2019-03-26 18:02
收件人: user-zh
主题: 实现 UpsertStreamTableSink, BatchTableSink 接口代码
大家好!
 
        伙伴们,附件有实现 blink 中 flink-table 模块  UpsertStreamTableSink, BatchTableSink 
接口代码 ,自实现类放在 flink-jdbc 模块  org.apache.flink.api.java.io.jdbc 包下。大神们帮忙看看呗! 
江湖救急啊!
 
        目前实现后,在代码中调用,报异常:
Exception in thread "main" org.apache.flink.table.api.TableException: Arity [4] 
of result [[Lorg.apache.flink.api.common.typeinfo.TypeInformation;@377dfb8d] 
does not match the number[2] of requested type [Java Tuple2].
 
       
 
        主要盲点:
        1、要怎么匹配上这个类型  Tuple2 ?这里面决定 update 或 delete 的 Boolean型值 
怎么赋? Row 映射进去的底层原理?
        2、这两个改怎么重写?另外 keys 和   isAppendOnly 调用时该怎么赋值?
             @Override
             public void setKeyFields(String[] keys) {}
    @Override
    public void setIsAppendOnly(Boolean isAppendOnly){}
 
baiyg25...@hundsun.com

回复: 实现 UpsertStreamTableSink, BatchTableSink 接口代码

2019-03-26 文章 邓成刚【qq
这里面决定 update 或 delete 的 Boolean型值 怎么赋?

这里的  Boolean 值 是流类型决定,如果流是APPEND,则为true, update,为false,你直接打印流会有这个字段

不知道我的理解是否正确,期待大佬解答。。。

邓成刚【qq】
 
发件人: baiyg25...@hundsun.com
发送时间: 2019-03-26 18:02
收件人: user-zh
主题: 实现 UpsertStreamTableSink, BatchTableSink 接口代码
大家好!

        伙伴们,附件有实现 blink 中 flink-table 模块  UpsertStreamTableSink, BatchTableSink 
接口代码 ,自实现类放在 flink-jdbc 模块  org.apache.flink.api.java.io.jdbc 包下。大神们帮忙看看呗! 
江湖救急啊!

        目前实现后,在代码中调用,报异常:
Exception in thread "main" org.apache.flink.table.api.TableException: Arity [4] 
of result [[Lorg.apache.flink.api.common.typeinfo.TypeInformation;@377dfb8d] 
does not match the number[2] of requested type [Java Tuple2].

       

        主要盲点:
        1、要怎么匹配上这个类型  Tuple2 ?这里面决定 update 或 delete 的 Boolean型值 
怎么赋? Row 映射进去的底层原理?
        2、这两个改怎么重写?另外 keys 和   isAppendOnly 调用时该怎么赋值?
             @Override
             public void setKeyFields(String[] keys) {}
    @Override
    public void setIsAppendOnly(Boolean isAppendOnly){}

baiyg25...@hundsun.com

blink消费kafka出现诡异的情况,困扰很久了,哪位大佬知道怎么回事

2019-03-26 文章 邓成刚【qq
HI,各位大佬:
      发现一个很诡异的问题:使用SQL API时,在窗口上group by,JOB 5分钟后会timeout,但如果改成select * 
就能正常消费kafka。。。
说明:本地模式和提交JOB均存在此异常
相关信息:
blink 1.5.1
kafka 1.1.1
flink-connector-kafka-0.11_2.11-1.5.1-sql-jar.jar

消费正常的code:

String sql = "select * from table1"

Table sip_distinct_event_id = tableEnv.sqlQuery( sql );           

tableEnv.toRetractStream(sip_distinct_event_id, 
Row.class).print();
env.execute("myjob2");



如果把SQL换成如下就会timeout...

String sql ="select TUMBLE_START(EVENTTIME,INTERVAL '1' MINUTE) AS 
EVENTTIME,NEW_EVENT_ID,MSISDN from   
        +"select EVENTTIME,EVENT_ID as NEW_EVENT_ID,MSISDN from 
table1"         
       +") group by TUMBLE(EVENTTIME,INTERVAL '1' 
MINUTE),NEW_EVENT_ID,MSISDN"); 



Table sip_distinct_event_id = tableEnv.sqlQuery( sql );           

tableEnv.toRetractStream(sip_distinct_event_id, 
Row.class).print();
env.execute("myjob2");


异常:
Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: 
java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJob(MiniCluster.java:637)
at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.executeInternal(LocalStreamEnvironment.java:98)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1893)
at 
com.nsn.flink.service.DealRegisterFile13.main(DealRegisterFile13.java:98)
Caused by: java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:834)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)






????: ????: flink????

2019-03-25 文章 ????????qq??
DataStream ds = 


DataStream ds1 = ??ds ?? B ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 
?0?2SINK

DataStream ds2 = ??ds ?? C ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 
?0?2SINK


??DS1??DS2DSSQL


qq??
?0?2
?0?2baiyg25...@hundsun.com
???0?22019-03-26?0?210:09
?0?2user-zh
???0?2: flink
??
??AB??AC??
?0?2
?0?2
?0?2
baiyg25...@hundsun.com
 IORI
?? 2019-03-26 09:46
 user-zh
?? flink
ABsink,Creducesinkoperator

回复: blink文档编译失败。有编译好的发布版本的blink文档,可以共享个访问地址吗?

2019-03-25 文章 邓成刚【qq
看一下这里,是不是你想要的。。。
http://fetching118.com/article/5.html



邓成刚【qq】
 
发件人: 陈韬
发送时间: 2019-03-26 08:20
收件人: user-zh
主题: blink文档编译失败。有编译好的发布版本的blink文档,可以共享个访问地址吗? 
blink文档编译失败。有编译好的发布版本的blink文档,可以共享个访问地址吗? 
谢谢


关于sql优化规则问题

2019-03-25 文章 邓成刚【qq
HI,flink SQL API,能不能指定应用哪些sql优化规则,我发现,双流在WINDOWS 
下去重,再关联之前的流的场景下,优化规则10多分钟都跑不完(通过jstack 查看),最后OOM了,之前澄水帮我看了下,调整了参数  export 
JVM_ARGS='-Xss8m' 还是没有用,建议 注释掉 
FilterSetOpTransposeRule和ProjectSetOpTransposeRule。。。

flink-question is about two streams(from kafka) union all then distinct

2019-03-25 文章 邓成刚【qq
Hi, everyone. I have a question. I would like to ask you a question.
 
Problem Description:
     I have two tables (streams from kafka),Both tables define rowTime 
attributes in EVENTTIME
 
     table1(EVENTTIME,NEW_EVENT_ID,F4,F6)
     table2(EVENTTIME,NEW_EVENT_ID,F2,F3) 
 
now,I would like to use UNION ALL for two streams and distinct them as follows:
 
Table id_distinct = tableEnv.sqlQuery("select distinct 
EVENTTIME,NEW_EVENT_IDfrom (select EVENTTIME,NEW_EVENT_ID FROM table1 
                                                                union all 
select EVENTTIME,NEW_EVENT_ID FROM table2)");
 
Question: It will report the following exception. How can I fix this problem? 
Thank you!
 
Exception in thread "main" java.lang.AssertionError: Type mismatch:
rowtype of new rel:
RecordType(TIMESTAMP(3) NOT NULL EVENTTIME, VARCHAR(65536) CHARACTER SET 
"UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" NEW_EVENT_ID) NOT NULL
rowtype of set:
RecordType(TIMESTAMP(3) NOT NULL EVENTTIME, VARCHAR(65536) CHARACTER SET 
"UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" NEW_EVENT_ID) NOT NULL
at org.apache.calcite.util.Litmus$1.fail(Litmus.java:31)
at org.apache.calcite.plan.RelOptUtil.equal(RelOptUtil.java:1857)
at org.apache.calcite.plan.volcano.RelSubset.add(RelSubset.java:276)
at org.apache.calcite.plan.volcano.RelSet.add(RelSet.java:148)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1633)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)
at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
at 
org.apache.calcite.rel.rules.ProjectSetOpTransposeRule.onMatch(ProjectSetOpTransposeRule.java:109)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
at 
org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
at 
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
at 
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:305)
at 
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:248)
at com.nsn.flink.service.DealRegisterFile12.main(DealRegisterFile12.java:80)
 
 
deng

flink- question about tables union all

2019-03-25 文章 邓成刚【qq
HI,all,
        Ask a question, the number of fields in each table is not the same, but 
some fields are the same. In the case of union all distinct after selecting the 
same field, how can we ensure that the data of multiple tables are in the same 
time range? Based on EVENTTIME, thank you!
 
 
deng

mail list test

2019-03-25 文章 邓成刚【qq
mail list test