Forgot to hit "reply all" in my last email.

On Fri, Nov 17, 2017 at 8:26 PM, Gary Yao <g...@data-artisans.com> wrote:

> Hi Robert,
>
> To get your desired behavior, you should start a single job with
> parallelism set to 4.
>
> Flink does not rely on Kafka's consumer groups to distribute the
> partitions to the parallel subtasks.
> Instead, Flink does the assignment of partitions itself and also tracks
> and checkpoints the offsets internally.
> This is needed to achieve exactly-once semantics.
>
> The group.id that you are setting is used for different purposes, e.g.,
> to track the consumer lag of a job.
>
> Best,
>
> Gary
>
> On Fri, Nov 17, 2017 at 7:54 PM, r. r. <rob...@abv.bg> wrote:
>
>> Hi
>>
>>
>>
>> it's Flink 1.3.2, Kafka 0.10.2.0
>>
>> I am starting 1 JM and 4 TM (with 1 task slot each). Then I deploy 4
>> times (via ./flink run -p1 x.jar), job parallelism is set to 1.
>>
>>
>>
>> A new thing I just noticed: if I start in parallel to the Flink jobs two
>>  kafka-console-consumer (with --consumer-property
>> group.id=TopicConsumers) and write a msg to Kafka, then one of the
>> console consumers receives the msg together with both Flink jobs.
>>
>> I though maybe the Flink consumers didn't receive the group property
>> passed via "flink run .. --group.id TopicConsumers", but no - they do
>> belong to the group as well:
>>
>>
>>
>> taskmanager_3  | 2017-11-17 18:29:00,750 INFO
>> org.apache.kafka.clients.consumer.ConsumerConfig              -
>> ConsumerConfig values:
>>
>> taskmanager_3  |     auto.commit.interval.ms = 5000
>>
>> taskmanager_3  |     auto.offset.reset = latest
>>
>> taskmanager_3  |     bootstrap.servers = [kafka:9092]
>>
>> taskmanager_3  |     check.crcs = true
>>
>> taskmanager_3  |     client.id =
>>
>> taskmanager_3  |     connections.max.idle.ms = 540000
>>
>> taskmanager_3  |     enable.auto.commit = true
>>
>> taskmanager_3  |     exclude.internal.topics = true
>>
>> taskmanager_3  |     fetch.max.bytes = 52428800
>>
>> taskmanager_3  |     fetch.max.wait.ms = 500
>>
>> taskmanager_3  |     fetch.min.bytes = 1
>>
>> taskmanager_3  |     group.id = TopicConsumers
>>
>> taskmanager_3  |     heartbeat.interval.ms = 3000
>>
>> taskmanager_3  |     interceptor.classes = null
>>
>> taskmanager_3  |     key.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>>
>> taskmanager_3  |     max.partition.fetch.bytes = 1048576
>>
>> taskmanager_3  |     max.poll.interval.ms = 300000
>>
>> taskmanager_3  |     max.poll.records = 500
>>
>> taskmanager_3  |     metadata.max.age.ms = 300000
>>
>> taskmanager_3  |     metric.reporters = []
>>
>> taskmanager_3  |     metrics.num.samples = 2
>>
>> taskmanager_3  |     metrics.recording.level = INFO
>>
>> taskmanager_3  |     metrics.sample.window.ms = 30000
>>
>> taskmanager_3  |     partition.assignment.strategy = [class
>> org.apache.kafka.clients.consumer.RangeAssignor]
>>
>> taskmanager_3  |     receive.buffer.bytes = 65536
>>
>> taskmanager_3  |     reconnect.backoff.ms = 50
>>
>> taskmanager_3  |     request.timeout.ms = 305000
>>
>> taskmanager_3  |     retry.backoff.ms = 100
>>
>> taskmanager_3  |     sasl.jaas.config = null
>>
>> taskmanager_3  |     sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>
>> taskmanager_3  |     sasl.kerberos.min.time.before.relogin = 60000
>>
>> taskmanager_3  |     sasl.kerberos.service.name = null
>>
>> taskmanager_3  |     sasl.kerberos.ticket.renew.jitter = 0.05
>>
>> taskmanager_3  |     sasl.kerberos.ticket.renew.window.factor = 0.8
>>
>> taskmanager_3  |     sasl.mechanism = GSSAPI
>>
>> taskmanager_3  |     security.protocol = PLAINTEXT
>>
>> taskmanager_3  |     send.buffer.bytes = 131072
>>
>> taskmanager_3  |     session.timeout.ms = 10000
>>
>> taskmanager_3  |     ssl.cipher.suites = null
>>
>> taskmanager_3  |     ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>
>> taskmanager_3  |     ssl.endpoint.identification.algorithm = null
>>
>> taskmanager_3  |     ssl.key.password = null
>>
>> taskmanager_3  |     ssl.keymanager.algorithm = SunX509
>>
>> taskmanager_3  |     ssl.keystore.location = null
>>
>> taskmanager_3  |     ssl.keystore.password = null
>>
>> taskmanager_3  |     ssl.keystore.type = JKS
>>
>> taskmanager_3  |     ssl.protocol = TLS
>>
>> taskmanager_3  |     ssl.provider = null
>>
>> taskmanager_3  |     ssl.secure.random.implementation = null
>>
>> taskmanager_3  |     ssl.trustmanager.algorithm = PKIX
>>
>> taskmanager_3  |     ssl.truststore.location = null
>>
>> taskmanager_3  |     ssl.truststore.password = null
>>
>> taskmanager_3  |     ssl.truststore.type = JKS
>>
>> taskmanager_3  |     value.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>>
>> taskmanager_3  |
>>
>> taskmanager_3  | 2017-11-17 18:29:00,765 WARN
>> org.apache.kafka.clients.consumer.ConsumerConfig              - The
>> configuration 'topic' was supplied but isn't a known config.
>>
>> taskmanager_3  | 2017-11-17 18:29:00,765 INFO
>> org.apache.kafka.common.utils.AppInfoParser                   - Kafka
>> version : 0.10.2.1
>>
>> taskmanager_3  | 2017-11-17 18:29:00,770 INFO
>> org.apache.kafka.common.utils.AppInfoParser                   - Kafka
>> commitId : e89bffd6b2eff799
>>
>> taskmanager_3  | 2017-11-17 18:29:00,791 INFO
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  -
>> Discovered coordinator kafka:9092 (id: 2147482646 <(214)%20748-2646>
>> rack: null) for group
>> TopicConsumers.
>>
>>
>>
>>
>>
>> I'm running Kafka and Flink jobs in docker containers, the
>> console-consumers from localhost
>>
>>
>>
>>
>>
>>
>>
>>  >-------- Оригинално писмо --------
>>
>>  >От: Gary Yao g...@data-artisans.com
>>
>>  >Относно: Re: all task managers reading from all kafka partitions
>>
>>  >До: "r. r." <rob...@abv.bg>
>>
>>  >Изпратено на: 17.11.2017 20:02
>>
>>
>>
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >      Hi Robert,
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >      Can you tell us which Flink version you are using?
>>
>> >
>>
>> >
>>
>> >      Also, are you starting a single job with parallelism 4 or are you
>> starting several jobs?
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >      Thanks!
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >      Gary
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >      On Fri, Nov 17, 2017 at 4:41 PM, r. r.
>>
>> >      <rob...@abv.bg> wrote:
>>
>> >
>>
>> >
>>
>> >       Hi
>>
>> >
>>
>> >        I have this strange problem: 4 task managers each with one task
>> slot, attaching to the same Kafka topic which has 10 partitions.
>>
>> >
>>
>> >        When I post a single message to the Kafka topic it seems that
>> all 4 consumers fetch the message and start processing (confirmed by TM
>> logs).
>>
>> >
>>
>> >        If I run kafka-consumer-groups.sh  --describe --group
>> TopicConsumers it says that only one message was posted to a single
>> partition. Next message would generally go to another partition.
>>
>> >
>>
>> >        In addition, while the Flink jobs are running on the message, I
>> start two kafka-console-consumer.sh and each would get only one message, as
>> expected.
>>
>> >
>>
>> >        On start each of the Flink TM would post something that to me
>> reads as if it would read from all partitions:
>>
>> >
>>
>> >        2017-11-17 15:03:38,688 INFO  org.apache.flink.streaming.con
>> nectors.kafka.FlinkKafkaConsumer09  - Got 10 partitions from these
>> topics: [TopicToConsume]
>>
>> >        2017-11-17 15:03:38,689 INFO  org.apache.flink.streaming.con
>> nectors.kafka.FlinkKafkaConsumer09  - Consumer is going to read the
>> following topics (with number of partitions): TopicToConsume (10),
>>
>> >        2017-11-17 15:03:38,689 INFO  org.apache.flink.streaming.con
>> nectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start
>> reading the following 10 partitions from the committed group offsets in
>> Kafka: [KafkaTopicPartition{topic='TopicToConsume', partition=8},
>> KafkaTopicPartition{topic='TopicToConsume', partition=9},
>> KafkaTopicPartition{topic='TopicToConsume', partition=6},
>> KafkaTopicPartition{topic='TopicToConsume', partition=7},
>> KafkaTopicPartition{topic='TopicToConsume', partition=4},
>> KafkaTopicPartition{topic='TopicToConsume', partition=5},
>> KafkaTopicPartition{topic='TopicToConsume', partition=2},
>> KafkaTopicPartition{topic='TopicToConsume', partition=3},
>> KafkaTopicPartition{topic='TopicToConsume', partition=0},
>> KafkaTopicPartition{topic='TopicToConsume', partition=1}]
>>
>> >        2017-11-17 15:03:38,699 INFO  org.apache.kafka.clients.consu
>> mer.ConsumerConfig              - ConsumerConfig values:
>>
>> >
>>
>> >       auto.commit.interval.ms = 5000
>>
>> >                auto.offset.reset = latest
>>
>> >
>>
>> >
>>
>> >
>>
>> >        Any hints?
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>
>

Reply via email to