This is an automated email from the ASF dual-hosted git repository.
valdar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new de36a0a Some folowup improvements fro #969 and #202.
de36a0a is described below
commit de36a0a2a63ffd947b19bcf3951e782105c7fc5e
Author: Andrea Tarocchi <[email protected]>
AuthorDate: Mon Mar 8 15:57:09 2021 +0100
Some folowup improvements fro #969 and #202.
---
.../camel/kafkaconnector/CamelSourceTask.java | 24 ++++++++++++----------
1 file changed, 13 insertions(+), 11 deletions(-)
diff --git
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 51b055d..00ce145 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -16,7 +16,6 @@
*/
package org.apache.camel.kafkaconnector;
-import java.io.IOException;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.ArrayList;
@@ -219,11 +218,6 @@ public class CamelSourceTask extends SourceTask {
StreamCache sc = (StreamCache) messageBodyValue;
// reset to be sure that the cache is ready to be used before
sending it in the record (could be useful for SMTs)
sc.reset();
- try {
- messageBodyValue = sc.copy(exchange);
- } catch (IOException e) {
- e.printStackTrace();
- }
}
for (String singleTopic : topics) {
CamelSourceRecord camelRecord = new
CamelSourceRecord(sourcePartition, sourceOffset, singleTopic, null,
messageKeySchema,
@@ -256,15 +250,23 @@ public class CamelSourceTask extends SourceTask {
}
@Override
- public void commitRecord(SourceRecord record, RecordMetadata metadata)
throws InterruptedException {
+ public void commitRecord(SourceRecord record, RecordMetadata metadata) {
+ LOG.debug("Committing record: {} with metadata: {}", record, metadata);
///XXX: this should be a safe cast please see:
https://issues.apache.org/jira/browse/KAFKA-12391
Integer claimCheck = ((CamelSourceRecord)record).getClaimCheck();
LOG.debug("Committing record with claim check number: {}", claimCheck);
Exchange correlatedExchange = exchangesWaitingForAck[claimCheck];
- exchangesWaitingForAck[claimCheck] = null;
- freeSlots.add(claimCheck);
- UnitOfWorkHelper.doneSynchronizations(correlatedExchange,
correlatedExchange.adapt(ExtendedExchange.class).handoverCompletions(), LOG);
- LOG.debug("Record with claim check number: {} committed.", claimCheck);
+ try {
+ UnitOfWorkHelper.doneSynchronizations(correlatedExchange,
correlatedExchange.adapt(ExtendedExchange.class).handoverCompletions(), LOG);
+ LOG.debug("Record with claim check number: {} committed.",
claimCheck);
+ } catch (Throwable t) {
+ LOG.error("Exception during Unit Of Work completion: {} caused by:
{}", t.getMessage(), t.getCause());
+ throw new RuntimeException(t);
+ } finally {
+ exchangesWaitingForAck[claimCheck] = null;
+ freeSlots.add(claimCheck);
+ LOG.debug("Claim check number: {} freed.", claimCheck);
+ }
}
@Override