Re: Fail to deploy flink on k8s in minikube

2020-01-12 文章 刘建刚
Thank you for your help.

Yang Wang  于2020年1月13日周一 下午12:53写道:

> Hi, Jiangang
>
> Glad to hear that you are looking to run Flink on Kubernetes.
>
> It just because you are using the new Kubernetes version.The
> extensions/v1beta1
> has been removed since v1.16. Please use apps/v1 instead. The apps/v1 is
> introduced
> from v1.9.0. I will create a ticket fix the documentation.
>
> Before release-1.10, you could use standalone per-job[1] or standalone
> session[2] cluster on
> K8s. There are some existing K8s operators to manage the application
> lifecycle(e.g. google flink-on-k8s-operator[3],
> lyft flink-k8s-operator[4]).
>
> Running Flink native on K8s is supported from 1.10. You could find it here
> [5]. It aims at to make
> Flink users more convenient to deploy Flink workloads on K8s cluster.
> However, we only support
> session cluster now. The per-job mode is in development.
>
> [1]
> https://github.com/apache/flink/blob/release-1.9/flink-container/kubernetes/README.md#deploy-flink-job-cluster
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#flink-session-cluster-on-kubernetes
>
> http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/
> [3] https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
> [4] https://github.com/lyft/flinkk8soperator
> [5]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html
>
> Best,
> Yang
>
> 刘建刚  于2020年1月13日周一 下午12:14写道:
>
>>   I fail to deploy flink on k8s referring to
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html
>>   When I run the command 'kubectl create -f
>> jobmanager-deployment.yaml', following error is reported:
>> [image: image.png]
>>   I am new to k8s. Our team want to deploy flink on k8s. Can anyone
>> help me resolve this issue? Can anyone give me some tutorial about k8s and
>> flink in product? Thank you very much.
>>
>


Fail to deploy flink on k8s in minikube

2020-01-12 文章 刘建刚
  I fail to deploy flink on k8s referring to
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html
  When I run the command 'kubectl create -f jobmanager-deployment.yaml',
following error is reported:
[image: image.png]
  I am new to k8s. Our team want to deploy flink on k8s. Can anyone
help me resolve this issue? Can anyone give me some tutorial about k8s and
flink in product? Thank you very much.


Re: 取消关注

2020-01-12 文章 Leonard Xu
Hi, Sun
是指取消订阅邮件吗?
可以发送任意内容的邮件到  user-zh-unsubscr...@flink.apache.org 
  取消订阅来自 user-zh@flink.apache.org 
 邮件组的邮件

邮件组的订阅管理,可以参考[1]
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list 



> 在 2020年1月13日,11:33,sun <1392427...@qq.com> 写道:
> 
> 请问怎么取消关注



Re: 取消关注

2020-01-12 文章 Jiawei Wu
这个页面里有如何unsubscribe:  https://flink.apache.org/community.html

On Mon, Jan 13, 2020 at 11:33 AM sun <1392427...@qq.com> wrote:

> 请问怎么取消关注


????????

2020-01-12 文章 sun


Re: 疑似ParquetTableSource Filter Pushdown bug

2020-01-12 文章 jun su
已经创建issue:  https://issues.apache.org/jira/browse/FLINK-15563

Kurt Young  于2020年1月8日周三 下午5:33写道:

> 如果是优化器一直卡住不能退出,那基本肯定是BUG了。请开一个issue把这些信息上传上去吧,我们会调查一下是什么问题导致的。
>
> Best,
> Kurt
>
>
> On Wed, Jan 8, 2020 at 5:12 PM jun su  wrote:
>
> > 添加代码文字:
> >
> > def main(args: Array[String]): Unit = {
> >
> > val env = StreamExecutionEnvironment.getExecutionEnvironment
> > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> > val tableEnv = StreamTableEnvironment.create(env)
> >
> > val schema =
> "{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"log_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"log_from\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"data_source\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"is_scan\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"result\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timelong\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"is_sec\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"time_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"device\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"occur_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"row_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"
> > val parquetTableSource: ParquetTableSource = ParquetTableSource
> > .builder
> > .forParquetSchema(new
> org.apache.parquet.avro.AvroSchemaConverter().convert(
> > org.apache.avro.Schema.parse(schema, true)))
> > .path("/Users/sujun/Documents/tmp/login_data")
> > .build
> >
> > tableEnv.registerTableSource("source",parquetTableSource)
> >
> >
> > val t1 = tableEnv.sqlQuery("select log_id,city from source where
> city = '274' ")
> > tableEnv.registerTable("t1",t1)
> >
> > val t4 = tableEnv.sqlQuery("select * from t1 where
> log_id='5927070661978133'")
> > t1.toAppendStream[Row].print()
> >
> > env.execute()
> >
> > }
> >
> >
> > jun su  于2020年1月8日周三 下午4:59写道:
> >
> >> 你好:
> >>我在使用ParquetTableSource时, 发现一些问题, 疑似是ParquetTableSource Filter
> >> Pushdown的Bug, 以下是代码和描述:
> >>
> >> [image: 1578473593933.jpg]
> >>
> >> debug发现,
> >> 代码卡在了: org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp方法,
> while
> >> true循环一直出不来, 知道整合程序OOM
> >>
> >> [image: 1.jpg]
> >>
> >> 将ParquetTableSource的filter pushdown代码去掉后 , 主程序可以执行.
> >> 怀疑是calcite的优化器在迭代找代价最小的plan时一直无法退出导致的
> >>
> >
>


?????? Re: flink????kafka????????????kafka??????????????????

2020-01-12 文章 Evan
??kafka??Offset??
kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zkhost:2181 
--group ${group.id} --topic ${topic_name}
zkhost ??group.id??topic_name

Group           Topic        
                  Pid Offset  
        logSize         Lag  
           Owner
test            dy_event      
                 0  
 8115733         10658588      
  2542855         none
test            dy_event      
                 1  
 8114221         10658585      
  2544364         none
test            dy_event      
                 2  
 8115173         10658587      
  2543414         none
test            dy_event      
                 3  
 8115127         10658585      
  2543458         none
test            dy_event      
                 4  
 8115160         10658587      
  2543427         none



pid Offset??




--  --
??: "Benchao Li"https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration

wqpapa https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
> >
> >wqpapa 

Re: flink是否可以通过代码设置hadoop的配置文件目录

2020-01-12 文章 LakeShen
提交任务的时候,系统变量指定 HADOOP_CONF_DIR 即可

祝好,
沈磊

tison  于2020年1月9日周四 下午3:57写道:

> 建议通过 HADOOP_HOME 或 HADOOP_CONF_DIR 环境配置,Flink 有一个 fallback 的加载优先级
>
> 1. HADOOP_HOME
> 2. Configuration 也就是 fs.hdfs.hadoopconf
> 3. HADOOP_CONF_DIR
>
> 其中 Configuration 的方式是已废弃的
>
> Best,
> tison.
>
>
> LJY  于2020年1月9日周四 下午3:52写道:
>
> > 各位好:
> >
> > 目前hadoop的配置文件是在 fs.hdfs.hadoopconf 设置。
> >
> > 用户是否能够不启用配置文件中的fs.hdfs.hadoopconf,通过代码手动设置hadoop的目录。
>


Re: 流处理任务失败该如何追回之前的数据

2020-01-12 文章 LakeShen
1. Checkpoint 的状态,默认停止后会自行删除,所以如果你重启, Checkpoint 的状态如果没有特殊指定保留,应该就不见了,你无法从
Checkpoint 的状态恢复。
2. 使用Savepoint 可以,启动的时候,记得指定状态恢复的目录
3. 你使用的是事件时间,时间的具体指来源于你的记录中的某个时间字段,所以即使你重启,不会影响窗口类操作。

Px New <15701181132mr@gmail.com> 于2020年1月9日周四 下午6:50写道:

> rollback 后
> taskManager 会去获取持久化存储的snapshot , Source 也会回放到 做CheckPoint 时的那个点上
> 不论你使用的是是什么时间吧 -
>
>
> Dian Fu  于2019年11月14日周四 下午1:14写道:
>
> > 如果使用的event
> > time,watermark是根据event计算出来的,和系统时间没有关系,所以从最后一次checkpoint恢复即可。为什么你会觉得有问题?
> >
> > > 在 2019年11月13日,下午8:29,柯桂强  写道:
> > >
> > >
> >
> 我现在有一个流处理任务失败了,并且保留了checkpoint或者savepoint,我希望从最后一次checkpoint恢复,但是任务使用的是事件时间,超过窗口的数据就会被丢弃,我想到一个方法是,重启之前的数据通过批处理完成然后跑流处理,想问问大家这个方案是否可行,但是感觉如何限定批处理的范围并且和之后的流处理完美拼接是一个比较难的问题
> >
> >
>


Re: checkpoint、state

2020-01-12 文章 LakeShen
状态先存到本地 比直接存到 HDFS 速度更快,对于 RocksDBStateBackend,状态会先存到本地,然后异步存储到 HDFS。

Px New <15701181132mr@gmail.com> 于2020年1月10日周五 上午10:27写道:

> Yes, CheckPoint 是一个封装后的Meta信息 而这个被封装的Meta信息是有所有Operator 的state 的组成了
>
> hahaha sc  于2019年11月29日周五 下午4:12写道:
>
> >
> >
> flink的每条数据既然都做了checkpoint,做成全局分布式一致性快照,那还需要本地state干啥呢?是否可以理解成,本地state是一致性快照的一部分而已?
> >   昨天看了 社区的直播回放,听PMC的介绍,好像不是一回事。
> >
>


Re: 关于flink集群中调用dubbo服务的咨询

2020-01-12 文章 LakeShen
Flink 任务可以调 Dubbo ,dubbo 相关初始化可以在 你的 main 方法中初始化,然后具体接口的初始化,
在你的函数中的 open 方法中进行初始化,你自己需要处理好 Dubbo 相关异常,比如超时异常。 Dubbo 的话,
会对任务处理的吞吐量有一定影响,毕竟从请求 Dubbo 到 返回数据,会消耗一定时间。

祝好,
沈磊

Leonard Xu  于2020年1月10日周五 上午11:50写道:

> Hi, 依辰
>
> 对dubbo不太熟,你邮件里的图片挂了,可以通过图床工具发送下图片链接。
>
> Best,
> Leonard
>
> > 在 2020年1月10日,11:12,依辰 <431737...@qq.com> 写道:
> >
> > Hi All,
> > 目前我这里有个需求是想通过flink集群,消费mq消息,然后调用发送push的dubbo服务,实现push分发的功能。
> > 网上关于flink接入spring框架的资料没有找到太多,更别提dubbo服务调用相关的了(也可能是我查询资料的方式有限)。
> >
>  
> 期望哪位有经验的朋友能够给予指点,我自己其实已经实现了一个测试demo,但还是期望能有更多相关资料作为参考,主要是担心资源消耗和性能、安全方面会发生问题,因为本身也是刚接触flink,了解有限。
> > 方便的话可以提供一些实例代码或资料链接。
> >
>  感谢flink社区的各位朋友
> >
> > ps:下图是当前的实现方式,总觉得太简陋了,还缺少close时对spring资源的释放操作
> >
> >
>
>


Re: Re: flink消费kafka数据,如何从kafka端获取消费偏移信息

2020-01-12 文章 Benchao Li
Flink也会commit offset到Kafka的[1],当然在开checkpoint和不开checkpoint的时候表现有些不同。

offset
只要是commit到了kafka,查看offset的方式我理解跟用其他方式commit的offset的查阅方式应该没有区别的。如果你看不到flink消费的topic对应的offset,可能需要check下是什么原因没有commit
offset.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration

wqpapa  于2020年1月12日周日 下午9:34写道:

> 感谢回复!主要想了解下如何从kafka端后台命令方式获取对应消费组的偏移信息。之前通过普通的java代码消费kafka数据,通过kafka-consumer-groups.sh
> --describe可获取到消费偏移信息,但通过flink消费,不知道要怎么在kafka端获取偏移信息?
>
>
>
>
>
>
>
>
>
> 在 2020-01-12 21:17:40,"Benchao Li"  写道:
> >setStartFromGroupOffsets
>
> >说的是从kafka里面保存的offset开始消费,flink并没有保证kafka一定会有这个consumer的offset,如果没有的话,会按照kafka
> >consumer的配置'auto.offset.reset'来设置这些partition的offset。
> >
> >具体情况可以参考文档[1].
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
> >
> >wqpapa  于2020年1月12日周日 下午9:09写道:
> >
> >> flink通过FlinkKafkaConsumer消费kafka主题,设置group.id
> >> 按setStartFromGroupOffsets取偏移,如何在kafka端获取对应group.id的偏移信息?
> >> kafka为0.9版本,在zk那边找不到对应的group.id;通过kafka-consumer-groups.sh --describe
> >> --group ,也取不到。请问下要怎么获取啊?
> >
> >
> >
> >--
> >
> >Benchao Li
> >School of Electronics Engineering and Computer Science, Peking University
> >Tel:+86-15650713730
> >Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re:Re: flink消费kafka数据,如何从kafka端获取消费偏移信息

2020-01-12 文章 wqpapa
感谢回复!主要想了解下如何从kafka端后台命令方式获取对应消费组的偏移信息。之前通过普通的java代码消费kafka数据,通过kafka-consumer-groups.sh
 --describe可获取到消费偏移信息,但通过flink消费,不知道要怎么在kafka端获取偏移信息?









在 2020-01-12 21:17:40,"Benchao Li"  写道:
>setStartFromGroupOffsets
>说的是从kafka里面保存的offset开始消费,flink并没有保证kafka一定会有这个consumer的offset,如果没有的话,会按照kafka
>consumer的配置'auto.offset.reset'来设置这些partition的offset。
>
>具体情况可以参考文档[1].
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>
>wqpapa  于2020年1月12日周日 下午9:09写道:
>
>> flink通过FlinkKafkaConsumer消费kafka主题,设置group.id
>> 按setStartFromGroupOffsets取偏移,如何在kafka端获取对应group.id的偏移信息?
>> kafka为0.9版本,在zk那边找不到对应的group.id;通过kafka-consumer-groups.sh --describe
>> --group ,也取不到。请问下要怎么获取啊?
>
>
>
>-- 
>
>Benchao Li
>School of Electronics Engineering and Computer Science, Peking University
>Tel:+86-15650713730
>Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: flink消费kafka数据,如何从kafka端获取消费偏移信息

2020-01-12 文章 Benchao Li
setStartFromGroupOffsets
说的是从kafka里面保存的offset开始消费,flink并没有保证kafka一定会有这个consumer的offset,如果没有的话,会按照kafka
consumer的配置'auto.offset.reset'来设置这些partition的offset。

具体情况可以参考文档[1].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-start-position-configuration

wqpapa  于2020年1月12日周日 下午9:09写道:

> flink通过FlinkKafkaConsumer消费kafka主题,设置group.id
> 按setStartFromGroupOffsets取偏移,如何在kafka端获取对应group.id的偏移信息?
> kafka为0.9版本,在zk那边找不到对应的group.id;通过kafka-consumer-groups.sh --describe
> --group ,也取不到。请问下要怎么获取啊?



-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


flink消费kafka数据,如何从kafka端获取消费偏移信息

2020-01-12 文章 wqpapa
flink通过FlinkKafkaConsumer消费kafka主题,设置group.id按setStartFromGroupOffsets取偏移,如何在kafka端获取对应group.id的偏移信息?
kafka为0.9版本,在zk那边找不到对应的group.id;通过kafka-consumer-groups.sh --describe  --group 
,也取不到。请问下要怎么获取啊?