[ https://issues.apache.org/jira/browse/IGNITE-16176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17495565#comment-17495565 ]
Mikhail Petrov commented on IGNITE-16176: ----------------------------------------- TC Run-All extensions - https://ci.ignite.apache.org/project.html?projectId=IgniteExtensions&branch_IgniteExtensions=pull%2F99%2F > Configurable request timeouts in KafkaToIgniteCdcStreamerApplier and > IgniteToKafkaCdcStreamer > --------------------------------------------------------------------------------------------- > > Key: IGNITE-16176 > URL: https://issues.apache.org/jira/browse/IGNITE-16176 > Project: Ignite > Issue Type: Improvement > Reporter: Ilya Shishkov > Assignee: Mikhail Petrov > Priority: Minor > Labels: IEP-59, ise > > Now KafkaToIgniteCdcStreamerApplier[1] and IgniteToKafkaCdcStreamer[2] > perform requests with a hard-coded timeout equal to {{DFLT_REQ_TIMEOUT}}: > {code:title=KafkaToIgniteCdcStreamerApplier} > /** */ > public static final int DFLT_REQ_TIMEOUT = 3; > ... > private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws > IgniteCheckedException { > ConsumerRecords<Integer, byte[]> recs = > cnsmr.poll(Duration.ofSeconds(DFLT_REQ_TIMEOUT)); > if (log.isDebugEnabled()) { > log.debug( > "Polled from consumer [assignments=" + cnsmr.assignment() + > ",rcvdEvts=" + rcvdEvts.addAndGet(recs.count()) + ']' > ); > } > apply(F.iterator(recs, this::deserialize, true, rec -> > F.isEmpty(caches) || caches.contains(rec.key()))); > cnsmr.commitSync(Duration.ofSeconds(DFLT_REQ_TIMEOUT)); > } > {code} > {code:title=IgniteToKafkaCdcStreamer} > /** Default kafka request timeout in seconds. */ > public static final int DFLT_REQ_TIMEOUT = 5; > > ... > @Override public boolean onEvents(Iterator<CdcEvent> evts) { > List<Future<RecordMetadata>> futs = new ArrayList<>(); > ... > if (!futs.isEmpty()) { > try { > for (Future<RecordMetadata> fut : futs) > fut.get(DFLT_REQ_TIMEOUT, TimeUnit.SECONDS); > msgsSnt.add(futs.size()); > lastMsgTs.value(System.currentTimeMillis()); > } > {code} > We should have configurable timeout for requests to the Kafka. > # > https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java#L203 > # > https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java#L197 -- This message was sent by Atlassian Jira (v8.20.1#820001)