Re: flink1.9.1用采用-d(分离模式提交)作业报错,但是不加-d是可以正常跑的

2020-08-17 Thread Congxian Qiu
Hi
   像我之前说的那样,加 -d 和不加 -d 使用的是不同的模式启动作业的。从你的报错栈来看,应该是类冲突了。你可以看下这个文档[1] 看看能否帮助你
java.lang.NoSuchMethodError:
org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.newInstance(IFLjava/util/List;Ljava/util/List;Ljava/util/List;Lorg/apache/hadoop/yarn/api/records/ResourceBlacklistRequest;)Lorg/apache/hadoop/yarn/api/protocolrecords/AllocateRequest;
at
org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:279)
at
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:273)


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/debugging_classloading.html
Best,
Congxian


bradyMk  于2020年8月17日周一 下午2:36写道:

> 您好:
>
> 我没有尝试过新版本,但是觉得好像不是版本的问题,因为我其他所有flink作业加上-d都能正常运行,就这个不行,并且如果我不用(-d)提交,这个也是可以运行的。我也很奇怪
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 在sink 执行notifyCheckpointComplete 方法时能否收到上游 snapshot collect 发送的数据?

2020-08-17 Thread Congxian Qiu
Hi
notifyCheckpointComplete 是整个 checkpoint 完成后调用的(也就是所有算子都做完了 snapshot,而且
JM 也做完了一些其他的工作),你的需求看上去只是要在算子间做一些顺序操作,这个应该不需要依赖 notifyCheckpointComplete
的,你可以自己写一个逻辑,在 submit 收集到 N 个信号后再做相应的事情。
Best,
Congxian


key lou  于2020年8月17日周一 上午11:42写道:

> 谢谢 解答。也就是假如 A->B 这样一个 graph。在一次checkpoint 中 A 调用  snapshot 往下游发的数据,在B 执行
> notifyCheckpointComplete 与 Asnapshot 下发的数据到达B   这2者没有必然的先后顺序。
>
> 另外就是 如果没有先后顺序,有没有什么办法 或者是在 B执行 某某操作前 能确保 这次 checkpoint 中 A  snapshot  发出的数据
> 到达了B.
>
>  我的场景是 有3个核心算子  start->proccess->submit . 其中 start和 submit 并行度为1, proccess
> 并行度为N, start  会开启一个事务 编号proccess  用这个事务 编号
> 去做预处理(赞一批处理一次,并把这一次处理结果下发,给下游做事务提交),  submit  收到上游批处理的结果 用 同样的事务编号去提交
>
>
> Congxian Qiu  于2020年8月17日周一 上午10:42写道:
>
> > Hi
> > 上游 snapshot 的逻辑和下游收到之前的 notifyCheckpointComplete
> > 之间是没有必然联系的,所以这个从理论上是不保证先后顺序的。
> > Best,
> > Congxian
> >
> >
> > key lou  于2020年8月16日周日 下午9:27写道:
> >
> > > 各位大佬:
> > >在如下代码中: FCombine  执行snapshot  collect 发送数据之后如果不执行sleep 则  FSubmit
> > > 在执行 notifyCheckpointComplete 方法时,list 集合 ls 为空。
> > > 如果在  FCombine  执行snapshot  collect 发送数据之后如果执行sleep,
> > > 在执行 notifyCheckpointComplete 方法时 则就可以收到  snapshot  collect 发送的数据。
> > > 我之前的理解是每个算子在执行完checkpoint 之后 才会把 barrier 广播到下游算子。 所以觉得下游无论如何应该在执行
> > > notifyCheckpointComplete 之前就会收到 上游 snapshot  collect 发送数据(因为 snapshot
> > >  collect 在前,广播 barrier  在后,然后下游在收到了 barrier  才会执行 chekpoint
> > > 的相关方法,所以在执行相关方法前 上游 snapshot  collect 发出的数据就应该已经到达了下游)。
> > > 但是根据如下代码的测试来看,不是这样的。请大佬帮忙解答下原因。
> > >
> > > public class FlinkCheckpointTest {
> > > public static void main(String[] args) throws Exception {
> > > StreamExecutionEnvironment steamEnv =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > steamEnv.enableCheckpointing(1000L*2);
> > > steamEnv
> > > .addSource(new FSource()).setParallelism(4)
> > > .transform("开始事务", Types.STRING,new
> > FStart()).setParallelism(1)
> > > .process(new FCombine()).name("事务预处理").setParallelism(4)
> > > .addSink(new FSubmit()).name("提交事务").setParallelism(1)
> > > ;
> > > steamEnv.execute("test");
> > > }
> > >
> > >static class FSource extends RichParallelSourceFunction{
> > > @Override
> > > public void run(SourceContext sourceContext) throws
> > > Exception {
> > > int I =0;
> > > while (true){
> > > I = I + 1;
> > > sourceContext.collect("thread " +
> > > Thread.currentThread().getId() +"-" +I);
> > > Thread.sleep(1000);
> > > }
> > > }
> > > @Override
> > > public void cancel() {}
> > > }
> > >
> > > static class FStart extends AbstractStreamOperator
> > > implements OneInputStreamOperator{
> > >volatile Long ckid = 0L;
> > > @Override
> > > public void processElement(StreamRecord streamRecord)
> > > throws Exception {
> > > log("收到数据: " + streamRecord.getValue() + "..ckid:" + ckid);
> > > output.collect(streamRecord);
> > > }
> > > @Override
> > > public void prepareSnapshotPreBarrier(long checkpointId)
> > > throws Exception {
> > > log("开启事务: " + checkpointId);
> > > ckid = checkpointId;
> > > super.prepareSnapshotPreBarrier(checkpointId);
> > > }
> > > }
> > >
> > > static class FCombine extends ProcessFunction
> > > implements CheckpointedFunction {
> > > List ls = new ArrayList();
> > > Collector collector =null;
> > > volatile Long ckid = 0L;
> > >
> > > @Override
> > > public void snapshotState(FunctionSnapshotContext
> > > functionSnapshotContext) throws Exception {
> > > StringBuffer sb = new StringBuffer();
> > > ls.forEach(x->{sb.append(x).append(";");});
> > > log("批处理 " + functionSnapshotContext.getCheckpointId() +
> > > ": 时收到数据:" + sb.toString());
> > > Thread.sleep(5*1000);
> > > collector.collect(sb.toString());
> > > ls.clear();
> > > Thread.sleep(5*1000);
> > > //Thread.sleep(20*1000);
> > > }
> > > @Override
> > > public void initializeState(FunctionInitializationContext
> > > functionInitializationContext) throws Exception {}
> > > @Override
> > > public void processElement(String s, Context context,
> > > Collector out) throws Exception {
> > > if(StringUtils.isNotBlank(s)){
> > > ls.add(s);
> > > }
> > > log("收到数据 :" + s + "; 这批数据大小为:" + ls.size() + "..ckid:" +
> > > ckid);
> > > if(collector ==null){
> > > collector = out;
> > > }
> > > }
> > > }
> > >
> > > static class FSubmit extends RichSinkFunction implements
> > > /*  CheckpointedFunction,*/ CheckpointListener {
> > > List ls = new ArrayList();
> > > volatile 

答复: How to specify the number of TaskManagers in Yarn Cluster using Per-Job Mode

2020-08-17 Thread 范超
Thanks Yangze for providing these links I'll try it !

-邮件原件-
发件人: Yangze Guo [mailto:karma...@gmail.com] 
发送时间: 2020年8月18日 星期二 12:57
收件人: 范超 
抄送: user (user@flink.apache.org) 
主题: Re: How to specify the number of TaskManagers in Yarn Cluster using Per-Job 
Mode

The number of TM mainly depends on the parallelism and job graph.
Flink now allows you to set the maximum slots number 
(slotmanager-number-of-slots-max[1]). There is also a plan to support setting 
the minimum number of slots[2].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#slotmanager-number-of-slots-max
[2] https://issues.apache.org/jira/browse/FLINK-15959

Best,
Yangze Guo

On Tue, Aug 18, 2020 at 12:21 PM 范超  wrote:
>
> Thanks Yangze
>
> 1. Do you meet any problem when deploying on Yarn or running Flink job?
> My job works well
>
> 2. Why do you need to start the TMs on all the three machines?
> From cluster perspective, I wonder if the process pressure can be balance to 
> 3 machines.
>
> 3. Flink can control how many TM to start, but where to start the TMs depends 
> on Yarn.
> Yes, the job where to start the TM is depend on Yarn.
> Could you please tell me parameter controls how many TM to start, the 
> yn parameter is delete from 1.10 as the 1.9 doc sample list[1] below
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/cli.ht
> ml
>
> Run example program using a per-job YARN cluster with 2 TaskManagers:
>
> ./bin/flink run -m yarn-cluster -yn 2 \
>./examples/batch/WordCount.jar \
>--input hdfs:///user/hamlet.txt --output 
> hdfs:///user/wordcount_out
>
> -邮件原件-
> 发件人: Yangze Guo [mailto:karma...@gmail.com]
> 发送时间: 2020年8月18日 星期二 11:31
> 收件人: 范超 
> 抄送: user (user@flink.apache.org) 
> 主题: Re: How to specify the number of TaskManagers in Yarn Cluster 
> using Per-Job Mode
>
> Hi,
>
> Flink can control how many TM to start, but where to start the TMs depends on 
> Yarn.
>
> Do you meet any problem when deploying on Yarn or running Flink job?
> Why do you need to start the TMs on all the three machines?
>
> Best,
> Yangze Guo
>
> On Tue, Aug 18, 2020 at 11:25 AM 范超  wrote:
> >
> > Thanks Yangze
> > The reason why I don’t deploying a standalone cluster, it's because there 
> > kafka, kudu, hadoop, zookeeper on these machines, maybe currently using the 
> > yarn to manage resources is the best choice for me.
> > If Flink can not control how many tm to start , could anyone 
> > providing me some best practice for deploying on yarn please? I read 
> > the [1] and still don't very clear
> >
> > [1]
> > https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster
> > -g
> > eneral-guidelines
> >
> > -邮件原件-
> > 发件人: Yangze Guo [mailto:karma...@gmail.com]
> > 发送时间: 2020年8月18日 星期二 10:50
> > 收件人: 范超 
> > 抄送: user (user@flink.apache.org) 
> > 主题: Re: How to specify the number of TaskManagers in Yarn Cluster 
> > using Per-Job Mode
> >
> > Hi,
> >
> > I think that is only related to the Yarn scheduling strategy. AFAIK, Flink 
> > could not control it. You could check the RM log to figure out why it did 
> > not schedule the containers to all the three machines. BTW, if you have 
> > specific requirements to start with all the three machines, how about 
> > deploying a standalone cluster instead?
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Aug 18, 2020 at 10:24 AM 范超  wrote:
> > >
> > > Thanks Yangze
> > >
> > > All 3 machines NodeManager is started.
> > >
> > > I just don't know why not three machines each running a Flink 
> > > TaskManager and how to achieve this
> > >
> > > -邮件原件-
> > > 发件人: Yangze Guo [mailto:karma...@gmail.com]
> > > 发送时间: 2020年8月18日 星期二 10:10
> > > 收件人: 范超 
> > > 抄送: user (user@flink.apache.org) 
> > > 主题: Re: How to specify the number of TaskManagers in Yarn Cluster 
> > > using Per-Job Mode
> > >
> > > Hi,
> > >
> > > Do you start the NodeManager in all the three machines? If so, could you 
> > > check all the NMs correctly connect to the ResourceManager?
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Tue, Aug 18, 2020 at 10:01 AM 范超  wrote:
> > > >
> > > > Hi, Dev and Users
> > > > I’ve 3 machines each one is 8 cores and 16GB memory.
> > > > Following it’s my Resource Manager screenshot the cluster have 36GB 
> > > > total.
> > > > I specify the paralism to 3 or even up to 12,  But the task manager is 
> > > > always running on two nodes not all three machine, the third node does 
> > > > not start the task manager.
> > > > I tried set the –p –tm –jm parameters, but it always the same, only 
> > > > different is more container on the two maching but not all three 
> > > > machine start the task manager.
> > > > My question is how to set the cli parameter to start all of my 
> > > > three machine (all task manager start on 3 machines)
> > > >
> > > > Thanks a lot
> > > > [cid:image001.png@01D67546.62291B70]
> > > >
> > > >
> > > > Chao fan
> > > >


Re: How to specify the number of TaskManagers in Yarn Cluster using Per-Job Mode

2020-08-17 Thread Yangze Guo
The number of TM mainly depends on the parallelism and job graph.
Flink now allows you to set the maximum slots number
(slotmanager-number-of-slots-max[1]). There is also a plan to support
setting the minimum number of slots[2].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#slotmanager-number-of-slots-max
[2] https://issues.apache.org/jira/browse/FLINK-15959

Best,
Yangze Guo

On Tue, Aug 18, 2020 at 12:21 PM 范超  wrote:
>
> Thanks Yangze
>
> 1. Do you meet any problem when deploying on Yarn or running Flink job?
> My job works well
>
> 2. Why do you need to start the TMs on all the three machines?
> From cluster perspective, I wonder if the process pressure can be balance to 
> 3 machines.
>
> 3. Flink can control how many TM to start, but where to start the TMs depends 
> on Yarn.
> Yes, the job where to start the TM is depend on Yarn.
> Could you please tell me parameter controls how many TM to start, the yn 
> parameter is delete from 1.10 as the 1.9 doc sample list[1] below
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/cli.html
>
> Run example program using a per-job YARN cluster with 2 TaskManagers:
>
> ./bin/flink run -m yarn-cluster -yn 2 \
>./examples/batch/WordCount.jar \
>--input hdfs:///user/hamlet.txt --output 
> hdfs:///user/wordcount_out
>
> -邮件原件-
> 发件人: Yangze Guo [mailto:karma...@gmail.com]
> 发送时间: 2020年8月18日 星期二 11:31
> 收件人: 范超 
> 抄送: user (user@flink.apache.org) 
> 主题: Re: How to specify the number of TaskManagers in Yarn Cluster using 
> Per-Job Mode
>
> Hi,
>
> Flink can control how many TM to start, but where to start the TMs depends on 
> Yarn.
>
> Do you meet any problem when deploying on Yarn or running Flink job?
> Why do you need to start the TMs on all the three machines?
>
> Best,
> Yangze Guo
>
> On Tue, Aug 18, 2020 at 11:25 AM 范超  wrote:
> >
> > Thanks Yangze
> > The reason why I don’t deploying a standalone cluster, it's because there 
> > kafka, kudu, hadoop, zookeeper on these machines, maybe currently using the 
> > yarn to manage resources is the best choice for me.
> > If Flink can not control how many tm to start , could anyone providing
> > me some best practice for deploying on yarn please? I read the [1] and
> > still don't very clear
> >
> > [1]
> > https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-g
> > eneral-guidelines
> >
> > -邮件原件-
> > 发件人: Yangze Guo [mailto:karma...@gmail.com]
> > 发送时间: 2020年8月18日 星期二 10:50
> > 收件人: 范超 
> > 抄送: user (user@flink.apache.org) 
> > 主题: Re: How to specify the number of TaskManagers in Yarn Cluster
> > using Per-Job Mode
> >
> > Hi,
> >
> > I think that is only related to the Yarn scheduling strategy. AFAIK, Flink 
> > could not control it. You could check the RM log to figure out why it did 
> > not schedule the containers to all the three machines. BTW, if you have 
> > specific requirements to start with all the three machines, how about 
> > deploying a standalone cluster instead?
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Aug 18, 2020 at 10:24 AM 范超  wrote:
> > >
> > > Thanks Yangze
> > >
> > > All 3 machines NodeManager is started.
> > >
> > > I just don't know why not three machines each running a Flink
> > > TaskManager and how to achieve this
> > >
> > > -邮件原件-
> > > 发件人: Yangze Guo [mailto:karma...@gmail.com]
> > > 发送时间: 2020年8月18日 星期二 10:10
> > > 收件人: 范超 
> > > 抄送: user (user@flink.apache.org) 
> > > 主题: Re: How to specify the number of TaskManagers in Yarn Cluster
> > > using Per-Job Mode
> > >
> > > Hi,
> > >
> > > Do you start the NodeManager in all the three machines? If so, could you 
> > > check all the NMs correctly connect to the ResourceManager?
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Tue, Aug 18, 2020 at 10:01 AM 范超  wrote:
> > > >
> > > > Hi, Dev and Users
> > > > I’ve 3 machines each one is 8 cores and 16GB memory.
> > > > Following it’s my Resource Manager screenshot the cluster have 36GB 
> > > > total.
> > > > I specify the paralism to 3 or even up to 12,  But the task manager is 
> > > > always running on two nodes not all three machine, the third node does 
> > > > not start the task manager.
> > > > I tried set the –p –tm –jm parameters, but it always the same, only 
> > > > different is more container on the two maching but not all three 
> > > > machine start the task manager.
> > > > My question is how to set the cli parameter to start all of my
> > > > three machine (all task manager start on 3 machines)
> > > >
> > > > Thanks a lot
> > > > [cid:image001.png@01D67546.62291B70]
> > > >
> > > >
> > > > Chao fan
> > > >


Re: 1.11 kafka producer 只往一个partition里写

2020-08-17 Thread x2009438
是的,我看了一下源码。

因为我用的是simplestringschema,不属于keyedschema。所以flinkfixedpartitioner的open方法被跳过了,没有初始化subtask
 index,所以全部是0%partition=0,于是都写到partition 0里去了。
我感觉怪怪的,不太合逻辑。

发自我的iPhone

>> 在 2020年8月17日,23:28,cs <58683...@qq.com> 写道:
> 目前Kafka 
> producer的partitioner使用的是org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner这个类
> 具体的分区方法是org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner#partition
> 由该方法可以得知你每个subtask发到哪个kafka的partition中。每个subtask的数据只会写到一个固定的partition里面。
> 
> 
> 
> 
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2020年8月17日(星期一) 晚上10:03
> 收件人:"user-zh" 
> 主题:1.11 kafka producer 只往一个partition里写
> 
> 
> 
> 我看1.11的kafka默认是用FlinkFixedPartitioner,应该是把sink每个subtask里的数据全部写到某一个partition吧?但是碰到了一个奇怪的事情:job整体并行度为5的情况下,数据结果只往partition
>  0里写。然后把partitoner用null替代之后是以roundrobin方式写入每一个partition。感到很奇怪啊。求助
> 
> 
> 
> 
> 发自我的iPhone
> 
> 
> 发自我的iPhone



答复: How to specify the number of TaskManagers in Yarn Cluster using Per-Job Mode

2020-08-17 Thread 范超
Thanks Yangze

1. Do you meet any problem when deploying on Yarn or running Flink job?
My job works well

2. Why do you need to start the TMs on all the three machines?
From cluster perspective, I wonder if the process pressure can be balance to 3 
machines.

3. Flink can control how many TM to start, but where to start the TMs depends 
on Yarn.
Yes, the job where to start the TM is depend on Yarn.
Could you please tell me parameter controls how many TM to start, the yn 
parameter is delete from 1.10 as the 1.9 doc sample list[1] below

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/cli.html

Run example program using a per-job YARN cluster with 2 TaskManagers:

./bin/flink run -m yarn-cluster -yn 2 \
   ./examples/batch/WordCount.jar \
   --input hdfs:///user/hamlet.txt --output 
hdfs:///user/wordcount_out

-邮件原件-
发件人: Yangze Guo [mailto:karma...@gmail.com] 
发送时间: 2020年8月18日 星期二 11:31
收件人: 范超 
抄送: user (user@flink.apache.org) 
主题: Re: How to specify the number of TaskManagers in Yarn Cluster using Per-Job 
Mode

Hi,

Flink can control how many TM to start, but where to start the TMs depends on 
Yarn.

Do you meet any problem when deploying on Yarn or running Flink job?
Why do you need to start the TMs on all the three machines?

Best,
Yangze Guo

On Tue, Aug 18, 2020 at 11:25 AM 范超  wrote:
>
> Thanks Yangze
> The reason why I don’t deploying a standalone cluster, it's because there 
> kafka, kudu, hadoop, zookeeper on these machines, maybe currently using the 
> yarn to manage resources is the best choice for me.
> If Flink can not control how many tm to start , could anyone providing 
> me some best practice for deploying on yarn please? I read the [1] and 
> still don't very clear
>
> [1] 
> https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-g
> eneral-guidelines
>
> -邮件原件-
> 发件人: Yangze Guo [mailto:karma...@gmail.com]
> 发送时间: 2020年8月18日 星期二 10:50
> 收件人: 范超 
> 抄送: user (user@flink.apache.org) 
> 主题: Re: How to specify the number of TaskManagers in Yarn Cluster 
> using Per-Job Mode
>
> Hi,
>
> I think that is only related to the Yarn scheduling strategy. AFAIK, Flink 
> could not control it. You could check the RM log to figure out why it did not 
> schedule the containers to all the three machines. BTW, if you have specific 
> requirements to start with all the three machines, how about deploying a 
> standalone cluster instead?
>
> Best,
> Yangze Guo
>
> On Tue, Aug 18, 2020 at 10:24 AM 范超  wrote:
> >
> > Thanks Yangze
> >
> > All 3 machines NodeManager is started.
> >
> > I just don't know why not three machines each running a Flink 
> > TaskManager and how to achieve this
> >
> > -邮件原件-
> > 发件人: Yangze Guo [mailto:karma...@gmail.com]
> > 发送时间: 2020年8月18日 星期二 10:10
> > 收件人: 范超 
> > 抄送: user (user@flink.apache.org) 
> > 主题: Re: How to specify the number of TaskManagers in Yarn Cluster 
> > using Per-Job Mode
> >
> > Hi,
> >
> > Do you start the NodeManager in all the three machines? If so, could you 
> > check all the NMs correctly connect to the ResourceManager?
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Aug 18, 2020 at 10:01 AM 范超  wrote:
> > >
> > > Hi, Dev and Users
> > > I’ve 3 machines each one is 8 cores and 16GB memory.
> > > Following it’s my Resource Manager screenshot the cluster have 36GB total.
> > > I specify the paralism to 3 or even up to 12,  But the task manager is 
> > > always running on two nodes not all three machine, the third node does 
> > > not start the task manager.
> > > I tried set the –p –tm –jm parameters, but it always the same, only 
> > > different is more container on the two maching but not all three machine 
> > > start the task manager.
> > > My question is how to set the cli parameter to start all of my 
> > > three machine (all task manager start on 3 machines)
> > >
> > > Thanks a lot
> > > [cid:image001.png@01D67546.62291B70]
> > >
> > >
> > > Chao fan
> > >


Re: Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-17 Thread Yun Gao
Hi, 

Very thanks for bringing up this discussion!

One more question is that does the BATCH and STREAMING mode also decides 
the shuffle types and operators? I'm asking so because that even for blocking 
mode, it should also benefit from keeping some edges to be pipeline if the 
resources are known to be enough. Do we also consider to expose more 
fine-grained control on the shuffle types? 

Best,
 Yun 



 --Original Mail --
Sender:Kostas Kloudas 
Send Date:Tue Aug 18 02:24:21 2020
Recipients:David Anderson 
CC:dev , user 
Subject:Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input
Hi Kurt and David,

Thanks a lot for the insightful feedback!

@Kurt: For the topic of checkpointing with Batch Scheduling, I totally
agree with you that it requires a lot more work and careful thinking
on the semantics. This FLIP was written under the assumption that if
the user wants to have checkpoints on bounded input, he/she will have
to go with STREAMING as the scheduling mode. Checkpointing for BATCH
can be handled as a separate topic in the future.

In the case of MIXED workloads and for this FLIP, the scheduling mode
should be set to STREAMING. That is why the AUTOMATIC option sets
scheduling to BATCH only if all the sources are bounded. I am not sure
what are the plans there at the scheduling level, as one could imagine
in the future that in mixed workloads, we schedule first all the
bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
subgraph per application, which is going to be scheduled after all
Bounded ones have finished. Essentially the bounded subgraphs will be
used to bootstrap the unbounded one. But, I am not aware of any plans
towards that direction.


@David: The processing time timer handling is a topic that has also
been discussed in the community in the past, and I do not remember any
final conclusion unfortunately.

In the current context and for bounded input, we chose to favor
reproducibility of the result, as this is expected in batch processing
where the whole input is available in advance. This is why this
proposal suggests to not allow processing time timers. But I
understand your argument that the user may want to be able to run the
same pipeline on batch and streaming this is why we added the two
options under future work, namely (from the FLIP):

```
Future Work: In the future we may consider adding as options the capability of:
* firing all the registered processing time timers at the end of a job
(at close()) or,
* ignoring all the registered processing time timers at the end of a job.
```

Conceptually, we are essentially saying that we assume that batch
execution is assumed to be instantaneous and refers to a single
"point" in time and any processing-time timers for the future may fire
at the end of execution or be ignored (but not throw an exception). I
could also see ignoring the timers in batch as the default, if this
makes more sense.

By the way, do you have any usecases in mind that will help us better
shape our processing time timer handling?

Kostas

On Mon, Aug 17, 2020 at 2:52 PM David Anderson  wrote:
>
> Kostas,
>
> I'm pleased to see some concrete details in this FLIP.
>
> I wonder if the current proposal goes far enough in the direction of 
> recognizing the need some users may have for "batch" and "bounded streaming" 
> to be treated differently. If I've understood it correctly, the section on 
> scheduling allows me to choose STREAMING scheduling even if I have bounded 
> sources. I like that approach, because it recognizes that even though I have 
> bounded inputs, I don't necessarily want batch processing semantics. I think 
> it makes sense to extend this idea to processing time support as well.
>
> My thinking is that sometimes in development and testing it's reasonable to 
> run exactly the same job as in production, except with different sources and 
> sinks. While it might be a reasonable default, I'm not convinced that 
> switching a processing time streaming job to read from a bounded source 
> should always cause it to fail.
>
> David
>
> On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas  wrote:
>>
>> Hi all,
>>
>> As described in FLIP-131 [1], we are aiming at deprecating the DataSet
>> API in favour of the DataStream API and the Table API. After this work
>> is done, the user will be able to write a program using the DataStream
>> API and this will execute efficiently on both bounded and unbounded
>> data. But before we reach this point, it is worth discussing and
>> agreeing on the semantics of some operations as we transition from the
>> streaming world to the batch one.
>>
>> This thread and the associated FLIP [2] aim at discussing these issues
>> as these topics are pretty important to users and can lead to
>> unpleasant surprises if we do not pay attention.
>>
>> Let's have a healthy discussion here and I will be updating the FLIP
>> accordingly.
>>
>> Cheers,
>> Kostas
>>
>> [1] 
>> 

Re: Print SQL connector无法正常使用

2020-08-17 Thread xiao cai
Hi china_tao:
你好,HBase肯定没有问题的,请问你可以正常使用print connector吗,能否让我看看正确的使用姿势,感谢


 原始邮件 
发件人: china_tao
收件人: user-zh
发送时间: 2020年8月17日(周一) 23:00
主题: Re: Print SQL connector无法正常使用


String createHbaseSql = CREATE TABLE dimension ( rowKey STRING, cf ROW, tas BIGINT ) WITH ( 'connector.type' = 'hbase', 
'connector.version' = '1.4.3', 'connector.table-name' = ’test', 
'connector.write.buffer-flush.max-rows' = '10', 'connector.zookeeper.quorum' = 
‘IP:port', 'connector.zookeeper.znode.parent' = '/hbase', ); 
tableEnv.executeSql(createHbaseSql); Table queryTable = 
tableEnv.sqlQuery("select * from dimension"); 
tableEnv.toAppendStream(queryTable, Row.class).print(); 
你先用这种方式,看看能不能打印出来,证明你hbase没有问题。然后在用print_table。 -- Sent from: 
http://apache-flink.147419.n8.nabble.com/

Re: Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-08-17 Thread Yun Gao
+1 for removing the methods that are deprecated for a while & have alternative 
methods.

One specific thing is that if we remove the DataStream#split, do we consider 
enabling side-output in more operators in the future ? Currently it should be 
only available in ProcessFunctions, but not available to other commonly used 
UDF like Source or AsyncFunction[1].

One temporary solution occurs to me is to add a ProcessFunction after the 
operators want to use side-output. But I think the solution is not very direct 
to come up with and if it really works we might add it to the document of 
side-output. 

[1] https://issues.apache.org/jira/browse/FLINK-7954

Best,
 Yun


 --Original Mail --
Sender:Kostas Kloudas 
Send Date:Tue Aug 18 03:52:44 2020
Recipients:Dawid Wysakowicz 
CC:dev , user 
Subject:Re: [DISCUSS] Removing deprecated methods from DataStream API
+1 for removing them.



From a quick look, most of them (not all) have been deprecated a long time ago.



Cheers,

Kostas



On Mon, Aug 17, 2020 at 9:37 PM Dawid Wysakowicz  wrote:

>

> @David Yes, my idea was to remove any use of fold method and all related 
> classes including WindowedStream#fold

>

> @Klou Good idea to also remove the deprecated enableCheckpointing() & 
> StreamExecutionEnvironment#readFile and alike. I did another pass over some 
> of the classes and thought we could also drop:

>

> ExecutionConfig#set/getCodeAnalysisMode

> ExecutionConfig#disable/enableSysoutLogging

> ExecutionConfig#set/isFailTaskOnCheckpointError

> ExecutionConfig#isLatencyTrackingEnabled

>

> As for the `forceCheckpointing` I am not fully convinced to doing it. As far 
> as I know iterations still do not participate in checkpointing correctly. 
> Therefore it still might make sense to force it. In other words there is no 
> real alternative to that method. Unless we only remove the methods from 
> StreamExecutionEnvironment and redirect to the setter in CheckpointConfig. 
> WDYT?

>

> An updated list of methods I suggest to remove:

>

> ExecutionConfig#set/getCodeAnalysisMode (deprecated in 1.9)

> ExecutionConfig#disable/enableSysoutLogging (deprecated in 1.10)

> ExecutionConfig#set/isFailTaskOnCheckpointError (deprecated in 1.9)

> ExecutionConfig#isLatencyTrackingEnabled (deprecated in 1.7)

> ExecutionConfig#(get)/setNumberOfExecutionRetries() (deprecated in 1.1?)

> StreamExecutionEnvironment#readFile,readFileStream(...),socketTextStream(...),socketTextStream(...)
>  (deprecated in 1.2)

> RuntimeContext#getAllAccumulators (deprecated in 0.10)

> DataStream#fold and all related classes and methods such as FoldFunction, 
> FoldingState, FoldingStateDescriptor ... (deprecated in 1.3/1.4)

> StreamExecutionEnvironment#setStateBackend(AbstractStateBackend) (deprecated 
> in 1.5)

> DataStream#split (deprecated in 1.8)

> Methods in (Connected)DataStream that specify keys as either indices or field 
> names such as DataStream#keyBy, DataStream#partitionCustom, 
> ConnectedStream#keyBy,  (deprecated in 1.11)

>

> Bear in mind that majority of the options listed above in ExecutionConfig 
> take no effect. They were left there purely to satisfy the binary 
> compatibility. Personally I don't see any benefit of leaving a method and 
> silently dropping the underlying feature. The only configuration that is 
> respected is setting the number of execution retries.

>

> I also wanted to make it explicit that most of the changes above would result 
> in a binary incompatibility and require additional exclusions in the japicmp. 
> Those are:

>

> ExecutionConfig#set/getCodeAnalysisMode (deprecated in 1.9)

> ExecutionConfig#disable/enableSysoutLogging (deprecated in 1.10)

> ExecutionConfig#set/isFailTaskOnCheckpointError (deprecated in 1.9)

> ExecutionConfig#isLatencyTrackingEnabled (deprecated in 1.7)

> ExecutionConfig#(get)/setNumberOfExecutionRetries() (deprecated in 1.1?)

> DataStream#fold and all related classes and methods such as FoldFunction, 
> FoldingState, FoldingStateDescriptor ... (deprecated in 1.3/1.4)

> DataStream#split (deprecated in 1.8)

> Methods in (Connected)DataStream that specify keys as either indices or field 
> names such as DataStream#keyBy, DataStream#partitionCustom, 
> ConnectedStream#keyBy,  (deprecated in 1.11)

> StreamExecutionEnvironment#readFile,readFileStream(...),socketTextStream(...),socketTextStream(...)
>  (deprecated in 1.2)

>

> Looking forward to more opinions on the issue.

>

> Best,

>

> Dawid

>

>

> On 17/08/2020 12:49, Kostas Kloudas wrote:

>

> Thanks a lot for starting this Dawid,

>

> Big +1 for the proposed clean-up, and I would also add the deprecated

> methods of the StreamExecutionEnvironment like:

>

> enableCheckpointing(long interval, CheckpointingMode mode, boolean force)

> enableCheckpointing()

> isForceCheckpointing()

>

> readFile(FileInputFormat inputFormat,String

> filePath,FileProcessingMode watchType,long interval, FilePathFilter

> 

Flink checkpoint recovery time

2020-08-17 Thread Zhinan Cheng
Hi all,

I am working on measuring the failure recovery time of Flink and I want to
decompose the recovery time into different parts, say the time to detect
the failure, the time to restart the job, and the time to
restore the checkpointing.

I found that I can measure the down time during failure and the time to
restart the job and some metric for the checkpointing as below.

[image: measure.png]
Unfortunately, I cannot find any information about the failure detect time
and checkpoint recovery time, Is there any way that Flink has provided for
this, otherwise, how can I solve this?

Thanks a lot for your help.

Regards,


答复: How to specify the number of TaskManagers in Yarn Cluster using Per-Job Mode

2020-08-17 Thread 范超
Thanks Yangze
The reason why I don’t deploying a standalone cluster, it's because there 
kafka, kudu, hadoop, zookeeper on these machines, maybe currently using the 
yarn to manage resources is the best choice for me.
If Flink can not control how many tm to start , could anyone providing me some 
best practice for deploying on yarn please? I read the [1] and still don't very 
clear

[1] 
https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines

-邮件原件-
发件人: Yangze Guo [mailto:karma...@gmail.com] 
发送时间: 2020年8月18日 星期二 10:50
收件人: 范超 
抄送: user (user@flink.apache.org) 
主题: Re: How to specify the number of TaskManagers in Yarn Cluster using Per-Job 
Mode

Hi,

I think that is only related to the Yarn scheduling strategy. AFAIK, Flink 
could not control it. You could check the RM log to figure out why it did not 
schedule the containers to all the three machines. BTW, if you have specific 
requirements to start with all the three machines, how about deploying a 
standalone cluster instead?

Best,
Yangze Guo

On Tue, Aug 18, 2020 at 10:24 AM 范超  wrote:
>
> Thanks Yangze
>
> All 3 machines NodeManager is started.
>
> I just don't know why not three machines each running a Flink 
> TaskManager and how to achieve this
>
> -邮件原件-
> 发件人: Yangze Guo [mailto:karma...@gmail.com]
> 发送时间: 2020年8月18日 星期二 10:10
> 收件人: 范超 
> 抄送: user (user@flink.apache.org) 
> 主题: Re: How to specify the number of TaskManagers in Yarn Cluster 
> using Per-Job Mode
>
> Hi,
>
> Do you start the NodeManager in all the three machines? If so, could you 
> check all the NMs correctly connect to the ResourceManager?
>
> Best,
> Yangze Guo
>
> On Tue, Aug 18, 2020 at 10:01 AM 范超  wrote:
> >
> > Hi, Dev and Users
> > I’ve 3 machines each one is 8 cores and 16GB memory.
> > Following it’s my Resource Manager screenshot the cluster have 36GB total.
> > I specify the paralism to 3 or even up to 12,  But the task manager is 
> > always running on two nodes not all three machine, the third node does not 
> > start the task manager.
> > I tried set the –p –tm –jm parameters, but it always the same, only 
> > different is more container on the two maching but not all three machine 
> > start the task manager.
> > My question is how to set the cli parameter to start all of my three 
> > machine (all task manager start on 3 machines)
> >
> > Thanks a lot
> > [cid:image001.png@01D67546.62291B70]
> >
> >
> > Chao fan
> >


Re: How to specify the number of TaskManagers in Yarn Cluster using Per-Job Mode

2020-08-17 Thread Yangze Guo
Hi,

Flink can control how many TM to start, but where to start the TMs
depends on Yarn.

Do you meet any problem when deploying on Yarn or running Flink job?
Why do you need to start the TMs on all the three machines?

Best,
Yangze Guo

On Tue, Aug 18, 2020 at 11:25 AM 范超  wrote:
>
> Thanks Yangze
> The reason why I don’t deploying a standalone cluster, it's because there 
> kafka, kudu, hadoop, zookeeper on these machines, maybe currently using the 
> yarn to manage resources is the best choice for me.
> If Flink can not control how many tm to start , could anyone providing me 
> some best practice for deploying on yarn please? I read the [1] and still 
> don't very clear
>
> [1] 
> https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines
>
> -邮件原件-
> 发件人: Yangze Guo [mailto:karma...@gmail.com]
> 发送时间: 2020年8月18日 星期二 10:50
> 收件人: 范超 
> 抄送: user (user@flink.apache.org) 
> 主题: Re: How to specify the number of TaskManagers in Yarn Cluster using 
> Per-Job Mode
>
> Hi,
>
> I think that is only related to the Yarn scheduling strategy. AFAIK, Flink 
> could not control it. You could check the RM log to figure out why it did not 
> schedule the containers to all the three machines. BTW, if you have specific 
> requirements to start with all the three machines, how about deploying a 
> standalone cluster instead?
>
> Best,
> Yangze Guo
>
> On Tue, Aug 18, 2020 at 10:24 AM 范超  wrote:
> >
> > Thanks Yangze
> >
> > All 3 machines NodeManager is started.
> >
> > I just don't know why not three machines each running a Flink
> > TaskManager and how to achieve this
> >
> > -邮件原件-
> > 发件人: Yangze Guo [mailto:karma...@gmail.com]
> > 发送时间: 2020年8月18日 星期二 10:10
> > 收件人: 范超 
> > 抄送: user (user@flink.apache.org) 
> > 主题: Re: How to specify the number of TaskManagers in Yarn Cluster
> > using Per-Job Mode
> >
> > Hi,
> >
> > Do you start the NodeManager in all the three machines? If so, could you 
> > check all the NMs correctly connect to the ResourceManager?
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Aug 18, 2020 at 10:01 AM 范超  wrote:
> > >
> > > Hi, Dev and Users
> > > I’ve 3 machines each one is 8 cores and 16GB memory.
> > > Following it’s my Resource Manager screenshot the cluster have 36GB total.
> > > I specify the paralism to 3 or even up to 12,  But the task manager is 
> > > always running on two nodes not all three machine, the third node does 
> > > not start the task manager.
> > > I tried set the –p –tm –jm parameters, but it always the same, only 
> > > different is more container on the two maching but not all three machine 
> > > start the task manager.
> > > My question is how to set the cli parameter to start all of my three
> > > machine (all task manager start on 3 machines)
> > >
> > > Thanks a lot
> > > [cid:image001.png@01D67546.62291B70]
> > >
> > >
> > > Chao fan
> > >


Re: How to specify the number of TaskManagers in Yarn Cluster using Per-Job Mode

2020-08-17 Thread Yangze Guo
Hi,

I think that is only related to the Yarn scheduling strategy. AFAIK,
Flink could not control it. You could check the RM log to figure out
why it did not schedule the containers to all the three machines. BTW,
if you have specific requirements to start with all the three
machines, how about deploying a standalone cluster instead?

Best,
Yangze Guo

On Tue, Aug 18, 2020 at 10:24 AM 范超  wrote:
>
> Thanks Yangze
>
> All 3 machines NodeManager is started.
>
> I just don't know why not three machines each running a Flink TaskManager and 
> how to achieve this
>
> -邮件原件-
> 发件人: Yangze Guo [mailto:karma...@gmail.com]
> 发送时间: 2020年8月18日 星期二 10:10
> 收件人: 范超 
> 抄送: user (user@flink.apache.org) 
> 主题: Re: How to specify the number of TaskManagers in Yarn Cluster using 
> Per-Job Mode
>
> Hi,
>
> Do you start the NodeManager in all the three machines? If so, could you 
> check all the NMs correctly connect to the ResourceManager?
>
> Best,
> Yangze Guo
>
> On Tue, Aug 18, 2020 at 10:01 AM 范超  wrote:
> >
> > Hi, Dev and Users
> > I’ve 3 machines each one is 8 cores and 16GB memory.
> > Following it’s my Resource Manager screenshot the cluster have 36GB total.
> > I specify the paralism to 3 or even up to 12,  But the task manager is 
> > always running on two nodes not all three machine, the third node does not 
> > start the task manager.
> > I tried set the –p –tm –jm parameters, but it always the same, only 
> > different is more container on the two maching but not all three machine 
> > start the task manager.
> > My question is how to set the cli parameter to start all of my three
> > machine (all task manager start on 3 machines)
> >
> > Thanks a lot
> > [cid:image001.png@01D67546.62291B70]
> >
> >
> > Chao fan
> >


答复: How to specify the number of TaskManagers in Yarn Cluster using Per-Job Mode

2020-08-17 Thread 范超
Thanks Yangze

All 3 machines NodeManager is started.

I just don't know why not three machines each running a Flink TaskManager and 
how to achieve this

-邮件原件-
发件人: Yangze Guo [mailto:karma...@gmail.com] 
发送时间: 2020年8月18日 星期二 10:10
收件人: 范超 
抄送: user (user@flink.apache.org) 
主题: Re: How to specify the number of TaskManagers in Yarn Cluster using Per-Job 
Mode

Hi,

Do you start the NodeManager in all the three machines? If so, could you check 
all the NMs correctly connect to the ResourceManager?

Best,
Yangze Guo

On Tue, Aug 18, 2020 at 10:01 AM 范超  wrote:
>
> Hi, Dev and Users
> I’ve 3 machines each one is 8 cores and 16GB memory.
> Following it’s my Resource Manager screenshot the cluster have 36GB total.
> I specify the paralism to 3 or even up to 12,  But the task manager is always 
> running on two nodes not all three machine, the third node does not start the 
> task manager.
> I tried set the –p –tm –jm parameters, but it always the same, only different 
> is more container on the two maching but not all three machine start the task 
> manager.
> My question is how to set the cli parameter to start all of my three 
> machine (all task manager start on 3 machines)
>
> Thanks a lot
> [cid:image001.png@01D67546.62291B70]
>
>
> Chao fan
>


Re: How to specify the number of TaskManagers in Yarn Cluster using Per-Job Mode

2020-08-17 Thread Yangze Guo
Hi,

Do you start the NodeManager in all the three machines? If so, could
you check all the NMs correctly connect to the ResourceManager?

Best,
Yangze Guo

On Tue, Aug 18, 2020 at 10:01 AM 范超  wrote:
>
> Hi, Dev and Users
> I’ve 3 machines each one is 8 cores and 16GB memory.
> Following it’s my Resource Manager screenshot the cluster have 36GB total.
> I specify the paralism to 3 or even up to 12,  But the task manager is always 
> running on two nodes not all three machine, the third node does not start the 
> task manager.
> I tried set the –p –tm –jm parameters, but it always the same, only different 
> is more container on the two maching but not all three machine start the task 
> manager.
> My question is how to set the cli parameter to start all of my three machine 
> (all task manager start on 3 machines)
>
> Thanks a lot
> [cid:image001.png@01D67546.62291B70]
>
>
> Chao fan
>


Re: Performance Flink streaming kafka consumer sink to s3

2020-08-17 Thread Vijayendra Yadav
Hi, Do you think there can be any issue with Flinks performance, with 400Kb
up to 1 MB payload record sizes ? my Spark streaming seems to be doing
better. Are there any recommended configurations or increasing parallelism
to improve Flink streaming  using flink kafka connect?

Regards,
Vijay


On Fri, Aug 14, 2020 at 2:04 PM Vijayendra Yadav 
wrote:

> Hi Robert,
>
> Thanks for information. payloads so far are 400KB (each record).
> To achieve high parallelism at the downstream operator do I rebalance the
> kafka stream ? Could you give me an example please.
>
> Regards,
> Vijay
>
>
> On Fri, Aug 14, 2020 at 12:50 PM Robert Metzger 
> wrote:
>
>> Hi,
>>
>> Also, can we increase parallel processing, beyond the number of
>>> kafka partitions that we have, without causing any overhead ?
>>
>>
>> Yes, the Kafka sources produce a tiny bit of overhead, but the potential
>> benefit of having downstream operators at a high parallelism might be much
>> bigger.
>>
>> How large is a large payload in your case?
>>
>> Best practices:
>> Try to understand what's causing the performance slowdown: Kafka or S3 ?
>> You can do a test where you read from kafka, and write it into a
>> discarding sink.
>> Likewise, use a datagenerator source, and write into S3.
>>
>> Do the math on your job: What's the theoretical limits of your job:
>> https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines
>>
>> Hope this helps,
>> Robert
>>
>>
>> On Thu, Aug 13, 2020 at 11:25 PM Vijayendra Yadav 
>> wrote:
>>
>>> Hi Team,
>>>
>>> I am trying to increase throughput of my flink stream job streaming from
>>> kafka source and sink to s3. Currently it is running fine for small events
>>> records. But records with large payloads are running extremely slow like at
>>> rate 2 TPS.
>>>
>>> Could you provide some best practices to tune?
>>> Also, can we increase parallel processing, beyond the number of
>>> kafka partitions that we have, without causing any overhead ?
>>>
>>> Regards,
>>> Vijay
>>>
>>


How to specify the number of TaskManagers in Yarn Cluster using Per-Job Mode

2020-08-17 Thread 范超
Hi, Dev and Users
I’ve 3 machines each one is 8 cores and 16GB memory.
Following it’s my Resource Manager screenshot the cluster have 36GB total.
I specify the paralism to 3 or even up to 12,  But the task manager is always 
running on two nodes not all three machine, the third node does not start the 
task manager.
I tried set the �Cp �Ctm �Cjm parameters, but it always the same, only 
different is more container on the two maching but not all three machine start 
the task manager.
My question is how to set the cli parameter to start all of my three machine 
(all task manager start on 3 machines)

Thanks a lot
[cid:image001.png@01D67546.62291B70]


Chao fan

<>

Re: flink1.11 mysql 分页查询

2020-08-17 Thread Leonard Xu
Hi
可以跟下这个issue[1], 在1.12会支持用于自定义query

Best
Leonard
https://issues.apache.org/jira/browse/FLINK-17826 


> 在 2020年8月18日,09:50,china_tao  写道:
> 
> 那肯定不行啊,我mysql表里面内容很多。FlinkSQL有没有直接分页查询的方法么?望赐教。类似于spark
> dataframe中的dbtable,万分感谢
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: flink1.11 mysql 分页查询

2020-08-17 Thread china_tao
那肯定不行啊,我mysql表里面内容很多。FlinkSQL有没有直接分页查询的方法么?望赐教。类似于spark
dataframe中的dbtable,万分感谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/

??????1.11 kafka producer ????????partition????

2020-08-17 Thread Matrix42
FlinkFixedPartitioner??partitionsubTask??partition??partitoner??null??kafka??partitoner??roundrobin




----
??: "x2009438"

Re: flink1.11 mysql 分页查询

2020-08-17 Thread Leonard Xu
Hi

> 在 2020年8月17日,20:46,china_tao  写道:
> 
> 您好,请教一个问题,flink sql 读取mysql如何分页查询。
> 在spark中,dataframe可以通过dbtable,传入分页查询的语句。
> val resultDF = session.read.format("jdbc")
>  .option("url",jdbcUrl)
>  .option("dbtable" , selectSql )
>  .option("user",user)
>  .options(writeOpts)
>  .option("password",password).load()
> 
> 在flink中,通过connector,会读取全表么?

会的,就是读取全表,connector就是读取全量表

> String insertSql = CREATE TABLE MyUserTable (
>  id BIGINT,
>  name STRING,
>  age INT,
>  status BOOLEAN,
>  PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
>   'table-name' = 'users'
> );
> tableEnv.executeSql(insertSql);
> 以上的executesql会进行全表读取么?
> 还是执行了下面的sql,才会读取内容?
> String querysql = ”select * from MyUserTable limit 1 to 10“;

这个query Flink的query,从MyUserTable这张全量的表里筛选数据

Best
Leonard



Re: 回复:How to get the evaluation result of a time-based window aggregation in time after a new event falling into the window?

2020-08-17 Thread Theo Diefenthal
Hi Chengcheng Zhang, 

I think your request is related to this feature request from two years ago here 
[1], with me asking about the status one year ago [2]. 
You might want to upvote this so we can hope that it gets some more attention 
in future. 

Today, it is possible to write your own DataStream API where you customize the 
triggers to your wishes (CountTigger of 1), but with Flink SQL, you sadly lose 
most of that flexibility. 

Thanks @ forideal for mentioning that there is something in the configuration. 
I wasn't aware of that. 

[1] [ 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-Do-Not-Support-Custom-Trigger-td20932.html
 | 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-Do-Not-Support-Custom-Trigger-td20932.html
 ] 
[2] [ 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Are-there-any-news-on-custom-trigger-support-for-SQL-Table-API-td29600.html
 | 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Are-there-any-news-on-custom-trigger-support-for-SQL-Table-API-td29600.html
 ] 

Best regards 
Theo 


Von: "forideal"  
An: "Chengcheng Zhang" <274522...@qq.com> 
CC: "user"  
Gesendet: Sonntag, 16. August 2020 09:14:23 
Betreff: Re:回复:How to get the evaluation result of a time-based window 
aggregation in time after a new event falling into the window? 

Hi Chengcheng Zhang, 

You are welcome. 
I also got the material from the community to answer your question. 
There is also a hidden method here, as if the community has not been placed 
publicly in the document. 
table.exec.emit.early-fire.enabled = true 
table.exec.emit.early-fire.delay = 60 s 
[1] [ http://apache-flink.147419.n8.nabble.com/FLINKSQL1-10-UV-td4003.html | 
http://apache-flink.147419.n8.nabble.com/FLINKSQL1-10-UV-td4003.html ] 

Best, forideal 


















At 2020-08-16 13:21:25, "Chengcheng Zhang" <274522...@qq.com> wrote: 


Hi, forideal 
Thank you so much, it does help a lot. 
The approach you mentioned earlier, happened to be the same path we took two 
days ago, and it worked well as expected. 
To be honest, after some effort-taking searches on the Internet, I' am a little 
convinced that, this maybe the best solution at the moment. However, the 
time-based window aggregation is a great feature in Flink, as we all know. 
Would it be perfect if we could use time-based windows and still get the latest 
result at the same time? 
Best, Chengcheng Zhang 


-- 原始邮件 -- 
发件人: "forideal" ; 
发送时间: 2020年8月16日(星期天) 中午12:24 
收件人: "Chengcheng Zhang"<274522...@qq.com>; 
抄送: "user"; 
主题: Re:How to get the evaluation result of a time-based window aggregation in 
time after a new event falling into the window? 

Hi Chengcheng Zhang, 
Is this your scene? For example, every day is divided into 12 hours, let’s take 
today as an example, 2020081600 2020081601,...2020081623 
For example, if we count pv, we can count like this 
INSERT INTO cumulative_pv 
SELECT time_str, count(1) 
FROM pv_per_hour 
GROUP BY time_str; 
In this sql, time_str is an hour in 2020081600, 2020081601,...2020081623. 

[1] [ http://apache-flink.147419.n8.nabble.com/flink-sql-5-5-td2011.html | 
http://apache-flink.147419.n8.nabble.com/flink-sql-5-5-td2011.html ] 
[2] [ 
http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/
 | 
http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/
 ] 

Hope this helps. 

Best, forideal 







At 2020-08-16 12:05:04, "Chengcheng Zhang" <274522...@qq.com> wrote: 
BQ_BEGIN

Hi, 
I'm a new user of Flink, and have been puzzled a lot by the time-based window 
aggregation result. 
For our business, hourly and daily reports have to been created best in a real 
time style. So, I used a event-time based window aggregation to consume the 
Kafka data stream, but found that, only after the current hour or day passed, 
the newest result could be seen on console or upserted to MySQL. 
How can I get the latest window result immediately after a stream record 
falling into it? Is there a specific configuration option for this, hopefully? 
Please help and rescue me. 
Best regards. 









BQ_END



Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-08-17 Thread Kostas Kloudas
+1 for removing them.

>From a quick look, most of them (not all) have been deprecated a long time ago.

Cheers,
Kostas

On Mon, Aug 17, 2020 at 9:37 PM Dawid Wysakowicz  wrote:
>
> @David Yes, my idea was to remove any use of fold method and all related 
> classes including WindowedStream#fold
>
> @Klou Good idea to also remove the deprecated enableCheckpointing() & 
> StreamExecutionEnvironment#readFile and alike. I did another pass over some 
> of the classes and thought we could also drop:
>
> ExecutionConfig#set/getCodeAnalysisMode
> ExecutionConfig#disable/enableSysoutLogging
> ExecutionConfig#set/isFailTaskOnCheckpointError
> ExecutionConfig#isLatencyTrackingEnabled
>
> As for the `forceCheckpointing` I am not fully convinced to doing it. As far 
> as I know iterations still do not participate in checkpointing correctly. 
> Therefore it still might make sense to force it. In other words there is no 
> real alternative to that method. Unless we only remove the methods from 
> StreamExecutionEnvironment and redirect to the setter in CheckpointConfig. 
> WDYT?
>
> An updated list of methods I suggest to remove:
>
> ExecutionConfig#set/getCodeAnalysisMode (deprecated in 1.9)
> ExecutionConfig#disable/enableSysoutLogging (deprecated in 1.10)
> ExecutionConfig#set/isFailTaskOnCheckpointError (deprecated in 1.9)
> ExecutionConfig#isLatencyTrackingEnabled (deprecated in 1.7)
> ExecutionConfig#(get)/setNumberOfExecutionRetries() (deprecated in 1.1?)
> StreamExecutionEnvironment#readFile,readFileStream(...),socketTextStream(...),socketTextStream(...)
>  (deprecated in 1.2)
> RuntimeContext#getAllAccumulators (deprecated in 0.10)
> DataStream#fold and all related classes and methods such as FoldFunction, 
> FoldingState, FoldingStateDescriptor ... (deprecated in 1.3/1.4)
> StreamExecutionEnvironment#setStateBackend(AbstractStateBackend) (deprecated 
> in 1.5)
> DataStream#split (deprecated in 1.8)
> Methods in (Connected)DataStream that specify keys as either indices or field 
> names such as DataStream#keyBy, DataStream#partitionCustom, 
> ConnectedStream#keyBy,  (deprecated in 1.11)
>
> Bear in mind that majority of the options listed above in ExecutionConfig 
> take no effect. They were left there purely to satisfy the binary 
> compatibility. Personally I don't see any benefit of leaving a method and 
> silently dropping the underlying feature. The only configuration that is 
> respected is setting the number of execution retries.
>
> I also wanted to make it explicit that most of the changes above would result 
> in a binary incompatibility and require additional exclusions in the japicmp. 
> Those are:
>
> ExecutionConfig#set/getCodeAnalysisMode (deprecated in 1.9)
> ExecutionConfig#disable/enableSysoutLogging (deprecated in 1.10)
> ExecutionConfig#set/isFailTaskOnCheckpointError (deprecated in 1.9)
> ExecutionConfig#isLatencyTrackingEnabled (deprecated in 1.7)
> ExecutionConfig#(get)/setNumberOfExecutionRetries() (deprecated in 1.1?)
> DataStream#fold and all related classes and methods such as FoldFunction, 
> FoldingState, FoldingStateDescriptor ... (deprecated in 1.3/1.4)
> DataStream#split (deprecated in 1.8)
> Methods in (Connected)DataStream that specify keys as either indices or field 
> names such as DataStream#keyBy, DataStream#partitionCustom, 
> ConnectedStream#keyBy,  (deprecated in 1.11)
> StreamExecutionEnvironment#readFile,readFileStream(...),socketTextStream(...),socketTextStream(...)
>  (deprecated in 1.2)
>
> Looking forward to more opinions on the issue.
>
> Best,
>
> Dawid
>
>
> On 17/08/2020 12:49, Kostas Kloudas wrote:
>
> Thanks a lot for starting this Dawid,
>
> Big +1 for the proposed clean-up, and I would also add the deprecated
> methods of the StreamExecutionEnvironment like:
>
> enableCheckpointing(long interval, CheckpointingMode mode, boolean force)
> enableCheckpointing()
> isForceCheckpointing()
>
> readFile(FileInputFormat inputFormat,String
> filePath,FileProcessingMode watchType,long interval, FilePathFilter
> filter)
> readFileStream(...)
>
> socketTextStream(String hostname, int port, char delimiter, long maxRetry)
> socketTextStream(String hostname, int port, char delimiter)
>
> There are more, like the (get)/setNumberOfExecutionRetries() that were
> deprecated long ago, but I have not investigated to see if they are
> actually easy to remove.
>
> Cheers,
> Kostas
>
> On Mon, Aug 17, 2020 at 10:53 AM Dawid Wysakowicz
>  wrote:
>
> Hi devs and users,
>
> I wanted to ask you what do you think about removing some of the deprecated 
> APIs around the DataStream API.
>
> The APIs I have in mind are:
>
> RuntimeContext#getAllAccumulators (deprecated in 0.10)
> DataStream#fold and all related classes and methods such as FoldFunction, 
> FoldingState, FoldingStateDescriptor ... (deprecated in 1.3/1.4)
> StreamExecutionEnvironment#setStateBackend(AbstractStateBackend) (deprecated 
> in 1.5)
> DataStream#split (deprecated in 1.8)
> Methods in (Connected)DataStream 

Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-08-17 Thread Dawid Wysakowicz
@David Yes, my idea was to remove any use of fold method and all related
classes including WindowedStream#fold

@Klou Good idea to also remove the deprecated enableCheckpointing() &
StreamExecutionEnvironment#readFile and alike. I did another pass over
some of the classes and thought we could also drop:

  * ExecutionConfig#set/getCodeAnalysisMode
  * ExecutionConfig#disable/enableSysoutLogging
  * ExecutionConfig#set/isFailTaskOnCheckpointError
  * ExecutionConfig#isLatencyTrackingEnabled

As for the `forceCheckpointing` I am not fully convinced to doing it. As
far as I know iterations still do not participate in checkpointing
correctly. Therefore it still might make sense to force it. In other
words there is no real alternative to that method. Unless we only remove
the methods from StreamExecutionEnvironment and redirect to the setter
in CheckpointConfig. WDYT?

An updated list of methods I suggest to remove:

  * ExecutionConfig#set/getCodeAnalysisMode (deprecated in 1.9)
  * ExecutionConfig#disable/enableSysoutLogging (deprecated in 1.10)
  * ExecutionConfig#set/isFailTaskOnCheckpointError (deprecated in 1.9)
  * ExecutionConfig#isLatencyTrackingEnabled (deprecated in 1.7)
  * ExecutionConfig#(get)/setNumberOfExecutionRetries() (deprecated in 1.1?)
  * 
StreamExecutionEnvironment#readFile,readFileStream(...),socketTextStream(...),socketTextStream(...)
(deprecated in 1.2)
  * RuntimeContext#getAllAccumulators (deprecated in 0.10)
  * DataStream#fold and all related classes and methods such as
FoldFunction, FoldingState, FoldingStateDescriptor ... (deprecated
in 1.3/1.4)
  * StreamExecutionEnvironment#setStateBackend(AbstractStateBackend)
(deprecated in 1.5)
  * DataStream#split (deprecated in 1.8)
  * Methods in (Connected)DataStream that specify keys as either indices
or field names such as DataStream#keyBy, DataStream#partitionCustom,
ConnectedStream#keyBy,  (deprecated in 1.11)

Bear in mind that majority of the options listed above in
ExecutionConfig take no effect. They were left there purely to satisfy
the binary compatibility. Personally I don't see any benefit of leaving
a method and silently dropping the underlying feature. The only
configuration that is respected is setting the number of execution retries.

I also wanted to make it explicit that most of the changes above would
result in a binary incompatibility and require additional exclusions in
the japicmp. Those are:

  * ExecutionConfig#set/getCodeAnalysisMode (deprecated in 1.9)
  * ExecutionConfig#disable/enableSysoutLogging (deprecated in 1.10)
  * ExecutionConfig#set/isFailTaskOnCheckpointError (deprecated in 1.9)
  * ExecutionConfig#isLatencyTrackingEnabled (deprecated in 1.7)
  * ExecutionConfig#(get)/setNumberOfExecutionRetries() (deprecated in 1.1?)
  * DataStream#fold and all related classes and methods such as
FoldFunction, FoldingState, FoldingStateDescriptor ... (deprecated
in 1.3/1.4)
  * DataStream#split (deprecated in 1.8)
  * Methods in (Connected)DataStream that specify keys as either indices
or field names such as DataStream#keyBy, DataStream#partitionCustom,
ConnectedStream#keyBy,  (deprecated in 1.11)
  * 
StreamExecutionEnvironment#readFile,readFileStream(...),socketTextStream(...),socketTextStream(...)
(deprecated in 1.2)

Looking forward to more opinions on the issue.

Best,

Dawid


On 17/08/2020 12:49, Kostas Kloudas wrote:
> Thanks a lot for starting this Dawid,
>
> Big +1 for the proposed clean-up, and I would also add the deprecated
> methods of the StreamExecutionEnvironment like:
>
> enableCheckpointing(long interval, CheckpointingMode mode, boolean force)
> enableCheckpointing()
> isForceCheckpointing()
>
> readFile(FileInputFormat inputFormat,String
> filePath,FileProcessingMode watchType,long interval, FilePathFilter
> filter)
> readFileStream(...)
>
> socketTextStream(String hostname, int port, char delimiter, long maxRetry)
> socketTextStream(String hostname, int port, char delimiter)
>
> There are more, like the (get)/setNumberOfExecutionRetries() that were
> deprecated long ago, but I have not investigated to see if they are
> actually easy to remove.
>
> Cheers,
> Kostas
>
> On Mon, Aug 17, 2020 at 10:53 AM Dawid Wysakowicz
>  wrote:
>> Hi devs and users,
>>
>> I wanted to ask you what do you think about removing some of the deprecated 
>> APIs around the DataStream API.
>>
>> The APIs I have in mind are:
>>
>> RuntimeContext#getAllAccumulators (deprecated in 0.10)
>> DataStream#fold and all related classes and methods such as FoldFunction, 
>> FoldingState, FoldingStateDescriptor ... (deprecated in 1.3/1.4)
>> StreamExecutionEnvironment#setStateBackend(AbstractStateBackend) (deprecated 
>> in 1.5)
>> DataStream#split (deprecated in 1.8)
>> Methods in (Connected)DataStream that specify keys as either indices or 
>> field names such as DataStream#keyBy, DataStream#partitionCustom, 
>> ConnectedStream#keyBy,  (deprecated in 1.11)
>>
>> I think the 

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-17 Thread Kostas Kloudas
Hi Kurt and David,

Thanks a lot for the insightful feedback!

@Kurt: For the topic of checkpointing with Batch Scheduling, I totally
agree with you that it requires a lot more work and careful thinking
on the semantics. This FLIP was written under the assumption that if
the user wants to have checkpoints on bounded input, he/she will have
to go with STREAMING as the scheduling mode. Checkpointing for BATCH
can be handled as a separate topic in the future.

In the case of MIXED workloads and for this FLIP, the scheduling mode
should be set to STREAMING. That is why the AUTOMATIC option sets
scheduling to BATCH only if all the sources are bounded. I am not sure
what are the plans there at the scheduling level, as one could imagine
in the future that in mixed workloads, we schedule first all the
bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
subgraph per application, which is going to be scheduled after all
Bounded ones have finished. Essentially the bounded subgraphs will be
used to bootstrap the unbounded one. But, I am not aware of any plans
towards that direction.


@David: The processing time timer handling is a topic that has also
been discussed in the community in the past, and I do not remember any
final conclusion unfortunately.

In the current context and for bounded input, we chose to favor
reproducibility of the result, as this is expected in batch processing
where the whole input is available in advance. This is why this
proposal suggests to not allow processing time timers. But I
understand your argument that the user may want to be able to run the
same pipeline on batch and streaming this is why we added the two
options under future work, namely (from the FLIP):

```
Future Work: In the future we may consider adding as options the capability of:
* firing all the registered processing time timers at the end of a job
(at close()) or,
* ignoring all the registered processing time timers at the end of a job.
```

Conceptually, we are essentially saying that we assume that batch
execution is assumed to be instantaneous and refers to a single
"point" in time and any processing-time timers for the future may fire
at the end of execution or be ignored (but not throw an exception). I
could also see ignoring the timers in batch as the default, if this
makes more sense.

By the way, do you have any usecases in mind that will help us better
shape our processing time timer handling?

Kostas

On Mon, Aug 17, 2020 at 2:52 PM David Anderson  wrote:
>
> Kostas,
>
> I'm pleased to see some concrete details in this FLIP.
>
> I wonder if the current proposal goes far enough in the direction of 
> recognizing the need some users may have for "batch" and "bounded streaming" 
> to be treated differently. If I've understood it correctly, the section on 
> scheduling allows me to choose STREAMING scheduling even if I have bounded 
> sources. I like that approach, because it recognizes that even though I have 
> bounded inputs, I don't necessarily want batch processing semantics. I think 
> it makes sense to extend this idea to processing time support as well.
>
> My thinking is that sometimes in development and testing it's reasonable to 
> run exactly the same job as in production, except with different sources and 
> sinks. While it might be a reasonable default, I'm not convinced that 
> switching a processing time streaming job to read from a bounded source 
> should always cause it to fail.
>
> David
>
> On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas  wrote:
>>
>> Hi all,
>>
>> As described in FLIP-131 [1], we are aiming at deprecating the DataSet
>> API in favour of the DataStream API and the Table API. After this work
>> is done, the user will be able to write a program using the DataStream
>> API and this will execute efficiently on both bounded and unbounded
>> data. But before we reach this point, it is worth discussing and
>> agreeing on the semantics of some operations as we transition from the
>> streaming world to the batch one.
>>
>> This thread and the associated FLIP [2] aim at discussing these issues
>> as these topics are pretty important to users and can lead to
>> unpleasant surprises if we do not pay attention.
>>
>> Let's have a healthy discussion here and I will be updating the FLIP
>> accordingly.
>>
>> Cheers,
>> Kostas
>>
>> [1] 
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
>> [2] 
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522


Re: coordination of sinks

2020-08-17 Thread Fabian Hueske
Hi Marco,

You cannot really synchronize data that is being emitted via different
streams (without bringing them together in an operator).

I see two options:

1) emit the event to create the partition and the data to be written into
the partition to the same stream. Flink guarantees that records do not
overtake records in the same partition. However, you need to ensure that
all records remain in the same partition, for example by partitioning on
the same ke.
2) emit the records to two different streams but have a CoProcessFunction
that processes the create partition and data events. The processing
function would just buffer the data events (in state) until it observes the
create partition event for which it creates the partitions (in a
synchronous fashion). Once the partition is created, it forwards all
buffered data and the remaining data.

Hope this helps,
Fabian

Am Sa., 15. Aug. 2020 um 07:45 Uhr schrieb Marco Villalobos <
mvillalo...@kineteque.com>:

> Given a source that goes into a tumbling window with a process function
> that yields two side outputs, in addition to the main data stream, is it
> possible to coordinate the order of completion
> of sink 1, sink 2, and sink 3 as data leaves the tumbling window?
>
> source -> tumbling window -> process function -> side output tag 1 ->
> sink 1   \-> side output tag 2
> -> sink 2
>  \-> main stream -> sink 3
>
>
> sink 1 will create partitions in PostgreSQL for me.
> sink 2 will insert data into the partitioned table
> sink 3 can happen in any order
> but all of them need to finish before the next window fires.
>
> Any advice will help.
>


回复:1.11 kafka producer 只往一个partition里写

2020-08-17 Thread cs
目前Kafka 
producer的partitioner使用的是org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner这个类
具体的分区方法是org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner#partition
由该方法可以得知你每个subtask发到哪个kafka的partition中。每个subtask的数据只会写到一个固定的partition里面。




--原始邮件--
发件人:
"user-zh"   
 

Re: flink sql在实时数仓中,关联hbase维表频繁变化的问题

2020-08-17 Thread shizk233
有没有可能把维表数据也作为数据流从kafka输入呢

Jim Chen  于2020年8月17日周一 下午4:36写道:

> 大家好:
> 我们现在在用flink sql在做实时数仓,架构大概是kafka关联hbase维表,然后写入clickhouse。hbase维表是频繁变化的
> 现在遇到的几个比较棘手的问题:
> 1、自己在实现AsyncTableFunction做异步io的时候,发现性能还是不够。后来就加入本地缓存,但是缓存一致性出现问题,不知道该如何解决
> 2、写入hbase的时候,是批量写的,无法保证有序,维表频繁变化的话,顺序不对,会造成结果有问题
> 3、hbase维表,可能5s后才会更新,但是此时kafka数据流已经过去了,关联的数据都是空
>
> 不知道,针对上面的场景,有什么好的解决思路或者方案
>


Re: 如何在KeyedProcessFunction中获取processingTime

2020-08-17 Thread shizk233
ctx.timestamp()其实就是获取的StreamRecord的时间戳,也就是事件被提取出来的时间戳。
这个方法一般需要使用event time,并且在数据流上assign过timestamp和watermark。

ゞ野蠻遊戲χ  于2020年8月16日周日 下午7:57写道:

> 大家好
>
>   
> 当我在KeyedProcessFunction的processElement方法中获取processingTime,就像这样ctx.timestamp(),返回为null,我改如何在processElement中获取processingTime?
>
>
> 谢谢!
> 嘉治


Re: Print SQL connector无法正常使用

2020-08-17 Thread china_tao
String createHbaseSql = CREATE TABLE dimension (
rowKey STRING,
cf ROW,
tas BIGINT
) WITH (
'connector.type' = 'hbase',
'connector.version' = '1.4.3',
'connector.table-name' = ’test',
'connector.write.buffer-flush.max-rows' = '10',
'connector.zookeeper.quorum' = ‘IP:port',
'connector.zookeeper.znode.parent' = '/hbase',
);
tableEnv.executeSql(createHbaseSql);
Table queryTable = tableEnv.sqlQuery("select * from dimension");
tableEnv.toAppendStream(queryTable, Row.class).print();

你先用这种方式,看看能不能打印出来,证明你hbase没有问题。然后在用print_table。 



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink cdc能支持分库分表读取吗

2020-08-17 Thread china_tao
在数据库层面建view(view关联你的分库分表),然后flink操作这个view。就类似你用sqoop或者其它抽取工具的操作方式一样。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

1.11 kafka producer 只往一个partition里写

2020-08-17 Thread x2009438
我看1.11的kafka默认是用FlinkFixedPartitioner,应该是把sink每个subtask里的数据全部写到某一个partition吧?但是碰到了一个奇怪的事情:job整体并行度为5的情况下,数据结果只往partition
 0里写。然后把partitoner用null替代之后是以roundrobin方式写入每一个partition。感到很奇怪啊。求助




发自我的iPhone


发自我的iPhone

flink1.11 mysql 分页查询

2020-08-17 Thread china_tao
您好,请教一个问题,flink sql 读取mysql如何分页查询。在spark中,dataframe可以通过dbtable,传入分页查询的语句。val
resultDF = session.read.format("jdbc")  .option("url",jdbcUrl) 
.option("dbtable" , selectSql )  .option("user",user) 
.options(writeOpts) 
.option("password",password).load()在flink中,通过connector,会读取全表么?String
insertSql = CREATE TABLE MyUserTable (  id BIGINT,  name STRING,  age INT, 
status BOOLEAN,  PRIMARY KEY (id) NOT ENFORCED) WITH (   'connector' =
'jdbc',   'url' = 'jdbc:mysql://localhost:3306/mydatabase',   'table-name' =
'users');tableEnv.executeSql(insertSql);以上的executesql会进行全表读取么?还是执行了下面的sql,才会读取内容?String
querysql = ”select * from MyUserTable limit 1 to 10“;
tableEnv.sqlQuery(querySql);执行上看的sqlQuery才会真正的读取数据吧。问题比较简单,只是有点懵,不知道跟spark是否有区别。谢谢
 



--
Sent from: http://apache-flink.147419.n8.nabble.com/

RE: JobManager refusing connections when running many jobs in parallel?

2020-08-17 Thread Hailu, Andreas
Interesting – what is the JobManager submission bounded by? Does it only allow 
a certain number of submissions per second, or is there a number of threads it 
accepts?

// ah

From: Robert Metzger 
Sent: Tuesday, August 11, 2020 4:46 AM
To: Hailu, Andreas [Engineering] 
Cc: user@flink.apache.org; Shah, Siddharth [Engineering] 

Subject: Re: JobManager refusing connections when running many jobs in parallel?

Thanks for checking.

Your analysis sounds correct. The JM is busy processing job submissions, 
resulting in other submissions not being accepted.

Increasing rest.connection-timeout should resolve your problem.


On Fri, Aug 7, 2020 at 1:59 AM Hailu, Andreas 
mailto:andreas.ha...@gs.com>> wrote:
Thanks for pointing this out. We had a look - the nodes in our cluster have a 
cap of 65K open files and we aren’t breaching 50% per metrics, so I don’t 
believe this is the problem.

The connection refused error makes us think it’s some process using a thread 
pool for the JobManager hitting capacity on a port somewhere. This sound 
correct? Is there a config for us to increase the pool size?

From: Robert Metzger mailto:rmetz...@apache.org>>
Sent: Wednesday, July 29, 2020 1:52:53 AM
To: Hailu, Andreas [Engineering]
Cc: user@flink.apache.org; Shah, Siddharth 
[Engineering]
Subject: Re: JobManager refusing connections when running many jobs in parallel?

Hi Andreas,

Thanks for reaching out .. this should not happen ...
Maybe your operating system has configured low limits for the number of 
concurrent connections / sockets. Maybe this thread is helpful: 
https://stackoverflow.com/questions/923990/why-do-i-get-connection-refused-after-1024-connections
 (there might better SO threads, I didn't put much effort into searching :) )

On Mon, Jul 27, 2020 at 6:31 PM Hailu, Andreas 
mailto:andreas.ha...@gs.com>> wrote:
Hi team,

We’ve observed that when we submit a decent number of jobs in parallel from a 
single Job Master, we encounter job failures due with Connection Refused 
exceptions. We’ve seen this behavior start at 30 jobs running in parallel. It’s 
seemingly transient, however, as upon several retries the job succeeds. The 
surface level error varies, but digging deeper in stack traces it looks to stem 
from the Job Manager no longer accepting connections.

I’ve included a couple of examples below from failed jobs’ driver logs, with 
different errors stemming from a connection refused error:

First example: 15 Task Managers/2 cores/4096 Job Manager memory/12288 Task 
Manager memory - 30 jobs submitted in parallel, each with parallelism of 1
Job Manager is running @ d43723-563.dc.gs.com: 
Using job manager web tracking url http://d43723-563.dc.gs.com:41268;> 
Job Manager Web Interface  (http://d43723-563.dc.gs.com:41268) 
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve 
the execution result. (JobID: 1dfef6303cf0e888231d4c57b4b4e0e6)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
...
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
Could not complete the operation. Number of retries has been exhausted.
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:273)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:341)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
at 

flink1.11 mysql 分页查询

2020-08-17 Thread china_tao
您好,请教一个问题,flink sql 读取mysql如何分页查询。
在spark中,dataframe可以通过dbtable,传入分页查询的语句。
val resultDF = session.read.format("jdbc")
  .option("url",jdbcUrl)
  .option("dbtable" , selectSql )
  .option("user",user)
  .options(writeOpts)
  .option("password",password).load()

在flink中,通过connector,会读取全表么?
String insertSql = CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users'
);
tableEnv.executeSql(insertSql);
以上的executesql会进行全表读取么?
还是执行了下面的sql,才会读取内容?
String querysql = ”select * from MyUserTable limit 1 to 10“;
 tableEnv.sqlQuery(querySql);
执行上看的sqlQuery才会真正的读取数据吧。

问题比较简单,只是有点懵,不知道跟spark是否有区别。
谢谢
 



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-17 Thread David Anderson
Kostas,

I'm pleased to see some concrete details in this FLIP.

I wonder if the current proposal goes far enough in the direction of
recognizing the need some users may have for "batch" and "bounded
streaming" to be treated differently. If I've understood it correctly, the
section on scheduling allows me to choose STREAMING scheduling even if I
have bounded sources. I like that approach, because it recognizes that even
though I have bounded inputs, I don't necessarily want batch processing
semantics. I think it makes sense to extend this idea to processing time
support as well.

My thinking is that sometimes in development and testing it's reasonable to
run exactly the same job as in production, except with different sources
and sinks. While it might be a reasonable default, I'm not convinced that
switching a processing time streaming job to read from a bounded source
should always cause it to fail.

David

On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas  wrote:

> Hi all,
>
> As described in FLIP-131 [1], we are aiming at deprecating the DataSet
> API in favour of the DataStream API and the Table API. After this work
> is done, the user will be able to write a program using the DataStream
> API and this will execute efficiently on both bounded and unbounded
> data. But before we reach this point, it is worth discussing and
> agreeing on the semantics of some operations as we transition from the
> streaming world to the batch one.
>
> This thread and the associated FLIP [2] aim at discussing these issues
> as these topics are pretty important to users and can lead to
> unpleasant surprises if we do not pay attention.
>
> Let's have a healthy discussion here and I will be updating the FLIP
> accordingly.
>
> Cheers,
> Kostas
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
> [2]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522
>


flink1.11 mysql 分页查询

2020-08-17 Thread china_tao
您好,请教一个问题,flink sql 读取mysql如何分页查询。
在spark中,dataframe可以通过dbtable,传入分页查询的语句。
val resultDF = session.read.format("jdbc")
  .option("url",jdbcUrl)
  .option("dbtable" , selectSql )
  .option("user",user)
  .options(writeOpts)
  .option("password",password).load()

在flink中,通过connector,会读取全表么?
String insertSql = CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users'
);
tableEnv.executeSql(insertSql);
以上的executesql会进行全表读取么?
还是执行了下面的sql,才会读取内容?
String querysql = ”select * from MyUserTable limit 1 to 10“;
 tableEnv.sqlQuery(querySql);
执行上看的sqlQuery才会真正的读取数据吧。

问题比较简单,只是有点懵,不知道跟spark是否有区别。
谢谢
 



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复:flink-1.10.1 想用 DDL 入 ES5.6

2020-08-17 Thread kcz
tks.收到





-- 原始邮件 --
发件人: Yangze Guo https://github.com/apache/flink/pull/12184/files#diff-c71784e07a5a5c214a2f4f9843e6ef3fgt;
 开始
 比如你自己实现了Elasticsearch5DynamicSink 


Re: PyFlink 中间表

2020-08-17 Thread Xingbo Huang
Hi,

支持

Best,
Xingbo

guaishushu1...@163.com  于2020年8月17日周一 下午7:55写道:

> 哪位大佬知道Flink 1.10 PyFlink支持中间表 或者支持这种写法吗
>   source = st_env.scan("source_kafka_ifang_dkt_log")
>   dim_table = source.select("`cluster`, `caller`, `cid`,`content`, `ip`
> `path`, `type`")
>   st_env.register_table('dim_table', dim_table)
>
>
>
>
> guaishushu1...@163.com
>


PyFlink 中间表

2020-08-17 Thread guaishushu1...@163.com
哪位大佬知道Flink 1.10 PyFlink支持中间表 或者支持这种写法吗
  source = st_env.scan("source_kafka_ifang_dkt_log")
  dim_table = source.select("`cluster`, `caller`, `cid`,`content`, `ip` `path`, 
`type`")
  st_env.register_table('dim_table', dim_table)




guaishushu1...@163.com


flink sql 数据异常导致任务失败

2020-08-17 Thread 赵一旦
kafka source,一条异常数据就会导致任务失败,这种问题怎么解决呢?

以前用DatastreamAPI的话是自己解析,因此可以捕获异常,仅统计非法数据量作为flink的metric指标。

现在是基于flink sql直接基于kakfa创建动态表,查询动态表过程json解析失败,导致任务失败。


Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-08-17 Thread David Anderson
I assume that along with DataStream#fold you would also
remove WindowedStream#fold.

I'm in favor of going ahead with all of these.

David

On Mon, Aug 17, 2020 at 10:53 AM Dawid Wysakowicz 
wrote:

> Hi devs and users,
>
> I wanted to ask you what do you think about removing some of the
> deprecated APIs around the DataStream API.
>
> The APIs I have in mind are:
>
>- RuntimeContext#getAllAccumulators (deprecated in 0.10)
>- DataStream#fold and all related classes and methods such as
>FoldFunction, FoldingState, FoldingStateDescriptor ... (deprecated in
>1.3/1.4)
>- StreamExecutionEnvironment#setStateBackend(AbstractStateBackend)
>(deprecated in 1.5)
>- DataStream#split (deprecated in 1.8)
>- Methods in (Connected)DataStream that specify keys as either indices
>or field names such as DataStream#keyBy, DataStream#partitionCustom,
>ConnectedStream#keyBy,  (deprecated in 1.11)
>
> I think the first three should be straightforward. They are long
> deprecated. The getAccumulators method is not used very often in my
> opinion. The same applies to the DataStream#fold which additionally is not
> very performant. Lastly the setStateBackend has an alternative with a class
> from the AbstractStateBackend hierarchy, therefore it will be still code
> compatible. Moreover if we remove the
> #setStateBackend(AbstractStateBackend) we will get rid off warnings users
> have right now when setting a statebackend as the correct method cannot be
> used without an explicit casting.
>
> As for the DataStream#split I know there were some objections against
> removing the #split method in the past. I still believe the output tags can
> replace the split method already.
>
> The only problem in the last set of methods I propose to remove is that
> they were deprecated only in the last release and those method were only
> partially deprecated. Moreover some of the methods were not deprecated in
> ConnectedStreams. Nevertheless I'd still be inclined to remove the methods
> in this release.
>
> Let me know what do you think about it.
>
> Best,
>
> Dawid
>


Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-08-17 Thread Kostas Kloudas
Thanks a lot for starting this Dawid,

Big +1 for the proposed clean-up, and I would also add the deprecated
methods of the StreamExecutionEnvironment like:

enableCheckpointing(long interval, CheckpointingMode mode, boolean force)
enableCheckpointing()
isForceCheckpointing()

readFile(FileInputFormat inputFormat,String
filePath,FileProcessingMode watchType,long interval, FilePathFilter
filter)
readFileStream(...)

socketTextStream(String hostname, int port, char delimiter, long maxRetry)
socketTextStream(String hostname, int port, char delimiter)

There are more, like the (get)/setNumberOfExecutionRetries() that were
deprecated long ago, but I have not investigated to see if they are
actually easy to remove.

Cheers,
Kostas

On Mon, Aug 17, 2020 at 10:53 AM Dawid Wysakowicz
 wrote:
>
> Hi devs and users,
>
> I wanted to ask you what do you think about removing some of the deprecated 
> APIs around the DataStream API.
>
> The APIs I have in mind are:
>
> RuntimeContext#getAllAccumulators (deprecated in 0.10)
> DataStream#fold and all related classes and methods such as FoldFunction, 
> FoldingState, FoldingStateDescriptor ... (deprecated in 1.3/1.4)
> StreamExecutionEnvironment#setStateBackend(AbstractStateBackend) (deprecated 
> in 1.5)
> DataStream#split (deprecated in 1.8)
> Methods in (Connected)DataStream that specify keys as either indices or field 
> names such as DataStream#keyBy, DataStream#partitionCustom, 
> ConnectedStream#keyBy,  (deprecated in 1.11)
>
> I think the first three should be straightforward. They are long deprecated. 
> The getAccumulators method is not used very often in my opinion. The same 
> applies to the DataStream#fold which additionally is not very performant. 
> Lastly the setStateBackend has an alternative with a class from the 
> AbstractStateBackend hierarchy, therefore it will be still code compatible. 
> Moreover if we remove the #setStateBackend(AbstractStateBackend) we will get 
> rid off warnings users have right now when setting a statebackend as the 
> correct method cannot be used without an explicit casting.
>
> As for the DataStream#split I know there were some objections against 
> removing the #split method in the past. I still believe the output tags can 
> replace the split method already.
>
> The only problem in the last set of methods I propose to remove is that they 
> were deprecated only in the last release and those method were only partially 
> deprecated. Moreover some of the methods were not deprecated in 
> ConnectedStreams. Nevertheless I'd still be inclined to remove the methods in 
> this release.
>
> Let me know what do you think about it.
>
> Best,
>
> Dawid


flink 1.10.1 ???????? OutOfMemoryError: Metaspace

2020-08-17 Thread ????
hi all
 flink 1.10.1 ??10??
2020-08-15 19:32:59
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:498)
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:384)
at sun.reflect.GeneratedMethodAccessor250.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:282)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:150)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory 
error has occurred. This can mean two things: either the job requires a larger 
size of JVM metaspace to load classes or there is a class loading leak. In the 
first case 'taskmanager.memory.jvm-metaspace.size' configuration option should 
be increased. If the error persists (usually in cluster after several job 
(re-)submissions) then there is probably a class loading leak which has to be 
investigated and fixed. The task executor has to be shutdown...

MemoryAnalyzer??dump 700m 70??m


??

Re: FlinkSQL tableEnv 依赖问题

2020-08-17 Thread Rui Li
没错,-C是需要每台机器上能访问到的

On Mon, Aug 17, 2020 at 5:56 PM 赵一旦  wrote:

> 不一样吧。-C 指定的是必须所有集群机器能访问的。sql-client指定的那个按照之前其他同学的说法是会被上传的。
>
> Rui Li  于2020年8月17日周一 下午5:40写道:
>
> > 对,这个文件每一行就是一个类名。另外你说的flink run指定额外的jar包应该可以通过-C参数来指定
> >
> > On Mon, Aug 17, 2020 at 5:38 PM 赵一旦  wrote:
> >
> > > 哦哦。也就是说本身这个文件内内容是支持一行一个这样的格式的是吧?
> > >
> > > Rui Li  于2020年8月17日周一 下午5:36写道:
> > >
> > > > 用shade plugin的时候可以指定service resource
> > > > transformer,应该能把多个service文件merge起来。具体可以参考:
> > > >
> > > >
> > >
> >
> https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer
> > > >
> > > > On Mon, Aug 17, 2020 at 5:00 PM 赵一旦  wrote:
> > > >
> > > > >
> > @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。
> > > > >
> > > > > 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。
> > > > >
> > > > > Rui Li  于2020年8月17日周一 下午3:46写道:
> > > > >
> > > > > > 可能是打fat
> > > > > >
> > jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
> > > > > >
> > > > > > On Fri, Aug 14, 2020 at 7:13 PM 赵一旦  wrote:
> > > > > >
> > > > > > > 代码如下:
> > > > > > > // tEnv;
> > > > > > > tEnv.sqlUpdate("create table dr1(  " +
> > > > > > > "  cid STRING,  " +
> > > > > > > "  server_time BIGINT,  " +
> > > > > > > "  d MAP,  " +
> > > > > > > "  process_time AS PROCTIME(),  " +
> > > > > > > "  event_time AS
> TO_TIMESTAMP(FROM_UNIXTIME(server_time /
> > > > > 1000)),
> > > > > > > " +
> > > > > > > "  WATERMARK FOR event_time AS event_time - INTERVAL
> '60'
> > > > > SECOND
> > > > > > > " +
> > > > > > > ") WITH (  " +
> > > > > > > "  'update-mode' = 'append',  " +
> > > > > > > "  'connector.type' = 'kafka',  " +
> > > > > > > "  'connector.version' = 'universal',  " +
> > > > > > > "  'connector.topic' = 'antibot_dr1',  " +
> > > > > > > "  'connector.startup-mode' = 'latest-offset',  " +
> > > > > > > "  'connector.properties.zookeeper.connect' =
> > > > > > > 'yq01-sw-xxx03.yq01:8681',  " +
> > > > > > > "  'connector.properties.bootstrap.servers' =
> > > > > > > 'yq01-sw-xxx03.yq01:8192',  " +
> > > > > > > "  'format.type' = 'json'  " +
> > > > > > > ")");
> > > > > > > Table t1 = tEnv.sqlQuery("select * from dr1");
> > > > > > >
> > > > > > > 我打包会把flink-json打包进去,最终结果包是test.jar。
> > > > > > >
> > > > > > > test.jar是个fat jar,相关依赖都有了。
> > > > > > >
> > > > > > > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
> > > > > > >
> > > > > > > Caused by:
> > > > org.apache.flink.table.api.NoMatchingTableFactoryException:
> > > > > > > Could not find a suitable table factory for
> > > > > > > 'org.apache.flink.table.factories.DeserializationSchemaFactory'
> > in
> > > > > > > the classpath.
> > > > > > >
> > > > > > > 可是我flink-json.jar都打包进去了,居然还是报错。。。
> > > > > > >
> > > > > > > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar
> 这个命令的机器
> > > > > > >
> > > > > > > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
> > > > > > >
> > > > > > > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Best regards!
> > > > > > Rui Li
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Best regards!
> > > > Rui Li
> > > >
> > >
> >
> >
> > --
> > Best regards!
> > Rui Li
> >
>


-- 
Best regards!
Rui Li


Re: flink-1.10.1 想用 DDL 入 ES5.6

2020-08-17 Thread Yangze Guo
社区已经在考虑废弃es5的支持了

Best,
Yangze Guo

On Mon, Aug 17, 2020 at 5:38 PM kcz <573693...@qq.com> wrote:
>
> 有一个小问题比较好奇,当时社区为什么没有提供ES5的sql功能,是处于什么考虑吗?因为已经提供了ES5 sinK的connect。
> --原始邮件--
> 发件人:  
>   "kcz"   
>  
> <573693...@qq.com;
> 发送时间:2020年8月17日(星期一) 上午8:34
> 收件人:"user-zh"
> 主题:回复:flink-1.10.1 想用 DDL 入 ES5.6
>
>
>
> 谢谢大佬 我先研究研究
>
>
>
>
>
> -- 原始邮件 --
> 发件人: Leonard Xu  发送时间: 2020年8月14日 10:25
> 收件人: user-zh  主题: 回复:flink-1.10.1 想用 DDL 入 ES5.6
>
>
>
> Hi,
> 我贴的链接里有对应的PR[1], 你可以看看这个PR里的代码,代码入口可以从 Elasticsearch6DynamicSink.java 
>   开始
> 比如你自己实现了Elasticsearch5DynamicSink 
>   一套后,再打一个 es5 的sql jar 就好了。
>
> 祝好
> Leonard
> [1] https://github.com/apache/flink/pull/12184 
> 
>
>  在 2020年8月14日,10:14,kcz <573693...@qq.com 写道:
> 
>  查看您说的[1]的url之后,发现里面并没有跟 es sql jar有关的。
> 
> 
> 
>  --
>  Sent from: http://apache-flink.147419.n8.nabble.com/


Re: FlinkSQL tableEnv 依赖问题

2020-08-17 Thread 赵一旦
不一样吧。-C 指定的是必须所有集群机器能访问的。sql-client指定的那个按照之前其他同学的说法是会被上传的。

Rui Li  于2020年8月17日周一 下午5:40写道:

> 对,这个文件每一行就是一个类名。另外你说的flink run指定额外的jar包应该可以通过-C参数来指定
>
> On Mon, Aug 17, 2020 at 5:38 PM 赵一旦  wrote:
>
> > 哦哦。也就是说本身这个文件内内容是支持一行一个这样的格式的是吧?
> >
> > Rui Li  于2020年8月17日周一 下午5:36写道:
> >
> > > 用shade plugin的时候可以指定service resource
> > > transformer,应该能把多个service文件merge起来。具体可以参考:
> > >
> > >
> >
> https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer
> > >
> > > On Mon, Aug 17, 2020 at 5:00 PM 赵一旦  wrote:
> > >
> > > >
> @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。
> > > >
> > > > 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?
> > > >
> > > >
> > > >
> > >
> >
> 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。
> > > >
> > > > Rui Li  于2020年8月17日周一 下午3:46写道:
> > > >
> > > > > 可能是打fat
> > > > >
> jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
> > > > >
> > > > > On Fri, Aug 14, 2020 at 7:13 PM 赵一旦  wrote:
> > > > >
> > > > > > 代码如下:
> > > > > > // tEnv;
> > > > > > tEnv.sqlUpdate("create table dr1(  " +
> > > > > > "  cid STRING,  " +
> > > > > > "  server_time BIGINT,  " +
> > > > > > "  d MAP,  " +
> > > > > > "  process_time AS PROCTIME(),  " +
> > > > > > "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time /
> > > > 1000)),
> > > > > > " +
> > > > > > "  WATERMARK FOR event_time AS event_time - INTERVAL '60'
> > > > SECOND
> > > > > > " +
> > > > > > ") WITH (  " +
> > > > > > "  'update-mode' = 'append',  " +
> > > > > > "  'connector.type' = 'kafka',  " +
> > > > > > "  'connector.version' = 'universal',  " +
> > > > > > "  'connector.topic' = 'antibot_dr1',  " +
> > > > > > "  'connector.startup-mode' = 'latest-offset',  " +
> > > > > > "  'connector.properties.zookeeper.connect' =
> > > > > > 'yq01-sw-xxx03.yq01:8681',  " +
> > > > > > "  'connector.properties.bootstrap.servers' =
> > > > > > 'yq01-sw-xxx03.yq01:8192',  " +
> > > > > > "  'format.type' = 'json'  " +
> > > > > > ")");
> > > > > > Table t1 = tEnv.sqlQuery("select * from dr1");
> > > > > >
> > > > > > 我打包会把flink-json打包进去,最终结果包是test.jar。
> > > > > >
> > > > > > test.jar是个fat jar,相关依赖都有了。
> > > > > >
> > > > > > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
> > > > > >
> > > > > > Caused by:
> > > org.apache.flink.table.api.NoMatchingTableFactoryException:
> > > > > > Could not find a suitable table factory for
> > > > > > 'org.apache.flink.table.factories.DeserializationSchemaFactory'
> in
> > > > > > the classpath.
> > > > > >
> > > > > > 可是我flink-json.jar都打包进去了,居然还是报错。。。
> > > > > >
> > > > > > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
> > > > > >
> > > > > > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
> > > > > >
> > > > > > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Best regards!
> > > > > Rui Li
> > > > >
> > > >
> > >
> > >
> > > --
> > > Best regards!
> > > Rui Li
> > >
> >
>
>
> --
> Best regards!
> Rui Li
>


sql client 并行度问题

2020-08-17 Thread 18579099...@163.com
sql client 读取hive表,hive表中一共有21个文件需要读取,sql 
client提示需要21个并行度,但是我slot并没有这么多。有什么办法可以把并行度改小?



18579099...@163.com


Re: FlinkSQL tableEnv 依赖问题

2020-08-17 Thread Rui Li
对,这个文件每一行就是一个类名。另外你说的flink run指定额外的jar包应该可以通过-C参数来指定

On Mon, Aug 17, 2020 at 5:38 PM 赵一旦  wrote:

> 哦哦。也就是说本身这个文件内内容是支持一行一个这样的格式的是吧?
>
> Rui Li  于2020年8月17日周一 下午5:36写道:
>
> > 用shade plugin的时候可以指定service resource
> > transformer,应该能把多个service文件merge起来。具体可以参考:
> >
> >
> https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer
> >
> > On Mon, Aug 17, 2020 at 5:00 PM 赵一旦  wrote:
> >
> > > @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。
> > >
> > > 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?
> > >
> > >
> > >
> >
> 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。
> > >
> > > Rui Li  于2020年8月17日周一 下午3:46写道:
> > >
> > > > 可能是打fat
> > > > jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
> > > >
> > > > On Fri, Aug 14, 2020 at 7:13 PM 赵一旦  wrote:
> > > >
> > > > > 代码如下:
> > > > > // tEnv;
> > > > > tEnv.sqlUpdate("create table dr1(  " +
> > > > > "  cid STRING,  " +
> > > > > "  server_time BIGINT,  " +
> > > > > "  d MAP,  " +
> > > > > "  process_time AS PROCTIME(),  " +
> > > > > "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time /
> > > 1000)),
> > > > > " +
> > > > > "  WATERMARK FOR event_time AS event_time - INTERVAL '60'
> > > SECOND
> > > > > " +
> > > > > ") WITH (  " +
> > > > > "  'update-mode' = 'append',  " +
> > > > > "  'connector.type' = 'kafka',  " +
> > > > > "  'connector.version' = 'universal',  " +
> > > > > "  'connector.topic' = 'antibot_dr1',  " +
> > > > > "  'connector.startup-mode' = 'latest-offset',  " +
> > > > > "  'connector.properties.zookeeper.connect' =
> > > > > 'yq01-sw-xxx03.yq01:8681',  " +
> > > > > "  'connector.properties.bootstrap.servers' =
> > > > > 'yq01-sw-xxx03.yq01:8192',  " +
> > > > > "  'format.type' = 'json'  " +
> > > > > ")");
> > > > > Table t1 = tEnv.sqlQuery("select * from dr1");
> > > > >
> > > > > 我打包会把flink-json打包进去,最终结果包是test.jar。
> > > > >
> > > > > test.jar是个fat jar,相关依赖都有了。
> > > > >
> > > > > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
> > > > >
> > > > > Caused by:
> > org.apache.flink.table.api.NoMatchingTableFactoryException:
> > > > > Could not find a suitable table factory for
> > > > > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
> > > > > the classpath.
> > > > >
> > > > > 可是我flink-json.jar都打包进去了,居然还是报错。。。
> > > > >
> > > > > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
> > > > >
> > > > > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
> > > > >
> > > > > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
> > > > >
> > > >
> > > >
> > > > --
> > > > Best regards!
> > > > Rui Li
> > > >
> > >
> >
> >
> > --
> > Best regards!
> > Rui Li
> >
>


-- 
Best regards!
Rui Li


??????flink-1.10.1 ???? DDL ?? ES5.6

2020-08-17 Thread kcz
ES5??sql??ES5
 sinK??connect??
----
??: 
   "kcz"
<573693...@qq.com;
:2020??8??17??(??) 8:34
??:"user-zh"https://github.com/apache/flink/pull/12184/files#diff-c71784e07a5a5c214a2f4f9843e6ef3f;
 
Elasticsearch5DynamicSink 


Re: FlinkSQL tableEnv 依赖问题

2020-08-17 Thread 赵一旦
哦哦。也就是说本身这个文件内内容是支持一行一个这样的格式的是吧?

Rui Li  于2020年8月17日周一 下午5:36写道:

> 用shade plugin的时候可以指定service resource
> transformer,应该能把多个service文件merge起来。具体可以参考:
>
> https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer
>
> On Mon, Aug 17, 2020 at 5:00 PM 赵一旦  wrote:
>
> > @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。
> >
> > 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?
> >
> >
> >
> 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。
> >
> > Rui Li  于2020年8月17日周一 下午3:46写道:
> >
> > > 可能是打fat
> > > jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
> > >
> > > On Fri, Aug 14, 2020 at 7:13 PM 赵一旦  wrote:
> > >
> > > > 代码如下:
> > > > // tEnv;
> > > > tEnv.sqlUpdate("create table dr1(  " +
> > > > "  cid STRING,  " +
> > > > "  server_time BIGINT,  " +
> > > > "  d MAP,  " +
> > > > "  process_time AS PROCTIME(),  " +
> > > > "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time /
> > 1000)),
> > > > " +
> > > > "  WATERMARK FOR event_time AS event_time - INTERVAL '60'
> > SECOND
> > > > " +
> > > > ") WITH (  " +
> > > > "  'update-mode' = 'append',  " +
> > > > "  'connector.type' = 'kafka',  " +
> > > > "  'connector.version' = 'universal',  " +
> > > > "  'connector.topic' = 'antibot_dr1',  " +
> > > > "  'connector.startup-mode' = 'latest-offset',  " +
> > > > "  'connector.properties.zookeeper.connect' =
> > > > 'yq01-sw-xxx03.yq01:8681',  " +
> > > > "  'connector.properties.bootstrap.servers' =
> > > > 'yq01-sw-xxx03.yq01:8192',  " +
> > > > "  'format.type' = 'json'  " +
> > > > ")");
> > > > Table t1 = tEnv.sqlQuery("select * from dr1");
> > > >
> > > > 我打包会把flink-json打包进去,最终结果包是test.jar。
> > > >
> > > > test.jar是个fat jar,相关依赖都有了。
> > > >
> > > > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
> > > >
> > > > Caused by:
> org.apache.flink.table.api.NoMatchingTableFactoryException:
> > > > Could not find a suitable table factory for
> > > > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
> > > > the classpath.
> > > >
> > > > 可是我flink-json.jar都打包进去了,居然还是报错。。。
> > > >
> > > > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
> > > >
> > > > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
> > > >
> > > > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
> > > >
> > >
> > >
> > > --
> > > Best regards!
> > > Rui Li
> > >
> >
>
>
> --
> Best regards!
> Rui Li
>


答复: flink sql在实时数仓中,关联hbase维表频繁变化的问题

2020-08-17 Thread zhao liang
第三条我有个大概的想法,kafka数据把原有时间戳减去一个你觉得足够的延迟时间,生成一个新的时间戳,flink用这个时间做watermark,原有时间保留用来和habse进行匹配。

发件人: Jim Chen 
日期: 星期一, 2020年8月17日 16:36
收件人: user-zh 
主题: flink sql在实时数仓中,关联hbase维表频繁变化的问题
大家好:
我们现在在用flink sql在做实时数仓,架构大概是kafka关联hbase维表,然后写入clickhouse。hbase维表是频繁变化的
现在遇到的几个比较棘手的问题:
1、自己在实现AsyncTableFunction做异步io的时候,发现性能还是不够。后来就加入本地缓存,但是缓存一致性出现问题,不知道该如何解决
2、写入hbase的时候,是批量写的,无法保证有序,维表频繁变化的话,顺序不对,会造成结果有问题
3、hbase维表,可能5s后才会更新,但是此时kafka数据流已经过去了,关联的数据都是空

不知道,针对上面的场景,有什么好的解决思路或者方案


Re: FlinkSQL tableEnv 依赖问题

2020-08-17 Thread 赵一旦
或者通过 flink run 方式运行的任务,能否像 sql-client.sh 那样通过-l,-j指定的jar也会被上传到集群呢?

赵一旦  于2020年8月17日周一 下午5:34写道:

> 小伙伴们,帮忙看下怎么解决呢? 通过写代码方式,基于table
> api执行flink的sql。这种情况下用到的flink-json等包通过shade等方式做成一个大jar包之后依赖的问题。
>
> 赵一旦  于2020年8月17日周一 下午5:00写道:
>
>> @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。
>>
>> 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?
>>
>>
>> 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。
>>
>> Rui Li  于2020年8月17日周一 下午3:46写道:
>>
>>> 可能是打fat
>>> jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
>>>
>>> On Fri, Aug 14, 2020 at 7:13 PM 赵一旦  wrote:
>>>
>>> > 代码如下:
>>> > // tEnv;
>>> > tEnv.sqlUpdate("create table dr1(  " +
>>> > "  cid STRING,  " +
>>> > "  server_time BIGINT,  " +
>>> > "  d MAP,  " +
>>> > "  process_time AS PROCTIME(),  " +
>>> > "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time /
>>> 1000)),
>>> > " +
>>> > "  WATERMARK FOR event_time AS event_time - INTERVAL '60'
>>> SECOND
>>> > " +
>>> > ") WITH (  " +
>>> > "  'update-mode' = 'append',  " +
>>> > "  'connector.type' = 'kafka',  " +
>>> > "  'connector.version' = 'universal',  " +
>>> > "  'connector.topic' = 'antibot_dr1',  " +
>>> > "  'connector.startup-mode' = 'latest-offset',  " +
>>> > "  'connector.properties.zookeeper.connect' =
>>> > 'yq01-sw-xxx03.yq01:8681',  " +
>>> > "  'connector.properties.bootstrap.servers' =
>>> > 'yq01-sw-xxx03.yq01:8192',  " +
>>> > "  'format.type' = 'json'  " +
>>> > ")");
>>> > Table t1 = tEnv.sqlQuery("select * from dr1");
>>> >
>>> > 我打包会把flink-json打包进去,最终结果包是test.jar。
>>> >
>>> > test.jar是个fat jar,相关依赖都有了。
>>> >
>>> > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
>>> >
>>> > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
>>> > Could not find a suitable table factory for
>>> > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
>>> > the classpath.
>>> >
>>> > 可是我flink-json.jar都打包进去了,居然还是报错。。。
>>> >
>>> > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
>>> >
>>> > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
>>> >
>>> > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
>>> >
>>>
>>>
>>> --
>>> Best regards!
>>> Rui Li
>>>
>>


Re: FlinkSQL tableEnv 依赖问题

2020-08-17 Thread Rui Li
用shade plugin的时候可以指定service resource
transformer,应该能把多个service文件merge起来。具体可以参考:
https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer

On Mon, Aug 17, 2020 at 5:00 PM 赵一旦  wrote:

> @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。
>
> 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?
>
>
> 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。
>
> Rui Li  于2020年8月17日周一 下午3:46写道:
>
> > 可能是打fat
> > jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
> >
> > On Fri, Aug 14, 2020 at 7:13 PM 赵一旦  wrote:
> >
> > > 代码如下:
> > > // tEnv;
> > > tEnv.sqlUpdate("create table dr1(  " +
> > > "  cid STRING,  " +
> > > "  server_time BIGINT,  " +
> > > "  d MAP,  " +
> > > "  process_time AS PROCTIME(),  " +
> > > "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time /
> 1000)),
> > > " +
> > > "  WATERMARK FOR event_time AS event_time - INTERVAL '60'
> SECOND
> > > " +
> > > ") WITH (  " +
> > > "  'update-mode' = 'append',  " +
> > > "  'connector.type' = 'kafka',  " +
> > > "  'connector.version' = 'universal',  " +
> > > "  'connector.topic' = 'antibot_dr1',  " +
> > > "  'connector.startup-mode' = 'latest-offset',  " +
> > > "  'connector.properties.zookeeper.connect' =
> > > 'yq01-sw-xxx03.yq01:8681',  " +
> > > "  'connector.properties.bootstrap.servers' =
> > > 'yq01-sw-xxx03.yq01:8192',  " +
> > > "  'format.type' = 'json'  " +
> > > ")");
> > > Table t1 = tEnv.sqlQuery("select * from dr1");
> > >
> > > 我打包会把flink-json打包进去,最终结果包是test.jar。
> > >
> > > test.jar是个fat jar,相关依赖都有了。
> > >
> > > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
> > >
> > > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> > > Could not find a suitable table factory for
> > > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
> > > the classpath.
> > >
> > > 可是我flink-json.jar都打包进去了,居然还是报错。。。
> > >
> > > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
> > >
> > > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
> > >
> > > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
> > >
> >
> >
> > --
> > Best regards!
> > Rui Li
> >
>


-- 
Best regards!
Rui Li


Re: FlinkSQL tableEnv 依赖问题

2020-08-17 Thread 赵一旦
小伙伴们,帮忙看下怎么解决呢? 通过写代码方式,基于table
api执行flink的sql。这种情况下用到的flink-json等包通过shade等方式做成一个大jar包之后依赖的问题。

赵一旦  于2020年8月17日周一 下午5:00写道:

> @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。
>
> 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?
>
>
> 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。
>
> Rui Li  于2020年8月17日周一 下午3:46写道:
>
>> 可能是打fat
>> jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
>>
>> On Fri, Aug 14, 2020 at 7:13 PM 赵一旦  wrote:
>>
>> > 代码如下:
>> > // tEnv;
>> > tEnv.sqlUpdate("create table dr1(  " +
>> > "  cid STRING,  " +
>> > "  server_time BIGINT,  " +
>> > "  d MAP,  " +
>> > "  process_time AS PROCTIME(),  " +
>> > "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time /
>> 1000)),
>> > " +
>> > "  WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
>> > " +
>> > ") WITH (  " +
>> > "  'update-mode' = 'append',  " +
>> > "  'connector.type' = 'kafka',  " +
>> > "  'connector.version' = 'universal',  " +
>> > "  'connector.topic' = 'antibot_dr1',  " +
>> > "  'connector.startup-mode' = 'latest-offset',  " +
>> > "  'connector.properties.zookeeper.connect' =
>> > 'yq01-sw-xxx03.yq01:8681',  " +
>> > "  'connector.properties.bootstrap.servers' =
>> > 'yq01-sw-xxx03.yq01:8192',  " +
>> > "  'format.type' = 'json'  " +
>> > ")");
>> > Table t1 = tEnv.sqlQuery("select * from dr1");
>> >
>> > 我打包会把flink-json打包进去,最终结果包是test.jar。
>> >
>> > test.jar是个fat jar,相关依赖都有了。
>> >
>> > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
>> >
>> > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
>> > Could not find a suitable table factory for
>> > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
>> > the classpath.
>> >
>> > 可是我flink-json.jar都打包进去了,居然还是报错。。。
>> >
>> > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
>> >
>> > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
>> >
>> > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
>> >
>>
>>
>> --
>> Best regards!
>> Rui Li
>>
>


Re: PyFlink 写入ES

2020-08-17 Thread Xingbo Huang
Hi,

其实报错信息已经说清楚了你用的方式的问题了,这个host方法是需要传入三个参数的,第一个是你的hostname,第二个是你的port,第三个是你使用的protocol。
可以采用下面这种方式试一下
.host("es9223.db.58dns.org", 9223, "http")

Best,
Xingbo

guaishushu1...@163.com  于2020年8月17日周一 下午5:12写道:

> PyFlink 从kafka写入ES 抛这个异常,但是host是正确的有哪位知道吗
> File "main-0-8.py", line 74, in 
> .host("http://es9223.db.58dns.org:9223;)
> TypeError: host() missing 2 required positional arguments: 'port' and
> 'protocol'
>
>
>
>
> guaishushu1...@163.com
>


PyFlink 写入ES

2020-08-17 Thread guaishushu1...@163.com
PyFlink 从kafka写入ES 抛这个异常,但是host是正确的有哪位知道吗
File "main-0-8.py", line 74, in 
.host("http://es9223.db.58dns.org:9223;)
TypeError: host() missing 2 required positional arguments: 'port' and 'protocol'




guaishushu1...@163.com


Re: FlinkSQL tableEnv 依赖问题

2020-08-17 Thread 赵一旦
@RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。

所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?

此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。

Rui Li  于2020年8月17日周一 下午3:46写道:

> 可能是打fat
> jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
>
> On Fri, Aug 14, 2020 at 7:13 PM 赵一旦  wrote:
>
> > 代码如下:
> > // tEnv;
> > tEnv.sqlUpdate("create table dr1(  " +
> > "  cid STRING,  " +
> > "  server_time BIGINT,  " +
> > "  d MAP,  " +
> > "  process_time AS PROCTIME(),  " +
> > "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / 1000)),
> > " +
> > "  WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
> > " +
> > ") WITH (  " +
> > "  'update-mode' = 'append',  " +
> > "  'connector.type' = 'kafka',  " +
> > "  'connector.version' = 'universal',  " +
> > "  'connector.topic' = 'antibot_dr1',  " +
> > "  'connector.startup-mode' = 'latest-offset',  " +
> > "  'connector.properties.zookeeper.connect' =
> > 'yq01-sw-xxx03.yq01:8681',  " +
> > "  'connector.properties.bootstrap.servers' =
> > 'yq01-sw-xxx03.yq01:8192',  " +
> > "  'format.type' = 'json'  " +
> > ")");
> > Table t1 = tEnv.sqlQuery("select * from dr1");
> >
> > 我打包会把flink-json打包进去,最终结果包是test.jar。
> >
> > test.jar是个fat jar,相关依赖都有了。
> >
> > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
> >
> > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> > Could not find a suitable table factory for
> > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
> > the classpath.
> >
> > 可是我flink-json.jar都打包进去了,居然还是报错。。。
> >
> > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
> >
> > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
> >
> > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
> >
>
>
> --
> Best regards!
> Rui Li
>


[DISCUSS] Removing deprecated methods from DataStream API

2020-08-17 Thread Dawid Wysakowicz
Hi devs and users,

I wanted to ask you what do you think about removing some of the
deprecated APIs around the DataStream API.

The APIs I have in mind are:

  * RuntimeContext#getAllAccumulators (deprecated in 0.10)
  * DataStream#fold and all related classes and methods such as
FoldFunction, FoldingState, FoldingStateDescriptor ... (deprecated
in 1.3/1.4)
  * StreamExecutionEnvironment#setStateBackend(AbstractStateBackend)
(deprecated in 1.5)
  * DataStream#split (deprecated in 1.8)
  * Methods in (Connected)DataStream that specify keys as either indices
or field names such as DataStream#keyBy, DataStream#partitionCustom,
ConnectedStream#keyBy,  (deprecated in 1.11)

I think the first three should be straightforward. They are long
deprecated. The getAccumulators method is not used very often in my
opinion. The same applies to the DataStream#fold which additionally is
not very performant. Lastly the setStateBackend has an alternative with
a class from the AbstractStateBackend hierarchy, therefore it will be
still code compatible. Moreover if we remove the
#setStateBackend(AbstractStateBackend) we will get rid off warnings
users have right now when setting a statebackend as the correct method
cannot be used without an explicit casting.

As for the DataStream#split I know there were some objections against
removing the #split method in the past. I still believe the output tags
can replace the split method already.

The only problem in the last set of methods I propose to remove is that
they were deprecated only in the last release and those method were only
partially deprecated. Moreover some of the methods were not deprecated
in ConnectedStreams. Nevertheless I'd still be inclined to remove the
methods in this release.

Let me know what do you think about it.

Best,

Dawid



signature.asc
Description: OpenPGP digital signature


Re: Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性

2020-08-17 Thread Jim Chen
hbase维表的数据量,大概500G

Dream-底限  于2020年8月13日周四 下午12:16写道:

> flink暴漏的lookup
>
> 是支持设置缓存记录条数和缓存时间的吧,把时间和条数设置的小一点或者直接禁用缓存,如果流表数据量不大的话可以不用异步访问,数据量大的话异步访问不加缓存维表存储引擎可能压力过大
>
> Jim Chen  于2020年8月13日周四 上午11:53写道:
>
> > 请问下,如果使用了localcache+asyncIO的方式,缓存一致性,有什么解决的思路吗?维表的状态是频繁更新的
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: TableColumn为啥不包含comment

2020-08-17 Thread Shengkai Fang
hi, 那请你在那个jira留一下言,我会把这个分配给你。

Harold.Miao  于2020年8月17日周一 上午11:26写道:

> 谢谢   我想提交这个patch
>
> Shengkai Fang  于2020年8月14日周五 下午4:33写道:
>
> > hi, 我已经建了一个issue[1]跟踪这个情况,有兴趣的话可以帮忙修复下这个bug。
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-18958
> >
> > Harold.Miao  于2020年8月13日周四 上午11:08写道:
> >
> > > hi all
> > > 我发现TableColumn class不包含column comment  , 给开发带来了一点麻烦,请教大家一下,谢谢
> > >
> > >
> > > --
> > >
> > > Best Regards,
> > > Harold Miao
> > >
> >
>
>
> --
>
> Best Regards,
> Harold Miao
>


flink sql在实时数仓中,关联hbase维表频繁变化的问题

2020-08-17 Thread Jim Chen
大家好:
我们现在在用flink sql在做实时数仓,架构大概是kafka关联hbase维表,然后写入clickhouse。hbase维表是频繁变化的
现在遇到的几个比较棘手的问题:
1、自己在实现AsyncTableFunction做异步io的时候,发现性能还是不够。后来就加入本地缓存,但是缓存一致性出现问题,不知道该如何解决
2、写入hbase的时候,是批量写的,无法保证有序,维表频繁变化的话,顺序不对,会造成结果有问题
3、hbase维表,可能5s后才会更新,但是此时kafka数据流已经过去了,关联的数据都是空

不知道,针对上面的场景,有什么好的解决思路或者方案


flink cdc能支持分库分表读取吗

2020-08-17 Thread 18579099...@163.com
如题,我有一个需求是将分库分表放在一张表里进行统计(我们的16个分库16个分表共计256张表合成一张表),flink现在支持吗



18579099...@163.com


Re: Tracing and Flink

2020-08-17 Thread bvarga
Hi Aaron,

I've recently been looking at this topic and working on a prototype. The
approach I am trying is "backward tracing", or data provenance tracing,
where we try to explain what inputs and steps have affected the production
of an output record.

Arvid has summarized the most important aspects, my approach to UIDs is as
he described. I would like to add a few thoughts.

- With this backward tracing approach, it is very difficult to do sampling,
as aggregations / multi-input operators can only be traced if all inputs are
also traced. So this is more useful if you need to be able to explain the
origins of all output records.

- As Arvid mentioned, the size of the trace records can become big, and
negatively impact the performance of the pipeline. I'd suggest an approach
where each operator directly outputs its traces to some storage. Each trace
record has a UID. If each trace record contains a list/array of its inputs,
and you use an appropriate storage, you can do recursive lookups based on
the trace UIDs to find a complete trace graph for an output record. You may
even want a separate Flink job that pre-processes and pre-aggregates traces
that belong together (although the lateness / ordering might be difficult to
handle)

- If you choose this directly reporting approach, you still need to pass
along the trace UID in the main pipeline, so that the next operator's
produced trace can list it in the inputs.

- If you leave the production of the trace records explicit (as in you have
to construct and collect the trace record manually in each operator), you
can flexibly choose what inputs to include (e.g. for a large aggregation,
you may only want to list some of the aggregated elements as inputs). You
can then also add any additional metadata to help explain a certain step.

- I've looked into adapting this to OpenTracing, but it didn't seem
well-suited for this task. The span-based approach has a parent-child
relationship that doesn't fit the dataflow model too well. In Flink, with
the backward-tracing approach, the "root span" would logically be the output
record, and its children would need to be constructed earlier. I couldn't
find a way to nicely fit this view into the structure of OpenTracing
records.

Let me know your thoughts, I'd be happy to discuss this further.

Regards,

Balazs Varga  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink 1.11 SQL idea调试无数据也无报错

2020-08-17 Thread DanielGu
hi,
flink 1.11 SQL idea调试 有其他伙伴了解吗?求赐教.
最近调试卡在这里..有点出不来了
十分感谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: FlinkSQL tableEnv 依赖问题

2020-08-17 Thread Rui Li
可能是打fat
jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现

On Fri, Aug 14, 2020 at 7:13 PM 赵一旦  wrote:

> 代码如下:
> // tEnv;
> tEnv.sqlUpdate("create table dr1(  " +
> "  cid STRING,  " +
> "  server_time BIGINT,  " +
> "  d MAP,  " +
> "  process_time AS PROCTIME(),  " +
> "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / 1000)),
> " +
> "  WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
> " +
> ") WITH (  " +
> "  'update-mode' = 'append',  " +
> "  'connector.type' = 'kafka',  " +
> "  'connector.version' = 'universal',  " +
> "  'connector.topic' = 'antibot_dr1',  " +
> "  'connector.startup-mode' = 'latest-offset',  " +
> "  'connector.properties.zookeeper.connect' =
> 'yq01-sw-xxx03.yq01:8681',  " +
> "  'connector.properties.bootstrap.servers' =
> 'yq01-sw-xxx03.yq01:8192',  " +
> "  'format.type' = 'json'  " +
> ")");
> Table t1 = tEnv.sqlQuery("select * from dr1");
>
> 我打包会把flink-json打包进去,最终结果包是test.jar。
>
> test.jar是个fat jar,相关依赖都有了。
>
> 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
>
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
> the classpath.
>
> 可是我flink-json.jar都打包进去了,居然还是报错。。。
>
> 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
>
> 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
>
> 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
>


-- 
Best regards!
Rui Li


?????? ??????????

2020-08-17 Thread zhiyezou
Hi


??1%??














----
??: 
   "user-zh"



Re: flink1.9.1用采用-d(分离模式提交)作业报错,但是不加-d是可以正常跑的

2020-08-17 Thread bradyMk
您好:
我没有尝试过新版本,但是觉得好像不是版本的问题,因为我其他所有flink作业加上-d都能正常运行,就这个不行,并且如果我不用(-d)提交,这个也是可以运行的。我也很奇怪



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: (无主题)

2020-08-17 Thread art
hi,感谢提供的方案

是的,day是订单的创建时间

想请教下,你们的离线任务是每次都将全量订单数据一起修正吗,就是不管历史的有没有变化 都去作修正,

要是这样那会不会出现绝大部分情况,离线跑的任务都是无效的,因为历史数据未发生变化

> 在 2020年8月17日,下午1:22,zhiyezou <1530130...@qq.com> 写道:
> 
> HI
> 
> 
> 这个day应该是订单的创建时间吧
> 
> 
> 我觉得我们遇到的问题有些类似,看下我们的方案对你是否有所帮助。
> 
> 
> 首先,我们会把day这个条件控制在3天(select * where day 
> now-3),状态的TTL也是3天,即flink保留3天的状态,这样即使有3天前的数据到来也不会更新我们的结果表;这样可以解决更新错误的问题。
> 
> 
> 然后,通过离线任务来定时修正3天前的结果数据。这样可以保证数据的最终一致性
> 
> 
> 
> 
> 
> 
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2020年8月17日(星期一) 中午12:32
> 收件人:"user-zh@flink.apache.org" 
> 主题:(无主题)
> 
> 
> 
> hi,社区的小伙伴,大家好!我有一个应用场景,想请教下大家有没有遇过,有什么好的方案。
> 场景就是:按照user和day的维度统计订单表里的有效订单数,同时存在历史的订单状态随时可能被更新,比如可能当前把2个月前的订单状态置未true,所以没法根据历史结果预统计,翻译称sql就是select
>  user,day,count(*) from table where state = true group by 
> user,day;目前我已经用flink-sql-cdc-connector实现了,但是有一个问题就是state,因为按user day组合 
> 那么如果全部状态都保存后期回越来越大,但是如果设置ttl,那么如果历史订单变化,最终更新出去的值也不对。 
> 希望社区的小伙伴给我出出主意