http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java new file mode 100644 index 0000000..84b6c2d --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.rocketmq.promise; + +public enum FutureState { + /** + * the task is doing + **/ + DOING(0), + /** + * the task is done + **/ + DONE(1), + /** + * ths task is cancelled + **/ + CANCELLED(2); + + public final int value; + + private FutureState(int value) { + this.value = value; + } + + public boolean isCancelledState() { + return this == CANCELLED; + } + + public boolean isDoneState() { + return this == DONE; + } + + public boolean isDoingState() { + return this == DOING; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java new file mode 100644 index 0000000..104d3d9 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.utils; + +import io.openmessaging.KeyValue; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.log.ClientLogger; +import org.slf4j.Logger; + +public final class BeanUtils { + final static Logger log = ClientLogger.getLog(); + + /** + * Maps primitive {@code Class}es to their corresponding wrapper {@code Class}. + */ + private static Map<Class<?>, Class<?>> primitiveWrapperMap = new HashMap<Class<?>, Class<?>>(); + + static { + primitiveWrapperMap.put(Boolean.TYPE, Boolean.class); + primitiveWrapperMap.put(Byte.TYPE, Byte.class); + primitiveWrapperMap.put(Character.TYPE, Character.class); + primitiveWrapperMap.put(Short.TYPE, Short.class); + primitiveWrapperMap.put(Integer.TYPE, Integer.class); + primitiveWrapperMap.put(Long.TYPE, Long.class); + primitiveWrapperMap.put(Double.TYPE, Double.class); + primitiveWrapperMap.put(Float.TYPE, Float.class); + primitiveWrapperMap.put(Void.TYPE, Void.TYPE); + } + + private static Map<Class<?>, Class<?>> wrapperMap = new HashMap<Class<?>, Class<?>>(); + + static { + for (final Class<?> primitiveClass : primitiveWrapperMap.keySet()) { + final Class<?> wrapperClass = primitiveWrapperMap.get(primitiveClass); + if (!primitiveClass.equals(wrapperClass)) { + wrapperMap.put(wrapperClass, primitiveClass); + } + } + wrapperMap.put(String.class, String.class); + } + + /** + * <p>Populate the JavaBeans properties of the specified bean, based on + * the specified name/value pairs. This method uses Java reflection APIs + * to identify corresponding "property setter" method names, and deals + * with setter arguments of type <Code>String</Code>, <Code>boolean</Code>, + * <Code>int</Code>, <Code>long</Code>, <Code>float</Code>, and + * <Code>double</Code>.</p> + * + * <p>The particular setter method to be called for each property is + * determined using the usual JavaBeans introspection mechanisms. Thus, + * you may identify custom setter methods using a BeanInfo class that is + * associated with the class of the bean itself. If no such BeanInfo + * class is available, the standard method name conversion ("set" plus + * the capitalized name of the property in question) is used.</p> + * + * <p><strong>NOTE</strong>: It is contrary to the JavaBeans Specification + * to have more than one setter method (with different argument + * signatures) for the same property.</p> + * + * @param clazz JavaBean class whose properties are being populated + * @param properties Map keyed by property name, with the corresponding (String or String[]) value(s) to be set + * @param <T> Class type + * @return Class instance + */ + public static <T> T populate(final Properties properties, final Class<T> clazz) { + T obj = null; + try { + obj = clazz.newInstance(); + return populate(properties, obj); + } catch (Throwable e) { + log.warn("Error occurs !", e); + } + return obj; + } + + public static <T> T populate(final KeyValue properties, final Class<T> clazz) { + T obj = null; + try { + obj = clazz.newInstance(); + return populate(properties, obj); + } catch (Throwable e) { + log.warn("Error occurs !", e); + } + return obj; + } + + public static Class<?> getMethodClass(Class<?> clazz, String methodName) { + Method[] methods = clazz.getMethods(); + for (Method method : methods) { + if (method.getName().equalsIgnoreCase(methodName)) { + return method.getParameterTypes()[0]; + } + } + return null; + } + + public static void setProperties(Class<?> clazz, Object obj, String methodName, + Object value) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + Class<?> parameterClass = getMethodClass(clazz, methodName); + Method setterMethod = clazz.getMethod(methodName, parameterClass); + if (parameterClass == Boolean.TYPE) { + setterMethod.invoke(obj, Boolean.valueOf(value.toString())); + } else if (parameterClass == Integer.TYPE) { + setterMethod.invoke(obj, Integer.valueOf(value.toString())); + } else if (parameterClass == Double.TYPE) { + setterMethod.invoke(obj, Double.valueOf(value.toString())); + } else if (parameterClass == Float.TYPE) { + setterMethod.invoke(obj, Float.valueOf(value.toString())); + } else if (parameterClass == Long.TYPE) { + setterMethod.invoke(obj, Long.valueOf(value.toString())); + } else + setterMethod.invoke(obj, value); + } + + public static <T> T populate(final Properties properties, final T obj) { + Class<?> clazz = obj.getClass(); + try { + + Set<Map.Entry<Object, Object>> entries = properties.entrySet(); + for (Map.Entry<Object, Object> entry : entries) { + String entryKey = entry.getKey().toString(); + String[] keyGroup = entryKey.split("\\."); + for (int i = 0; i < keyGroup.length; i++) { + keyGroup[i] = keyGroup[i].toLowerCase(); + keyGroup[i] = StringUtils.capitalize(keyGroup[i]); + } + String beanFieldNameWithCapitalization = StringUtils.join(keyGroup); + try { + setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, entry.getValue()); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) { + //ignored... + } + } + } catch (RuntimeException e) { + log.warn("Error occurs !", e); + } + return obj; + } + + public static <T> T populate(final KeyValue properties, final T obj) { + Class<?> clazz = obj.getClass(); + try { + + final Set<String> keySet = properties.keySet(); + for (String key : keySet) { + String[] keyGroup = key.split("\\."); + for (int i = 0; i < keyGroup.length; i++) { + keyGroup[i] = keyGroup[i].toLowerCase(); + keyGroup[i] = StringUtils.capitalize(keyGroup[i]); + } + String beanFieldNameWithCapitalization = StringUtils.join(keyGroup); + try { + setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, properties.getString(key)); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) { + //ignored... + } + } + } catch (RuntimeException e) { + log.warn("Error occurs !", e); + } + return obj; + } +} + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java new file mode 100644 index 0000000..60c8408 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.utils; + +import io.openmessaging.BytesMessage; +import io.openmessaging.KeyValue; +import io.openmessaging.MessageHeader; +import io.openmessaging.OMS; +import io.openmessaging.SendResult; +import io.openmessaging.rocketmq.domain.BytesMessageImpl; +import io.openmessaging.rocketmq.domain.NonStandardKeys; +import io.openmessaging.rocketmq.domain.SendResultImpl; +import java.lang.reflect.Field; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageAccessor; + +public class OMSUtil { + + /** + * Builds a OMS client instance name. + * + * @return a unique instance name + */ + public static String buildInstanceName() { + return Integer.toString(UtilAll.getPid()) + "%OpenMessaging" + "%" + System.nanoTime(); + } + + public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) { + org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message(); + rmqMessage.setBody(omsMessage.getBody()); + + KeyValue headers = omsMessage.headers(); + KeyValue properties = omsMessage.properties(); + + //All destinations in RocketMQ use Topic + if (headers.containsKey(MessageHeader.TOPIC)) { + rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC)); + rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); + } else { + rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE)); + rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE"); + } + + for (String key : properties.keySet()) { + MessageAccessor.putProperty(rmqMessage, key, properties.getString(key)); + } + + //Headers has a high priority + for (String key : headers.keySet()) { + MessageAccessor.putProperty(rmqMessage, key, headers.getString(key)); + } + + return rmqMessage; + } + + public static BytesMessage msgConvert(org.apache.rocketmq.common.message.MessageExt rmqMsg) { + BytesMessage omsMsg = new BytesMessageImpl(); + omsMsg.setBody(rmqMsg.getBody()); + + KeyValue headers = omsMsg.headers(); + KeyValue properties = omsMsg.properties(); + + final Set<Map.Entry<String, String>> entries = rmqMsg.getProperties().entrySet(); + + for (final Map.Entry<String, String> entry : entries) { + if (isOMSHeader(entry.getKey())) { + headers.put(entry.getKey(), entry.getValue()); + } else { + properties.put(entry.getKey(), entry.getValue()); + } + } + + omsMsg.putHeaders(MessageHeader.MESSAGE_ID, rmqMsg.getMsgId()); + if (!rmqMsg.getProperties().containsKey(NonStandardKeys.MESSAGE_DESTINATION) || + rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC")) { + omsMsg.putHeaders(MessageHeader.TOPIC, rmqMsg.getTopic()); + } else { + omsMsg.putHeaders(MessageHeader.QUEUE, rmqMsg.getTopic()); + } + omsMsg.putHeaders(MessageHeader.SEARCH_KEY, rmqMsg.getKeys()); + omsMsg.putHeaders(MessageHeader.BORN_HOST, String.valueOf(rmqMsg.getBornHost())); + omsMsg.putHeaders(MessageHeader.BORN_TIMESTAMP, rmqMsg.getBornTimestamp()); + omsMsg.putHeaders(MessageHeader.STORE_HOST, String.valueOf(rmqMsg.getStoreHost())); + omsMsg.putHeaders(MessageHeader.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp()); + return omsMsg; + } + + public static boolean isOMSHeader(String value) { + for (Field field : MessageHeader.class.getDeclaredFields()) { + try { + if (field.get(MessageHeader.class).equals(value)) { + return true; + } + } catch (IllegalAccessException e) { + return false; + } + } + return false; + } + + /** + * Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult. + */ + public static SendResult sendResultConvert(org.apache.rocketmq.client.producer.SendResult rmqResult) { + assert rmqResult.getSendStatus().equals(SendStatus.SEND_OK); + return new SendResultImpl(rmqResult.getMsgId(), OMS.newKeyValue()); + } + + public static KeyValue buildKeyValue(KeyValue... keyValues) { + KeyValue keyValue = OMS.newKeyValue(); + for (KeyValue properties : keyValues) { + for (String key : properties.keySet()) { + keyValue.put(key, properties.getString(key)); + } + } + return keyValue; + } + + /** + * Returns an iterator that cycles indefinitely over the elements of {@code Iterable}. + */ + public static <T> Iterator<T> cycle(final Iterable<T> iterable) { + return new Iterator<T>() { + Iterator<T> iterator = new Iterator<T>() { + @Override + public synchronized boolean hasNext() { + return false; + } + + @Override + public synchronized T next() { + throw new NoSuchElementException(); + } + + @Override + public synchronized void remove() { + //Ignore + } + }; + + @Override + public synchronized boolean hasNext() { + return iterator.hasNext() || iterable.iterator().hasNext(); + } + + @Override + public synchronized T next() { + if (!iterator.hasNext()) { + iterator = iterable.iterator(); + if (!iterator.hasNext()) { + throw new NoSuchElementException(); + } + } + return iterator.next(); + } + + @Override + public synchronized void remove() { + iterator.remove(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java new file mode 100644 index 0000000..ae4d3ed --- /dev/null +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.consumer; + +import io.openmessaging.rocketmq.config.ClientConfig; +import io.openmessaging.rocketmq.domain.ConsumeRequest; +import io.openmessaging.rocketmq.domain.NonStandardKeys; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class LocalMessageCacheTest { + private LocalMessageCache localMessageCache; + @Mock + private DefaultMQPullConsumer rocketmqPullConsume; + @Mock + private ConsumeRequest consumeRequest; + + @Before + public void init() { + ClientConfig clientConfig = new ClientConfig(); + clientConfig.setRmqPullMessageBatchNums(512); + clientConfig.setRmqPullMessageCacheCapacity(1024); + localMessageCache = new LocalMessageCache(rocketmqPullConsume, clientConfig); + } + + @Test + public void testNextPullBatchNums() throws Exception { + assertThat(localMessageCache.nextPullBatchNums()).isEqualTo(512); + for (int i = 0; i < 513; i++) { + localMessageCache.submitConsumeRequest(consumeRequest); + } + assertThat(localMessageCache.nextPullBatchNums()).isEqualTo(511); + } + + @Test + public void testNextPullOffset() throws Exception { + MessageQueue messageQueue = new MessageQueue(); + when(rocketmqPullConsume.fetchConsumeOffset(any(MessageQueue.class), anyBoolean())) + .thenReturn(123L); + assertThat(localMessageCache.nextPullOffset(new MessageQueue())).isEqualTo(123L); + } + + @Test + public void testUpdatePullOffset() throws Exception { + MessageQueue messageQueue = new MessageQueue(); + localMessageCache.updatePullOffset(messageQueue, 124L); + assertThat(localMessageCache.nextPullOffset(messageQueue)).isEqualTo(124L); + } + + @Test + public void testSubmitConsumeRequest() throws Exception { + byte [] body = new byte[]{'1', '2', '3'}; + MessageExt consumedMsg = new MessageExt(); + consumedMsg.setMsgId("NewMsgId"); + consumedMsg.setBody(body); + consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); + consumedMsg.setTopic("HELLO_QUEUE"); + + when(consumeRequest.getMessageExt()).thenReturn(consumedMsg); + localMessageCache.submitConsumeRequest(consumeRequest); + assertThat(localMessageCache.poll()).isEqualTo(consumedMsg); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java new file mode 100644 index 0000000..277a5c6 --- /dev/null +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.consumer; + +import io.openmessaging.BytesMessage; +import io.openmessaging.Message; +import io.openmessaging.MessageHeader; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.MessagingAccessPointFactory; +import io.openmessaging.OMS; +import io.openmessaging.PropertyKeys; +import io.openmessaging.PullConsumer; +import io.openmessaging.rocketmq.config.ClientConfig; +import io.openmessaging.rocketmq.domain.NonStandardKeys; +import java.lang.reflect.Field; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.common.message.MessageExt; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class PullConsumerImplTest { + private PullConsumer consumer; + private String queueName = "HELLO_QUEUE"; + + @Mock + private DefaultMQPullConsumer rocketmqPullConsumer; + private LocalMessageCache localMessageCache = + spy(new LocalMessageCache(rocketmqPullConsumer, new ClientConfig())); + + @Before + public void init() throws NoSuchFieldException, IllegalAccessException { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + consumer = messagingAccessPoint.createPullConsumer(queueName, + OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup")); + + Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer"); + field.setAccessible(true); + field.set(consumer, rocketmqPullConsumer); //Replace + + field = PullConsumerImpl.class.getDeclaredField("localMessageCache"); + field.setAccessible(true); + field.set(consumer, localMessageCache); + + messagingAccessPoint.startup(); + consumer.startup(); + } + + @Test + public void testPoll() { + final byte[] testBody = new byte[] {'a', 'b'}; + MessageExt consumedMsg = new MessageExt(); + consumedMsg.setMsgId("NewMsgId"); + consumedMsg.setBody(testBody); + consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); + consumedMsg.setTopic(queueName); + + when(localMessageCache.poll()).thenReturn(consumedMsg); + + Message message = consumer.poll(); + assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId"); + assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody); + } + + @Test + public void testPoll_WithTimeout() { + //There is a default timeout value, @see ClientConfig#omsOperationTimeout. + Message message = consumer.poll(); + assertThat(message).isNull(); + + message = consumer.poll(OMS.newKeyValue().put(PropertyKeys.OPERATION_TIMEOUT, 100)); + assertThat(message).isNull(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java new file mode 100644 index 0000000..882e57e --- /dev/null +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.consumer; + +import io.openmessaging.BytesMessage; +import io.openmessaging.Message; +import io.openmessaging.MessageHeader; +import io.openmessaging.MessageListener; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.MessagingAccessPointFactory; +import io.openmessaging.OMS; +import io.openmessaging.PushConsumer; +import io.openmessaging.ReceivedMessageContext; +import io.openmessaging.rocketmq.domain.NonStandardKeys; +import java.lang.reflect.Field; +import java.util.Collections; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class PushConsumerImplTest { + private PushConsumer consumer; + + @Mock + private DefaultMQPushConsumer rocketmqPushConsumer; + + @Before + public void init() throws NoSuchFieldException, IllegalAccessException { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + consumer = messagingAccessPoint.createPushConsumer( + OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup")); + + Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer"); + field.setAccessible(true); + DefaultMQPushConsumer innerConsumer = (DefaultMQPushConsumer) field.get(consumer); + field.set(consumer, rocketmqPushConsumer); //Replace + + when(rocketmqPushConsumer.getMessageListener()).thenReturn(innerConsumer.getMessageListener()); + messagingAccessPoint.startup(); + consumer.startup(); + } + + @Test + public void testConsumeMessage() { + final byte[] testBody = new byte[] {'a', 'b'}; + + MessageExt consumedMsg = new MessageExt(); + consumedMsg.setMsgId("NewMsgId"); + consumedMsg.setBody(testBody); + consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); + consumedMsg.setTopic("HELLO_QUEUE"); + consumer.attachQueue("HELLO_QUEUE", new MessageListener() { + @Override + public void onMessage(final Message message, final ReceivedMessageContext context) { + assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId"); + assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody); + context.ack(); + } + }); + ((MessageListenerConcurrently) rocketmqPushConsumer + .getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg), null); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java new file mode 100644 index 0000000..1db80c3 --- /dev/null +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.producer; + +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.MessagingAccessPointFactory; +import io.openmessaging.Producer; +import io.openmessaging.exception.OMSRuntimeException; +import java.lang.reflect.Field; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class ProducerImplTest { + private Producer producer; + + @Mock + private DefaultMQProducer rocketmqProducer; + + @Before + public void init() throws NoSuchFieldException, IllegalAccessException { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + producer = messagingAccessPoint.createProducer(); + + Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer"); + field.setAccessible(true); + field.set(producer, rocketmqProducer); + + messagingAccessPoint.startup(); + producer.startup(); + } + + @Test + public void testSend_OK() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + SendResult sendResult = new SendResult(); + sendResult.setMsgId("TestMsgID"); + sendResult.setSendStatus(SendStatus.SEND_OK); + when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult); + io.openmessaging.SendResult omsResult = + producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'})); + + assertThat(omsResult.messageId()).isEqualTo("TestMsgID"); + } + + @Test + public void testSend_Not_OK() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + SendResult sendResult = new SendResult(); + sendResult.setSendStatus(SendStatus.FLUSH_DISK_TIMEOUT); + + when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult); + try { + producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'})); + failBecauseExceptionWasNotThrown(OMSRuntimeException.class); + } catch (Exception e) { + assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed."); + } + } + + @Test + public void testSend_WithException() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + when(rocketmqProducer.send(any(Message.class), anyLong())).thenThrow(MQClientException.class); + try { + producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'})); + failBecauseExceptionWasNotThrown(OMSRuntimeException.class); + } catch (Exception e) { + assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed."); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java new file mode 100644 index 0000000..823fe01 --- /dev/null +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.producer; + +import io.openmessaging.BytesMessage; +import io.openmessaging.MessageHeader; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.MessagingAccessPointFactory; +import io.openmessaging.SequenceProducer; +import java.lang.reflect.Field; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class SequenceProducerImplTest { + + private SequenceProducer producer; + + @Mock + private DefaultMQProducer rocketmqProducer; + + @Before + public void init() throws NoSuchFieldException, IllegalAccessException { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + producer = messagingAccessPoint.createSequenceProducer(); + + Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer"); + field.setAccessible(true); + field.set(producer, rocketmqProducer); + + messagingAccessPoint.startup(); + producer.startup(); + } + + @Test + public void testSend_WithCommit() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + SendResult sendResult = new SendResult(); + sendResult.setMsgId("TestMsgID"); + sendResult.setSendStatus(SendStatus.SEND_OK); + when(rocketmqProducer.send(ArgumentMatchers.<Message>anyList())).thenReturn(sendResult); + when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024); + final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}); + producer.send(message); + producer.commit(); + assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("TestMsgID"); + } + + @Test + public void testRollback() { + when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024); + final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}); + producer.send(message); + producer.rollback(); + producer.commit(); //Commit nothing. + assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo(null); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java new file mode 100644 index 0000000..2240ff2 --- /dev/null +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.promise; + +import io.openmessaging.Promise; +import io.openmessaging.PromiseListener; +import io.openmessaging.exception.OMSRuntimeException; +import org.junit.Before; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; + +public class DefaultPromiseTest { + private Promise<String> promise; + + @Before + public void init() { + promise = new DefaultPromise<>(); + } + + @Test + public void testIsCancelled() throws Exception { + assertThat(promise.isCancelled()).isEqualTo(false); + } + + @Test + public void testIsDone() throws Exception { + assertThat(promise.isDone()).isEqualTo(false); + promise.set("Done"); + assertThat(promise.isDone()).isEqualTo(true); + } + + @Test + public void testGet() throws Exception { + promise.set("Done"); + assertThat(promise.get()).isEqualTo("Done"); + } + + @Test + public void testGet_WithTimeout() throws Exception { + try { + promise.get(100); + failBecauseExceptionWasNotThrown(OMSRuntimeException.class); + } catch (OMSRuntimeException e) { + assertThat(e).hasMessageContaining("Get request result is timeout or interrupted"); + } + } + + @Test + public void testAddListener() throws Exception { + promise.addListener(new PromiseListener<String>() { + @Override + public void operationCompleted(final Promise<String> promise) { + assertThat(promise.get()).isEqualTo("Done"); + } + + @Override + public void operationFailed(final Promise<String> promise) { + + } + }); + promise.set("Done"); + } + + @Test + public void testAddListener_ListenerAfterSet() throws Exception { + promise.set("Done"); + promise.addListener(new PromiseListener<String>() { + @Override + public void operationCompleted(final Promise<String> promise) { + assertThat(promise.get()).isEqualTo("Done"); + } + + @Override + public void operationFailed(final Promise<String> promise) { + + } + }); + } + + @Test + public void testAddListener_WithException_ListenerAfterSet() throws Exception { + final Throwable exception = new OMSRuntimeException("-1", "Test Error"); + promise.setFailure(exception); + promise.addListener(new PromiseListener<String>() { + @Override + public void operationCompleted(final Promise<String> promise) { + } + + @Override + public void operationFailed(final Promise<String> promise) { + assertThat(promise.getThrowable()).isEqualTo(exception); + } + }); + } + + @Test + public void testAddListener_WithException() throws Exception { + final Throwable exception = new OMSRuntimeException("-1", "Test Error"); + promise.addListener(new PromiseListener<String>() { + @Override + public void operationCompleted(final Promise<String> promise) { + } + + @Override + public void operationFailed(final Promise<String> promise) { + assertThat(promise.getThrowable()).isEqualTo(exception); + } + }); + promise.setFailure(exception); + } + + @Test + public void getThrowable() throws Exception { + assertThat(promise.getThrowable()).isNull(); + Throwable exception = new OMSRuntimeException("-1", "Test Error"); + promise.setFailure(exception); + assertThat(promise.getThrowable()).isEqualTo(exception); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java new file mode 100644 index 0000000..71ca11c --- /dev/null +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.utils; + +import io.openmessaging.KeyValue; +import io.openmessaging.OMS; +import io.openmessaging.rocketmq.config.ClientConfig; +import io.openmessaging.rocketmq.domain.NonStandardKeys; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class BeanUtilsTest { + private KeyValue properties = OMS.newKeyValue(); + + public static class CustomizedConfig extends ClientConfig { + final static String STRING_TEST = "string.test"; + String stringTest = "foobar"; + + final static String DOUBLE_TEST = "double.test"; + double doubleTest = 123.0; + + final static String LONG_TEST = "long.test"; + long longTest = 123L; + + String getStringTest() { + return stringTest; + } + + public void setStringTest(String stringTest) { + this.stringTest = stringTest; + } + + double getDoubleTest() { + return doubleTest; + } + + public void setDoubleTest(final double doubleTest) { + this.doubleTest = doubleTest; + } + + long getLongTest() { + return longTest; + } + + public void setLongTest(final long longTest) { + this.longTest = longTest; + } + + CustomizedConfig() { + } + } + + @Before + public void init() { + properties.put(NonStandardKeys.MAX_REDELIVERY_TIMES, 120); + properties.put(CustomizedConfig.STRING_TEST, "kaka"); + properties.put(NonStandardKeys.CONSUMER_GROUP, "Default_Consumer_Group"); + properties.put(NonStandardKeys.MESSAGE_CONSUME_TIMEOUT, 101); + + properties.put(CustomizedConfig.LONG_TEST, 1234567890L); + properties.put(CustomizedConfig.DOUBLE_TEST, 10.234); + } + + @Test + public void testPopulate() { + CustomizedConfig config = BeanUtils.populate(properties, CustomizedConfig.class); + + //RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class); + Assert.assertEquals(config.getRmqMaxRedeliveryTimes(), 120); + Assert.assertEquals(config.getStringTest(), "kaka"); + Assert.assertEquals(config.getRmqConsumerGroup(), "Default_Consumer_Group"); + Assert.assertEquals(config.getRmqMessageConsumeTimeout(), 101); + Assert.assertEquals(config.getLongTest(), 1234567890L); + Assert.assertEquals(config.getDoubleTest(), 10.234, 0.000001); + } + + @Test + public void testPopulate_ExistObj() { + CustomizedConfig config = new CustomizedConfig(); + config.setOmsConsumerId("NewConsumerId"); + + Assert.assertEquals(config.getOmsConsumerId(), "NewConsumerId"); + + config = BeanUtils.populate(properties, config); + + //RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class); + Assert.assertEquals(config.getRmqMaxRedeliveryTimes(), 120); + Assert.assertEquals(config.getStringTest(), "kaka"); + Assert.assertEquals(config.getRmqConsumerGroup(), "Default_Consumer_Group"); + Assert.assertEquals(config.getRmqMessageConsumeTimeout(), 101); + Assert.assertEquals(config.getLongTest(), 1234567890L); + Assert.assertEquals(config.getDoubleTest(), 10.234, 0.000001); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 05ead63..25e4c84 100644 --- a/pom.xml +++ b/pom.xml @@ -181,6 +181,7 @@ <module>filter</module> <module>test</module> <module>distribution</module> + <module>openmessaging</module> </modules> <build> @@ -617,6 +618,11 @@ <artifactId>guava</artifactId> <version>19.0</version> </dependency> + <dependency> + <groupId>io.openmessaging</groupId> + <artifactId>openmessaging-api</artifactId> + <version>0.1.0-alpha</version> + </dependency> </dependencies> </dependencyManagement> </project>