This is an automated email from the ASF dual-hosted git repository.
shenlin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
The following commit(s) were added to refs/heads/main by this push:
new c32edbe update rmqEventSubscriber usage
c32edbe is described below
commit c32edbe8c9c7bd9910efdc1ec325dcb58f790fd9
Author: changfeng <[email protected]>
AuthorDate: Sun Jun 25 11:11:20 2023 +0800
update rmqEventSubscriber usage
---
.../rocketmq/runtimer/RocketMQEventSubscriber.java | 23 +++---
.../rocketmq/runtimer/consumer/ClientConfig.java | 24 +++---
.../runtimer/consumer/ExponentialRetryPolicy.java | 86 ----------------------
.../runtimer/consumer/LitePullConsumerImpl.java | 62 +++++++---------
.../runtimer/consumer/LocalMessageCache.java | 4 +-
.../rocketmq/runtimer/consumer/RetryPolicy.java | 26 -------
6 files changed, 55 insertions(+), 170 deletions(-)
diff --git
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
index df9cda7..bd081c5 100644
---
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
+++
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
@@ -33,9 +33,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
-import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.utils.NetworkUtil;
import
org.apache.rocketmq.eventbridge.adapter.runtime.boot.listener.EventSubscriber;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread;
import
org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.SubscribeRunnerKeys;
@@ -93,7 +91,7 @@ public class RocketMQEventSubscriber extends EventSubscriber {
private static final String SEMICOLON = ";";
- private static final String SYS_DEFAULT_GROUP =
"event-bridge-default-group";
+ private static final String DEFAULT_GROUP_PREFIX = "event-bridge-group";
public static final String QUEUE_OFFSET = "queueOffset";
public static final String MSG_ID = "msgId";
@@ -176,7 +174,6 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
ClientConfig clientConfig = new ClientConfig();
Properties properties =
PropertiesLoaderUtils.loadAllProperties("runtime.properties");
String namesrvAddr =
properties.getProperty("rocketmq.namesrvAddr");
- String consumerGroup =
properties.getProperty("rocketmq.consumerGroup");
pullTimeOut =
Integer.valueOf(properties.getProperty("rocketmq.consumer.pullTimeOut"));
pullBatchSize =
Integer.valueOf(properties.getProperty("rocketmq.consumer.pullBatchSize"));
String accessChannel =
properties.getProperty("rocketmq.accessChannel");
@@ -188,8 +185,6 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
String socks5Endpoint =
properties.getProperty("rocketmq.consumer.socks5Endpoint");
clientConfig.setNameSrvAddr(namesrvAddr);
- clientConfig.setConsumerGroup(StringUtils.isBlank(consumerGroup) ?
- createGroupName(SYS_DEFAULT_GROUP) : consumerGroup);
clientConfig.setAccessChannel(AccessChannel.CLOUD.name().equals(accessChannel) ?
AccessChannel.CLOUD : AccessChannel.LOCAL);
clientConfig.setNamespace(namespace);
@@ -238,7 +233,10 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
public LitePullConsumer initLitePullConsumer(SubscribeRunnerKeys
subscribeRunnerKeys) {
String topic = getTopicName(subscribeRunnerKeys);
RPCHook rpcHook = this.sessionCredentials != null ? new
AclClientRPCHook(this.sessionCredentials) : null;
- LitePullConsumerImpl pullConsumer = new
LitePullConsumerImpl(this.clientConfig, rpcHook);
+ ClientConfig consumerConfig =
ClientConfig.cloneConfig(this.clientConfig);
+ String groupName = createGroupName(subscribeRunnerKeys);
+ consumerConfig.setConsumerGroup(groupName);
+ LitePullConsumerImpl pullConsumer = new
LitePullConsumerImpl(consumerConfig, rpcHook);
if (StringUtils.isNotBlank(this.socksProxy)) {
pullConsumer.setSockProxyJson(this.socksProxy);
}
@@ -256,12 +254,11 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
return
eventDataRepository.getTopicNameWithOutCache(subscribeRunnerKeys.getAccountId(),
subscribeRunnerKeys.getEventBusName());
}
- private String createGroupName(String prefix) {
+ private String createGroupName(SubscribeRunnerKeys subscribeRunnerKeys) {
StringBuilder sb = new StringBuilder();
- sb.append(prefix).append("-");
- sb.append(NetworkUtil.getLocalAddress()).append("-");
- sb.append(UtilAll.getPid()).append("-");
- sb.append(System.nanoTime());
+ sb.append(DEFAULT_GROUP_PREFIX).append("-");
+ sb.append(subscribeRunnerKeys.getAccountId()).append("-");
+ sb.append(subscribeRunnerKeys.getRunnerName());
return sb.toString().replace(".", "-");
}
@@ -354,7 +351,7 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
messageBuffer.put(message);
}
} catch (Exception exception) {
- logger.error(getServiceName() + " - event bus pull record
exception, stackTrace - ", exception);
+ logger.error(getServiceName() + " -
RocketMQEventSubscriber pull record exception, stackTrace - ", exception);
}
}
}
diff --git
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java
index 02655ba..b37b851 100644
---
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java
+++
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java
@@ -36,7 +36,6 @@ public class ClientConfig {
// All the offsets will be committed in the commit thread if enable this
flag.
// To avoid too many rpc calls, disable it and rely on the inner offset
automatic commit mechanism
private boolean commitSync = false;
- private String routeHookEndpoint;
private AccessChannel accessChannel;
public int getRmqPullMessageCacheCapacity() {
@@ -112,14 +111,6 @@ public class ClientConfig {
this.commitSync = commitSync;
}
- public String getRouteHookEndpoint() {
- return routeHookEndpoint;
- }
-
- public void setRouteHookEndpoint(final String routeHookEndpoint) {
- this.routeHookEndpoint = routeHookEndpoint;
- }
-
public AccessChannel getAccessChannel() {
return accessChannel;
}
@@ -127,4 +118,19 @@ public class ClientConfig {
public void setAccessChannel(AccessChannel accessChannel) {
this.accessChannel = accessChannel;
}
+
+
+ public static ClientConfig cloneConfig(ClientConfig clientConfig) {
+ ClientConfig newConfig = new ClientConfig();
+
newConfig.setRmqPullMessageBatchNums(clientConfig.getRmqPullMessageBatchNums());
+ newConfig.setConsumeFromWhere(clientConfig.getConsumeFromWhere());
+ newConfig.setConsumeTimestamp(clientConfig.getConsumeTimestamp());
+ newConfig.setNameSrvAddr(clientConfig.getNameSrvAddr());
+ newConfig.setNamespace(clientConfig.getNamespace());
+ newConfig.setConsumerGroup(clientConfig.getConsumerGroup());
+ newConfig.setPullInterval(clientConfig.getPullInterval());
+ newConfig.setCommitSync(clientConfig.isCommitSync());
+ newConfig.setAccessChannel(clientConfig.getAccessChannel());
+ return newConfig;
+ }
}
diff --git
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ExponentialRetryPolicy.java
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ExponentialRetryPolicy.java
deleted file mode 100644
index 58ab4b8..0000000
---
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ExponentialRetryPolicy.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.rocketmq.eventbridge.adapter.storage.rocketmq.runtimer.consumer;
-
-import com.google.common.base.MoreObjects;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * @Author changfeng
- * @Date 2023/4/9 10:11 上午
- */
-public class ExponentialRetryPolicy implements RetryPolicy {
- private long initial = TimeUnit.SECONDS.toMillis(5);
- private long max = TimeUnit.HOURS.toMillis(2);
- private long multiplier = 2;
- private int retryCount = 0;
-
- public ExponentialRetryPolicy() {
- }
-
- public ExponentialRetryPolicy(long initial, long max, long multiplier) {
- this.initial = initial;
- this.max = max;
- this.multiplier = multiplier;
- }
-
- public long getInitial() {
- return initial;
- }
-
- public void setInitial(long initial) {
- this.initial = initial;
- }
-
- public long getMax() {
- return max;
- }
-
- public void setMax(long max) {
- this.max = max;
- }
-
- public long getMultiplier() {
- return multiplier;
- }
-
- public void setMultiplier(long multiplier) {
- this.multiplier = multiplier;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("initial", initial)
- .add("max", max)
- .add("multiplier", multiplier)
- .toString();
- }
-
- @Override
- public long nextDelayDuration() {
- if (retryCount < 0) {
- retryCount = 0;
- }
- if (retryCount > 32) {
- retryCount = 32;
- }
- return Math.min(max, initial * (long) Math.pow(multiplier,
retryCount++));
- }
-}
diff --git
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java
index db4843c..7eda3d9 100644
---
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java
+++
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java
@@ -24,6 +24,7 @@ import
org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
+import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.common.ServiceState;
@@ -31,6 +32,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +44,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -55,14 +56,12 @@ public class LitePullConsumerImpl implements
LitePullConsumer {
private static final Logger log =
LoggerFactory.getLogger(LitePullConsumerImpl.class);
private final DefaultMQPullConsumer rocketmqPullConsumer;
private final LocalMessageCache localMessageCache;
- private final ConcurrentHashMap<MessageQueue, ExponentialRetryPolicy>
retryPolicies;
private final ClientConfig clientConfig;
private final Map<MessageQueue, ProcessQueue> runningQueueMap = new
ConcurrentHashMap<>();
private final ScheduledExecutorService scheduleService = new
ScheduledThreadPoolExecutor(1,
ThreadUtils.newThreadFactory("PullConsumerScheduleService",
false));
- private final ExecutorService executorService =
Executors.newSingleThreadExecutor(
- ThreadUtils.newThreadFactory("PullConsumerExecutorService",
false));
-
+ private static final Long PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL =
30L;
+ private static final Long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION =
TimeUnit.SECONDS.toMillis(3);
private static final String DEFAULT_INSTANCE_NAME =
"EventBridge_Consumer_INSTANCE";
public LitePullConsumerImpl(final ClientConfig clientConfig, final RPCHook
rpcHook) {
@@ -78,7 +77,6 @@ public class LitePullConsumerImpl implements LitePullConsumer
{
rocketmqPullConsumer.setNamespace(clientConfig.getNamespace());
}
localMessageCache = new LocalMessageCache(rocketmqPullConsumer,
clientConfig);
- retryPolicies = new ConcurrentHashMap<>();
}
@Override
@@ -90,7 +88,6 @@ public class LitePullConsumerImpl implements LitePullConsumer
{
@Override
public void shutdown() {
rocketmqPullConsumer.shutdown();
- shutdownThreadPool(executorService);
shutdownThreadPool(scheduleService);
}
@@ -116,7 +113,6 @@ public class LitePullConsumerImpl implements
LitePullConsumer {
public void messageQueueChanged(String topic, Set<MessageQueue>
mqAll, Set<MessageQueue> mqDivided) {
submitPullTask(topic, tag, mqDivided);
localMessageCache.shrinkPullOffsetTable(mqDivided);
- retryPolicies.entrySet().removeIf(next ->
!mqDivided.contains(next.getKey()));
log.info("Load balance result of topic {} changed, mqAll {},
mqDivided {}.", topic, mqAll, mqDivided);
}
});
@@ -148,15 +144,6 @@ public class LitePullConsumerImpl implements
LitePullConsumer {
rocketmqPullConsumer.setSocksProxyConfig(proxyJson);
}
- private RetryPolicy getRetryPolicy(MessageQueue messageQueue) {
- return retryPolicies.computeIfAbsent(messageQueue, queue ->
- new ExponentialRetryPolicy(Duration.ofMillis(100).toMillis(),
Duration.ofMinutes(10).toMillis(), 2));
- }
-
- private void removeRetryPolicy(MessageQueue messageQueue) {
- retryPolicies.remove(messageQueue);
- }
-
private void submitPullTask(String topic, String tag, Set<MessageQueue>
assignedQueues) {
Set<MessageQueue> runningQueues = runningQueueMap.keySet();
for (MessageQueue runningQueue : runningQueues) {
@@ -178,7 +165,7 @@ public class LitePullConsumerImpl implements
LitePullConsumer {
if (runningQueueMap.putIfAbsent(messageQueue, processQueue) ==
null) {
try {
PullTask pullTask = new PullTask(messageQueue, tag);
- executorService.submit(pullTask);
+ pullImmediately(pullTask);
log.info("Submit pullTask:{}", messageQueue);
} catch (Exception e) {
log.error("Failed submit pullTask:{}, {}, wait next
balancing", topic, messageQueue, e);
@@ -195,6 +182,15 @@ public class LitePullConsumerImpl implements
LitePullConsumer {
}
+ void pullImmediately(PullTask pullTask) {
+ scheduleService.schedule(new Runnable() {
+ @Override
+ public void run() {
+ pullTask.run();
+ }
+ }, 0, TimeUnit.MILLISECONDS);
+ }
+
void pullLater(PullTask pullTask, long delay, TimeUnit unit) {
scheduleService.schedule(new Runnable() {
@Override
@@ -205,7 +201,6 @@ public class LitePullConsumerImpl implements
LitePullConsumer {
}
class PullTask implements Runnable {
- private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;
private final String tag;
private final MessageQueue messageQueue;
@@ -232,10 +227,9 @@ public class LitePullConsumerImpl implements
LitePullConsumer {
int batchNums = localMessageCache.nextPullBatchNums();
// If batchNums is zero, an exception will be thrown and then
trigger a delay
if (batchNums <= 0) {
- final int delayTimeMillis = (int)
getRetryPolicy(messageQueue).nextDelayDuration();
log.warn("Local cache is full, delay the pull task {} ms
for message queue {}",
- delayTimeMillis, messageQueue);
- pullLater(PullTask.this, delayTimeMillis,
TimeUnit.MILLISECONDS);
+ PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION,
messageQueue);
+ pullLater(PullTask.this,
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, TimeUnit.MILLISECONDS);
}
rocketmqPullConsumer.pullBlockIfNotFound(this.messageQueue,
this.tag, offset, batchNums, new PullCallback() {
@@ -257,28 +251,24 @@ public class LitePullConsumerImpl implements
LitePullConsumer {
localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt,
messageQueue, pq));
}
localMessageCache.updatePullOffset(messageQueue,
pullResult.getNextBeginOffset());
- removeRetryPolicy(messageQueue);
- executorService.submit(PullTask.this);
+ pullImmediately(PullTask.this);
} else {
localMessageCache.removePullOffset(messageQueue);
log.info("ProcessQueue {} dropped,
discard the pulled message.", messageQueue);
- removeRetryPolicy(messageQueue);
}
break;
case OFFSET_ILLEGAL:
- final int delayTimeMillis = (int)
getRetryPolicy(messageQueue).nextDelayDuration();
log.warn("The pull request offset is
illegal, offset is {}, message queue is {}, " +
"pull result is {}, delay
{} ms for next pull",
- offset, messageQueue, pullResult,
delayTimeMillis);
+ offset, messageQueue, pullResult,
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
localMessageCache.updatePullOffset(messageQueue,
pullResult.getNextBeginOffset());
- pullLater(PullTask.this, delayTimeMillis,
TimeUnit.MILLISECONDS);
+ pullLater(PullTask.this,
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, TimeUnit.MILLISECONDS);
break;
case NO_NEW_MSG:
case NO_MATCHED_MSG:
log.info("No NEW_MSG or MATCHED_MSG for
mq:{}, pull again.", messageQueue);
localMessageCache.updatePullOffset(messageQueue,
pullResult.getNextBeginOffset());
- removeRetryPolicy(messageQueue);
- executorService.submit(PullTask.this);
+ pullImmediately(PullTask.this);
break;
default:
log.warn("Failed to process pullResult,
mq:{} {}", messageQueue, pullResult);
@@ -292,17 +282,21 @@ public class LitePullConsumerImpl implements
LitePullConsumer {
@Override
public void onException(Throwable e) {
- final int delayTimeMillis = (int)
getRetryPolicy(messageQueue).nextDelayDuration();
+ long delayTimeMillis = 0L;
+ if (e instanceof MQBrokerException &&
((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) {
+ delayTimeMillis =
PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL;
+ } else {
+ delayTimeMillis =
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION;
+ }
log.error("Exception happens when pull message
process, delay {} ms for message queue {}",
delayTimeMillis, messageQueue, e);
pullLater(PullTask.this, delayTimeMillis,
TimeUnit.MILLISECONDS);
}
});
} catch (Throwable t) {
- final int delayTimeMillis = (int)
getRetryPolicy(messageQueue).nextDelayDuration();
log.error("Error occurs when pull message process, delay {} ms
for message queue {}",
- delayTimeMillis, messageQueue, t);
- pullLater(PullTask.this, delayTimeMillis,
TimeUnit.MILLISECONDS);
+ PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, messageQueue, t);
+ pullLater(PullTask.this, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION,
TimeUnit.MILLISECONDS);
}
}
diff --git
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java
index a352791..b4cf7a7 100644
---
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java
+++
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java
@@ -73,7 +73,7 @@ public class LocalMessageCache {
outerException.set(new
RuntimeException("fetchConsumeOffsetFromBroker exception, please check rocketmq
client for more details"));
return null;
}
- if (offset == -1) {
+ if (offset == -1 || offset == 0) {
// Follow the CONSUME_FROM_WHERE to compute next pull
offset
// But note that if broker thrown any unexpected runtime
exception may cause offset rollback.
// We don't handle this risk because of MetaQ doesn't have
any rpc hook
@@ -90,7 +90,7 @@ public class LocalMessageCache {
break;
}
}
- if (offset >= 0) {
+ if (offset > 0) {
// Got an offset from offset store
return offset;
}
diff --git
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/RetryPolicy.java
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/RetryPolicy.java
deleted file mode 100644
index 6da2a81..0000000
---
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/RetryPolicy.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.rocketmq.eventbridge.adapter.storage.rocketmq.runtimer.consumer;
-
-/**
- * @Author changfeng
- * @Date 2023/4/9 10:10 上午
- */
-public interface RetryPolicy {
- long nextDelayDuration();
-}