Hmm, but I want single slot task managers and multiple jobs so that if one job 
fails it doesn't bring the whole setup (for example 30+ parallel consumers) 
What setup would you advise? The job is quite heavy and might bring the VM down 
if run with such concurency in one JVM.


 >-------- Оригинално писмо --------

 >От: Gary Yao

 >Относно: Re: all task managers reading from all kafka partitions

 >До: "r. r." <>

 >Изпратено на: 17.11.2017 22:58

>    Forgot to hit "reply all" in my last email.
>      On Fri, Nov 17, 2017 at 8:26 PM, Gary Yao 
>      <> wrote:
>        Hi Robert,
>         To get your desired behavior, you should start a single job with 
> parallelism set to 4.
>         Flink does not rely on Kafka's consumer groups to distribute the 
> partitions to the parallel subtasks.
>         Instead, Flink does the assignment of partitions itself and also 
> tracks and checkpoints the offsets internally.
>         This is needed to achieve exactly-once semantics.
>         The 
> that you are setting is used for different purposes, e.g., 
> to track the consumer lag of a job.
>         Best,
>         Gary
>           On Fri, Nov 17, 2017 at 7:54 PM, r. r. 
>           <> wrote:
>            Hi    it's Flink 1.3.2, Kafka  I am starting 1 JM and 4 
> TM (with 1 task slot each). Then I deploy 4 times (via ./flink run -p1 
> x.jar), job parallelism is set to 1.    A new thing I just noticed: if I 
> start in parallel to the Flink jobs two  kafka-console-consumer (with 
> --consumer-property and write a msg to Kafka, then 
> one of the console consumers receives the msg together with both Flink jobs.  
> I though maybe the Flink consumers didn't receive the group property passed 
> via "flink run .. TopicConsumers", but no - they do belong to the 
> group as well:    taskmanager_3  | 2017-11-17 18:29:00,750 INFO 
>             org.apache.kafka.clients.consumer.ConsumerConfig              -
>             ConsumerConfig values:
>               taskmanager_3  |     
>     = 5000
>               taskmanager_3  |     auto.offset.reset = latest
>               taskmanager_3  |     bootstrap.servers = [kafka:9092]
>               taskmanager_3  |     check.crcs = true
>               taskmanager_3  |     
>     =
>               taskmanager_3  |     
>     = 540000
>               taskmanager_3  | = true
>               taskmanager_3  |     exclude.internal.topics = true
>               taskmanager_3  |     fetch.max.bytes = 52428800
>               taskmanager_3  |     
>     = 500
>               taskmanager_3  |     fetch.min.bytes = 1
>               taskmanager_3  |     
>     = TopicConsumers
>               taskmanager_3  |     
>     = 3000
>               taskmanager_3  |     interceptor.classes = null
>               taskmanager_3  |     key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>               taskmanager_3  |     max.partition.fetch.bytes = 1048576
>               taskmanager_3  |     
>     = 300000
>               taskmanager_3  |     max.poll.records = 500
>               taskmanager_3  |     
>     = 300000
>               taskmanager_3  |     metric.reporters = []
>               taskmanager_3  |     metrics.num.samples = 2
>               taskmanager_3  |     metrics.recording.level = INFO
>               taskmanager_3  |     
>     = 30000
>               taskmanager_3  |     partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor]
>               taskmanager_3  |     receive.buffer.bytes = 65536
>               taskmanager_3  |     
>     = 50
>               taskmanager_3  |     
>     = 305000
>               taskmanager_3  |     
>     = 100
>               taskmanager_3  |     sasl.jaas.config = null
>               taskmanager_3  |     sasl.kerberos.kinit.cmd = /usr/bin/kinit
>               taskmanager_3  |     sasl.kerberos.min.time.before.relogin = 
> 60000
>               taskmanager_3  |     
>     = null
>               taskmanager_3  |     sasl.kerberos.ticket.renew.jitter = 0.05
>               taskmanager_3  |     sasl.kerberos.ticket.renew.window.factor = 
> 0.8
>               taskmanager_3  |     sasl.mechanism = GSSAPI
>               taskmanager_3  |     security.protocol = PLAINTEXT
>               taskmanager_3  |     send.buffer.bytes = 131072
>               taskmanager_3  |     
>     = 10000
>               taskmanager_3  |     ssl.cipher.suites = null
>               taskmanager_3  |     ssl.enabled.protocols = [TLSv1.2, TLSv1.1, 
> TLSv1]
>               taskmanager_3  |     
>              ssl.endpoint.identification.algorithm = null
>               taskmanager_3  |     ssl.key.password = null
>               taskmanager_3  |     ssl.keymanager.algorithm = SunX509
>               taskmanager_3  |     ssl.keystore.location = null
>               taskmanager_3  |     ssl.keystore.password = null
>               taskmanager_3  |     ssl.keystore.type = JKS
>               taskmanager_3  |     ssl.protocol = TLS
>               taskmanager_3  |     ssl.provider = null
>               taskmanager_3  | = null
>               taskmanager_3  |     ssl.trustmanager.algorithm = PKIX
>               taskmanager_3  |     ssl.truststore.location = null
>               taskmanager_3  |     ssl.truststore.password = null
>               taskmanager_3  |     ssl.truststore.type = JKS
>               taskmanager_3  |     value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>               taskmanager_3  |
>               taskmanager_3  | 2017-11-17 18:29:00,765 WARN 
>               org.apache.kafka.clients.consumer.ConsumerConfig              - 
> The
>               configuration 'topic' was supplied but isn't a known config.
>               taskmanager_3  | 2017-11-17 18:29:00,765 INFO 
>               org.apache.kafka.common.utils.AppInfoParser                   - 
> Kafka
>               version :
>               taskmanager_3  | 2017-11-17 18:29:00,770 INFO 
>               org.apache.kafka.common.utils.AppInfoParser                   - 
> Kafka
>               commitId : e89bffd6b2eff799
>               taskmanager_3  | 2017-11-17 18:29:00,791 INFO 
>               org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
>  -
>               Discovered coordinator kafka:9092 (id: 
>              2147482646 rack: null) for group
>               TopicConsumers.
>               I'm running Kafka and Flink jobs in docker containers, the 
> console-consumers from localhost
>                >-------- Оригинално писмо --------
>                >От: Gary Yao 
>                >Относно: Re: all task managers reading from all kafka 
> partitions
>                >До: "r. r." <
>    >
>                >Изпратено на: 17.11.2017 20:02
>               >
>               >
>               >
>               >
>               >
>               >
>              >      Hi Robert,
>               >
>               >
>               >
>               >
>               >
>               >      Can you tell us which Flink version you are using? 
>               >
>               >
>               >      Also, are you starting a single job with parallelism 4 
> or are you starting several jobs?
>               >
>               >
>               >
>               >
>               >
>               >      Thanks!
>               >
>               >
>               >
>               >
>               >
>               >      Gary
>               >
>               >
>               >
>               >
>               >
>               >      On Fri, Nov 17, 2017 at 4:41 PM, r. r.
>               >      <
>    > wrote:
>               >
>               >
>               >       Hi
>               >
>               >        I have this strange problem: 4 task managers each with 
> one task slot, attaching to the same Kafka topic which has 10 partitions.
>               >
>               >        When I post a single message to the Kafka topic it 
> seems that all 4 consumers fetch the message and start processing (confirmed 
> by TM logs).
>               >
>               >        If I run  --describe --group 
> TopicConsumers it says that only one message was posted to a single 
> partition. Next message would generally go to another partition.
>               >
>               >        In addition, while the Flink jobs are running on the 
> message, I start two and each would get only one 
> message, as expected.
>               >
>               >        On start each of the Flink TM would post something 
> that to me reads as if it would read from all partitions:
>               >
>               >        2017-11-17 15:03:38,688 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Got 10 
> partitions from these topics: [TopicToConsume]
>               >        2017-11-17 15:03:38,689 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Consumer 
> is going to read the following topics (with number of partitions): 
> TopicToConsume (10),
>               >        2017-11-17 15:03:38,689 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
> Consumer subtask 0 will start reading the following 10 partitions from the 
> committed group offsets in Kafka: 
> [KafkaTopicPartition{topic='TopicToConsume', partition=8}, 
> KafkaTopicPartition{topic='TopicToConsume', partition=9}, 
> KafkaTopicPartition{topic='TopicToConsume', partition=6}, 
> KafkaTopicPartition{topic='TopicToConsume', partition=7}, 
> KafkaTopicPartition{topic='TopicToConsume', partition=4}, 
> KafkaTopicPartition{topic='TopicToConsume', partition=5}, 
> KafkaTopicPartition{topic='TopicToConsume', partition=2}, 
> KafkaTopicPartition{topic='TopicToConsume', partition=3}, 
> KafkaTopicPartition{topic='TopicToConsume', partition=0}, 
> KafkaTopicPartition{topic='TopicToConsume', partition=1}]
>               >        2017-11-17 15:03:38,699 INFO  
> org.apache.kafka.clients.consumer.ConsumerConfig              - 
> ConsumerConfig values:
>               >               
>               >       
>     = 5000
>               >                auto.offset.reset = latest
>               >
>               >
>               >
>               >        Any hints?
>               >
>               >
>               >
>               >
>               >
>               >
>               >
>               >
>               >

Reply via email to