[ https://issues.apache.org/jira/browse/IGNITE-16586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Nikolay Izhikov reassigned IGNITE-16586: ---------------------------------------- Assignee: (was: Mikhail Petrov) > Provide named parameters for Cdc streamers > ------------------------------------------ > > Key: IGNITE-16586 > URL: https://issues.apache.org/jira/browse/IGNITE-16586 > Project: Ignite > Issue Type: Improvement > Reporter: Ilya Shishkov > Priority: Minor > Labels: IEP-59, ise > > Now KafkaToIgniteCdcStreamerApplier[1] and IgniteToKafkaCdcStreamer[2] > perform requests with a hard-coded timeout equal to {{DFLT_REQ_TIMEOUT}}: > {code:title=KafkaToIgniteCdcStreamerApplier} > /** */ > public static final int DFLT_REQ_TIMEOUT = 3; > ... > private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws > IgniteCheckedException { > ConsumerRecords<Integer, byte[]> recs = > cnsmr.poll(Duration.ofSeconds(DFLT_REQ_TIMEOUT)); > if (log.isDebugEnabled()) { > log.debug( > "Polled from consumer [assignments=" + cnsmr.assignment() + > ",rcvdEvts=" + rcvdEvts.addAndGet(recs.count()) + ']' > ); > } > apply(F.iterator(recs, this::deserialize, true, rec -> > F.isEmpty(caches) || caches.contains(rec.key()))); > cnsmr.commitSync(Duration.ofSeconds(DFLT_REQ_TIMEOUT)); > } > {code} > {code:title=IgniteToKafkaCdcStreamer} > /** Default kafka request timeout in seconds. */ > public static final int DFLT_REQ_TIMEOUT = 5; > > ... > @Override public boolean onEvents(Iterator<CdcEvent> evts) { > List<Future<RecordMetadata>> futs = new ArrayList<>(); > ... > if (!futs.isEmpty()) { > try { > for (Future<RecordMetadata> fut : futs) > fut.get(DFLT_REQ_TIMEOUT, TimeUnit.SECONDS); > msgsSnt.add(futs.size()); > lastMsgTs.value(System.currentTimeMillis()); > } > {code} > We should have configurable timeout for requests to the Kafka. > # > https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java#L203 > # > https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java#L197 -- This message was sent by Atlassian Jira (v8.20.1#820001)