Forgot to hit "reply all" in my last email. On Fri, Nov 17, 2017 at 8:26 PM, Gary Yao <g...@data-artisans.com> wrote:
> Hi Robert, > > To get your desired behavior, you should start a single job with > parallelism set to 4. > > Flink does not rely on Kafka's consumer groups to distribute the > partitions to the parallel subtasks. > Instead, Flink does the assignment of partitions itself and also tracks > and checkpoints the offsets internally. > This is needed to achieve exactly-once semantics. > > The group.id that you are setting is used for different purposes, e.g., > to track the consumer lag of a job. > > Best, > > Gary > > On Fri, Nov 17, 2017 at 7:54 PM, r. r. <rob...@abv.bg> wrote: > >> Hi >> >> >> >> it's Flink 1.3.2, Kafka 0.10.2.0 >> >> I am starting 1 JM and 4 TM (with 1 task slot each). Then I deploy 4 >> times (via ./flink run -p1 x.jar), job parallelism is set to 1. >> >> >> >> A new thing I just noticed: if I start in parallel to the Flink jobs two >> kafka-console-consumer (with --consumer-property >> group.id=TopicConsumers) and write a msg to Kafka, then one of the >> console consumers receives the msg together with both Flink jobs. >> >> I though maybe the Flink consumers didn't receive the group property >> passed via "flink run .. --group.id TopicConsumers", but no - they do >> belong to the group as well: >> >> >> >> taskmanager_3 | 2017-11-17 18:29:00,750 INFO >> org.apache.kafka.clients.consumer.ConsumerConfig - >> ConsumerConfig values: >> >> taskmanager_3 | auto.commit.interval.ms = 5000 >> >> taskmanager_3 | auto.offset.reset = latest >> >> taskmanager_3 | bootstrap.servers = [kafka:9092] >> >> taskmanager_3 | check.crcs = true >> >> taskmanager_3 | client.id = >> >> taskmanager_3 | connections.max.idle.ms = 540000 >> >> taskmanager_3 | enable.auto.commit = true >> >> taskmanager_3 | exclude.internal.topics = true >> >> taskmanager_3 | fetch.max.bytes = 52428800 >> >> taskmanager_3 | fetch.max.wait.ms = 500 >> >> taskmanager_3 | fetch.min.bytes = 1 >> >> taskmanager_3 | group.id = TopicConsumers >> >> taskmanager_3 | heartbeat.interval.ms = 3000 >> >> taskmanager_3 | interceptor.classes = null >> >> taskmanager_3 | key.deserializer = class >> org.apache.kafka.common.serialization.ByteArrayDeserializer >> >> taskmanager_3 | max.partition.fetch.bytes = 1048576 >> >> taskmanager_3 | max.poll.interval.ms = 300000 >> >> taskmanager_3 | max.poll.records = 500 >> >> taskmanager_3 | metadata.max.age.ms = 300000 >> >> taskmanager_3 | metric.reporters = [] >> >> taskmanager_3 | metrics.num.samples = 2 >> >> taskmanager_3 | metrics.recording.level = INFO >> >> taskmanager_3 | metrics.sample.window.ms = 30000 >> >> taskmanager_3 | partition.assignment.strategy = [class >> org.apache.kafka.clients.consumer.RangeAssignor] >> >> taskmanager_3 | receive.buffer.bytes = 65536 >> >> taskmanager_3 | reconnect.backoff.ms = 50 >> >> taskmanager_3 | request.timeout.ms = 305000 >> >> taskmanager_3 | retry.backoff.ms = 100 >> >> taskmanager_3 | sasl.jaas.config = null >> >> taskmanager_3 | sasl.kerberos.kinit.cmd = /usr/bin/kinit >> >> taskmanager_3 | sasl.kerberos.min.time.before.relogin = 60000 >> >> taskmanager_3 | sasl.kerberos.service.name = null >> >> taskmanager_3 | sasl.kerberos.ticket.renew.jitter = 0.05 >> >> taskmanager_3 | sasl.kerberos.ticket.renew.window.factor = 0.8 >> >> taskmanager_3 | sasl.mechanism = GSSAPI >> >> taskmanager_3 | security.protocol = PLAINTEXT >> >> taskmanager_3 | send.buffer.bytes = 131072 >> >> taskmanager_3 | session.timeout.ms = 10000 >> >> taskmanager_3 | ssl.cipher.suites = null >> >> taskmanager_3 | ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >> >> taskmanager_3 | ssl.endpoint.identification.algorithm = null >> >> taskmanager_3 | ssl.key.password = null >> >> taskmanager_3 | ssl.keymanager.algorithm = SunX509 >> >> taskmanager_3 | ssl.keystore.location = null >> >> taskmanager_3 | ssl.keystore.password = null >> >> taskmanager_3 | ssl.keystore.type = JKS >> >> taskmanager_3 | ssl.protocol = TLS >> >> taskmanager_3 | ssl.provider = null >> >> taskmanager_3 | ssl.secure.random.implementation = null >> >> taskmanager_3 | ssl.trustmanager.algorithm = PKIX >> >> taskmanager_3 | ssl.truststore.location = null >> >> taskmanager_3 | ssl.truststore.password = null >> >> taskmanager_3 | ssl.truststore.type = JKS >> >> taskmanager_3 | value.deserializer = class >> org.apache.kafka.common.serialization.ByteArrayDeserializer >> >> taskmanager_3 | >> >> taskmanager_3 | 2017-11-17 18:29:00,765 WARN >> org.apache.kafka.clients.consumer.ConsumerConfig - The >> configuration 'topic' was supplied but isn't a known config. >> >> taskmanager_3 | 2017-11-17 18:29:00,765 INFO >> org.apache.kafka.common.utils.AppInfoParser - Kafka >> version : 0.10.2.1 >> >> taskmanager_3 | 2017-11-17 18:29:00,770 INFO >> org.apache.kafka.common.utils.AppInfoParser - Kafka >> commitId : e89bffd6b2eff799 >> >> taskmanager_3 | 2017-11-17 18:29:00,791 INFO >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - >> Discovered coordinator kafka:9092 (id: 2147482646 <(214)%20748-2646> >> rack: null) for group >> TopicConsumers. >> >> >> >> >> >> I'm running Kafka and Flink jobs in docker containers, the >> console-consumers from localhost >> >> >> >> >> >> >> >> >-------- Оригинално писмо -------- >> >> >От: Gary Yao g...@data-artisans.com >> >> >Относно: Re: all task managers reading from all kafka partitions >> >> >До: "r. r." <rob...@abv.bg> >> >> >Изпратено на: 17.11.2017 20:02 >> >> >> >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > Hi Robert, >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > Can you tell us which Flink version you are using? >> >> > >> >> > >> >> > Also, are you starting a single job with parallelism 4 or are you >> starting several jobs? >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > Thanks! >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > Gary >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > On Fri, Nov 17, 2017 at 4:41 PM, r. r. >> >> > <rob...@abv.bg> wrote: >> >> > >> >> > >> >> > Hi >> >> > >> >> > I have this strange problem: 4 task managers each with one task >> slot, attaching to the same Kafka topic which has 10 partitions. >> >> > >> >> > When I post a single message to the Kafka topic it seems that >> all 4 consumers fetch the message and start processing (confirmed by TM >> logs). >> >> > >> >> > If I run kafka-consumer-groups.sh --describe --group >> TopicConsumers it says that only one message was posted to a single >> partition. Next message would generally go to another partition. >> >> > >> >> > In addition, while the Flink jobs are running on the message, I >> start two kafka-console-consumer.sh and each would get only one message, as >> expected. >> >> > >> >> > On start each of the Flink TM would post something that to me >> reads as if it would read from all partitions: >> >> > >> >> > 2017-11-17 15:03:38,688 INFO org.apache.flink.streaming.con >> nectors.kafka.FlinkKafkaConsumer09 - Got 10 partitions from these >> topics: [TopicToConsume] >> >> > 2017-11-17 15:03:38,689 INFO org.apache.flink.streaming.con >> nectors.kafka.FlinkKafkaConsumer09 - Consumer is going to read the >> following topics (with number of partitions): TopicToConsume (10), >> >> > 2017-11-17 15:03:38,689 INFO org.apache.flink.streaming.con >> nectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 will start >> reading the following 10 partitions from the committed group offsets in >> Kafka: [KafkaTopicPartition{topic='TopicToConsume', partition=8}, >> KafkaTopicPartition{topic='TopicToConsume', partition=9}, >> KafkaTopicPartition{topic='TopicToConsume', partition=6}, >> KafkaTopicPartition{topic='TopicToConsume', partition=7}, >> KafkaTopicPartition{topic='TopicToConsume', partition=4}, >> KafkaTopicPartition{topic='TopicToConsume', partition=5}, >> KafkaTopicPartition{topic='TopicToConsume', partition=2}, >> KafkaTopicPartition{topic='TopicToConsume', partition=3}, >> KafkaTopicPartition{topic='TopicToConsume', partition=0}, >> KafkaTopicPartition{topic='TopicToConsume', partition=1}] >> >> > 2017-11-17 15:03:38,699 INFO org.apache.kafka.clients.consu >> mer.ConsumerConfig - ConsumerConfig values: >> >> > >> >> > auto.commit.interval.ms = 5000 >> >> > auto.offset.reset = latest >> >> > >> >> > >> >> > >> >> > Any hints? >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> > >