Re: FlinkKafkaConsumer问题

2020-09-22 文章 赵一旦
这个问题和flink关系不大。Kafka本身就是这么个特点,指定group,如果是订阅方式,会是你想象的那样,分享消息。但,如果是通过assign方式指定了消费哪个分区,则不受到group中消费者共享消息的限制。

SmileSmile  于2020年9月5日周六 下午4:51写道:

> hi,这种现象是在开checkpoint才出现的吗,还是没有开启也会?
>
> Best
>
>
>
>
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年09月04日 14:11,op 写道:
> 大概懂了 感谢
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> taochangl...@163.com;
> 发送时间:2020年9月4日(星期五) 中午11:54
> 收件人:"user-zh" acqua@gmail.com;
>
> 主题:Re: FlinkKafkaConsumer问题
>
>
>
>
> 为了保证exactly-once,flink自己通过barrier来实现checkpoint,包括barrier的传递等等,所以flink在kafkaconsumer的基础之上,封装了一层语义保障。
>
> 在 2020/9/4 10:34, Shuiqiang Chen 写道:
>
>  Hi,
>  为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source 算子维护当前算子所消费的
> partition 消费 offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint
> 恢复中从上次 commit 的位点开始消费,保证 exactly-once. 如果用 Kafka 消费组管理,那么
> FlinkKafkaConsumer 内各个并发实例所分配的 partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka
> 消费组管理者记录,Flink 无法维护这些信息。
> 
>  在 2020年9月4日,上午10:25,op <520075...@qq.com 写道:
> 
> 
> 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?nbsp;
> 
> 
>  --nbsp;原始邮件nbsp;--
> 
> 发件人:
> "user-zh"
>   发送时间:nbsp;2020年9月3日(星期四) 晚上6:09
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: FlinkKafkaConsumer问题
> 
> 
> 
>  Hi op,
> 
>  在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic
> 的所有 partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit
> 到 Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka
> 服务端的一个角色。
> 
>  另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id
> commit offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset
> 开始消费。
> 
>  gt; 在 2020年9月3日,下午3:03,op <520075...@qq.comgt; 写道:
>  gt;
>  gt; amp;nbsp; amp;nbsp; hi,amp;nbsp;
> amp;nbsp; 我对FlinkKafkaConsumer的实现有点迷惑,amp;nbsp; amp;nbsp;
> 这有两个相同代码的程序:
>  gt; //---
>  gt; val bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment
>  gt; Env.setRestartStrategy(RestartStrategies.noRestart())
>  gt; val consumerProps = new Properties()
>  gt; consumerProps.put("bootstrap.servers", brokers)
>  gt; consumerProps.put("group.id", "test1234")
>  gt;
>  gt; val consumer = new FlinkKafkaConsumer[String](topic,new
> KafkaStringSchema,consumerProps).setStartFromLatest()
>  gt; Env.addSource(consumer).print()
>  gt;
> Env.execute()//---我同时启动这两个程序,他们连接相同的集群的topic,
> group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer
> group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11
> 谢谢


Re: FlinkKafkaConsumer问题

2020-09-03 文章 taochanglian

为了保证exactly-once,flink自己通过barrier来实现checkpoint,包括barrier的传递等等,所以flink在kafkaconsumer的基础之上,封装了一层语义保障。

在 2020/9/4 10:34, Shuiqiang Chen 写道:


Hi,
为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source  算子维护当前算子所消费的 partition 消费 
offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint 恢复中从上次 commit 
的位点开始消费,保证 exactly-once.  如果用 Kafka 消费组管理,那么 FlinkKafkaConsumer 内各个并发实例所分配的 
partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka 消费组管理者记录,Flink 无法维护这些信息。


在 2020年9月4日,上午10:25,op <520075...@qq.com> 写道:

谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?


--原始邮件--
发件人: 
   "user-zh"   
 



Re: FlinkKafkaConsumer问题

2020-09-03 文章 Shuiqiang Chen
Hi,
为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source  算子维护当前算子所消费的 partition 消费 
offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint 恢复中从上次 commit 
的位点开始消费,保证 exactly-once.  如果用 Kafka 消费组管理,那么 FlinkKafkaConsumer 内各个并发实例所分配的 
partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka 消费组管理者记录,Flink 无法维护这些信息。

> 在 2020年9月4日,上午10:25,op <520075...@qq.com> 写道:
> 
> 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?
> 
> 
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2020年9月3日(星期四) 晚上6:09
> 收件人:"user-zh" 
> 主题:Re: FlinkKafkaConsumer问题
> 
> 
> 
> Hi op,
> 
> 在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 
> partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 
> Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。
> 
> 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit 
> offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。
> 
>  在 2020年9月3日,下午3:03,op <520075...@qq.com 写道:
>  
>  nbsp; nbsp; hi,nbsp; nbsp; 
> 我对FlinkKafkaConsumer的实现有点迷惑,nbsp; nbsp; 这有两个相同代码的程序:
>  //---
>  val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>  Env.setRestartStrategy(RestartStrategies.noRestart())
>  val consumerProps = new Properties()
>  consumerProps.put("bootstrap.servers", brokers)
>  consumerProps.put("group.id", "test1234")
>  
>  val consumer = new FlinkKafkaConsumer[String](topic,new 
> KafkaStringSchema,consumerProps).setStartFromLatest()
>  Env.addSource(consumer).print()
>  
> Env.execute()//---我同时启动这两个程序,他们连接相同的集群的topic,group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka
>  的consumer 
> group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11
>  谢谢



Re: FlinkKafkaConsumer问题

2020-09-03 文章 lec ssmi
是flink对kafka的消费,是自己管理offset,用low-level api去寻址,而不是用group.id来管理offset这种high-level
api。是这个意思吧。

op <520075...@qq.com> 于2020年9月4日周五 上午10:25写道:

>
> 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> acqua@gmail.com;
> 发送时间:2020年9月3日(星期四) 晚上6:09
> 收件人:"user-zh"
> 主题:Re: FlinkKafkaConsumer问题
>
>
>
> Hi op,
>
> 在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有
> partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到
> Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。
>
> 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit
> offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。
>
>  在 2020年9月3日,下午3:03,op <520075...@qq.com 写道:
> 
>  nbsp; nbsp; hi,nbsp; nbsp;
> 我对FlinkKafkaConsumer的实现有点迷惑,nbsp; nbsp; 这有两个相同代码的程序:
>  //---
>  val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>  Env.setRestartStrategy(RestartStrategies.noRestart())
>  val consumerProps = new Properties()
>  consumerProps.put("bootstrap.servers", brokers)
>  consumerProps.put("group.id", "test1234")
> 
>  val consumer = new FlinkKafkaConsumer[String](topic,new
> KafkaStringSchema,consumerProps).setStartFromLatest()
>  Env.addSource(consumer).print()
> 
> Env.execute()//---我同时启动这两个程序,他们连接相同的集群的topic,
> group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer
> group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11
> 谢谢


Re: FlinkKafkaConsumer问题

2020-09-03 文章 Shuiqiang Chen
Hi op,

在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 partition 
信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 
Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。

另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit offset 
到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。

> 在 2020年9月3日,下午3:03,op <520075...@qq.com> 写道:
> 
>   hi,  我对FlinkKafkaConsumer的实现有点迷惑,  
> 这有两个相同代码的程序:
> //---
> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> Env.setRestartStrategy(RestartStrategies.noRestart())
> val consumerProps = new Properties()
> consumerProps.put("bootstrap.servers", brokers)
> consumerProps.put("group.id", "test1234")
> 
> val consumer = new FlinkKafkaConsumer[String](topic,new 
> KafkaStringSchema,consumerProps).setStartFromLatest()
> Env.addSource(consumer).print()
> Env.execute()//---我同时启动这两个程序,他们连接相同的集群的topic,group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka
>  的consumer 
> group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11
>  谢谢