odbozhou commented on a change in pull request #1240: [ISSUE #1199] Implement
the 1.0.0 openmessaging new consumer API for rocketmq oms module
URL: https://github.com/apache/rocketmq/pull/1240#discussion_r293643948
##########
File path:
openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
##########
@@ -57,77 +59,34 @@ public Producer createProducer() {
return null;
}
- @Override public Consumer createConsumer() {
- String consumerId =
accessPointProperties.getString(NonStandardKeys.CONSUMER_ID);
- String[] nsStrArr = consumerId.split("_");
- if (nsStrArr.length < 2) {
- return new PushConsumerImpl(accessPointProperties);
- }
- if (NonStandardKeys.PULL_CONSUMER.equals(nsStrArr[0])) {
- return new PullConsumerImpl(accessPointProperties);
- }
+ @Override public PushConsumer createPushConsumer() {
return new PushConsumerImpl(accessPointProperties);
}
- @Override
- public ResourceManager resourceManager() {
- DefaultResourceManager resourceManager = new DefaultResourceManager();
- return resourceManager;
- }
-
- @Override public MessageFactory messageFactory() {
- return null;
+ @Override public PullConsumer createPullConsumer() {
+ return new PullConsumerImpl(accessPointProperties);
}
- class DefaultResourceManager implements ResourceManager {
-
- @Override
- public void createNamespace(String nsName) {
- accessPointProperties.put(NonStandardKeys.CONSUMER_ID, nsName);
- }
-
- @Override
- public void deleteNamespace(String nsName) {
- accessPointProperties.put(NonStandardKeys.CONSUMER_ID, null);
- }
-
- @Override
- public void switchNamespace(String targetNamespace) {
- accessPointProperties.put(NonStandardKeys.CONSUMER_ID,
targetNamespace);
+ @Override public PushConsumer createPushConsumer(KeyValue attributes) {
+ for (String key : attributes.keySet()) {
+ accessPointProperties.put(key, attributes.getString(key));
}
+ return new PushConsumerImpl(accessPointProperties);
+ }
- @Override
- public Set<String> listNamespaces() {
- return new HashSet<String>() {
- {
-
add(accessPointProperties.getString(NonStandardKeys.CONSUMER_ID));
- }
- };
- }
-
- @Override
- public void createQueue(String queueName) {
-
- }
-
- @Override
- public void deleteQueue(String queueName) {
-
- }
-
- @Override
- public Set<String> listQueues(String nsName) {
- return null;
- }
-
- @Override
- public void filter(String queueName, String filterString) {
-
+ @Override public PullConsumer createPullConsumer(KeyValue attributes) {
+ for (String key : attributes.keySet()) {
+ accessPointProperties.put(key, attributes.getString(key));
}
+ return new PullConsumerImpl(accessPointProperties);
+ }
- @Override
- public void routing(String sourceQueue, String targetQueue) {
+ @Override
+ public ResourceManager resourceManager() {
Review comment:
The current PR is too big, and resourceManager will be implemented in the
next pr
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services