[jira] [Updated] (CAMEL-13768) Seek to specific offset and KafkaConsumer access
[ https://issues.apache.org/jira/browse/CAMEL-13768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Otavio Rodolfo Piske updated CAMEL-13768: - Affects Version/s: 3.x > Seek to specific offset and KafkaConsumer access > - > > Key: CAMEL-13768 > URL: https://issues.apache.org/jira/browse/CAMEL-13768 > Project: Camel > Issue Type: New Feature > Components: camel-kafka >Affects Versions: 2.24.1, 3.x >Reporter: michael elbaz >Assignee: Otavio Rodolfo Piske >Priority: Major > Fix For: 3.12.0 > > > 1. Provide a way to rewind kafka offset to specific offset (improve seekTo ?) > there is no way to do that using camel-kafka component. The main idea is to > replay older kafka messages without starting from the beginning. >for example: > https://blog.sysco.no/integration/kafka-rewind-consumers-offset/ > {code:java} > boolean flag = true; > while (true) { > ConsumerRecords records = consumer.poll(100); > if(flag) { > Map query = new HashMap<>(); > query.put( > new TopicPartition("simple-topic-1", 0), > Instant.now().minus(10, MINUTES).toEpochMilli()); > // Get offset from timestamp > Map result = > consumer.offsetsForTimes(query); > // Rewind offset to previous position using seekTo > result.entrySet() > .stream() > .forEach(entry -> consumer.seek(entry.getKey(), > entry.getValue().offset())); > flag = false; > } > for (ConsumerRecord record : records) > System.out.printf("offset = %d, key = %s, value = %s%n", > record.offset(), record.key(), record.value()); > } > {code} > 2. Provide a way to access kafkaConsumer >Add camel header with reference to kafkaConsumer to be able to perform > some Kafka api call.We can use the same way that we do with KafkaManualCommit > {code:java} > public void process(Exchange exchange) { > KafkaManualCommit manual = > exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, > KafkaManualCommit.class); > manual.commitSync(); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (CAMEL-13768) Seek to specific offset and KafkaConsumer access
[ https://issues.apache.org/jira/browse/CAMEL-13768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Otavio Rodolfo Piske updated CAMEL-13768: - Fix Version/s: (was: 3.x) 3.12.0 > Seek to specific offset and KafkaConsumer access > - > > Key: CAMEL-13768 > URL: https://issues.apache.org/jira/browse/CAMEL-13768 > Project: Camel > Issue Type: New Feature > Components: camel-kafka >Affects Versions: 2.24.1 >Reporter: michael elbaz >Assignee: Otavio Rodolfo Piske >Priority: Major > Fix For: 3.12.0 > > > 1. Provide a way to rewind kafka offset to specific offset (improve seekTo ?) > there is no way to do that using camel-kafka component. The main idea is to > replay older kafka messages without starting from the beginning. >for example: > https://blog.sysco.no/integration/kafka-rewind-consumers-offset/ > {code:java} > boolean flag = true; > while (true) { > ConsumerRecords records = consumer.poll(100); > if(flag) { > Map query = new HashMap<>(); > query.put( > new TopicPartition("simple-topic-1", 0), > Instant.now().minus(10, MINUTES).toEpochMilli()); > // Get offset from timestamp > Map result = > consumer.offsetsForTimes(query); > // Rewind offset to previous position using seekTo > result.entrySet() > .stream() > .forEach(entry -> consumer.seek(entry.getKey(), > entry.getValue().offset())); > flag = false; > } > for (ConsumerRecord record : records) > System.out.printf("offset = %d, key = %s, value = %s%n", > record.offset(), record.key(), record.value()); > } > {code} > 2. Provide a way to access kafkaConsumer >Add camel header with reference to kafkaConsumer to be able to perform > some Kafka api call.We can use the same way that we do with KafkaManualCommit > {code:java} > public void process(Exchange exchange) { > KafkaManualCommit manual = > exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, > KafkaManualCommit.class); > manual.commitSync(); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (CAMEL-13768) Seek to specific offset and KafkaConsumer access
[ https://issues.apache.org/jira/browse/CAMEL-13768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Claus Ibsen updated CAMEL-13768: Fix Version/s: 3.x > Seek to specific offset and KafkaConsumer access > - > > Key: CAMEL-13768 > URL: https://issues.apache.org/jira/browse/CAMEL-13768 > Project: Camel > Issue Type: New Feature > Components: camel-kafka >Affects Versions: 2.24.1 >Reporter: michael elbaz >Priority: Minor > Fix For: 3.x > > > 1. Provide a way to rewind kafka offset to specific offset (improve seekTo ?) > there is no way to do that using camel-kafka component. The main idea is to > replay older kafka messages without starting from the beginning. >for example: > https://blog.sysco.no/integration/kafka-rewind-consumers-offset/ > {code:java} > boolean flag = true; > while (true) { > ConsumerRecords records = consumer.poll(100); > if(flag) { > Map query = new HashMap<>(); > query.put( > new TopicPartition("simple-topic-1", 0), > Instant.now().minus(10, MINUTES).toEpochMilli()); > // Get offset from timestamp > Map result = > consumer.offsetsForTimes(query); > // Rewind offset to previous position using seekTo > result.entrySet() > .stream() > .forEach(entry -> consumer.seek(entry.getKey(), > entry.getValue().offset())); > flag = false; > } > for (ConsumerRecord record : records) > System.out.printf("offset = %d, key = %s, value = %s%n", > record.offset(), record.key(), record.value()); > } > {code} > 2. Provide a way to access kafkaConsumer >Add camel header with reference to kafkaConsumer to be able to perform > some Kafka api call.We can use the same way that we do with KafkaManualCommit > {code:java} > public void process(Exchange exchange) { > KafkaManualCommit manual = > exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, > KafkaManualCommit.class); > manual.commitSync(); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (CAMEL-13768) Seek to specific offset and KafkaConsumer access
[ https://issues.apache.org/jira/browse/CAMEL-13768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Claus Ibsen updated CAMEL-13768: Priority: Major (was: Minor) > Seek to specific offset and KafkaConsumer access > - > > Key: CAMEL-13768 > URL: https://issues.apache.org/jira/browse/CAMEL-13768 > Project: Camel > Issue Type: New Feature > Components: camel-kafka >Affects Versions: 2.24.1 >Reporter: michael elbaz >Priority: Major > Fix For: 3.x > > > 1. Provide a way to rewind kafka offset to specific offset (improve seekTo ?) > there is no way to do that using camel-kafka component. The main idea is to > replay older kafka messages without starting from the beginning. >for example: > https://blog.sysco.no/integration/kafka-rewind-consumers-offset/ > {code:java} > boolean flag = true; > while (true) { > ConsumerRecords records = consumer.poll(100); > if(flag) { > Map query = new HashMap<>(); > query.put( > new TopicPartition("simple-topic-1", 0), > Instant.now().minus(10, MINUTES).toEpochMilli()); > // Get offset from timestamp > Map result = > consumer.offsetsForTimes(query); > // Rewind offset to previous position using seekTo > result.entrySet() > .stream() > .forEach(entry -> consumer.seek(entry.getKey(), > entry.getValue().offset())); > flag = false; > } > for (ConsumerRecord record : records) > System.out.printf("offset = %d, key = %s, value = %s%n", > record.offset(), record.key(), record.value()); > } > {code} > 2. Provide a way to access kafkaConsumer >Add camel header with reference to kafkaConsumer to be able to perform > some Kafka api call.We can use the same way that we do with KafkaManualCommit > {code:java} > public void process(Exchange exchange) { > KafkaManualCommit manual = > exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, > KafkaManualCommit.class); > manual.commitSync(); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (CAMEL-13768) Seek to specific offset and KafkaConsumer access
[ https://issues.apache.org/jira/browse/CAMEL-13768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] michael elbaz updated CAMEL-13768: -- Summary: Seek to specific offset and KafkaConsumer access (was: SeekTo specific offset and KafkaConsumer access ) > Seek to specific offset and KafkaConsumer access > - > > Key: CAMEL-13768 > URL: https://issues.apache.org/jira/browse/CAMEL-13768 > Project: Camel > Issue Type: New Feature > Components: camel-kafka >Affects Versions: 2.24.1 >Reporter: michael elbaz >Priority: Minor > > 1. Provide a way to rewind kafka offset to specific offset (improve seekTo ?) > there is no way to do that using camel-kafka component. The main idea is to > replay older kafka messages without starting from the beginning. >for example: > https://blog.sysco.no/integration/kafka-rewind-consumers-offset/ > {code:java} > boolean flag = true; > while (true) { > ConsumerRecords records = consumer.poll(100); > if(flag) { > Map query = new HashMap<>(); > query.put( > new TopicPartition("simple-topic-1", 0), > Instant.now().minus(10, MINUTES).toEpochMilli()); > // Get offset from timestamp > Map result = > consumer.offsetsForTimes(query); > // Rewind offset to previous position using seekTo > result.entrySet() > .stream() > .forEach(entry -> consumer.seek(entry.getKey(), > entry.getValue().offset())); > flag = false; > } > for (ConsumerRecord record : records) > System.out.printf("offset = %d, key = %s, value = %s%n", > record.offset(), record.key(), record.value()); > } > {code} > 2. Provide a way to access kafkaConsumer >Add camel header with reference to kafkaConsumer to be able to perform > some Kafka api call.We can use the same way that we do with KafkaManualCommit > {code:java} > public void process(Exchange exchange) { > KafkaManualCommit manual = > exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, > KafkaManualCommit.class); > manual.commitSync(); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)