FlinkKafkaConsumer????

2020-09-03 Thread op
    hi,    FlinkKafkaConsumer    //--- val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment Env.setRestartStrategy(RestartStrategies.noRestart()) val consumerProps = new Properties() consumerProps.put("bootstrap.ser

????: ????????????????????????????????????????????

2020-09-03 Thread samuel....@ubtrobot.com
?? ?? ??1.11flink sql??,??streaming api kafka,eventtime,stream??table,sql,??kafka topic,flink webui watermarks No Watermark,,kafka topic??,wat

??????checkpoint??????state

2020-09-03 Thread sun
??2?? 1checkpointchk-  ??chk-1??chk-2 , 2checkpoint??state?? ?? final StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.cr

??????checkpoint??????state

2020-09-03 Thread sun
??2?? 1checkpointchk-  ??chk-1??chk-2 , 2checkpoint??state?? ?? final StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.cr

??????checkpoint??????state

2020-09-03 Thread sun
??2?? 1checkpointchk-  ??chk-1??chk-2 , 2checkpoint??state?? ?? final StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.cr

Re:无法从checkpoint中恢复state

2020-09-03 Thread 程龙
再启动服务的时候 需要指定checkpoint回复地址,你这里只是指定了做checkpint地址 在 2020-09-03 16:03:41,"sun" <1392427...@qq.com> 写道: >你好,我有2个问题 > >1:每次重启服务,checkpoint的目录中chk-  总是从chk-1开始,chk-2 ,没有从上次的编号开始 > >2:重启服务后,没有从checkpoint中恢复state的数据 > >下面是我的配置,我是在本地调试的,单机 > > > >final StreamExecutionEnvironment s

pyflink1.11.1连接hive问题

2020-09-03 Thread 程龙
使用hivecatalog连接 抱一下错误 flink py4j.protocol.Py4JJavaError: An error occurred while calling o10.registerCatalog. : java.lang.NullPointerException

????????????checkpoint??????state

2020-09-03 Thread sun
--  -- ??: "user-zh" <13162790...@163.com

??????checkpoint??????state

2020-09-03 Thread sun
??2?? 1checkpointchk-  ??chk-1??chk-2 , 2checkpoint??state?? ?? final StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.cr

Re: pyflink1.11.1连接hive问题

2020-09-03 Thread Dian Fu
有更完整的log吗? > 在 2020年9月3日,下午4:12,程龙 <13162790...@163.com> 写道: > > 使用hivecatalog连接 抱一下错误 > > > > > flink py4j.protocol.Py4JJavaError: An error occurred while calling > o10.registerCatalog. : java.lang.NullPointerException

Re: Flink on k8s

2020-09-03 Thread Yang Wang
需要你发一下TaskManager的log,这样才能方便排查问题 你可以使用kubectl logs来直接查看log 另外你是用的官方文档[1]里面的yaml来启动的吗,E2E测试(minikube版本是v1.8.2)都是正常运行的,我在真实的K8s集群测试也没问题 [1]. https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html#deploy-flink-cluster-on-kubernetes Best, Yang superainbower 于2020年9

Re:Re: pyflink1.11.1连接hive问题

2020-09-03 Thread 程龙
完整日志乳如下: Traceback (most recent call last): File "/Users/bjhl/PycharmProjects/flink-example/com.baijiahulian/connecthive/HiveTest.py", line 26, in t_env.register_catalog("default", hive_catalog) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/py

Re: Flink 1.10.1 cdh-hdfs启用HA时,flink job提交报错

2020-09-03 Thread Storm☀️
问题找到了; hdfs-site.xml配置文件冲突导致的。 原因:通过-yt上传了 外部集群的hdfs-site.xml文件。 flink10初始化taskmanager读取 hdfs-site.xml配置的时候被外部的hdfs-site.xml文件干扰。 此问题终结。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Flink SQL 任务乱码问题

2020-09-03 Thread LakeShen
Hi 社区, 我的一个 Flink SQL 任务,版本为 1.10,代码中使用了中文值,具体如下: select xxx, case when a = 'a' then '你好' when a = 'b' then '你好呀' end as va from xxx ; 然后会把这个结果输出,最后发现 va 的值乱码了,也就是中文乱码。 目前有什么比较好的解决方法吗。 Best, LakeShen

Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

2020-09-03 Thread Yun Tang
Hi 我觉得这个不是root cause,实际上 transient ListState 是一种正确的用法,因为state应该是在函数open方法里面进行初始化,所以transient 修饰即可。 麻烦把这个list state的初始化以及使用方法的代码都贴出来吧。 祝好 唐云 From: Liu Rising Sent: Thursday, September 3, 2020 12:26 To: user-zh@flink.apache.org Subject: Re: 从Savepoint/Checkpoint

Re: Flink如何实现至多一次(At Most Once)

2020-09-03 Thread Paul Lam
如果每次都从最新的数据开始读的话,关掉 checkpoint 是可以达到 At Most Once。 另外建议还要看看 sink 有没有自动重试机制,可能造成数据重复。 Best, Paul Lam > 2020年9月2日 19:16,Tianwang Li 写道: > > 我们有一些场景,对实时性要求高,同时对数据重复会有比较大大影响。 > 我想关闭checkpoint,这样是不是能不能保证“至多一次” (At Most Once) ? > 这里会不会有什么坑? > 另外:我们允许丢失数据。 > > > -- > ***

Re: 无法从checkpoint中恢复state

2020-09-03 Thread Congxian Qiu
Hi 从 retain checkpoint 恢复可以参考文档[1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/checkpoints.html#%E4%BB%8E%E4%BF%9D%E7%95%99%E7%9A%84-checkpoint-%E4%B8%AD%E6%81%A2%E5%A4%8D%E7%8A%B6%E6%80%81 Best, Congxian sun <1392427...@qq.com> 于2020年9月3日周四 下午4:14写道: > 你好,

Re: FlinkKafkaConsumer问题

2020-09-03 Thread Shuiqiang Chen
Hi op, 在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id comm

Flink动态CEP该怎么做?

2020-09-03 Thread Jim Chen
Hi, 我们打算用flink来做规则匹配,现在打算用CEP来做。但是发现flink 不支持动态CEP,网上百度了下,类似于滴滴那种方式,改动太大。没有能力能做,所以,问下大家,有没有什么思路,简单点的

flink sql多层view嵌套,字段not found

2020-09-03 Thread Lin Hou
Hi, 请教一个通过sql-client提交flink sql遇到的关于嵌套view,当嵌套第二层时,查询时会报找不到字段的问题。 元数据已经建好,简述如下: 1.建嵌套的view: create temporary view temp_app_impression_5min as select argService as arg_service, timeLocal as time_local, mid as mid, vipruid as vipruid, activity as activity, LOWER(activityProperty)

Re: pyflink1.11.1连接hive问题

2020-09-03 Thread Dian Fu
你看看log文件里,有没有其他log,log文件位置:/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyflink/log/ 从你贴的异常来看,感觉像是连HiveMetastore出问题了,没有连上,可以看看log文件里,能不能看到具体原因。 > 在 2020年9月3日,下午4:37,程龙 <13162790...@163.com> 写道: > > 完整日志乳如下: > > > > Traceback (most recent call last): > > File

pyflink-udf 问题反馈

2020-09-03 Thread whh_960101
您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid input_type:input_type should be DataType but contain RowField(RECID, VARCHAR) 我的pyflink版本:1.11.1

Re: pyflink-udf 问题反馈

2020-09-03 Thread Xingbo Huang
Hi, input_types定义的是每一个列的具体类型。 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你 正确的写法是 input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()] 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的) input_types=DataTypes.Row([DataTypes.FIELD("a", DataTypes.STRING()), DataTypes.FIELD("b", DataTypes.STRING(

回复:pyflink-udf 问题反馈

2020-09-03 Thread whh_960101
我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法 input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()] 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗 或者正确写法是什么样的,感谢解答! | | whh_960101 | | 邮箱:whh_960...@163.com | 签名由 网易邮箱大师 定制 在2020年09月03日 21:14,Xingbo Huang

Re: pyflink-udf 问题反馈

2020-09-03 Thread Xingbo Huang
Hi, 我觉得你从头详细描述一下你的表结构。 比如,我的表包含三个字段a,b,c,他们的类型都是varchar的, 然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。 使用就是table.select("udf(b)")。详细的你可以去查阅文档[1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-fun

Re:Re: pyflink-udf 问题反馈

2020-09-03 Thread whh_960101
您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值 udf定义如下: @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=Da

?????? FlinkKafkaConsumer????

2020-09-03 Thread op
FlinkKafkaConsumerKafkaConsumer??flinkkafka  --  -- ??: "user-zh"

Re: FlinkKafkaConsumer问题

2020-09-03 Thread lec ssmi
是flink对kafka的消费,是自己管理offset,用low-level api去寻址,而不是用group.id来管理offset这种high-level api。是这个意思吧。 op <520075...@qq.com> 于2020年9月4日周五 上午10:25写道: > > 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?  > > > -- 原始邮件 -- > 发件人: >

Re: FlinkKafkaConsumer问题

2020-09-03 Thread Shuiqiang Chen
Hi, 为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source 算子维护当前算子所消费的 partition 消费 offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint 恢复中从上次 commit 的位点开始消费,保证 exactly-once. 如果用 Kafka 消费组管理,那么 FlinkKafkaConsumer 内各个并发实例所分配的 partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka 消费组管理者记录,Flink 无法维护这些信息。

Re: Re: pyflink-udf 问题反馈

2020-09-03 Thread Xingbo Huang
Hi, 你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事 Best, Xingbo whh_960101 于2020年9月4日周五 上午9:26写道: > > 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataType

flink读取kafka遇到kafka.consumer:type=app-info,id=1

2020-09-03 Thread 宁吉浩
使用flink1.9.3 , kafka 1.0.0 flink使用正则读取kafka的多个topic , 每个topic均为3个分区,flink这边并行度是3(1也尝试过),均出现了如下问题 有没有人遇到过这个问题呢?如何解决的 报错如下所示: javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=1 at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at com.sun.jm

Re: Flink SQL 任务乱码问题

2020-09-03 Thread Danny Chan
SQL 文本是什么编码 ?有尝试过 UTF8 编码 ? Best, Danny Chan 在 2020年9月3日 +0800 PM5:04,LakeShen ,写道: > Hi 社区, > > 我的一个 Flink SQL 任务,版本为 1.10,代码中使用了中文值,具体如下: > > select xxx, case when a = 'a' then '你好' when a = 'b' then '你好呀' end as va > from xxx ; > > 然后会把这个结果输出,最后发现 va 的值乱码了,也就是中文乱码。 > > 目前有什么比较好的解决方法吗。 > > Best,

Re: flink sql多层view嵌套,字段not found

2020-09-03 Thread Danny Chan
这是一个已知问题,社区版本已经修复了 [1],不过还有一个后续 PR https://github.com/apache/flink/pull/13293,待 merge [1] https://issues.apache.org/jira/browse/FLINK-18750 Best, Danny Chan 在 2020年9月3日 +0800 PM6:41,Lin Hou ,写道: > Hi, > > 请教一个通过sql-client提交flink sql遇到的关于嵌套view,当嵌套第二层时,查询时会报找不到字段的问题。 > 元数据已经建好,简述如下: > > 1.建嵌套的vie

flink读取kafka遇到kafka.consumer:type=app-info,id=1

2020-09-03 Thread 宁吉浩
把正则读取kafka-topic的代码进行了修改,然后问题消失了,贴一下读取的代码给大家看看 code: FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>( Pattern.compile(readTopic), // 正则读取 new SimpleStringSchema(), // 序列化 properties); kafkaConsumer.setStartFromLatest(); 实在想不清楚这里会有什么问题? --以下为原始问

Re: Flink如何实现至多一次(At Most Once)

2020-09-03 Thread Yun Tang
Hi 如果是完全依赖source的offset管理,可以达到类似 at most once 的语义。 社区其实也有更完备的checkpoint at most once 的实现讨论,已经抄送了相关的开发人员 @yuanmei.w...@gmail.com 祝好 唐云 From: Paul Lam Sent: Thursday, September 3, 2020 17:28 To: user-zh Subject: Re: Flink如何实现至多一次(A

Re: flink on k8s 如果jobmanager 所在pod重启后job失败如何处理

2020-09-03 Thread Yun Tang
Hi Please use English to ask questions in user mailing list. I have added this thread to user-zh mailing list, if you would like to reply this thread again, please remove user mailing list in senders. When talking about the question how to handle job manager failure in k8s, you could consider

Re: FlinkKafkaConsumer问题

2020-09-03 Thread taochanglian
为了保证exactly-once,flink自己通过barrier来实现checkpoint,包括barrier的传递等等,所以flink在kafkaconsumer的基础之上,封装了一层语义保障。 在 2020/9/4 10:34, Shuiqiang Chen 写道: Hi, 为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source 算子维护当前算子所消费的 partition 消费 offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint 恢复中从上次 commit 的位点开始消

Re: 回复:请指教一个关于时间窗的问题,非常感谢!

2020-09-03 Thread Benchao Li
如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。 要处理这种情况,可以了解下idle source[1] [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#dealing-with-idle-sources samuel@ubtrobot.com 于2020年9月3日周四 下午3:41写道: > 补充一下环境信息: > > 有点类似以下问题: > 在1.11版本测试flink sql时

flink on k8s 如果jobmanager 所在pod重启后job失败如何处理

2020-09-03 Thread dty...@163.com
hi: 请教一个问题。在使用k8s 部署的flink 集群,如果jobmanger 重启后,1)job所在的jar包会清除,jobmanager 找不到这个job的jar 包,2)正在运行的job也会取消,重启后的jobmanager 如何找到之前运行的job dty...@163.com

?????? FlinkKafkaConsumer????

2020-09-03 Thread op
--  -- ??: "user-zh"

FlinkSQL如何处理上游的表结构变更

2020-09-03 Thread xiao cai
Hi all: flink version : 1.11.0 场景:上游的数据来自binlog,当发生表结构变更时,希望能够实时的变动flink内部表的schema,但是目前来看,表的schema都是create table时写死的,有什么办法可以处理这种场景呢

Re: Re:Re: 回复:flink1.9.1采用Prometheus Pushgateway监控,当任务被kill掉,但grafana还是可以监控到数据

2020-09-03 Thread bradyMk
好的,谢谢您,我看一下~ - Best Wishes -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re: Re: pyflink-udf 问题反馈

2020-09-03 Thread whh_960101
您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗 我的udf输出了一个numpy.array(dtype = str), result_type设的是DataTypes.ARRAY(DataTypes.STRING()) 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING())) 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容 请问这个问题该怎么解决?

?????? ????savepoint

2020-09-03 Thread x
--  -- ??: "x"