This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9fb24648e1 [INLONG-11892][Sort] SortStandalone support Negative
Acknowledgment mechanism for delivery failures (#11896)
9fb24648e1 is described below
commit 9fb24648e12d1983f15db94fd2302d8db8a9fea4
Author: ChunLiang Lu <[email protected]>
AuthorDate: Wed Jun 18 11:29:04 2025 +0800
[INLONG-11892][Sort] SortStandalone support Negative Acknowledgment
mechanism for delivery failures (#11896)
* [INLONG-11892][Sort] SortStandalone support Negative Acknowledgment
mechanism for delivery failures
* set parameter
---
.../org/apache/inlong/sdk/sort/api/SortClient.java | 3 ++
.../inlong/sdk/sort/api/SortClientConfig.java | 18 ++++++++
.../apache/inlong/sdk/sort/api/TopicFetcher.java | 7 +++
.../fetcher/kafka/KafkaMultiTopicsFetcher.java | 12 ++++++
.../fetcher/kafka/KafkaSingleTopicFetcher.java | 12 ++++++
.../fetcher/pulsar/PulsarMultiTopicsFetcher.java | 12 ++++++
.../fetcher/pulsar/PulsarSingleTopicFetcher.java | 50 ++++++++++++++++++++++
.../sort/fetcher/tube/TubeSingleTopicFetcher.java | 12 ++++++
.../inlong/sdk/sort/impl/SortClientImpl.java | 15 +++++++
.../inlong/sdk/sort/impl/SortClientImplV2.java | 15 +++++++
.../config/holder/CommonPropertiesHolder.java | 28 ++++++++++++
.../standalone/channel/CacheMessageRecord.java | 13 ++++++
.../sort/standalone/channel/ProfileEvent.java | 37 ++++++++++++++++
.../sink/elasticsearch/EsCallbackListener.java | 16 +++++--
.../sink/kafka/KafkaProducerCluster.java | 15 +++++--
.../standalone/source/sortsdk/SortSdkSource.java | 1 +
16 files changed, 260 insertions(+), 6 deletions(-)
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClient.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClient.java
index cb3af2f95d..3161fd4101 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClient.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClient.java
@@ -27,4 +27,7 @@ public abstract class SortClient {
public abstract boolean close();
public abstract SortClientConfig getConfig();
+
+ public abstract void negativeAck(String msgKey, String msgOffset)
+ throws Exception;
}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
index a354b1aced..cfbef57fbd 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
@@ -78,6 +78,8 @@ public class SortClientConfig implements Serializable {
private int threadPoolSize = 50;
+ private int sendFailPauseConsumerMinutes = 10;
+
public SortClientConfig(
String sortTaskId,
String sortClusterName,
@@ -547,4 +549,20 @@ public class SortClientConfig implements Serializable {
return subset;
}
+ /**
+ * get sendFailPauseConsumerMinutes
+ * @return the sendFailPauseConsumerMinutes
+ */
+ public int getSendFailPauseConsumerMinutes() {
+ return sendFailPauseConsumerMinutes;
+ }
+
+ /**
+ * set sendFailPauseConsumerMinutes
+ * @param sendFailPauseConsumerMinutes the sendFailPauseConsumerMinutes to
set
+ */
+ public void setSendFailPauseConsumerMinutes(int
sendFailPauseConsumerMinutes) {
+ this.sendFailPauseConsumerMinutes = sendFailPauseConsumerMinutes;
+ }
+
}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcher.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcher.java
index 682e678de0..8a762289df 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcher.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcher.java
@@ -108,4 +108,11 @@ public interface TopicFetcher {
* @return The result of update.
*/
boolean updateTopics(List<InLongTopic> topics);
+
+ /**
+ * NegativeAck message by the given msgOffset.
+ * @param msgOffset Offset of message.
+ * @throws Exception
+ */
+ void negativeAck(String msgOffset) throws Exception;
}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java
index a24c270389..48aa9f9d4e 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java
@@ -395,4 +395,16 @@ public class KafkaMultiTopicsFetcher extends
MultiTopicsFetcher {
}
}
}
+
+ /**
+ * negativeAck Offset
+ *
+ * @param msgOffset String
+ */
+ @Override
+ public void negativeAck(String msgOffset) throws Exception {
+ this.sleepTime =
TimeUnit.MILLISECONDS.convert(context.getConfig().getSendFailPauseConsumerMinutes(),
+ TimeUnit.MINUTES);
+ LOGGER.error("negativeAck,sleep {} minutes.", this.sleepTime);
+ }
}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
index 1c081a64ef..9ab2a0ebf8 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
@@ -310,4 +310,16 @@ public class KafkaSingleTopicFetcher extends
SingleTopicFetcher {
}
}
}
+
+ /**
+ * negativeAck Offset
+ *
+ * @param msgOffset String
+ */
+ @Override
+ public void negativeAck(String msgOffset) throws Exception {
+ this.sleepTime =
TimeUnit.MILLISECONDS.convert(context.getConfig().getSendFailPauseConsumerMinutes(),
+ TimeUnit.MINUTES);
+ LOGGER.error("negativeAck,topic:{}, sleep {} minutes.",
this.topic.getTopicKey(), this.sleepTime);
+ }
}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java
index a346591e18..30601978e4 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java
@@ -424,4 +424,16 @@ public class PulsarMultiTopicsFetcher extends
MultiTopicsFetcher {
}
}
}
+
+ /**
+ * negativeAck Offset
+ *
+ * @param msgOffset String
+ */
+ @Override
+ public void negativeAck(String msgOffset) throws Exception {
+ this.sleepTime =
TimeUnit.MILLISECONDS.convert(context.getConfig().getSendFailPauseConsumerMinutes(),
+ TimeUnit.MINUTES);
+ LOGGER.error("negativeAck,sleep {} minutes.", this.sleepTime);
+ }
}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
index 91a4dcf5f1..6d7b8ab1e1 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
@@ -261,6 +261,7 @@ public class PulsarSingleTopicFetcher extends
SingleTopicFetcher {
if (sleepTime > 0) {
TimeUnit.MILLISECONDS.sleep(sleepTime);
+ consumer.resume();
}
context.acquireRequestPermit();
@@ -330,4 +331,53 @@ public class PulsarSingleTopicFetcher extends
SingleTopicFetcher {
}
}
}
+
+ /**
+ * negativeAck Offset
+ *
+ * @param msgOffset String
+ */
+ @Override
+ public void negativeAck(String msgOffset) throws Exception {
+ if (StringUtils.isEmpty(msgOffset)) {
+ return;
+ }
+ try {
+ if (consumer == null) {
+ context.addAckFail(topic, -1);
+ LOGGER.error("consumer == null {}", topic);
+ return;
+ }
+ MessageId messageId = offsetCache.get(msgOffset);
+ if (messageId == null) {
+ context.addAckFail(topic, -1);
+ LOGGER.error("messageId == null {}", topic);
+ return;
+ }
+ consumer.negativeAcknowledge(messageId);
+ offsetCache.remove(msgOffset);
+ context.addAckFail(topic, -1);
+ this.sleepTime =
TimeUnit.MILLISECONDS.convert(context.getConfig().getSendFailPauseConsumerMinutes(),
+ TimeUnit.MINUTES);
+ LOGGER.error("negativeAck,topic:{}, sleep {} minutes.",
this.topic.getTopicKey(), this.sleepTime);
+ this.clearConsumer();
+ } catch (Exception e) {
+ context.addAckFail(topic, -1);
+ LOGGER.error(e.getMessage(), e);
+ throw e;
+ }
+ }
+
+ private void clearConsumer() {
+ try {
+ consumer.pause();
+ Message<byte[]> message = consumer.receive(1,
TimeUnit.MILLISECONDS);
+ while (message != null) {
+ consumer.negativeAcknowledge(message);
+ message = consumer.receive(1, TimeUnit.MILLISECONDS);
+ }
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java
index ae30d1c3d2..05427fc6a5 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java
@@ -289,4 +289,16 @@ public class TubeSingleTopicFetcher extends
SingleTopicFetcher {
}
}
}
+
+ /**
+ * negativeAck Offset
+ *
+ * @param msgOffset String
+ */
+ @Override
+ public void negativeAck(String msgOffset) throws Exception {
+ this.sleepTime =
TimeUnit.MILLISECONDS.convert(context.getConfig().getSendFailPauseConsumerMinutes(),
+ TimeUnit.MINUTES);
+ LOG.error("negativeAck,topic:{}, sleep {} minutes.",
this.topic.getTopicKey(), this.sleepTime);
+ }
}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java
index dd412913da..34a0eead69 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java
@@ -152,4 +152,19 @@ public class SortClientImpl extends SortClient {
return false;
}
}
+
+ /**
+ * negativeAck offset to msgKey
+ *
+ * @param msgKey String
+ * @param msgOffset String
+ * @throws Exception
+ */
+ @Override
+ public void negativeAck(String msgKey, String msgOffset)
+ throws Exception {
+ logger.debug("negativeAck:{} offset:{}", msgKey, msgOffset);
+ TopicFetcher topicFetcher = getFetcher(msgKey);
+ topicFetcher.negativeAck(msgOffset);
+ }
}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
index da90b64643..e9e90763d1 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
@@ -150,4 +150,19 @@ public class SortClientImplV2 extends SortClient {
return false;
}
}
+
+ /**
+ * negativeAck offset to msgKey
+ *
+ * @param msgKey String
+ * @param msgOffset String
+ * @throws Exception
+ */
+ @Override
+ public void negativeAck(String msgKey, String msgOffset)
+ throws Exception {
+ logger.debug("negativeAck:{} offset:{}", msgKey, msgOffset);
+ TopicFetcher topicFetcher = getFetcher(msgKey);
+ topicFetcher.negativeAck(msgOffset);
+ }
}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
index 3cd897a56c..9bc8bb3a15 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
/**
*
@@ -42,12 +43,20 @@ public class CommonPropertiesHolder {
public static final String KEY_SORT_SOURCE_ACKPOLICY =
"sortSource.ackPolicy";
public static final String KEY_USE_UNIFIED_CONFIGURATION =
"useUnifiedConfiguration";
+ public static final String KEY_MAX_SENDFAIL_TIMES = "maxSendFailTimes";
+ public static final int DEFAULT_MAX_SENDFAIL_TIMES = 0;
+ public static final String KEY_SENDFAIL_PAUSE_CONSUMER_MIN =
"sendFailPauseConsumerMin";
+ public static final int DEFAULT_SENDFAIL_PAUSE_CONSUMER_MIN = 10;
+
private static Map<String, String> props;
private static Context context;
private static long auditFormatInterval = 60000L;
private static AckPolicy ackPolicy;
+ private static AtomicInteger maxSendFailTimes = new AtomicInteger(-1);
+ private static AtomicInteger sendFailPauseConsumerMinutes = new
AtomicInteger(-1);
+
/**
* init
*/
@@ -232,4 +241,23 @@ public class CommonPropertiesHolder {
return getBoolean(KEY_USE_UNIFIED_CONFIGURATION, false);
}
+ public static int getMaxSendFailTimes() {
+ int result = maxSendFailTimes.get();
+ if (result >= 0) {
+ return result;
+ }
+ int newResult = getInteger(KEY_MAX_SENDFAIL_TIMES,
DEFAULT_MAX_SENDFAIL_TIMES);
+ maxSendFailTimes.compareAndSet(result, newResult);
+ return newResult;
+ }
+
+ public static int getSendFailPauseConsumerMinutes() {
+ int result = sendFailPauseConsumerMinutes.get();
+ if (result >= 0) {
+ return result;
+ }
+ int newResult = getInteger(KEY_SENDFAIL_PAUSE_CONSUMER_MIN,
DEFAULT_SENDFAIL_PAUSE_CONSUMER_MIN);
+ sendFailPauseConsumerMinutes.compareAndSet(result, newResult);
+ return newResult;
+ }
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java
index c49402714c..11d4022421 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java
@@ -73,6 +73,19 @@ public class CacheMessageRecord {
return 0;
}
+ /**
+ * negativeAck
+ */
+ public void negativeAck() {
+ if (client != null) {
+ try {
+ client.negativeAck(msgKey, offset);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ }
+
/**
* ackMessage
* @param ackToken ackToken
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java
index fd1ac34ed2..6c5fe79b26 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java
@@ -25,6 +25,8 @@ import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.event.SimpleEvent;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
/**
*
@@ -42,6 +44,9 @@ public class ProfileEvent extends SimpleEvent {
private CacheMessageRecord cacheRecord;
private final int ackToken;
+ private final ConcurrentHashMap<String, String> headers = new
ConcurrentHashMap<>();
+ private final AtomicInteger sendedTime = new AtomicInteger(0);
+
/**
* Constructor
* @param headers
@@ -50,6 +55,7 @@ public class ProfileEvent extends SimpleEvent {
public ProfileEvent(Map<String, String> headers, byte[] body) {
super.setHeaders(headers);
super.setBody(body);
+ this.headers.putAll(headers);
this.inlongGroupId = headers.get(Constants.INLONG_GROUP_ID);
this.inlongStreamId = headers.get(Constants.INLONG_STREAM_ID);
this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
@@ -68,6 +74,7 @@ public class ProfileEvent extends SimpleEvent {
public ProfileEvent(InLongMessage sdkMessage, CacheMessageRecord
cacheRecord) {
super.setHeaders(sdkMessage.getParams());
super.setBody(sdkMessage.getBody());
+ this.headers.putAll(sdkMessage.getParams());
this.inlongGroupId = sdkMessage.getInlongGroupId();
this.inlongStreamId = sdkMessage.getInlongStreamId();
this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
@@ -78,6 +85,15 @@ public class ProfileEvent extends SimpleEvent {
this.ackToken = cacheRecord.getToken();
}
+ /**
+ * get headers
+ * @return the headers
+ */
+ @Override
+ public Map<String, String> getHeaders() {
+ return this.headers;
+ }
+
/**
* get inlongGroupId
*
@@ -148,4 +164,25 @@ public class ProfileEvent extends SimpleEvent {
cacheRecord.ackMessage(ackToken);
}
}
+
+ /**
+ * negativeAck
+ */
+ public void negativeAck() {
+ if (cacheRecord != null) {
+ cacheRecord.negativeAck();
+ }
+ }
+
+ /**
+ * get sendedTime
+ * @return the sendedTime
+ */
+ public int getSendedTime() {
+ return sendedTime.get();
+ }
+
+ public void incrementSendedTime() {
+ this.sendedTime.incrementAndGet();
+ }
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java
index a54c184204..7acc483351 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sort.standalone.sink.elasticsearch;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.elasticsearch.action.DocWriteRequest;
@@ -85,7 +86,12 @@ public class EsCallbackListener implements
BulkProcessor.Listener {
// is fail
if (responseItem.isFailed()) {
context.addSendResultMetric(event, context.getTaskName(),
false, sendTime);
- context.backDispatchQueue(requestItem);
+ event.incrementSendedTime();
+ if (event.getSendedTime() <=
CommonPropertiesHolder.getMaxSendFailTimes()) {
+ context.backDispatchQueue(requestItem);
+ } else {
+ event.negativeAck();
+ }
} else {
context.addSendResultMetric(event, context.getTaskName(),
true, sendTime);
context.releaseDispatchQueue(requestItem);
@@ -115,8 +121,12 @@ public class EsCallbackListener implements
BulkProcessor.Listener {
ProfileEvent event = requestItem.getEvent();
long sendTime = requestItem.getSendTime();
context.addSendResultMetric(event, context.getTaskName(), false,
sendTime);
- context.backDispatchQueue(requestItem);
+ event.incrementSendedTime();
+ if (event.getSendedTime() <=
CommonPropertiesHolder.getMaxSendFailTimes()) {
+ context.backDispatchQueue(requestItem);
+ } else {
+ event.negativeAck();
+ }
}
}
-
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
index 72b8ccc5e4..d10d32d02e 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
@@ -230,7 +230,7 @@ public class KafkaProducerCluster implements LifecycleAware
{
tx.commit();
profileEvent.ack();
} else {
- tx.rollback();
+ this.exceptionProcess(profileEvent, tx);
}
} else if (ex instanceof
UnknownTopicOrPartitionException
|| !(ex instanceof RetriableException)) {
@@ -238,7 +238,7 @@ public class KafkaProducerCluster implements LifecycleAware
{
tx.commit();
profileEvent.ack();
} else {
- tx.rollback();
+ this.exceptionProcess(profileEvent, tx);
}
LOG.error(String.format("send failed, topic is
%s", topic), ex);
sinkContext.addSendResultMetric(profileEvent,
topic, false, sendTime);
@@ -247,7 +247,7 @@ public class KafkaProducerCluster implements LifecycleAware
{
});
return true;
} catch (Exception e) {
- tx.rollback();
+ this.exceptionProcess(profileEvent, tx);
tx.close();
LOG.error(e.getMessage(), e);
sinkContext.addSendResultMetric(profileEvent, topic, false,
sendTime);
@@ -255,4 +255,13 @@ public class KafkaProducerCluster implements
LifecycleAware {
}
}
+ private void exceptionProcess(ProfileEvent profileEvent, Transaction tx) {
+ profileEvent.incrementSendedTime();
+ if (profileEvent.getSendedTime() <=
CommonPropertiesHolder.getMaxSendFailTimes()) {
+ tx.rollback();
+ } else {
+ tx.commit();
+ profileEvent.negativeAck();
+ }
+ }
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
index ac4726f241..409f2ebe90 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
@@ -185,6 +185,7 @@ public final class SortSdkSource extends AbstractSource
clientConfig.setCallback(callback);
Map<String, String> sortSdkParams =
this.getSortClientConfigParameters();
clientConfig.setParameters(sortSdkParams);
+
clientConfig.setSendFailPauseConsumerMinutes(CommonPropertiesHolder.getSendFailPauseConsumerMinutes());
// create SortClient
String configType = CommonPropertiesHolder