Remove diamond operator for client module with JDK 1.6
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/8d781757 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/8d781757 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/8d781757 Branch: refs/heads/release-4.1.0-incubating Commit: 8d781757dfb56ebd95b0494e8e2c87422c22d767 Parents: e57f9ac Author: dongeforever <zhendongli...@yeah.net> Authored: Sat May 27 14:09:08 2017 +0800 Committer: dongeforever <zhendongli...@yeah.net> Committed: Sat May 27 14:09:08 2017 +0800 ---------------------------------------------------------------------- .../AllocateMessageQueueConsistentHash.java | 8 +++--- .../AllocateMessageQueueConsitentHashTest.java | 27 ++++++++++---------- .../consistenthash/ConsistentHashRouter.java | 4 +-- .../org/apache/rocketmq/store/CommitLog.java | 4 +-- 4 files changed, 22 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8d781757/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java index 77198b7..09d940a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java @@ -76,19 +76,19 @@ public class AllocateMessageQueueConsistentHash implements AllocateMessageQueue } - Collection<ClientNode> cidNodes = new ArrayList<>(); + Collection<ClientNode> cidNodes = new ArrayList<ClientNode>(); for (String cid : cidAll) { cidNodes.add(new ClientNode(cid)); } final ConsistentHashRouter<ClientNode> router; //for building hash ring if (customHashFunction != null) { - router = new ConsistentHashRouter<>(cidNodes, virtualNodeCnt, customHashFunction); + router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction); } else { - router = new ConsistentHashRouter<>(cidNodes, virtualNodeCnt); + router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt); } - List<MessageQueue> results = new ArrayList<>(); + List<MessageQueue> results = new ArrayList<MessageQueue>(); for (MessageQueue mq : mqAll) { ClientNode clientNode = router.routeNode(mq.toString()); if (clientNode != null && currentCID.equals(clientNode.getKey())) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8d781757/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java ---------------------------------------------------------------------- diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java index fc7ab9f..e9e5db7 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Random; import java.util.TreeMap; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.junit.Assert; import org.junit.Before; @@ -113,13 +114,13 @@ public class AllocateMessageQueueConsitentHashTest { //System.out.println("mqAll:" + mqAll.toString()); List<String> cidAll = createConsumerIdList(consumerSize); - List<MessageQueue> allocatedResAll = new ArrayList<>(); + List<MessageQueue> allocatedResAll = new ArrayList<MessageQueue>(); - Map<MessageQueue, String> allocateToAllOrigin = new TreeMap<>(); + Map<MessageQueue, String> allocateToAllOrigin = new TreeMap<MessageQueue, String>(); //test allocate all { - List<String> cidBegin = new ArrayList<>(cidAll); + List<String> cidBegin = new ArrayList<String>(cidAll); //System.out.println("cidAll:" + cidBegin.toString()); for (String cid : cidBegin) { @@ -135,13 +136,13 @@ public class AllocateMessageQueueConsitentHashTest { verifyAllocateAll(cidBegin,mqAll, allocatedResAll)); } - Map<MessageQueue, String> allocateToAllAfterRemoveOne = new TreeMap<>(); - List<String> cidAfterRemoveOne = new ArrayList<>(cidAll); + Map<MessageQueue, String> allocateToAllAfterRemoveOne = new TreeMap<MessageQueue, String>(); + List<String> cidAfterRemoveOne = new ArrayList<String>(cidAll); //test allocate remove one cid { String removeCID = cidAfterRemoveOne.remove(0); //System.out.println("removing one cid "+removeCID); - List<MessageQueue> mqShouldOnlyChanged = new ArrayList<>(); + List<MessageQueue> mqShouldOnlyChanged = new ArrayList<MessageQueue>(); Iterator<Map.Entry<MessageQueue, String>> it = allocateToAllOrigin.entrySet().iterator(); while (it.hasNext()) { Map.Entry<MessageQueue, String> entry = it.next(); @@ -151,7 +152,7 @@ public class AllocateMessageQueueConsitentHashTest { } //System.out.println("cidAll:" + cidAfterRemoveOne.toString()); - List<MessageQueue> allocatedResAllAfterRemove = new ArrayList<>(); + List<MessageQueue> allocatedResAllAfterRemove = new ArrayList<MessageQueue>(); for (String cid : cidAfterRemoveOne) { List<MessageQueue> rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidAfterRemoveOne); allocatedResAllAfterRemove.addAll(rs); @@ -166,16 +167,16 @@ public class AllocateMessageQueueConsitentHashTest { verifyAfterRemove(allocateToAllOrigin, allocateToAllAfterRemoveOne, removeCID); } - List<String> cidAfterAdd = new ArrayList<>(cidAfterRemoveOne); + List<String> cidAfterAdd = new ArrayList<String>(cidAfterRemoveOne); //test allocate add one more cid { String newCid = CID_PREFIX+"NEW"; //System.out.println("add one more cid "+newCid); cidAfterAdd.add(newCid); - List<MessageQueue> mqShouldOnlyChanged = new ArrayList<>(); + List<MessageQueue> mqShouldOnlyChanged = new ArrayList<MessageQueue>(); //System.out.println("cidAll:" + cidAfterAdd.toString()); - List<MessageQueue> allocatedResAllAfterAdd = new ArrayList<>(); - Map<MessageQueue, String> allocateToAll3 = new TreeMap<>(); + List<MessageQueue> allocatedResAllAfterAdd = new ArrayList<MessageQueue>(); + Map<MessageQueue, String> allocateToAll3 = new TreeMap<MessageQueue, String>(); for (String cid : cidAfterAdd) { List<MessageQueue> rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidAfterAdd); allocatedResAllAfterAdd.addAll(rs); @@ -225,7 +226,7 @@ public class AllocateMessageQueueConsitentHashTest { } private List<String> createConsumerIdList(int size) { - List<String> consumerIdList = new ArrayList<>(size); + List<String> consumerIdList = new ArrayList<String>(size); for (int i = 0; i < size; i++) { consumerIdList.add(CID_PREFIX + String.valueOf(i)); } @@ -233,7 +234,7 @@ public class AllocateMessageQueueConsitentHashTest { } private List<MessageQueue> createMessageQueueList(int size) { - List<MessageQueue> messageQueueList = new ArrayList<>(size); + List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(size); for (int i = 0; i < size; i++) { MessageQueue mq = new MessageQueue(topic, "brokerName", i); messageQueueList.add(mq); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8d781757/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java b/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java index 8606c43..a6fce51 100644 --- a/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java +++ b/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java @@ -30,7 +30,7 @@ import java.util.TreeMap; * @param <T> */ public class ConsistentHashRouter<T extends Node> { - private final SortedMap<Long, VirtualNode<T>> ring = new TreeMap<>(); + private final SortedMap<Long, VirtualNode<T>> ring = new TreeMap<Long, VirtualNode<T>>(); private final HashFunction hashFunction; public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount) { @@ -64,7 +64,7 @@ public class ConsistentHashRouter<T extends Node> { if (vNodeCount < 0) throw new IllegalArgumentException("illegal virtual node counts :" + vNodeCount); int existingReplicas = getExistingReplicas(pNode); for (int i = 0; i < vNodeCount; i++) { - VirtualNode<T> vNode = new VirtualNode<>(pNode, i + existingReplicas); + VirtualNode<T> vNode = new VirtualNode<T>(pNode, i + existingReplicas); ring.put(hashFunction.hash(vNode.getKey()), vNode); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8d781757/store/src/main/java/org/apache/rocketmq/store/CommitLog.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 7b29263..b44211c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -722,7 +722,7 @@ public class CommitLog { messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); - lockForPutMessage(); //spin... + putMessageLock.lock(); try { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); this.beginTimeInLock = beginLockTimestamp; @@ -771,7 +771,7 @@ public class CommitLog { eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0; } finally { - releasePutMessageLock(); + putMessageLock.unlock(); }