Hi Miguel,

I suspect it's due to the timestamps in your topic A, which are earlier
than topic B. Note that Kafka Streams tries to synchronize joining topics
by processing records with smaller timestamps, and hence if topic A's
messages have smaller timestamps, they will be selected over the other.

The reason why through a repartition topic alleviates the problem is that,
the first topology would reset the timestamp on the repartition topics, to
some value more close to the processing time and is closer to topic B's
messages' timestamps.


Guozhang

On Mon, Jan 10, 2022 at 10:05 AM Miguel González <miguel.gonza...@klar.mx>
wrote:

> Hello
>
> We are consuming two topics (A and B) and joining them, but I have noticed
> no matter what I do, topic A gets consumed first in a batch and then topic
> B , increasing *num.stream.threads* will only get topic A process a lot of
> records faster. Topic B has lots of messages compared to Topic A
>
>
> Here are my settings:
>
> Map<String, Object> config = new HashMap<>();
> config.put(StreamsConfig.APPLICATION_ID_CONFIG, streamingAppName);
> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
> config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> Serdes.StringSerde.class);
> config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
> StreamsConfig.EXACTLY_ONCE);
> config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
> config.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO");
> config.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
>
>
> For now we tried splitting the topology into two topologies to have one
> thread for each of the topics that somewhat has alleviated the problem,
>
> For the first topology with the topic A we have updated the settings like
> so to try to limit the amount of messages fetched, and since we do a
> repartition the second topology will join with that repartition topic
>
> properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,
> streamingAppName + "-reKey-app");
> properties.setProperty(consumerPrefix(MAX_POLL_RECORDS_CONFIG), "20");
> properties.setProperty(consumerPrefix(FETCH_MAX_BYTES_CONFIG),
> Integer.toString(30 * 1024));
>
>
>
> Is there a better way?
>
> I was encouraged to use *max.task.idle.ms <http://max.task.idle.ms>* to
> avoid one topic's processing going much faster than the other, but I'm not
> sure if that will help with my issue.
>
>
> I'm not really sure about StreamThreads and Tasks and how they work.
>
> Any help is appreciated.
>


-- 
-- Guozhang

Reply via email to