Hello

We are consuming two topics (A and B) and joining them, but I have noticed
no matter what I do, topic A gets consumed first in a batch and then topic
B , increasing *num.stream.threads* will only get topic A process a lot of
records faster. Topic B has lots of messages compared to Topic A


Here are my settings:

Map<String, Object> config = new HashMap<>();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, streamingAppName);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE);
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
config.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO");
config.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);


For now we tried splitting the topology into two topologies to have one
thread for each of the topics that somewhat has alleviated the problem,

For the first topology with the topic A we have updated the settings like
so to try to limit the amount of messages fetched, and since we do a
repartition the second topology will join with that repartition topic

properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,
streamingAppName + "-reKey-app");
properties.setProperty(consumerPrefix(MAX_POLL_RECORDS_CONFIG), "20");
properties.setProperty(consumerPrefix(FETCH_MAX_BYTES_CONFIG),
Integer.toString(30 * 1024));



Is there a better way?

I was encouraged to use *max.task.idle.ms <http://max.task.idle.ms>* to
avoid one topic's processing going much faster than the other, but I'm not
sure if that will help with my issue.


I'm not really sure about StreamThreads and Tasks and how they work.

Any help is appreciated.

Reply via email to