RE: kafkaIO Consumer Rebalance with Spark Runner

2019-01-30 Thread linrick
Dear Alexey,

I have tried to use the following settings:

Map map = ImmutableMap.builder()
.put("topic", (Object)"kafkasink2")
.put("group.id", (Object)"test-consumer-group")
.put("partition.assignment.strategy", 
(Object)"org.apache.kafka.clients.consumer.RangeAssignor")
.put("enable.auto.commit", (Object)"true")
.put("auto.offset.reset", (Object)"earliest")
.put("max.poll.records", (Object)"10")
.build();
PCollection> readKafkaData = p.apply(KafkaIO.read()
.withBootstrapServers(KafkaProperties.KAFKA_SERVER_URL)
.withTopic("kafkasink2")
.updateConsumerProperties(map)
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata()
);
The experimental results are shown as the following figure:

[cid:image003.jpg@01D4B8B5.E5526280]

From the figure, it doesn't work for my project.

When we use the suggestion from @Juan Carlos Garcia
“You can limit your Spark processing by passing the following option to your 
beam pipeline: MaxRecordsPerBatch”

That does work for us.

Thanks for your idea.

Rick

From: Alexey Romanenko [mailto:aromanenko....@gmail.com]
Sent: Wednesday, January 30, 2019 2:10 AM
To: user@beam.apache.org
Subject: Re: kafkaIO Consumer Rebalance with Spark Runner

Rick,

I think “spark.streaming.kafka.maxRatePerPartition” won’t work for you since, 
afaik, it’s a configuration option of Spark Kafka reader and Beam KafkaIO 
doesn’t use it (since it has own consumer implementation).
In the same time, if you want to set an option for Beam KafkaIO consumer config 
then you should use "updateConsumerProperties()” method.


On 28 Jan 2019, at 10:56, mailto:linr...@itri.org.tw>> 
mailto:linr...@itri.org.tw>> wrote:

Dear Raghu,

I add the line: “PCollection reshuffled = 
windowKV.apply(Reshuffle.viaRandomKey());” in my program.

I tried to control the streaming data size: 100,000/1sec to decrease the 
processing time.

The following settings are used for my project.

1.  One topic / 2 partitions

2.  Two workers / two executors

3.  The spark-default setting is:
spark.executor.instances=2
spark.executor.cores=4
spark.executor.memory=2048m
spark.default.parallelism=200

spark.streaming.blockInterval=50ms
spark.streaming.kafka.maxRatePerPartition=50,000
spark.streaming.backpressure.enabled=true
spark.streaming.concurrentJobs = 1
spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC
spark.executor.extraJavaOptions=-Xss100M

spark.shuffle.consolidateFiles=true
spark.streaming.unpersist=true
spark.streaming.stopGracefullyOnShutdown=true

I hope that the data size is controlled at 100,000.

Here,


The data size is always over 100,000. The setting of 
“spark.streaming.kafka.maxRatePerPartition” confused me.

That does not seem to work for me.

Rick

From: Raghu Angadi [mailto:ang...@gmail.com]
Sent: Saturday, January 26, 2019 3:06 AM
To: user@beam.apache.org<mailto:user@beam.apache.org>
Subject: Re: kafkaIO Consumer Rebalance with Spark Runner

You have 32 partitions. Reading can not be distributed to more than 32 parallel 
tasks.
If you have a log of processing for each record after reading, you can 
reshuffle the messages before processing them, that way the processing could be 
distributed to more tasks. Search for previous threads about reshuffle in Beam 
lists.

On Thu, Jan 24, 2019 at 7:23 PM 
mailto:linr...@itri.org.tw>> wrote:
Dear all,

I am using the kafkaIO sdk in my project (Beam with Spark runner).

The problem about task skew is shown as the following figure.


My running environment is:
OS: Ubuntn 14.04.4 LTS
The version of related tools is:
java version: "1.8.0_151"
Beam version: 2.9.0 (Spark runner with Standalone mode)
Spark version: 2.3.1 Standalone mode
  Execution condition:
  Master/Driver node: ubuntu7
  Worker nodes: ubuntu8 (4 Executors); ubuntu9 (4 Executors)
The number of executors is 8

Kafka Broker: 2.10-0.10.1.1
  Broker node at ubuntu7
Kafka Client:
The topic: kafkasink32
kafkasink32 Partitions: 32

The programming of my project for kafkaIO SDK is as:
==
Map map = ImmutableMap.builder()
   .put("group.id<http://group.id/>", (Object)"test-consumer-group")
   .build();
List topicPartitions = new ArrayList();
   for(int i = 0; i < 32; i++) {
 topicPartitions.add(new TopicPartition("kafkasink32",i));
}
PCollection> readKafkaData = p.apply(KafkaIO.read()
 .withBootstrapServers("ubuntu7:9092")
   .updateConsumerProperties(map)
   .withKeyDeserializer(LongDeserializer.class)
   .withValueDeserializer(StringDeserializer.class)
   .withTopicPartitio

Re: kafkaIO Consumer Rebalance with Spark Runner

2019-01-29 Thread Alexey Romanenko
Rick, 

I think “spark.streaming.kafka.maxRatePerPartition” won’t work for you since, 
afaik, it’s a configuration option of Spark Kafka reader and Beam KafkaIO 
doesn’t use it (since it has own consumer implementation).
In the same time, if you want to set an option for Beam KafkaIO consumer config 
then you should use "updateConsumerProperties()” method.

> On 28 Jan 2019, at 10:56,   wrote:
> 
> Dear Raghu,
>  
> I add the line: “PCollection reshuffled = 
> windowKV.apply(Reshuffle.viaRandomKey());” in my program.
>  
> I tried to control the streaming data size: 100,000/1sec to decrease the 
> processing time.
>  
> The following settings are used for my project.
>  
> 1.  One topic / 2 partitions
> 
> 2.  Two workers / two executors
>  
> 3.  The spark-default setting is:
> spark.executor.instances=2
> spark.executor.cores=4
> spark.executor.memory=2048m
> spark.default.parallelism=200
>  
> spark.streaming.blockInterval=50ms
> spark.streaming.kafka.maxRatePerPartition=50,000
> spark.streaming.backpressure.enabled=true
> spark.streaming.concurrentJobs = 1
> spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC
> spark.executor.extraJavaOptions=-Xss100M
>  
> spark.shuffle.consolidateFiles=true
> spark.streaming.unpersist=true
> spark.streaming.stopGracefullyOnShutdown=true
>  
> I hope that the data size is controlled at 100,000.
>  
> Here,
> 
>  
> The data size is always over 100,000. The setting of 
> “spark.streaming.kafka.maxRatePerPartition” confused me.
>  
> That does not seem to work for me.
>  
> Rick
>  
> From: Raghu Angadi [mailto:ang...@gmail.com <mailto:ang...@gmail.com>] 
> Sent: Saturday, January 26, 2019 3:06 AM
> To: user@beam.apache.org <mailto:user@beam.apache.org>
> Subject: Re: kafkaIO Consumer Rebalance with Spark Runner
>  
> You have 32 partitions. Reading can not be distributed to more than 32 
> parallel tasks. 
> If you have a log of processing for each record after reading, you can 
> reshuffle the messages before processing them, that way the processing could 
> be distributed to more tasks. Search for previous threads about reshuffle in 
> Beam lists.
>  
> On Thu, Jan 24, 2019 at 7:23 PM  <mailto:linr...@itri.org.tw>> wrote:
> Dear all,
>  
> I am using the kafkaIO sdk in my project (Beam with Spark runner).
>  
> The problem about task skew is shown as the following figure.
> 
>  
> My running environment is:
> OS: Ubuntn 14.04.4 LTS
> The version of related tools is:
> java version: "1.8.0_151"
> Beam version: 2.9.0 (Spark runner with Standalone mode)
> Spark version: 2.3.1 Standalone mode
>   Execution condition:
>   Master/Driver node: ubuntu7
>   Worker nodes: ubuntu8 (4 Executors); ubuntu9 (4 Executors)
> The number of executors is 8
>  
> Kafka Broker: 2.10-0.10.1.1
>   Broker node at ubuntu7
> Kafka Client:
> The topic: kafkasink32
> kafkasink32 Partitions: 32
>  
> The programming of my project for kafkaIO SDK is as:
> ==
> Map map = ImmutableMap.builder()
>.put("group.id <http://group.id/>", (Object)"test-consumer-group")
>.build();
> List topicPartitions = new ArrayList();
>for(int i = 0; i < 32; i++) {
>  topicPartitions.add(new TopicPartition("kafkasink32",i));
> }
> PCollection> readKafkaData = p.apply(KafkaIO. String>read()
>  .withBootstrapServers("ubuntu7:9092")
>.updateConsumerProperties(map)
>.withKeyDeserializer(LongDeserializer.class)
>.withValueDeserializer(StringDeserializer.class)
>.withTopicPartitions(topicPartitions)
>.withoutMetadata()
>);
> ==
> Here I have two directions to solve this problem:
>  
> 1.  Using the following sdk from spark streaming
> 
> https://jaceklaskowski.gitbooks.io/spark-streaming/spark-streaming-kafka-LocationStrategy.html
>  
> <https://jaceklaskowski.gitbooks.io/spark-streaming/spark-streaming-kafka-LocationStrategy.html>
> LocationStrategies.PreferConsistent: Use in most cases as it consistently 
> distributes partitions across all executors.
>  
> If we would like to use this feature, we have not idea to set this in kafkaIO 
> SDK.
>  
> 2.  Setting the related configurations of kafka to perform the consumer 
> rebalance
> 
> set consumer group? Set group.id <http://group.id/>?
> 
>  
> 
> If we need to do No2., could someone give me some ideas to set c

Re: kafkaIO Consumer Rebalance with Spark Runner

2019-01-28 Thread Juan Carlos Garcia
Hi Rick,

You can limit your Spark processing by passing the following option to your
beam pipeline:

*MaxRecordsPerBatch*

see
https://beam.apache.org/releases/javadoc/2.9.0/org/apache/beam/runners/spark/SparkPipelineOptions.html#getMaxRecordsPerBatch--

Hope it helps.

JC




On Mon, Jan 28, 2019 at 10:57 AM  wrote:

> Dear Raghu,
>
>
>
> I add the line: “PCollection reshuffled =
> windowKV.apply(Reshuffle.viaRandomKey());” in my program.
>
>
>
> I tried to control the streaming data size: 100,000/1sec to decrease the
> processing time.
>
>
>
> The following settings are used for my project.
>
>
>
> 1.  One topic / 2 partitions
>
> 2.  Two workers / two executors
>
>
>
> 3.  The spark-default setting is:
>
> spark.executor.instances=2
>
> spark.executor.cores=4
>
> spark.executor.memory=2048m
>
> spark.default.parallelism=200
>
>
>
> spark.streaming.blockInterval=50ms
>
> spark.streaming.kafka.maxRatePerPartition=50,000
>
> spark.streaming.backpressure.enabled=true
>
> spark.streaming.concurrentJobs = 1
>
> spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC
>
> spark.executor.extraJavaOptions=-Xss100M
>
>
>
> spark.shuffle.consolidateFiles=true
>
> spark.streaming.unpersist=true
>
> spark.streaming.stopGracefullyOnShutdown=true
>
>
>
> I hope that the data size is controlled at 100,000.
>
>
>
> Here,
>
>
>
> The data size is always over 100,000. The setting of
> “spark.streaming.kafka.maxRatePerPartition” confused me.
>
>
>
> That does not seem to work for me.
>
>
>
> Rick
>
>
>
> *From:* Raghu Angadi [mailto:ang...@gmail.com]
> *Sent:* Saturday, January 26, 2019 3:06 AM
> *To:* user@beam.apache.org
> *Subject:* Re: kafkaIO Consumer Rebalance with Spark Runner
>
>
>
> You have 32 partitions. Reading can not be distributed to more than 32
> parallel tasks.
>
> If you have a log of processing for each record after reading, you can
> reshuffle the messages before processing them, that way the processing
> could be distributed to more tasks. Search for previous threads about
> reshuffle in Beam lists.
>
>
>
> On Thu, Jan 24, 2019 at 7:23 PM  wrote:
>
> Dear all,
>
>
>
> I am using the kafkaIO sdk in my project (Beam with Spark runner).
>
>
>
> The problem about task skew is shown as the following figure.
>
>
>
> My running environment is:
>
> OS: Ubuntn 14.04.4 LTS
>
> The version of related tools is:
>
> java version: "1.8.0_151"
>
> Beam version: 2.9.0 (Spark runner with Standalone mode)
>
> Spark version: 2.3.1 Standalone mode
>
> Execution condition:
>
> Master/Driver node: ubuntu7
>
> Worker nodes: ubuntu8 (4 Executors); ubuntu9 (4 Executors)
>
> The number of executors is 8
>
>
>
> Kafka Broker: 2.10-0.10.1.1
>
> Broker node at ubuntu7
>
> Kafka Client:
>
> The topic: kafkasink32
>
> kafkasink32 Partitions: 32
>
>
>
> The programming of my project for kafkaIO SDK is as:
>
>
> ==
>
> Map map = ImmutableMap.*builder*()
>
>.put("group.id", (Object)"test-consumer-group")
>
>.build();
>
> List topicPartitions = *new** ArrayList()*;
>
>*for*(*int* i = 0; i < 32; i++) {
>
>  topicPartitions.add(*new* TopicPartition(
> "kafkasink32",i));
>
> }
>
> PCollection> readKafkaData = p.apply(KafkaIO. String>*read*()
>
>  .withBootstrapServers("ubuntu7:9092")
>
>.updateConsumerProperties(map)
>
>.withKeyDeserializer(LongDeserializer.*class*)
>
>.withValueDeserializer(StringDeserializer.*class*)
>
>.withTopicPartitions(topicPartitions)
>
>.withoutMetadata()
>
>);
>
>
> ==
>
> Here I have two directions to solve this problem:
>
>
>
> 1.  Using the following sdk from spark streaming
>
>
> https://jaceklaskowski.gitbooks.io/spark-streaming/spark-streaming-kafka-LocationStrategy.html
>
> LocationStrategies.PreferConsistent: Use in most cases as it consistently
> distributes partitions across all executors.
>
>
>
> If we would like to use this feature, we have not idea to set this in
> kafkaIO SDK.
>
>
>
> 2.  Setting the related configurations of kafka to perform the
> consumer rebalance
>
> set consumer group? Set group.id?
>
>
>
> If we need to do No2., could someone give me some ideas to set
> configurations?
>
>
>
> If anyone provides any direction to help us to overcome this problem, we
> would appreciate it.
>
>
>
> Thanks.
>
>
>
> Sincerely yours,
>
>
>
> Rick
>
>
>
>
>
> --
> 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain
> confidential information. Please do not use or disclose it in any way and
> delete it if you are not the intended recipient.
>
>
>
> --
> 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain
> confidential information. Please do not use or disclose it in any way and
> delete it if you are not the intended recipient.
>


-- 

JC


RE: kafkaIO Consumer Rebalance with Spark Runner

2019-01-28 Thread linrick
Dear Raghu,

I add the line: “PCollection reshuffled = 
windowKV.apply(Reshuffle.viaRandomKey());” in my program.

I tried to control the streaming data size: 100,000/1sec to decrease the 
processing time.

The following settings are used for my project.


1.  One topic / 2 partitions
[cid:image004.jpg@01D4B732.D640B640]

2.  Two workers / two executors


3.  The spark-default setting is:
spark.executor.instances=2
spark.executor.cores=4
spark.executor.memory=2048m
spark.default.parallelism=200

spark.streaming.blockInterval=50ms
spark.streaming.kafka.maxRatePerPartition=50,000
spark.streaming.backpressure.enabled=true
spark.streaming.concurrentJobs = 1
spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC
spark.executor.extraJavaOptions=-Xss100M

spark.shuffle.consolidateFiles=true
spark.streaming.unpersist=true
spark.streaming.stopGracefullyOnShutdown=true

I hope that the data size is controlled at 100,000.

Here,
[cid:image005.jpg@01D4B732.D640B640]

The data size is always over 100,000. The setting of 
“spark.streaming.kafka.maxRatePerPartition” confused me.

That does not seem to work for me.

Rick

From: Raghu Angadi [mailto:ang...@gmail.com]
Sent: Saturday, January 26, 2019 3:06 AM
To: user@beam.apache.org
Subject: Re: kafkaIO Consumer Rebalance with Spark Runner

You have 32 partitions. Reading can not be distributed to more than 32 parallel 
tasks.
If you have a log of processing for each record after reading, you can 
reshuffle the messages before processing them, that way the processing could be 
distributed to more tasks. Search for previous threads about reshuffle in Beam 
lists.

On Thu, Jan 24, 2019 at 7:23 PM 
mailto:linr...@itri.org.tw>> wrote:
Dear all,

I am using the kafkaIO sdk in my project (Beam with Spark runner).

The problem about task skew is shown as the following figure.
[cid:image001.jpg@01D4B731.8E8469B0]

My running environment is:
OS: Ubuntn 14.04.4 LTS
The version of related tools is:
java version: "1.8.0_151"
Beam version: 2.9.0 (Spark runner with Standalone mode)
Spark version: 2.3.1 Standalone mode
  Execution condition:
  Master/Driver node: ubuntu7
  Worker nodes: ubuntu8 (4 Executors); ubuntu9 (4 Executors)
The number of executors is 8

Kafka Broker: 2.10-0.10.1.1
  Broker node at ubuntu7
Kafka Client:
The topic: kafkasink32
kafkasink32 Partitions: 32

The programming of my project for kafkaIO SDK is as:
==
Map map = ImmutableMap.builder()
   .put("group.id<http://group.id>", (Object)"test-consumer-group")
   .build();
List topicPartitions = new ArrayList();
   for(int i = 0; i < 32; i++) {
 topicPartitions.add(new TopicPartition("kafkasink32",i));
}
PCollection> readKafkaData = p.apply(KafkaIO.read()
 .withBootstrapServers("ubuntu7:9092")
   .updateConsumerProperties(map)
   .withKeyDeserializer(LongDeserializer.class)
   .withValueDeserializer(StringDeserializer.class)
   .withTopicPartitions(topicPartitions)
   .withoutMetadata()
   );
==
Here I have two directions to solve this problem:


1.  Using the following sdk from spark streaming
https://jaceklaskowski.gitbooks.io/spark-streaming/spark-streaming-kafka-LocationStrategy.html
LocationStrategies.PreferConsistent: Use in most cases as it consistently 
distributes partitions across all executors.

If we would like to use this feature, we have not idea to set this in kafkaIO 
SDK.


2.  Setting the related configurations of kafka to perform the consumer 
rebalance

set consumer group? Set group.id<http://group.id>?


If we need to do No2., could someone give me some ideas to set configurations?


If anyone provides any direction to help us to overcome this problem, we would 
appreciate it.


Thanks.

Sincerely yours,

Rick



--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.


Re: kafkaIO Consumer Rebalance with Spark Runner

2019-01-25 Thread Raghu Angadi
You have 32 partitions. Reading can not be distributed to more than 32
parallel tasks.
If you have a log of processing for each record after reading, you can
reshuffle the messages before processing them, that way the processing
could be distributed to more tasks. Search for previous threads about
reshuffle in Beam lists.

On Thu, Jan 24, 2019 at 7:23 PM  wrote:

> Dear all,
>
>
>
> I am using the kafkaIO sdk in my project (Beam with Spark runner).
>
>
>
> The problem about task skew is shown as the following figure.
>
>
>
> My running environment is:
>
> OS: Ubuntn 14.04.4 LTS
>
> The version of related tools is:
>
> java version: "1.8.0_151"
>
> Beam version: 2.9.0 (Spark runner with Standalone mode)
>
> Spark version: 2.3.1 Standalone mode
>
> Execution condition:
>
> Master/Driver node: ubuntu7
>
> Worker nodes: ubuntu8 (4 Executors); ubuntu9 (4 Executors)
>
> The number of executors is 8
>
>
>
> Kafka Broker: 2.10-0.10.1.1
>
> Broker node at ubuntu7
>
> Kafka Client:
>
> The topic: kafkasink32
>
> kafkasink32 Partitions: 32
>
>
>
> The programming of my project for kafkaIO SDK is as:
>
>
> ==
>
> Map map = ImmutableMap.*builder*()
>
>.put("group.id", (Object)"test-consumer-group")
>
>.build();
>
> List topicPartitions = *new** ArrayList()*;
>
>*for*(*int* i = 0; i < 32; i++) {
>
>  topicPartitions.add(*new* TopicPartition(
> "kafkasink32",i));
>
> }
>
> PCollection> readKafkaData = p.apply(KafkaIO. String>*read*()
>
>  .withBootstrapServers("ubuntu7:9092")
>
>.updateConsumerProperties(map)
>
>.withKeyDeserializer(LongDeserializer.*class*)
>
>.withValueDeserializer(StringDeserializer.*class*)
>
>.withTopicPartitions(topicPartitions)
>
>.withoutMetadata()
>
>);
>
>
> ==
>
> Here I have two directions to solve this problem:
>
>
>
> 1.  Using the following sdk from spark streaming
>
>
> https://jaceklaskowski.gitbooks.io/spark-streaming/spark-streaming-kafka-LocationStrategy.html
>
> LocationStrategies.PreferConsistent: Use in most cases as it consistently
> distributes partitions across all executors.
>
>
>
> If we would like to use this feature, we have not idea to set this in
> kafkaIO SDK.
>
>
>
> 2.  Setting the related configurations of kafka to perform the
> consumer rebalance
>
> set consumer group? Set group.id?
>
>
>
> If we need to do No2., could someone give me some ideas to set
> configurations?
>
>
>
> If anyone provides any direction to help us to overcome this problem, we
> would appreciate it.
>
>
>
> Thanks.
>
>
>
> Sincerely yours,
>
>
>
> Rick
>
>
>
>
> --
> 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain
> confidential information. Please do not use or disclose it in any way and
> delete it if you are not the intended recipient.
>


kafkaIO Consumer Rebalance with Spark Runner

2019-01-24 Thread linrick
Dear all,

I am using the kafkaIO sdk in my project (Beam with Spark runner).

The problem about task skew is shown as the following figure.
[cid:image002.jpg@01D4B4A0.6017B340]

My running environment is:
OS: Ubuntn 14.04.4 LTS
The version of related tools is:
java version: "1.8.0_151"
Beam version: 2.9.0 (Spark runner with Standalone mode)
Spark version: 2.3.1 Standalone mode
  Execution condition:
  Master/Driver node: ubuntu7
  Worker nodes: ubuntu8 (4 Executors); ubuntu9 (4 Executors)
The number of executors is 8

Kafka Broker: 2.10-0.10.1.1
  Broker node at ubuntu7
Kafka Client:
The topic: kafkasink32
kafkasink32 Partitions: 32

The programming of my project for kafkaIO SDK is as:
==
Map map = ImmutableMap.builder()
   .put("group.id", (Object)"test-consumer-group")
   .build();
List topicPartitions = new ArrayList();
   for(int i = 0; i < 32; i++) {
 topicPartitions.add(new TopicPartition("kafkasink32",i));
}
PCollection> readKafkaData = p.apply(KafkaIO.read()
 .withBootstrapServers("ubuntu7:9092")
   .updateConsumerProperties(map)
   .withKeyDeserializer(LongDeserializer.class)
   .withValueDeserializer(StringDeserializer.class)
   .withTopicPartitions(topicPartitions)
   .withoutMetadata()
   );
==
Here I have two directions to solve this problem:


1.  Using the following sdk from spark streaming
https://jaceklaskowski.gitbooks.io/spark-streaming/spark-streaming-kafka-LocationStrategy.html
LocationStrategies.PreferConsistent: Use in most cases as it consistently 
distributes partitions across all executors.

If we would like to use this feature, we have not idea to set this in kafkaIO 
SDK.


2.  Setting the related configurations of kafka to perform the consumer 
rebalance

set consumer group? Set group.id?


If we need to do No2., could someone give me some ideas to set configurations?


If anyone provides any direction to help us to overcome this problem, we would 
appreciate it.


Thanks.

Sincerely yours,

Rick



--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.