Gary, thanks a lot!
I completely forgot that parallelism extends over all slots visible to the 
JobManager!
So adding e.g. -p4 to 'flink run' approach should suit my use case just fine, I 
believe.
I'll look deeper into failure recovery with this scheme

Have a great weekend!
-Robert








 >-------- Оригинално писмо --------

 >От: Gary Yao g...@data-artisans.com

 >Относно: Re: all task managers reading from all kafka partitions

 >До: "r. r." <rob...@abv.bg>

 >Изпратено на: 18.11.2017 11:28



 
> 
 
>  
 
>  
 
>   
 
>    
 
>     
 
>      Hi Robert,
 
>     
 
>     
 
>      
 
>     
 
>     
 
>      Running a single job does not mean that you are limited to a single JVM.
 
>     
 
>     
 
>      
 
>     
 
>     
 
>      For example, a job with parallelism 4 by default requires 4 task slots 
> to run.
 
>     
 
>     
 
>      You can provision 4 single slot TaskMangers on different hosts to 
> connect to the
 
>     
 
>     
 
>      same JobManager. The JobManager can then take your job and distribute the
 
>     
 
>     
 
>      execution on the 4 slots. To learn more about the distributed runtime
 
>     
 
>     
 
>      environment:
 
>     
 
>     
 
>      
 
>     
 
>     
 
>        
 
>      
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/concepts/runtime.html
 
>     
 
>     
 
>      
 
>     
 
>     
 
>      Regarding your concerns about job failures, a failure in the JobManager 
> or one
 
>     
 
>     
 
>      of the TaskManagers can bring your job down but Flink has built-in
 
>     
 
>     
 
>      fault-tolerance on different levels. You may want to read up on the 
> following
 
>     
 
>     
 
>      topics:
 
>     
 
>     
 
>      
 
>     
 
>     
 
>      - Data Streaming Fault Tolerance: 
 
>      
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/stream_checkpointing.html
 
>     
 
>     
 
>      - Restart Strategies: 
 
>      
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/restart_strategies.html
 
>     
 
>     
 
>      - JobManager High Availability: 
 
>      
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html
 
>     
 
>     
 
>      
 
>     
 
>     
 
>      Let me know if you have further questions.
 
>     
 
>     
 
>      
 
>     
 
>     
 
>      Best,
 
>     
 
>     
 
>      
 
>     
 
>     
 
>      Gary
 
>     
 
>    
 
>    
 
>     
 
>     
 
>      On Fri, Nov 17, 2017 at 11:11 PM, r. r. 
 
>      <rob...@abv.bg> wrote:
 
>      
 
>      
 
>       Hmm, but I want single slot task managers and multiple jobs so that if 
> one job fails it doesn't bring the whole setup (for example 30+ parallel 
> consumers) down.
 
>        What setup would you advise? The job is quite heavy and might bring 
> the VM down if run with such concurency in one JVM.
 
>        
 
>        Thanks!
 
>        
 
>               >-------- Оригинално писмо --------   >От: Gary Yao 
> g...@data-artisans.com   >Относно: Re: all task managers reading from all 
> kafka partitions   >До: "r. r." <rob...@abv.bg>   >Изпратено на: 17.11.2017 
> 22:58
 
>        
 
>       
 
>        
 
>          
 
>          
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >    Forgot to hit "reply all" in my last email.
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >      On Fri, Nov 17, 2017 at 8:26 PM, Gary Yao
 
>          
 
>          >      <
 
>         g...@data-artisans.com> wrote:
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >        Hi Robert,
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >         To get your desired behavior, you should start a single 
> job with parallelism set to 4.
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >         Flink does not rely on Kafka's consumer groups to 
> distribute the partitions to the parallel subtasks.
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >         Instead, Flink does the assignment of partitions itself 
> and also tracks and checkpoints the offsets internally.
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >         This is needed to achieve exactly-once semantics.
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >         The
 
>          
 
>          >         
 
>         group.id that you are setting is used for different purposes, e.g., 
> to track the consumer lag of a job.
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >         Best,
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >         Gary
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >           On Fri, Nov 17, 2017 at 7:54 PM, r. r.
 
>          
 
>          >           <
 
>         rob...@abv.bg> wrote:
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >            Hi    it's Flink 1.3.2, Kafka 0.10.2.0  I am starting 1 
> JM and 4 TM (with 1 task slot each). Then I deploy 4 times (via ./flink run 
> -p1 x.jar), job parallelism is set to 1.    A new thing I just noticed: if I 
> start in parallel to the Flink jobs two  kafka-console-consumer (with 
> --consumer-property 
 
>         group.id=TopicConsumers) and write a msg to Kafka, then one of the 
> console consumers receives the msg together with both Flink jobs.  I though 
> maybe the Flink consumers didn't receive the group property passed via "flink 
> run .. --
 
>         group.id TopicConsumers", but no - they do belong to the group as 
> well:    taskmanager_3  | 2017-11-17 18:29:00,750 INFO 
 
>          
 
>          >             org.apache.kafka.clients.consumer.ConsumerConfig       
>        -
 
>          
 
>          >             ConsumerConfig values:
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         auto.commit.interval.ms = 5000
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     auto.offset.reset = latest
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     bootstrap.servers = [kafka:9092]
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     check.crcs = true
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         client.id =
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         connections.max.idle.ms = 540000
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     enable.auto.commit = true
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     exclude.internal.topics = true
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     fetch.max.bytes = 52428800
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         fetch.max.wait.ms = 500
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     fetch.min.bytes = 1
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         group.id = TopicConsumers
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         heartbeat.interval.ms = 3000
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     interceptor.classes = null
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     max.partition.fetch.bytes = 
> 1048576
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         max.poll.interval.ms = 300000
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     max.poll.records = 500
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         metadata.max.age.ms = 300000
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     metric.reporters = []
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     metrics.num.samples = 2
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     metrics.recording.level = INFO
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         metrics.sample.window.ms = 30000
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     partition.assignment.strategy = 
> [class org.apache.kafka.clients.consumer.RangeAssignor]
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     receive.buffer.bytes = 65536
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         reconnect.backoff.ms = 50
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         request.timeout.ms = 305000
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         retry.backoff.ms = 100
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     sasl.jaas.config = null
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     sasl.kerberos.kinit.cmd = 
> /usr/bin/kinit
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     
> sasl.kerberos.min.time.before.relogin = 60000
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         sasl.kerberos.service.name = null
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     
> sasl.kerberos.ticket.renew.jitter = 0.05
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     
> sasl.kerberos.ticket.renew.window.factor = 0.8
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     sasl.mechanism = GSSAPI
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     security.protocol = PLAINTEXT
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     send.buffer.bytes = 131072
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         session.timeout.ms = 10000
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.cipher.suites = null
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.enabled.protocols = 
> [TLSv1.2, TLSv1.1, TLSv1]
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              ssl.endpoint.identification.algorithm = null
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.key.password = null
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.keymanager.algorithm = 
> SunX509
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.keystore.location = null
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.keystore.password = null
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.keystore.type = JKS
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.protocol = TLS
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.provider = null
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     
> ssl.secure.random.implementation = null
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.trustmanager.algorithm = 
> PKIX
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.truststore.location = null
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.truststore.password = null
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.truststore.type = JKS
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  | 2017-11-17 18:29:00,765 WARN 
 
>          
 
>          >               org.apache.kafka.clients.consumer.ConsumerConfig     
>          - The
 
>          
 
>          >               configuration 'topic' was supplied but isn't a known 
> config.
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  | 2017-11-17 18:29:00,765 INFO 
 
>          
 
>          >               org.apache.kafka.common.utils.AppInfoParser          
>          - Kafka
 
>          
 
>          >               version : 0.10.2.1
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  | 2017-11-17 18:29:00,770 INFO 
 
>          
 
>          >               org.apache.kafka.common.utils.AppInfoParser          
>          - Kafka
 
>          
 
>          >               commitId : e89bffd6b2eff799
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  | 2017-11-17 18:29:00,791 INFO 
 
>          
 
>          >               
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  -
 
>          
 
>          >               Discovered coordinator kafka:9092 (id:
 
>          
 
>          >              2147482646 rack: null) for group
 
>          
 
>          >               TopicConsumers.
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >               I'm running Kafka and Flink jobs in docker 
> containers, the console-consumers from localhost
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >                >-------- Оригинално писмо --------
 
>          
 
>          >
 
>          
 
>          >                >От: Gary Yao
 
>          
 
>          >              
 
>         g...@data-artisans.com
 
>          
 
>          >
 
>          
 
>          >                >Относно: Re: all task managers reading from all 
> kafka partitions
 
>          
 
>          >
 
>          
 
>          >                >До: "r. r." <
 
>          
 
>          >              
 
>         rob...@abv.bg>
 
>          
 
>          >
 
>          
 
>          >                >Изпратено на: 17.11.2017 20:02
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >              >      Hi Robert,
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >      Can you tell us which Flink version you are 
> using? 
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >      Also, are you starting a single job with 
> parallelism 4 or are you starting several jobs?
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >      Thanks!
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >      Gary
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >      On Fri, Nov 17, 2017 at 4:41 PM, r. r.
 
>          
 
>          >
 
>          
 
>          >               >      <
 
>          
 
>          >              
 
>         rob...@abv.bg> wrote:
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >       Hi
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >        I have this strange problem: 4 task 
> managers each with one task slot, attaching to the same Kafka topic which has 
> 10 partitions.
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >        When I post a single message to the Kafka 
> topic it seems that all 4 consumers fetch the message and start processing 
> (confirmed by TM logs).
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >        If I run kafka-consumer-groups.sh  
> --describe --group TopicConsumers it says that only one message was posted to 
> a single partition. Next message would generally go to another partition.
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >        In addition, while the Flink jobs are 
> running on the message, I start two kafka-console-consumer.sh and each would 
> get only one message, as expected.
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >        On start each of the Flink TM would post 
> something that to me reads as if it would read from all partitions:
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >        2017-11-17 15:03:38,688 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Got 10 
> partitions from these topics: [TopicToConsume]
 
>          
 
>          >
 
>          
 
>          >               >        2017-11-17 15:03:38,689 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Consumer 
> is going to read the following topics (with number of partitions): 
> TopicToConsume (10),
 
>          
 
>          >
 
>          
 
>          >               >        2017-11-17 15:03:38,689 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
> Consumer subtask 0 will start reading the following 10 partitions from the 
> committed group offsets in Kafka: 
> [KafkaTopicPartition{topic='TopicToConsume', partition=8}, 
> KafkaTopicPartition{topic='TopicToConsume', partition=9}, 
> KafkaTopicPartition{topic='TopicToConsume', partition=6}, 
> KafkaTopicPartition{topic='TopicToConsume', partition=7}, 
> KafkaTopicPartition{topic='TopicToConsume', partition=4}, 
> KafkaTopicPartition{topic='TopicToConsume', partition=5}, 
> KafkaTopicPartition{topic='TopicToConsume', partition=2}, 
> KafkaTopicPartition{topic='TopicToConsume', partition=3}, 
> KafkaTopicPartition{topic='TopicToConsume', partition=0}, 
> KafkaTopicPartition{topic='TopicToConsume', partition=1}]
 
>          
 
>          >
 
>          
 
>          >               >        2017-11-17 15:03:38,699 INFO  
> org.apache.kafka.clients.consumer.ConsumerConfig              - 
> ConsumerConfig values:
 
>          
 
>          >
 
>          
 
>          >               >               
 
>          
 
>          >
 
>          
 
>          >               >       
 
>          
 
>          >              
 
>         auto.commit.interval.ms = 5000
 
>          
 
>          >
 
>          
 
>          >               >                auto.offset.reset = latest
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >        Any hints?
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>        
 
>       
 
>      
 
>     
 
>     
 
>    
 
>    
 
>  

Reply via email to