This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git


The following commit(s) were added to refs/heads/main by this push:
     new c32edbe  update rmqEventSubscriber usage
c32edbe is described below

commit c32edbe8c9c7bd9910efdc1ec325dcb58f790fd9
Author: changfeng <[email protected]>
AuthorDate: Sun Jun 25 11:11:20 2023 +0800

    update rmqEventSubscriber usage
---
 .../rocketmq/runtimer/RocketMQEventSubscriber.java | 23 +++---
 .../rocketmq/runtimer/consumer/ClientConfig.java   | 24 +++---
 .../runtimer/consumer/ExponentialRetryPolicy.java  | 86 ----------------------
 .../runtimer/consumer/LitePullConsumerImpl.java    | 62 +++++++---------
 .../runtimer/consumer/LocalMessageCache.java       |  4 +-
 .../rocketmq/runtimer/consumer/RetryPolicy.java    | 26 -------
 6 files changed, 55 insertions(+), 170 deletions(-)

diff --git 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
index df9cda7..bd081c5 100644
--- 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
+++ 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
@@ -33,9 +33,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.AccessChannel;
-import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.utils.NetworkUtil;
 import 
org.apache.rocketmq.eventbridge.adapter.runtime.boot.listener.EventSubscriber;
 import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread;
 import 
org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.SubscribeRunnerKeys;
@@ -93,7 +91,7 @@ public class RocketMQEventSubscriber extends EventSubscriber {
 
     private static final String SEMICOLON = ";";
 
-    private static final String SYS_DEFAULT_GROUP = 
"event-bridge-default-group";
+    private static final String DEFAULT_GROUP_PREFIX = "event-bridge-group";
 
     public static final String QUEUE_OFFSET = "queueOffset";
     public static final String MSG_ID = "msgId";
@@ -176,7 +174,6 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
             ClientConfig clientConfig = new ClientConfig();
             Properties properties = 
PropertiesLoaderUtils.loadAllProperties("runtime.properties");
             String namesrvAddr = 
properties.getProperty("rocketmq.namesrvAddr");
-            String consumerGroup = 
properties.getProperty("rocketmq.consumerGroup");
             pullTimeOut = 
Integer.valueOf(properties.getProperty("rocketmq.consumer.pullTimeOut"));
             pullBatchSize = 
Integer.valueOf(properties.getProperty("rocketmq.consumer.pullBatchSize"));
             String accessChannel = 
properties.getProperty("rocketmq.accessChannel");
@@ -188,8 +185,6 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
             String socks5Endpoint = 
properties.getProperty("rocketmq.consumer.socks5Endpoint");
 
             clientConfig.setNameSrvAddr(namesrvAddr);
-            clientConfig.setConsumerGroup(StringUtils.isBlank(consumerGroup) ?
-                    createGroupName(SYS_DEFAULT_GROUP) : consumerGroup);
             
clientConfig.setAccessChannel(AccessChannel.CLOUD.name().equals(accessChannel) ?
                     AccessChannel.CLOUD : AccessChannel.LOCAL);
             clientConfig.setNamespace(namespace);
@@ -238,7 +233,10 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
     public LitePullConsumer initLitePullConsumer(SubscribeRunnerKeys 
subscribeRunnerKeys) {
         String topic = getTopicName(subscribeRunnerKeys);
         RPCHook rpcHook = this.sessionCredentials != null ? new 
AclClientRPCHook(this.sessionCredentials) : null;
-        LitePullConsumerImpl pullConsumer = new 
LitePullConsumerImpl(this.clientConfig, rpcHook);
+        ClientConfig consumerConfig = 
ClientConfig.cloneConfig(this.clientConfig);
+        String groupName = createGroupName(subscribeRunnerKeys);
+        consumerConfig.setConsumerGroup(groupName);
+        LitePullConsumerImpl pullConsumer = new 
LitePullConsumerImpl(consumerConfig, rpcHook);
         if (StringUtils.isNotBlank(this.socksProxy)) {
             pullConsumer.setSockProxyJson(this.socksProxy);
         }
@@ -256,12 +254,11 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
         return 
eventDataRepository.getTopicNameWithOutCache(subscribeRunnerKeys.getAccountId(),
 subscribeRunnerKeys.getEventBusName());
     }
 
-    private String createGroupName(String prefix) {
+    private String createGroupName(SubscribeRunnerKeys subscribeRunnerKeys) {
         StringBuilder sb = new StringBuilder();
-        sb.append(prefix).append("-");
-        sb.append(NetworkUtil.getLocalAddress()).append("-");
-        sb.append(UtilAll.getPid()).append("-");
-        sb.append(System.nanoTime());
+        sb.append(DEFAULT_GROUP_PREFIX).append("-");
+        sb.append(subscribeRunnerKeys.getAccountId()).append("-");
+        sb.append(subscribeRunnerKeys.getRunnerName());
         return sb.toString().replace(".", "-");
     }
 
@@ -354,7 +351,7 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
                         messageBuffer.put(message);
                     }
                 } catch (Exception exception) {
-                    logger.error(getServiceName() + " - event bus pull record 
exception, stackTrace - ", exception);
+                    logger.error(getServiceName() + " - 
RocketMQEventSubscriber pull record exception, stackTrace - ", exception);
                 }
             }
         }
diff --git 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java
 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java
index 02655ba..b37b851 100644
--- 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java
+++ 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java
@@ -36,7 +36,6 @@ public class ClientConfig {
     // All the offsets will be committed in the commit thread if enable this 
flag.
     // To avoid too many rpc calls, disable it and rely on the inner offset 
automatic commit mechanism
     private boolean commitSync = false;
-    private String routeHookEndpoint;
     private AccessChannel accessChannel;
 
     public int getRmqPullMessageCacheCapacity() {
@@ -112,14 +111,6 @@ public class ClientConfig {
         this.commitSync = commitSync;
     }
 
-    public String getRouteHookEndpoint() {
-        return routeHookEndpoint;
-    }
-
-    public void setRouteHookEndpoint(final String routeHookEndpoint) {
-        this.routeHookEndpoint = routeHookEndpoint;
-    }
-
     public AccessChannel getAccessChannel() {
         return accessChannel;
     }
@@ -127,4 +118,19 @@ public class ClientConfig {
     public void setAccessChannel(AccessChannel accessChannel) {
         this.accessChannel = accessChannel;
     }
+
+
+    public static ClientConfig cloneConfig(ClientConfig clientConfig) {
+        ClientConfig newConfig = new ClientConfig();
+        
newConfig.setRmqPullMessageBatchNums(clientConfig.getRmqPullMessageBatchNums());
+        newConfig.setConsumeFromWhere(clientConfig.getConsumeFromWhere());
+        newConfig.setConsumeTimestamp(clientConfig.getConsumeTimestamp());
+        newConfig.setNameSrvAddr(clientConfig.getNameSrvAddr());
+        newConfig.setNamespace(clientConfig.getNamespace());
+        newConfig.setConsumerGroup(clientConfig.getConsumerGroup());
+        newConfig.setPullInterval(clientConfig.getPullInterval());
+        newConfig.setCommitSync(clientConfig.isCommitSync());
+        newConfig.setAccessChannel(clientConfig.getAccessChannel());
+        return newConfig;
+    }
 }
diff --git 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ExponentialRetryPolicy.java
 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ExponentialRetryPolicy.java
deleted file mode 100644
index 58ab4b8..0000000
--- 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ExponentialRetryPolicy.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.rocketmq.eventbridge.adapter.storage.rocketmq.runtimer.consumer;
-
-import com.google.common.base.MoreObjects;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * @Author changfeng
- * @Date 2023/4/9 10:11 上午
- */
-public class ExponentialRetryPolicy implements RetryPolicy {
-    private long initial = TimeUnit.SECONDS.toMillis(5);
-    private long max = TimeUnit.HOURS.toMillis(2);
-    private long multiplier = 2;
-    private int retryCount = 0;
-
-    public ExponentialRetryPolicy() {
-    }
-
-    public ExponentialRetryPolicy(long initial, long max, long multiplier) {
-        this.initial = initial;
-        this.max = max;
-        this.multiplier = multiplier;
-    }
-
-    public long getInitial() {
-        return initial;
-    }
-
-    public void setInitial(long initial) {
-        this.initial = initial;
-    }
-
-    public long getMax() {
-        return max;
-    }
-
-    public void setMax(long max) {
-        this.max = max;
-    }
-
-    public long getMultiplier() {
-        return multiplier;
-    }
-
-    public void setMultiplier(long multiplier) {
-        this.multiplier = multiplier;
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(this)
-                .add("initial", initial)
-                .add("max", max)
-                .add("multiplier", multiplier)
-                .toString();
-    }
-
-    @Override
-    public long nextDelayDuration() {
-        if (retryCount < 0) {
-            retryCount = 0;
-        }
-        if (retryCount > 32) {
-            retryCount = 32;
-        }
-        return Math.min(max, initial * (long) Math.pow(multiplier, 
retryCount++));
-    }
-}
diff --git 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java
 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java
index db4843c..7eda3d9 100644
--- 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java
+++ 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java
@@ -24,6 +24,7 @@ import 
org.apache.rocketmq.client.consumer.MessageQueueListener;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
 import 
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
+import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
 import org.apache.rocketmq.common.ServiceState;
@@ -31,6 +32,7 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,7 +44,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -55,14 +56,12 @@ public class LitePullConsumerImpl implements 
LitePullConsumer {
     private static final Logger log = 
LoggerFactory.getLogger(LitePullConsumerImpl.class);
     private final DefaultMQPullConsumer rocketmqPullConsumer;
     private final LocalMessageCache localMessageCache;
-    private final ConcurrentHashMap<MessageQueue, ExponentialRetryPolicy> 
retryPolicies;
     private final ClientConfig clientConfig;
     private final Map<MessageQueue, ProcessQueue> runningQueueMap = new 
ConcurrentHashMap<>();
     private final ScheduledExecutorService scheduleService = new 
ScheduledThreadPoolExecutor(1,
             ThreadUtils.newThreadFactory("PullConsumerScheduleService", 
false));
-    private final ExecutorService executorService = 
Executors.newSingleThreadExecutor(
-            ThreadUtils.newThreadFactory("PullConsumerExecutorService", 
false));
-
+    private static final Long PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL = 
30L;
+    private static final Long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 
TimeUnit.SECONDS.toMillis(3);
     private static final String DEFAULT_INSTANCE_NAME = 
"EventBridge_Consumer_INSTANCE";
 
     public LitePullConsumerImpl(final ClientConfig clientConfig, final RPCHook 
rpcHook) {
@@ -78,7 +77,6 @@ public class LitePullConsumerImpl implements LitePullConsumer 
{
             rocketmqPullConsumer.setNamespace(clientConfig.getNamespace());
         }
         localMessageCache = new LocalMessageCache(rocketmqPullConsumer, 
clientConfig);
-        retryPolicies = new ConcurrentHashMap<>();
     }
 
     @Override
@@ -90,7 +88,6 @@ public class LitePullConsumerImpl implements LitePullConsumer 
{
     @Override
     public void shutdown() {
         rocketmqPullConsumer.shutdown();
-        shutdownThreadPool(executorService);
         shutdownThreadPool(scheduleService);
     }
 
@@ -116,7 +113,6 @@ public class LitePullConsumerImpl implements 
LitePullConsumer {
             public void messageQueueChanged(String topic, Set<MessageQueue> 
mqAll, Set<MessageQueue> mqDivided) {
                 submitPullTask(topic, tag, mqDivided);
                 localMessageCache.shrinkPullOffsetTable(mqDivided);
-                retryPolicies.entrySet().removeIf(next -> 
!mqDivided.contains(next.getKey()));
                 log.info("Load balance result of topic {} changed, mqAll {}, 
mqDivided {}.", topic, mqAll, mqDivided);
             }
         });
@@ -148,15 +144,6 @@ public class LitePullConsumerImpl implements 
LitePullConsumer {
         rocketmqPullConsumer.setSocksProxyConfig(proxyJson);
     }
 
-    private RetryPolicy getRetryPolicy(MessageQueue messageQueue) {
-        return retryPolicies.computeIfAbsent(messageQueue, queue ->
-                new ExponentialRetryPolicy(Duration.ofMillis(100).toMillis(), 
Duration.ofMinutes(10).toMillis(), 2));
-    }
-
-    private void removeRetryPolicy(MessageQueue messageQueue) {
-        retryPolicies.remove(messageQueue);
-    }
-
     private void submitPullTask(String topic, String tag, Set<MessageQueue> 
assignedQueues) {
         Set<MessageQueue> runningQueues = runningQueueMap.keySet();
         for (MessageQueue runningQueue : runningQueues) {
@@ -178,7 +165,7 @@ public class LitePullConsumerImpl implements 
LitePullConsumer {
             if (runningQueueMap.putIfAbsent(messageQueue, processQueue) == 
null) {
                 try {
                     PullTask pullTask = new PullTask(messageQueue, tag);
-                    executorService.submit(pullTask);
+                    pullImmediately(pullTask);
                     log.info("Submit pullTask:{}", messageQueue);
                 } catch (Exception e) {
                     log.error("Failed submit pullTask:{}, {}, wait next 
balancing", topic, messageQueue, e);
@@ -195,6 +182,15 @@ public class LitePullConsumerImpl implements 
LitePullConsumer {
 
     }
 
+    void pullImmediately(PullTask pullTask) {
+        scheduleService.schedule(new Runnable() {
+            @Override
+            public void run() {
+                pullTask.run();
+            }
+        }, 0, TimeUnit.MILLISECONDS);
+    }
+
     void pullLater(PullTask pullTask, long delay, TimeUnit unit) {
         scheduleService.schedule(new Runnable() {
             @Override
@@ -205,7 +201,6 @@ public class LitePullConsumerImpl implements 
LitePullConsumer {
     }
 
     class PullTask implements Runnable {
-        private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;
 
         private final String tag;
         private final MessageQueue messageQueue;
@@ -232,10 +227,9 @@ public class LitePullConsumerImpl implements 
LitePullConsumer {
                 int batchNums = localMessageCache.nextPullBatchNums();
                 // If batchNums is zero, an exception will be thrown and then 
trigger a delay
                 if (batchNums <= 0) {
-                    final int delayTimeMillis = (int) 
getRetryPolicy(messageQueue).nextDelayDuration();
                     log.warn("Local cache is full, delay the pull task {} ms 
for message queue {}",
-                            delayTimeMillis, messageQueue);
-                    pullLater(PullTask.this, delayTimeMillis, 
TimeUnit.MILLISECONDS);
+                            PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, 
messageQueue);
+                    pullLater(PullTask.this, 
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, TimeUnit.MILLISECONDS);
                 }
 
                 rocketmqPullConsumer.pullBlockIfNotFound(this.messageQueue, 
this.tag, offset, batchNums, new PullCallback() {
@@ -257,28 +251,24 @@ public class LitePullConsumerImpl implements 
LitePullConsumer {
                                             
localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt, 
messageQueue, pq));
                                         }
                                         
localMessageCache.updatePullOffset(messageQueue, 
pullResult.getNextBeginOffset());
-                                        removeRetryPolicy(messageQueue);
-                                        executorService.submit(PullTask.this);
+                                        pullImmediately(PullTask.this);
                                     } else {
                                         
localMessageCache.removePullOffset(messageQueue);
                                         log.info("ProcessQueue {} dropped, 
discard the pulled message.", messageQueue);
-                                        removeRetryPolicy(messageQueue);
                                     }
                                     break;
                                 case OFFSET_ILLEGAL:
-                                    final int delayTimeMillis = (int) 
getRetryPolicy(messageQueue).nextDelayDuration();
                                     log.warn("The pull request offset is 
illegal, offset is {}, message queue is {}, " +
                                                     "pull result is {}, delay 
{} ms for next pull",
-                                            offset, messageQueue, pullResult, 
delayTimeMillis);
+                                            offset, messageQueue, pullResult, 
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                                     
localMessageCache.updatePullOffset(messageQueue, 
pullResult.getNextBeginOffset());
-                                    pullLater(PullTask.this, delayTimeMillis, 
TimeUnit.MILLISECONDS);
+                                    pullLater(PullTask.this, 
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, TimeUnit.MILLISECONDS);
                                     break;
                                 case NO_NEW_MSG:
                                 case NO_MATCHED_MSG:
                                     log.info("No NEW_MSG or MATCHED_MSG for 
mq:{}, pull again.", messageQueue);
                                     
localMessageCache.updatePullOffset(messageQueue, 
pullResult.getNextBeginOffset());
-                                    removeRetryPolicy(messageQueue);
-                                    executorService.submit(PullTask.this);
+                                    pullImmediately(PullTask.this);
                                     break;
                                 default:
                                     log.warn("Failed to process pullResult, 
mq:{} {}", messageQueue, pullResult);
@@ -292,17 +282,21 @@ public class LitePullConsumerImpl implements 
LitePullConsumer {
 
                     @Override
                     public void onException(Throwable e) {
-                        final int delayTimeMillis = (int) 
getRetryPolicy(messageQueue).nextDelayDuration();
+                        long delayTimeMillis = 0L;
+                        if (e instanceof MQBrokerException && 
((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) {
+                            delayTimeMillis = 
PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL;
+                        } else {
+                            delayTimeMillis = 
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION;
+                        }
                         log.error("Exception happens when pull message 
process, delay {} ms for message queue {}",
                                 delayTimeMillis, messageQueue, e);
                         pullLater(PullTask.this, delayTimeMillis, 
TimeUnit.MILLISECONDS);
                     }
                 });
             } catch (Throwable t) {
-                final int delayTimeMillis = (int) 
getRetryPolicy(messageQueue).nextDelayDuration();
                 log.error("Error occurs when pull message process, delay {} ms 
for message queue {}",
-                        delayTimeMillis, messageQueue, t);
-                pullLater(PullTask.this, delayTimeMillis, 
TimeUnit.MILLISECONDS);
+                        PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, messageQueue, t);
+                pullLater(PullTask.this, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, 
TimeUnit.MILLISECONDS);
             }
         }
 
diff --git 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java
 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java
index a352791..b4cf7a7 100644
--- 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java
+++ 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java
@@ -73,7 +73,7 @@ public class LocalMessageCache {
                     outerException.set(new 
RuntimeException("fetchConsumeOffsetFromBroker exception, please check rocketmq 
client for more details"));
                     return null;
                 }
-                if (offset == -1) {
+                if (offset == -1 || offset == 0) {
                     // Follow the CONSUME_FROM_WHERE to compute next pull 
offset
                     // But note that if broker thrown any unexpected runtime 
exception may cause offset rollback.
                     // We don't handle this risk because of MetaQ doesn't have 
any rpc hook
@@ -90,7 +90,7 @@ public class LocalMessageCache {
                             break;
                     }
                 }
-                if (offset >= 0) {
+                if (offset > 0) {
                     // Got an offset from offset store
                     return offset;
                 }
diff --git 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/RetryPolicy.java
 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/RetryPolicy.java
deleted file mode 100644
index 6da2a81..0000000
--- 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/RetryPolicy.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.rocketmq.eventbridge.adapter.storage.rocketmq.runtimer.consumer;
-
-/**
- * @Author changfeng
- * @Date 2023/4/9 10:10 上午
- */
-public interface RetryPolicy {
-    long nextDelayDuration();
-}

Reply via email to