[ 
https://issues.apache.org/jira/browse/IGNITE-16586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikolay Izhikov reassigned IGNITE-16586:
----------------------------------------

    Assignee:     (was: Mikhail Petrov)

> Provide named parameters for Cdc streamers
> ------------------------------------------
>
>                 Key: IGNITE-16586
>                 URL: https://issues.apache.org/jira/browse/IGNITE-16586
>             Project: Ignite
>          Issue Type: Improvement
>            Reporter: Ilya Shishkov
>            Priority: Minor
>              Labels: IEP-59, ise
>
> Now KafkaToIgniteCdcStreamerApplier[1] and IgniteToKafkaCdcStreamer[2] 
> perform requests with a hard-coded timeout equal to {{DFLT_REQ_TIMEOUT}}:
> {code:title=KafkaToIgniteCdcStreamerApplier}
>     /** */
>     public static final int DFLT_REQ_TIMEOUT = 3;
>     ...
>     private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws 
> IgniteCheckedException {
>         ConsumerRecords<Integer, byte[]> recs = 
> cnsmr.poll(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
>         if (log.isDebugEnabled()) {
>             log.debug(
>                 "Polled from consumer [assignments=" + cnsmr.assignment() + 
> ",rcvdEvts=" + rcvdEvts.addAndGet(recs.count()) + ']'
>             );
>         }
>         apply(F.iterator(recs, this::deserialize, true, rec -> 
> F.isEmpty(caches) || caches.contains(rec.key())));
>         cnsmr.commitSync(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
>     }
> {code}
> {code:title=IgniteToKafkaCdcStreamer}
>     /** Default kafka request timeout in seconds. */
>     public static final int DFLT_REQ_TIMEOUT = 5;
>     
>     ...
>     @Override public boolean onEvents(Iterator<CdcEvent> evts) {
>         List<Future<RecordMetadata>> futs = new ArrayList<>();
>     ...
>         if (!futs.isEmpty()) {
>             try {
>                 for (Future<RecordMetadata> fut : futs)
>                     fut.get(DFLT_REQ_TIMEOUT, TimeUnit.SECONDS);
>                 msgsSnt.add(futs.size());
>                 lastMsgTs.value(System.currentTimeMillis());
>             }
> {code}
> We should have configurable timeout for requests to the Kafka.
> # 
> https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java#L203
> # 
> https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java#L197



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to