You have 32 partitions. Reading can not be distributed to more than 32 parallel tasks. If you have a log of processing for each record after reading, you can reshuffle the messages before processing them, that way the processing could be distributed to more tasks. Search for previous threads about reshuffle in Beam lists.
On Thu, Jan 24, 2019 at 7:23 PM <linr...@itri.org.tw> wrote: > Dear all, > > > > I am using the kafkaIO sdk in my project (Beam with Spark runner). > > > > The problem about task skew is shown as the following figure. > > > > My running environment is: > > OS: Ubuntn 14.04.4 LTS > > The version of related tools is: > > java version: "1.8.0_151" > > Beam version: 2.9.0 (Spark runner with Standalone mode) > > Spark version: 2.3.1 Standalone mode > > Execution condition: > > Master/Driver node: ubuntu7 > > Worker nodes: ubuntu8 (4 Executors); ubuntu9 (4 Executors) > > The number of executors is 8 > > > > Kafka Broker: 2.10-0.10.1.1 > > Broker node at ubuntu7 > > Kafka Client: > > The topic: kafkasink32 > > kafkasink32 Partitions: 32 > > > > The programming of my project for kafkaIO SDK is as: > > > ============================================================================== > > Map<String, Object> map = ImmutableMap.<String, Object>*builder*() > > .put("group.id", (Object)"test-consumer-group") > > .build(); > > List<TopicPartition> topicPartitions = *new** ArrayList()*; > > *for*(*int* i = 0; i < 32; i++) { > > topicPartitions.add(*new* TopicPartition( > "kafkasink32",i)); > > } > > PCollection<KV<Long, String>> readKafkaData = p.apply(KafkaIO.<Long, > String>*read*() > > .withBootstrapServers("ubuntu7:9092") > > .updateConsumerProperties(map) > > .withKeyDeserializer(LongDeserializer.*class*) > > .withValueDeserializer(StringDeserializer.*class*) > > .withTopicPartitions(topicPartitions) > > .withoutMetadata() > > ); > > > ============================================================================== > > Here I have two directions to solve this problem: > > > > 1. Using the following sdk from spark streaming > > > https://jaceklaskowski.gitbooks.io/spark-streaming/spark-streaming-kafka-LocationStrategy.html > > LocationStrategies.PreferConsistent: Use in most cases as it consistently > distributes partitions across all executors. > > > > If we would like to use this feature, we have not idea to set this in > kafkaIO SDK. > > > > 2. Setting the related configurations of kafka to perform the > consumer rebalance > > set consumer group? Set group.id? > > > > If we need to do No2., could someone give me some ideas to set > configurations? > > > > If anyone provides any direction to help us to overcome this problem, we > would appreciate it. > > > > Thanks. > > > > Sincerely yours, > > > > Rick > > > > > -- > 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain > confidential information. Please do not use or disclose it in any way and > delete it if you are not the intended recipient. >