[ 
https://issues.apache.org/jira/browse/IGNITE-16176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17495565#comment-17495565
 ] 

Mikhail Petrov commented on IGNITE-16176:
-----------------------------------------

TC Run-All extensions - 
https://ci.ignite.apache.org/project.html?projectId=IgniteExtensions&branch_IgniteExtensions=pull%2F99%2F

> Configurable request timeouts in KafkaToIgniteCdcStreamerApplier and 
> IgniteToKafkaCdcStreamer
> ---------------------------------------------------------------------------------------------
>
>                 Key: IGNITE-16176
>                 URL: https://issues.apache.org/jira/browse/IGNITE-16176
>             Project: Ignite
>          Issue Type: Improvement
>            Reporter: Ilya Shishkov
>            Assignee: Mikhail Petrov
>            Priority: Minor
>              Labels: IEP-59, ise
>
> Now KafkaToIgniteCdcStreamerApplier[1] and IgniteToKafkaCdcStreamer[2] 
> perform requests with a hard-coded timeout equal to {{DFLT_REQ_TIMEOUT}}:
> {code:title=KafkaToIgniteCdcStreamerApplier}
>     /** */
>     public static final int DFLT_REQ_TIMEOUT = 3;
>     ...
>     private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws 
> IgniteCheckedException {
>         ConsumerRecords<Integer, byte[]> recs = 
> cnsmr.poll(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
>         if (log.isDebugEnabled()) {
>             log.debug(
>                 "Polled from consumer [assignments=" + cnsmr.assignment() + 
> ",rcvdEvts=" + rcvdEvts.addAndGet(recs.count()) + ']'
>             );
>         }
>         apply(F.iterator(recs, this::deserialize, true, rec -> 
> F.isEmpty(caches) || caches.contains(rec.key())));
>         cnsmr.commitSync(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
>     }
> {code}
> {code:title=IgniteToKafkaCdcStreamer}
>     /** Default kafka request timeout in seconds. */
>     public static final int DFLT_REQ_TIMEOUT = 5;
>     
>     ...
>     @Override public boolean onEvents(Iterator<CdcEvent> evts) {
>         List<Future<RecordMetadata>> futs = new ArrayList<>();
>     ...
>         if (!futs.isEmpty()) {
>             try {
>                 for (Future<RecordMetadata> fut : futs)
>                     fut.get(DFLT_REQ_TIMEOUT, TimeUnit.SECONDS);
>                 msgsSnt.add(futs.size());
>                 lastMsgTs.value(System.currentTimeMillis());
>             }
> {code}
> We should have configurable timeout for requests to the Kafka.
> # 
> https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java#L203
> # 
> https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java#L197



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to