http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/Mutex.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/Mutex.cpp b/rocketmq-client4cpp/src/kpr/Mutex.cpp deleted file mode 100755 index f98282c..0000000 --- a/rocketmq-client4cpp/src/kpr/Mutex.cpp +++ /dev/null @@ -1,296 +0,0 @@ -/** - * Copyright (C) 2013 kangliqiang ,kang...@163.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "Mutex.h" - -#include <pthread.h> -#include <unistd.h> -#include <stdio.h> -#include <errno.h> -#include <time.h> - - -namespace kpr -{ -Mutex::Mutex() -{ - ::pthread_mutex_init(&m_mutex, NULL); -} - -Mutex::~Mutex() -{ - ::pthread_mutex_destroy(&m_mutex); -} - -void Mutex::Lock() const -{ - ::pthread_mutex_lock(&m_mutex); -} - -bool Mutex::TryLock() const -{ - int ret = ::pthread_mutex_trylock(&m_mutex); - return (ret == 0); -} - -bool Mutex::TryLock(int timeout) const -{ - struct timespec ts; - clock_gettime(CLOCK_REALTIME, &ts); - ts.tv_sec += (timeout/1000); - ts.tv_nsec += (timeout%1000) * 1000 * 1000; - - int ret = ::pthread_mutex_timedlock(&m_mutex, &ts); - return (ret == 0); -} - - -void Mutex::Unlock() const -{ - ::pthread_mutex_unlock(&m_mutex); -} - -//*********** -//RWMutex -//*************** -RWMutex::RWMutex() -{ - ::pthread_rwlock_init(&m_mutex, NULL); -} - -RWMutex::~RWMutex() -{ - ::pthread_rwlock_destroy(&m_mutex); -} - -void RWMutex::ReadLock() const -{ - ::pthread_rwlock_rdlock(&m_mutex); -} - -void RWMutex::WriteLock() const -{ - ::pthread_rwlock_wrlock(&m_mutex); -} - -bool RWMutex::TryReadLock() const -{ - int ret = ::pthread_rwlock_tryrdlock(&m_mutex); - return (ret == 0); -} - -bool RWMutex::TryReadLock(int timeout) const -{ - struct timespec ts; - clock_gettime(CLOCK_REALTIME, &ts); - ts.tv_sec += (timeout/1000); - ts.tv_nsec += (timeout%1000) * 1000 * 1000; - - int ret = ::pthread_rwlock_timedrdlock(&m_mutex, &ts); - return (ret == 0); -} - -bool RWMutex::TryWriteLock() const -{ - int ret = ::pthread_rwlock_trywrlock(&m_mutex); - return (ret == 0); -} - -bool RWMutex::TryWriteLock(int timeout) const -{ - struct timespec ts; - clock_gettime(CLOCK_REALTIME, &ts); - ts.tv_sec += (timeout/1000); - ts.tv_nsec += (timeout%1000) * 1000 * 1000; - - int ret = ::pthread_rwlock_timedwrlock(&m_mutex, &ts); - return (ret == 0); -} - - -void RWMutex::Unlock() const -{ - ::pthread_rwlock_unlock(&m_mutex); -} - - -//*********** -//RecursiveMutex -//*************** -RecursiveMutex::RecursiveMutex() - : m_count(0), - m_owner(ThreadId()) -{ - ::pthread_mutex_init(&m_mutex, NULL); -} - -RecursiveMutex::~RecursiveMutex() -{ - ::pthread_mutex_destroy(&m_mutex); -} - -bool RecursiveMutex::Lock()const -{ - return ((RecursiveMutex*)this)->lock(1); -} - -bool RecursiveMutex::Unlock()const -{ - return ((RecursiveMutex*)this)->unlock(); -} - -bool RecursiveMutex::TryLock()const -{ - return ((RecursiveMutex*)this)->tryLock(); -} - -ThreadId RecursiveMutex::GetOwner()const -{ - m_internal.Lock(); - ThreadId id; - if (m_count > 0) - { - id = m_owner; - } - m_internal.Unlock(); - - return id; -} - -bool RecursiveMutex::lock(int count) -{ - bool rc = false; - bool obtained = false; - - while (!obtained) - { - m_internal.Lock(); - - if (m_count == 0) - { - m_count = count; - m_owner = ThreadId::GetCurrentThreadId(); - obtained = true; - rc = true; - - try - { - ::pthread_mutex_lock(&m_mutex); - } - catch (...) - { - try - { - m_internal.Unlock(); - } - catch (...) - { - } - throw; - } - } - else if (m_owner == ThreadId::GetCurrentThreadId()) - { - m_count += count; - obtained = true; - } - - m_internal.Unlock(); - - if (!obtained) - { - ::pthread_mutex_lock(&m_mutex); - ::pthread_mutex_unlock(&m_mutex); - } - } - - return rc; -} - -bool RecursiveMutex::tryLock() -{ - bool obtained = false; - - m_internal.Lock(); - - if (m_count == 0) - { - m_count = 1; - m_owner = ThreadId::GetCurrentThreadId(); - obtained = true; - - try - { - ::pthread_mutex_lock(&m_mutex); - } - catch (...) - { - try - { - m_internal.Unlock(); - } - catch (...) - { - } - throw; - } - } - else if (m_owner == ThreadId::GetCurrentThreadId()) - { - ++m_count; - obtained = true; - } - - m_internal.Unlock(); - - return obtained; -} - -bool RecursiveMutex::unlock() -{ - bool rc; - m_internal.Lock(); - - if (--m_count == 0) - { - m_owner = ThreadId(); - - ::pthread_mutex_unlock(&m_mutex); - - rc = true; - } - else - { - rc = false; - } - - m_internal.Unlock(); - - return rc; -} - -unsigned int RecursiveMutex::reset4Condvar() -{ - m_internal.Lock(); - - unsigned int count = m_count; - m_count = 0; - m_owner = ThreadId(); - - m_internal.Unlock(); - - return count; -} -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/Mutex.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/Mutex.h b/rocketmq-client4cpp/src/kpr/Mutex.h deleted file mode 100755 index fc3498f..0000000 --- a/rocketmq-client4cpp/src/kpr/Mutex.h +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Copyright (C) 2013 kangliqiang ,kang...@163.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __KPR_MUTEX_H__ -#define __KPR_MUTEX_H__ - -#include "KPRTypes.h" -#include <errno.h> - -namespace kpr -{ -class Mutex -{ -public: - Mutex(); - ~Mutex(); - - void Lock()const; - void Unlock()const; - bool TryLock()const; - bool TryLock(int timeout) const; - - ThreadId GetOwner()const; - -private: - Mutex(const Mutex&); - const Mutex& operator=(const Mutex&); - - mutable pthread_mutex_t m_mutex; - friend class Condition; -}; - -class RWMutex -{ -public: - RWMutex(); - ~RWMutex(); - - void ReadLock()const; - void WriteLock()const; - bool TryReadLock()const; - bool TryReadLock(int timeout) const; - bool TryWriteLock()const; - bool TryWriteLock(int timeout)const; - void Unlock()const; - - ThreadId GetOwner()const; - -private: - RWMutex(const RWMutex&); - const RWMutex& operator=(const RWMutex&); - - mutable pthread_rwlock_t m_mutex; - friend class Condition; -}; - -class RecursiveMutex -{ -public: - RecursiveMutex(); - ~RecursiveMutex(); - - bool Lock()const; - bool Unlock()const; - bool TryLock()const; - - ThreadId GetOwner()const; - - unsigned int GetCount()const - { - return m_count; - } - -private: - RecursiveMutex(const RecursiveMutex&); - - const RecursiveMutex& operator=(const RecursiveMutex&); - - bool lock(int count); - bool tryLock(); - bool unlock(); - - unsigned int reset4Condvar(); - -private: - pthread_mutex_t m_mutex; - Mutex m_internal; - mutable unsigned int m_count; - mutable ThreadId m_owner; - - friend class Condition; - friend class ConditionHelper; -}; -} -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/RefHandle.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/RefHandle.h b/rocketmq-client4cpp/src/kpr/RefHandle.h deleted file mode 100644 index fd7d741..0000000 --- a/rocketmq-client4cpp/src/kpr/RefHandle.h +++ /dev/null @@ -1,328 +0,0 @@ -/** - * Copyright (C) 2013 kangliqiang ,kang...@163.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __KPR_REFHANDLET_H__ -#define __KPR_REFHANDLET_H__ - -#include "KPRTypes.h" -#include "AtomicValue.h" -#include "Exception.h" - -namespace kpr -{ - -class RefCount -{ -public: - RefCount& operator=(const RefCount&) - { - return *this; - } - - void incRef() - { - m_refCount++; - } - - void decRef() - { - if (--m_refCount == 0 && !m_noDelete) - { - m_noDelete = true; - delete this; - } - } - - int getRef() const - { - return m_refCount.get(); - } - - void setNoDelete(bool b) - { - m_noDelete = b; - } - -protected: - RefCount() - : m_refCount(0), m_noDelete(false) - { - } - - RefCount(const RefCount&) - : m_refCount(0), m_noDelete(false) - { - } - - virtual ~RefCount() - { - } - -protected: - AtomicInteger m_refCount; - bool m_noDelete; -}; - - - -template <class T> -class RefHandleT -{ -public: - RefHandleT(T* p = 0) - { - m_ptr = p; - - if (m_ptr) - { - m_ptr->incRef(); - } - } - - template<typename Y> - RefHandleT(const RefHandleT<Y>& v) - { - m_ptr = v.m_ptr; - - if (m_ptr) - { - m_ptr->incRef(); - } - } - - RefHandleT(const RefHandleT& v) - { - m_ptr = v.m_ptr; - - if (m_ptr) - { - m_ptr->incRef(); - } - } - - ~RefHandleT() - { - if (m_ptr) - { - m_ptr->decRef(); - } - } - - RefHandleT<T>& operator=(T* p) - { - if (m_ptr != p) - { - if (p) - { - p->incRef(); - } - - T* ptr = m_ptr; - m_ptr = p; - - if (ptr) - { - ptr->decRef(); - } - } - - return *this; - } - - template<typename Y> - RefHandleT<T>& operator=(const RefHandleT<Y>& v) - { - if (m_ptr != v.m_ptr) - { - if (v.m_ptr) - { - v.m_ptr->incRef(); - } - - T* ptr = m_ptr; - m_ptr = v.m_ptr; - - if (ptr) - { - ptr->decRef(); - } - } - - return *this; - } - - RefHandleT<T>& operator=(const RefHandleT<T>& v) - { - if (m_ptr != v.m_ptr) - { - if (v.m_ptr) - { - v.m_ptr->incRef(); - } - - T* ptr = m_ptr; - m_ptr = v.m_ptr; - - if (ptr) - { - ptr->decRef(); - } - } - - return *this; - } - - T* operator->() const - { - if (!m_ptr) - { - THROW_EXCEPTION(RefHandleNullException, "autoptr null handle error", -1); - } - - return m_ptr; - } - - T& operator*() const - { - if (!m_ptr) - { - THROW_EXCEPTION(RefHandleNullException, "autoptr null handle error", -1); - } - - return *m_ptr; - } - - operator T* () const - { - return m_ptr; - } - - T* ptr() const - { - return m_ptr; - } - - T* retn() - { - T* p = m_ptr; - m_ptr = 0; - - return p; - } - - bool operator==(const RefHandleT<T>& v) const - { - return m_ptr == v.m_ptr; - } - - bool operator==(T* p) const - { - return m_ptr == p; - } - - bool operator!=(const RefHandleT<T>& v) const - { - return m_ptr != v.m_ptr; - } - - bool operator!=(T* p) const - { - return m_ptr != p; - } - - bool operator!() const - { - return m_ptr == 0; - } - - operator bool() const - { - return m_ptr != 0; - } - - void swap(RefHandleT& other) - { - std::swap(m_ptr, other._ptr); - } - - template<class Y> - static RefHandleT dynamicCast(const RefHandleT<Y>& r) - { - return RefHandleT(dynamic_cast<T*>(r._ptr)); - } - - template<class Y> - static RefHandleT dynamicCast(Y* p) - { - return RefHandleT(dynamic_cast<T*>(p)); - } - -public: - T* m_ptr; -}; - - -template<typename T, typename U> -inline bool operator==(const RefHandleT<T>& lhs, const RefHandleT<U>& rhs) -{ - T* l = lhs.ptr(); - U* r = rhs.ptr(); - if(l && r) - { - return *l == *r; - } - else - { - return !l && !r; - } -} - - -template<typename T, typename U> -inline bool operator!=(const RefHandleT<T>& lhs, const RefHandleT<U>& rhs) -{ - T* l = lhs.ptr(); - U* r = rhs.ptr(); - if(l && r) - { - return *l != *r; - } - else - { - return l || r; - } -} - - -template<typename T, typename U> -inline bool operator<(const RefHandleT<T>& lhs, const RefHandleT<U>& rhs) -{ - T* l = lhs.ptr(); - U* r = rhs.ptr(); - if(l && r) - { - return *l < *r; - } - else - { - return !l && r; - } -} - -} - -#define DECLAREVAR(T) typedef kpr::RefHandleT<T> T ## Ptr; - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/ScopedLock.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/ScopedLock.h b/rocketmq-client4cpp/src/kpr/ScopedLock.h deleted file mode 100755 index 6ff9dd1..0000000 --- a/rocketmq-client4cpp/src/kpr/ScopedLock.h +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Copyright (C) 2013 kangliqiang ,kang...@163.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __KPR_SCOPEDLOCK_H__ -#define __KPR_SCOPEDLOCK_H__ - -namespace kpr -{ -template <class T> -class ScopedLock -{ -public: - ScopedLock(const T& mutex) - : m_mutex(mutex) - { - m_mutex.Lock(); - } - - ~ScopedLock() - { - m_mutex.Unlock(); - } - -private: - const T& m_mutex; -}; - - -template <class T> -class ScopedRLock -{ -public: - ScopedRLock(const T& mutex) - : m_mutex(mutex) - { - m_mutex.ReadLock(); - m_acquired = true; - } - - ~ScopedRLock() - { - if (m_acquired) - { - m_mutex.Unlock(); - } - } - -private: - const T& m_mutex; - mutable bool m_acquired; -}; - - -template <class T> -class ScopedWLock -{ -public: - ScopedWLock(const T& mutex) - : m_mutex(mutex) - { - m_mutex.WriteLock(); - m_acquired = true; - } - - ~ScopedWLock() - { - if (m_acquired) - { - m_mutex.Unlock(); - } - } - -private: - const T& m_mutex; - mutable bool m_acquired; -}; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/Semaphore.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/Semaphore.cpp b/rocketmq-client4cpp/src/kpr/Semaphore.cpp deleted file mode 100755 index 59a0eef..0000000 --- a/rocketmq-client4cpp/src/kpr/Semaphore.cpp +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Copyright (C) 2013 kangliqiang ,kang...@163.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "Semaphore.h" - -#include <unistd.h> -#include <sys/time.h> -#include "KPRUtil.h" - -namespace kpr -{ -Semaphore::Semaphore(long initial_count) -{ - sem_init(&m_sem, 0, initial_count); -} - -Semaphore::~Semaphore() -{ - sem_destroy(&m_sem); -} - -int Semaphore::GetValue() -{ - int value = 0; - int rc = sem_getvalue(&m_sem, &value); - if (rc < 0) - { - return rc; - } - return value; -} - -bool Semaphore::Wait() -{ - int rc; - rc = sem_wait(&m_sem); - return !rc; -} - -bool Semaphore::Wait(long timeout) -{ - int rc; - if (timeout < 0) - { - rc = sem_wait(&m_sem); - } - else - { - struct timespec abstime = KPRUtil::CalcAbsTime(timeout); - rc = sem_timedwait(&m_sem, &abstime); - } - - return !rc; -} - -void Semaphore::Release(int count) -{ - sem_post(&m_sem); -} -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/Semaphore.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/Semaphore.h b/rocketmq-client4cpp/src/kpr/Semaphore.h deleted file mode 100755 index 2a1af7f..0000000 --- a/rocketmq-client4cpp/src/kpr/Semaphore.h +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Copyright (C) 2013 kangliqiang ,kang...@163.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __KPR_SEMAPHORE_H__ -#define __KPR_SEMAPHORE_H__ - -#include "KPRTypes.h" -#include <errno.h> - -namespace kpr -{ - -class Semaphore -{ -public: - Semaphore(long initial_count = 0); - ~Semaphore(); - - int GetValue(); - bool Wait(); - bool Wait(long timeout); - - void Release(int count = 1); - -private: - sem_t m_sem; -}; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/Thread.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/Thread.cpp b/rocketmq-client4cpp/src/kpr/Thread.cpp deleted file mode 100755 index d80819b..0000000 --- a/rocketmq-client4cpp/src/kpr/Thread.cpp +++ /dev/null @@ -1,191 +0,0 @@ -/** - * Copyright (C) 2013 kangliqiang ,kang...@163.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "Thread.h" - -#include <string.h> -#include <stdlib.h> -#include <stdio.h> -#include <errno.h> -#include <assert.h> -#include <unistd.h> -#include <sys/types.h> -#include <signal.h> - -#include "ScopedLock.h" -#include "Exception.h" - -//for log -#include "RocketMQClient.h" - -namespace kpr -{ -kpr::AtomicInteger Thread::s_threadNumber = 0; - -void* Thread::ThreadRoute(void* pArg) -{ - Thread* tv = ((Thread*)pArg); - - try - { - tv->Startup(); - } - catch (...) - { - } - - try - { - tv->Cleanup(); - } - catch (...) - { - } - - return 0; -} - -Thread::Thread(const char* name) -{ - m_started = false; - m_threadId = ThreadId(); - m_threadNumber = s_threadNumber++; - - SetName(name); -} - -Thread::~Thread() -{ - try - { - } - catch (...) - { - } -} - -void Thread::SetName(const char* name) -{ - ScopedLock<Mutex> guard(m_mutex); - - if (name == NULL) - { - snprintf(m_name, sizeof(m_name), "Thread-%u", m_threadNumber); - } - else - { - snprintf(m_name, sizeof(m_name), "%s", name); - } -} - -const char* Thread::GetName() const -{ - ScopedLock<Mutex> guard(m_mutex); - return m_name; -} - -void Thread::Start() -{ - ScopedLock<Mutex> guard(m_mutex); - - if (m_started) - { - return; - } - - pthread_attr_t attr; - int retcode = 0; - retcode = pthread_attr_init(&attr); - if (retcode != 0) - { - THROW_EXCEPTION(SystemCallException, "pthread_attr_init failed!", errno) - } - - pthread_t id; - retcode = pthread_create(&id, &attr, ThreadRoute, (void*)this); - if (retcode != 0) - { - THROW_EXCEPTION(SystemCallException, "pthread_create error", errno) - } - - m_threadId = id; - pthread_attr_destroy(&attr); - m_started = true; - RMQ_DEBUG("thread[%s][%ld] start successfully", m_name, (long)id); -} - -void Thread::Run() -{ - //TODO support runable -} - -bool Thread::IsAlive() const -{ - if (m_started) - { - int retcode = pthread_kill(m_threadId, 0); - return (retcode == ESRCH); - } - - return false; -} - -void Thread::Join() -{ - if (m_started) - { - pthread_join(m_threadId, NULL); - } -} - -void Thread::Sleep(long millis, int nanos) -{ - assert(millis >= 0 && nanos >= 0 && nanos < 999999); - struct timespec tv; - tv.tv_sec = millis / 1000; - tv.tv_nsec = (millis % 1000) * 1000000 + nanos; - nanosleep(&tv, 0); -} - -void Thread::Yield() -{ - pthread_yield(); -} - -ThreadId Thread::GetId() const -{ - ScopedLock<Mutex> guard(m_mutex); - return m_threadId; -} - -void Thread::Startup() -{ - try - { - RMQ_INFO("thread[%s] started", GetName()); - Run(); - } - catch (...) - { - } -} - -void Thread::Cleanup() -{ - RMQ_INFO("thread[%s] end", GetName()); -} - -} - http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/Thread.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/Thread.h b/rocketmq-client4cpp/src/kpr/Thread.h deleted file mode 100755 index ef2590e..0000000 --- a/rocketmq-client4cpp/src/kpr/Thread.h +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Copyright (C) 2013 kangliqiang ,kang...@163.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __KPR_THREAD_H__ -#define __KPR_THREAD_H__ - -#include "KPRTypes.h" -#include "RefHandle.h" -#include "Mutex.h" - -#ifdef Yield -#undef Yield -#endif - -namespace kpr -{ -class Thread : public virtual kpr::RefCount -{ -public: - Thread(const char* name = NULL); - virtual ~Thread(); - - virtual void Run(); - void Start(); - bool IsAlive() const; - void Join(); - ThreadId GetId() const; - - void SetName(const char*); - const char* GetName() const; - - void Startup(); - void Cleanup(); - - static void Sleep(long millis, int nano = 0); - static void Yield(); - -private: - Thread(const Thread&); - const Thread& operator=(const Thread&); - static void* ThreadRoute(void* pArg); - -private: - ThreadId m_threadId; - unsigned int m_threadNumber; - char m_name[128]; - bool m_started; - Mutex m_mutex; - - static kpr::AtomicInteger s_threadNumber; -}; -typedef kpr::RefHandleT<Thread> ThreadPtr; - -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/ThreadLocal.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/ThreadLocal.cpp b/rocketmq-client4cpp/src/kpr/ThreadLocal.cpp deleted file mode 100755 index 32cba5b..0000000 --- a/rocketmq-client4cpp/src/kpr/ThreadLocal.cpp +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Copyright (C) 2013 kangliqiang ,kang...@163.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "ThreadLocal.h" - -#include <errno.h> - -#include "Exception.h" - -namespace kpr -{ -ThreadLocal::ThreadLocal() - : m_Key(0) -{ - int retcode = 0; - - retcode = pthread_key_create(&m_Key, 0); - if (retcode != 0) - { - THROW_EXCEPTION(SystemCallException, "pthread_key_create error", errno); - } -} - -ThreadLocal::~ThreadLocal() -{ - pthread_key_delete(m_Key); -} - -void* ThreadLocal::GetValue() -{ - void* v; - v = pthread_getspecific(m_Key); - return v; -} - -void ThreadLocal::SetValue(void* value) -{ - int retcode = pthread_setspecific(m_Key, value); - if (retcode != 0) - { - THROW_EXCEPTION(SystemCallException, "pthread_setspecific error", errno); - } -} -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/ThreadLocal.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/ThreadLocal.h b/rocketmq-client4cpp/src/kpr/ThreadLocal.h deleted file mode 100644 index 9ec8f43..0000000 --- a/rocketmq-client4cpp/src/kpr/ThreadLocal.h +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Copyright (C) 2013 kangliqiang ,kang...@163.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __KPR_THREADLOCAL_H__ -#define __KPR_THREADLOCAL_H__ - -#include "KPRTypes.h" - -namespace kpr -{ -class ThreadLocal -{ -public: - ThreadLocal(); - virtual ~ThreadLocal(); - - void* GetValue(); - void SetValue(void* value); - -private: - ThreadKey m_Key; -}; -}; - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/ThreadPool.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/ThreadPool.cpp b/rocketmq-client4cpp/src/kpr/ThreadPool.cpp deleted file mode 100755 index 32557a8..0000000 --- a/rocketmq-client4cpp/src/kpr/ThreadPool.cpp +++ /dev/null @@ -1,418 +0,0 @@ -/** - * Copyright (C) 2013 kangliqiang ,kang...@163.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "ThreadPool.h" - -#include "RocketMQClient.h" -#include "ScopedLock.h" -#include "KPRUtil.h" - -namespace kpr -{ -ThreadPoolWorker:: ThreadPoolWorker(ThreadPool* pThreadPool, const char* strName) - : kpr::Thread(strName), - m_pThreadPool(pThreadPool), - m_canWork(false), - m_isWaiting(false), - m_stop(false), - m_idleTime(0), - m_idle(true) -{ - -} - -bool ThreadPoolWorker::IsIdle() -{ - kpr::ScopedLock<kpr::Monitor> lock(*this); - return m_idle; -} - -void ThreadPoolWorker:: SetIdle(bool idle) -{ - kpr::ScopedLock<kpr::Monitor> lock(*this); - m_idle = idle; - m_idleTime = 0; -} - -int ThreadPoolWorker::IdleTime(int idleTime) -{ - if (m_idle) - { - m_idleTime += idleTime; - } - else - { - m_idleTime = 0; - } - - return m_idleTime; -} - -void ThreadPoolWorker::Run() -{ - while (!m_stop) - { - SetIdle(true); - { - kpr::ScopedLock<kpr::Monitor> lock(*this); - while (!m_canWork) - { - try - { - m_isWaiting = true; - Wait(); - m_isWaiting = false; - } - catch (...) - { - } - } - - m_canWork = false; - } - - while (!m_stop) - { - ThreadPoolWorkPtr request = m_pThreadPool->GetWork(this); - if ((ThreadPoolWork*)(NULL) == request) - { - break; - } - - SetIdle(false); - - try - { - request->Do(); - } - catch(...) - { - RMQ_ERROR("thead[%s] doWork exception", GetName()); - } - - //delete request; - request = NULL; - } - - if (m_stop || m_pThreadPool->IsDestroy()) - { - break; - } - } - - m_pThreadPool ->RemoveThread(this); - m_pThreadPool = NULL; -} - -void ThreadPoolWorker::WakeUp() -{ - SetIdle(false); - kpr::ScopedLock<kpr::Monitor> lock(*this); - m_canWork = true; - Notify(); -} - -void ThreadPoolWorker::Stop() -{ - kpr::ScopedLock<kpr::Monitor> lock(*this); - m_canWork = true; - m_stop = true; - Notify(); -} - -bool ThreadPoolWorker:: IsWaiting() -{ - kpr::ScopedLock<kpr::Monitor> lock(*this); - return m_isWaiting; -} - -ThreadPool::ThreadPool(const char* name, - int count, - int minCount, - int maxCount, - int step, - int maxIdleTime, - int checkldleThreadsInterval) -{ - if (name == NULL) - { - snprintf(m_name, sizeof(m_name), "ThreadPool"); - } - else - { - snprintf(m_name, sizeof(m_name), "%s", name); - } - - m_destroy = false; - m_minCount = minCount; - m_maxCount = maxCount; - m_maxIdleTime = maxIdleTime; - m_count = 0; - m_step = step; - m_index = 0; - - m_lastRemoveIdleThreadsTime = KPRUtil::GetCurrentTimeMillis(); - - if (m_minCount <= 0) - { - m_minCount = MIN_THREAD_COUNT; - } - - if (m_maxCount < 0) - { - m_maxCount = MAX_THREAD_COUNT; - } - - if (m_maxIdleTime < 0) - { - m_maxIdleTime = MAX_IDLE_THREAD_TIME; - } - - if (m_maxCount != 0 && m_maxCount < m_minCount) - { - m_minCount = MIN_THREAD_COUNT; - } - - if ((m_maxCount != 0 && count > m_maxCount) || count < m_minCount) - { - count = m_minCount; - } - - if (checkldleThreadsInterval < 0) - { - checkldleThreadsInterval = CHECK_IDLE_THREADS_INTERVAL; - } - - AddThreads(count); - - char manager_name[32]; - snprintf(manager_name, sizeof(manager_name), "%s-manager", m_name); - m_manager = new ThreadPoolManage(manager_name, this, checkldleThreadsInterval); - m_manager->Start(); -} - -ThreadPool::~ThreadPool() -{ - Destroy(); -} - -void ThreadPool::AddThreads(int count) -{ - char threadName[256]; - - for (int i = 0; i < count; ++i) - { - snprintf(threadName, sizeof(threadName), "%s-Worker%d", m_name, m_index); - - try - { - ThreadPoolWorkerPtr worker = new ThreadPoolWorker(this, threadName); - worker->Start(); - - m_workers.push_back(worker); - while (!worker->IsWaiting()) - { - kpr::Thread::Sleep(0, 100000); - } - - m_index++; - m_count++; - } - catch (...) - { - RMQ_ERROR("ThreadPool thead[%s] new exception", threadName); - } - } -} - -void ThreadPool::Destroy() -{ - std::list<ThreadPoolWorkerPtr> workers; - { - kpr::ScopedLock<kpr::Monitor> lock(*this); - if (m_destroy) - { - return; - } - - m_destroy = true; - - std::list<ThreadPoolWorkerPtr>::iterator iter; - for (iter = m_workers.begin(); iter != m_workers.end(); iter++) - { - workers.push_back(*iter); - (*iter)->Stop(); - } - } - - m_manager->Stop(); - m_manager->Join(); - - std::list<ThreadPoolWorkerPtr>::iterator itThread; - for (itThread = workers.begin(); itThread != workers.end(); itThread++) - { - (*itThread)->Join(); - } - m_works.clear(); -} - -int ThreadPool::AddWork(ThreadPoolWorkPtr pWork) -{ - kpr::ScopedLock<kpr::Monitor> lock(*this); - if (m_destroy) - { - return -1; - } - - m_works.push_back(pWork); - - if (!WakeOneThread()) - { - if (0 == m_maxCount || m_count < m_maxCount) - { - int step = m_step; - - if (0 < m_maxCount && m_count + m_step > m_maxCount) - { - step = m_maxCount - m_count; - } - - AddThreads(step); - WakeOneThread(); - } - } - - return 0; -} - -ThreadPoolWorkPtr ThreadPool::GetWork(ThreadPoolWorker* pWorker) -{ - kpr::ScopedLock<kpr::Monitor> lock(*this); - ThreadPoolWorkPtr result = NULL; - - if (!m_destroy && !m_works.empty()) - { - result = m_works.front(); - m_works.pop_front(); - } - - return result; -} - -bool ThreadPool::IsDestroy() -{ - kpr::ScopedLock<kpr::Monitor> lock(*this); - return m_destroy; -} - -void ThreadPool::RemoveThread(ThreadPoolWorker* workerThread) -{ - kpr::ScopedLock<kpr::Monitor> lock(*this); - - std::list<ThreadPoolWorkerPtr>::iterator it = m_workers.begin(); - - for (; it != m_workers.end(); it++) - { - if ((*it) == workerThread) - { - m_workers.erase(it); - m_count--; - break; - } - } -} - -void ThreadPool::RemoveIdleThreads() -{ - kpr::ScopedLock<kpr::Monitor> lock(*this); - - if (m_maxIdleTime == 0) - { - return; - } - - unsigned long long time = KPRUtil::GetCurrentTimeMillis(); - int interval = (int)(time - m_lastRemoveIdleThreadsTime); - m_lastRemoveIdleThreadsTime = time; - - std::list<ThreadPoolWorkerPtr>::iterator it = m_workers.begin(); - int size = (int)m_workers.size(); - while (size > m_minCount && it != m_workers.end()) - { - if ((*it)->IdleTime(interval) > m_maxIdleTime) - { - (*it)->Stop(); - size--; - } - - it++; - } -} - -bool ThreadPool::WakeOneThread() -{ - std::list<ThreadPoolWorkerPtr>::iterator it = m_workers.begin(); - for (; it != m_workers.end(); it++) - { - if ((*it)->IsIdle()) - { - (*it)->WakeUp(); - return true; - } - } - - return false; -} - -ThreadPoolManage::ThreadPoolManage(const char* name, ThreadPool* pThreadPool, int checkldleThreadsInterval) - : kpr::Thread(name), - m_pThreadPool(pThreadPool), - m_stop(false), - m_checkIdleThreadsInterval(checkldleThreadsInterval) -{ -} - -ThreadPoolManage::~ThreadPoolManage() -{ -} - -void ThreadPoolManage::Stop() -{ - kpr::ScopedLock<kpr::Monitor> lock(*this); - m_stop = true; - Notify(); -} - -void ThreadPoolManage::Run() -{ - while (!m_stop) - { - { - kpr::ScopedLock<kpr::Monitor> lock(*this); - if (!m_stop) - { - Wait(m_checkIdleThreadsInterval); - } - - if (m_stop) - { - break; - } - } - - m_pThreadPool->RemoveIdleThreads(); - } -} -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/ThreadPool.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/ThreadPool.h b/rocketmq-client4cpp/src/kpr/ThreadPool.h deleted file mode 100755 index 2c7e3ff..0000000 --- a/rocketmq-client4cpp/src/kpr/ThreadPool.h +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Copyright (C) 2013 kangliqiang ,kang...@163.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __KPR_THREADPOOL_H__ -#define __KPR_THREADPOOL_H__ - -#include<time.h> -#include <assert.h> -#include <list> -#include "Mutex.h" -#include "Thread.h" -#include "Monitor.h" - -#include "ThreadPoolWork.h" - -namespace kpr -{ -const int MAX_THREAD_COUNT = 300; -const int MIN_THREAD_COUNT = 1; -//const int MAX_IDLE_THREAD_TIME = 600000; -const int MAX_IDLE_THREAD_TIME = 0; -const int THREAD_STEP = 10; -const int CHECK_IDLE_THREADS_INTERVAL = 30000; - -class ThreadPool; -class ThreadPoolWorker : public kpr::Thread, public kpr::Monitor -{ -public: - ThreadPoolWorker(ThreadPool* pThreadPool, const char* strName); - - virtual void Run(); - void WakeUp(); - void Stop(); - bool IsWaiting(); - bool IsIdle(); - void SetIdle(bool idle); - int IdleTime(int idleTime); - -private: - ThreadPool* m_pThreadPool; - bool m_canWork; - bool m_isWaiting; - bool m_stop; - int m_idleTime; - bool m_idle; -}; -typedef kpr::RefHandleT<ThreadPoolWorker> ThreadPoolWorkerPtr; - -class ThreadPoolManage : public kpr::Thread, public kpr::Monitor -{ -public: - ThreadPoolManage(const char* name, ThreadPool* pThreadPool, int nCheckldleThreadsInterval); - - ~ThreadPoolManage(); - virtual void Run(); - void Stop(); - -private: - ThreadPool* m_pThreadPool; - bool m_stop; - int m_checkIdleThreadsInterval; -}; -typedef kpr::RefHandleT<ThreadPoolManage> ThreadPoolManagePtr; - - -class ThreadPool : public kpr::RefCount, public kpr::Monitor -{ -public: - ThreadPool(const char* name, - int initCount, - int minCount, - int maxCount, - int step = THREAD_STEP, - int maxIdleTime = MAX_IDLE_THREAD_TIME, - int checkldleThreadsInterval = CHECK_IDLE_THREADS_INTERVAL); - - ~ThreadPool(); - void Destroy(); - - int AddWork(ThreadPoolWorkPtr pWork); - ThreadPoolWorkPtr GetWork(ThreadPoolWorker* pWorker); - - void RemoveIdleThreads(); - void RemoveThread(ThreadPoolWorker* pWorker); - - bool WakeOneThread(); - bool IsDestroy(); - -private: - void AddThreads(int count); - -private: - bool m_destroy; - int m_minCount; - int m_maxCount; - int m_maxIdleTime; - int m_count; - int m_step; - - char m_name[128]; - unsigned int m_index; - unsigned long long m_lastRemoveIdleThreadsTime; - - ThreadPoolManagePtr m_manager; - std::list<ThreadPoolWorkPtr> m_works; - std::list<ThreadPoolWorkerPtr> m_workers; -}; - -typedef kpr::RefHandleT<ThreadPool> ThreadPoolPtr; - -} -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/ThreadPoolWork.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/ThreadPoolWork.h b/rocketmq-client4cpp/src/kpr/ThreadPoolWork.h deleted file mode 100644 index 30dfe6c..0000000 --- a/rocketmq-client4cpp/src/kpr/ThreadPoolWork.h +++ /dev/null @@ -1,34 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#ifndef __THREADPOOLWORK_H__ -#define __THREADPOOLWORK_H__ - -#include "RefHandle.h" - -namespace kpr -{ - -class ThreadPoolWork : public kpr::RefCount -{ -public: - virtual ~ThreadPoolWork() {} - virtual void Do() = 0; -}; -typedef kpr::RefHandleT<ThreadPoolWork> ThreadPoolWorkPtr; - -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/TimerTaskManager.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/TimerTaskManager.cpp b/rocketmq-client4cpp/src/kpr/TimerTaskManager.cpp deleted file mode 100755 index 42ef672..0000000 --- a/rocketmq-client4cpp/src/kpr/TimerTaskManager.cpp +++ /dev/null @@ -1,91 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#include "TimerTaskManager.h" -#include "ThreadPool.h" -#include "ScopedLock.h" - -namespace kpr -{ -TimerTaskManager::TimerTaskManager() -{ -} - -TimerTaskManager::~TimerTaskManager() -{ -} - -int TimerTaskManager::Init(int maxThreadCount, int checklnteval) -{ - try - { - m_pThreadPool = new ThreadPool("TimerThreadPool", 5, 5, maxThreadCount); - m_timerThread = new TimerThread("TimerThread", checklnteval); - m_timerThread->Start(); - } - catch (...) - { - return -1; - } - - return 0; -} - -unsigned int TimerTaskManager::RegisterTimer(unsigned int initialDelay, unsigned int elapse, TimerTaskPtr pTask) -{ - unsigned int id = m_timerThread->RegisterTimer(initialDelay, elapse, this, true); - - kpr::ScopedLock<kpr::Mutex> lock(m_mutex); - m_timerTasks[id] = pTask; - - return id; -} - -bool TimerTaskManager::UnRegisterTimer(unsigned int timerId) -{ - bool ret = m_timerThread->UnRegisterTimer(timerId); - - kpr::ScopedLock<kpr::Mutex> lock(m_mutex); - m_timerTasks.erase(timerId); - - return ret; -} - -bool TimerTaskManager::ResetTimer(unsigned int timerId) -{ - return m_timerThread->ResetTimer(timerId); -} - -void TimerTaskManager::OnTimeOut(unsigned int timerId) -{ - kpr::ScopedLock<kpr::Mutex> lock(m_mutex); - std::map<unsigned int, TimerTaskPtr>::iterator it = m_timerTasks.find(timerId); - if (it != m_timerTasks.end()) - { - if (!it->second->IsProcessing()) - { - it->second->SetProcessing(true); - m_pThreadPool->AddWork((it->second).ptr()); - } - } -} - -void TimerTaskManager::Stop() -{ - m_timerThread->Stop(); - m_timerThread->Join(); - m_pThreadPool->Destroy(); -} -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/TimerTaskManager.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/TimerTaskManager.h b/rocketmq-client4cpp/src/kpr/TimerTaskManager.h deleted file mode 100755 index b9cc2e0..0000000 --- a/rocketmq-client4cpp/src/kpr/TimerTaskManager.h +++ /dev/null @@ -1,95 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#ifndef __KPR_TIMERTASKMANAGER_H__ -#define __KPR_TIMERTASKMANAGER_H__ - -#include <list> -#include <map> - -#include "RocketMQClient.h" -#include "TimerThread.h" -#include "ThreadPool.h" -#include "ThreadPoolWork.h" - -namespace kpr -{ - -class TimerTask : public kpr::ThreadPoolWork -{ -public: - TimerTask() - : m_isProcessing(false) - { - } - - virtual ~TimerTask() - { - } - - virtual void Do() - { - try - { - DoTask(); - } - catch(...) - { - RMQ_ERROR("TimerTask exception"); - } - m_isProcessing = false; - } - - bool IsProcessing() - { - return m_isProcessing; - } - - void SetProcessing(bool isProcessing) - { - m_isProcessing = isProcessing; - } - - virtual void DoTask() = 0; - -private: - bool m_isProcessing; -}; -typedef kpr::RefHandleT<TimerTask> TimerTaskPtr; - - -class TimerTaskManager : public TimerHandler -{ -public: - TimerTaskManager(); - virtual ~TimerTaskManager(); - - int Init(int maxThreadCount, int checklnteval); - unsigned int RegisterTimer(unsigned int initialDelay, unsigned int elapse, TimerTaskPtr pTask); - bool UnRegisterTimer(unsigned int timerId); - bool ResetTimer(unsigned int timerId); - void Stop(); - - virtual void OnTimeOut(unsigned int timerId); - -private: - std::map<unsigned int, TimerTaskPtr> m_timerTasks; - kpr::Mutex m_mutex; - TimerThreadPtr m_timerThread; - kpr::ThreadPoolPtr m_pThreadPool; -}; - -} -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/TimerThread.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/TimerThread.cpp b/rocketmq-client4cpp/src/kpr/TimerThread.cpp deleted file mode 100755 index b127074..0000000 --- a/rocketmq-client4cpp/src/kpr/TimerThread.cpp +++ /dev/null @@ -1,186 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#include "TimerThread.h" -#include "KPRUtil.h" -#include "ScopedLock.h" - -namespace kpr -{ -unsigned int TimerThread::s_nextTimerID = 0; - -TimerThread::TimerThread(const char* name, unsigned int checklnterval) - : kpr::Thread(name), m_closed(false), m_checkInterval(checklnterval) -{ -} - -TimerThread::~TimerThread() -{ -} - -void TimerThread::Run() -{ - unsigned long long lastCheckTime = KPRUtil::GetCurrentTimeMillis(); - unsigned long long currentCheckTime = lastCheckTime; - - while (!m_closed) - { - currentCheckTime = KPRUtil::GetCurrentTimeMillis(); - unsigned int elapse = (unsigned int)(currentCheckTime - lastCheckTime); - - std::list<TimerInfo> timeList; - - CheckTimeOut(elapse, timeList); - - if (!timeList.empty()) - { - std::list<TimerInfo>::iterator it = timeList.begin(); - for (; it != timeList.end(); it++) - { - try - { - it->pTimerHandler->OnTimeOut(it->id); - } - catch(...) - { - RMQ_ERROR("TimerThread[%s] OnTimeOut exception", GetName()); - } - } - } - - unsigned long long checkEndTime = KPRUtil::GetCurrentTimeMillis(); - int sleepTime = m_checkInterval - (int)(checkEndTime - currentCheckTime); - if (sleepTime < 0) - { - sleepTime = 0; - } - - lastCheckTime = currentCheckTime; - - try - { - kpr::ScopedLock<kpr::Monitor> lock(*this); - Wait(sleepTime); - } - catch (...) - { - } - } -} - -void TimerThread::Stop() -{ - m_closed = true; - kpr::ScopedLock<kpr::Monitor> lock(*this); - Notify(); -} - -unsigned int TimerThread::RegisterTimer(unsigned int initialDelay, unsigned int elapse, TimerHandler* pHandler, bool persistent) -{ - TimerInfo info; - info.elapse = elapse; - info.outTime = elapse - initialDelay; - info.pTimerHandler = pHandler; - info.persistent = persistent; - - kpr::ScopedLock<kpr::Mutex> lock(m_mutex); - info.id = GetNextTimerID(); - m_timers[info.id] = info; - - return info.id; -} - -bool TimerThread::UnRegisterTimer(unsigned int timerId) -{ - bool result = false; - kpr::ScopedLock<kpr::Mutex> lock(m_mutex); - std::map<unsigned int, TimerInfo>::iterator it = m_timers.find(timerId); - if (it != m_timers.end()) - { - m_timers.erase(it); - result = true; - } - - return result; -} - -bool TimerThread::ResetTimer(unsigned int timerId) -{ - bool result = false; - kpr::ScopedLock<kpr::Mutex> lock(m_mutex); - std::map<unsigned int, TimerInfo>::iterator it = m_timers.find(timerId); - if (it != m_timers.end()) - { - if (it->second.persistent) - { - it->second.outTime = it->second.elapse; - } - else - { - it->second.outTime = 0; - } - - result = true; - } - - return result; -} - -bool TimerThread::CheckTimeOut(unsigned int elapse, std::list<TimerInfo>& timerList) -{ - bool result = false; - timerList.clear(); - - kpr::ScopedLock<kpr::Mutex> lock(m_mutex); - if (!m_timers.empty()) - { - std::map<unsigned int, TimerInfo>::iterator it = m_timers.begin(); - while (it != m_timers.end()) - { - it->second.outTime += elapse; - - if (it->second.outTime >= int(it->second.elapse)) - { - timerList.push_back(it->second); - - if (it->second.persistent) - { - it->second.outTime = 0; - ++it; - } - else - { - std::map<unsigned int, TimerInfo>::iterator it1 = it; - ++it; - m_timers.erase(it1); - } - } - else - { - ++it; - } - } - - result = true; - } - - return result; -} - -unsigned int TimerThread::GetNextTimerID() -{ - return ++s_nextTimerID; -} -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/TimerThread.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/TimerThread.h b/rocketmq-client4cpp/src/kpr/TimerThread.h deleted file mode 100755 index 7e02a79..0000000 --- a/rocketmq-client4cpp/src/kpr/TimerThread.h +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Copyright (C) 2013 kangliqiang ,kang...@163.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __KPR_TIMERTHREAD_H__ -#define __KPR_TIMERTHREAD_H__ - -#include <list> -#include <map> - -#include "RocketMQClient.h" -#include "Thread.h" -#include "Mutex.h" -#include "Monitor.h" - -namespace kpr -{ -class TimerHandler -{ -public: - TimerHandler() - { - } - - virtual ~TimerHandler() - { - } - - virtual void OnTimeOut(unsigned int timerID) = 0; -}; - -typedef struct tagTimerlnfo -{ - unsigned int id; - unsigned int elapse; - int outTime; - bool persistent; - TimerHandler* pTimerHandler; -} TimerInfo; - - -class TimerThread : public kpr::Thread, public kpr::Monitor -{ -public: - TimerThread(const char* name, unsigned int checklnterval); - virtual ~TimerThread(); - virtual void Run(); - virtual void Stop(); - - virtual unsigned int RegisterTimer(unsigned int initialDelay, unsigned int elapse, TimerHandler* pHandler, bool persistent = true); - virtual bool UnRegisterTimer(unsigned int timerId); - virtual bool ResetTimer(unsigned int timerId); - -private: - bool CheckTimeOut(unsigned int elapse, std::list<TimerInfo>& timerList); - static unsigned int GetNextTimerID(); - -private: - static unsigned int s_nextTimerID; - std::map<unsigned int, TimerInfo> m_timers; - kpr::Mutex m_mutex; - bool m_closed; - unsigned int m_checkInterval; -}; -typedef kpr::RefHandleT<TimerThread> TimerThreadPtr; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/message/Message.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/message/Message.cpp b/rocketmq-client4cpp/src/message/Message.cpp deleted file mode 100755 index db88c3e..0000000 --- a/rocketmq-client4cpp/src/message/Message.cpp +++ /dev/null @@ -1,379 +0,0 @@ -/** - * Copyright (C) 2013 kangliqiang ,kang...@163.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "Message.h" - -#include <string.h> -#include <stdlib.h> -#include <stdio.h> -#include "UtilAll.h" - - -namespace rmq -{ - -const std::string Message::PROPERTY_KEYS = "KEYS"; -const std::string Message::PROPERTY_TAGS = "TAGS"; -const std::string Message::PROPERTY_WAIT_STORE_MSG_OK = "WAIT"; -const std::string Message::PROPERTY_DELAY_TIME_LEVEL = "DELAY"; -const std::string Message::PROPERTY_RETRY_TOPIC = "RETRY_TOPIC"; -const std::string Message::PROPERTY_REAL_TOPIC = "REAL_TOPIC"; -const std::string Message::PROPERTY_REAL_QUEUE_ID = "REAL_QID"; -const std::string Message::PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG"; -const std::string Message::PROPERTY_PRODUCER_GROUP = "PGROUP"; -const std::string Message::PROPERTY_MIN_OFFSET = "MIN_OFFSET"; -const std::string Message::PROPERTY_MAX_OFFSET = "MAX_OFFSET"; -const std::string Message::PROPERTY_BUYER_ID = "BUYER_ID"; -const std::string Message::PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID"; -const std::string Message::PROPERTY_TRANSFER_FLAG = "TRANSFER_FLAG"; -const std::string Message::PROPERTY_CORRECTION_FLAG = "CORRECTION_FLAG"; -const std::string Message::PROPERTY_MQ2_FLAG = "MQ2_FLAG"; -const std::string Message::PROPERTY_RECONSUME_TIME = "RECONSUME_TIME"; -const std::string Message::PROPERTY_MSG_REGION = "MSG_REGION"; -const std::string Message::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY"; -const std::string Message::PROPERTY_MAX_RECONSUME_TIMES = "MAX_RECONSUME_TIMES"; -const std::string Message::PROPERTY_CONSUME_START_TIMESTAMP = "CONSUME_START_TIME"; -const std::string Message::KEY_SEPARATOR = " "; - -Message::Message() -{ - Init("", "", "", 0, NULL, 0, true); -} - -Message::Message(const std::string& topic, const char* body, int len) -{ - Init(topic, "", "", 0, body, len, true); -} - -Message::Message(const std::string& topic, const std::string& tags, const char* body, int len) -{ - Init(topic, tags, "", 0, body, len, true); -} - -Message::Message(const std::string& topic, const std::string& tags, const std::string& keys, const char* body, int len) -{ - Init(topic, tags, keys, 0, body, len, true); -} - -Message::Message(const std::string& topic, - const std::string& tags, - const std::string& keys, - const int flag, - const char* body, - int len, - bool waitStoreMsgOK) -{ - Init(topic, tags, keys, flag, body, len, waitStoreMsgOK); -} - -Message::~Message() -{ - if (m_body) - { - free(m_body); - m_body = NULL; - m_bodyLen = 0; - } - - if (m_compressBody) - { - free(m_compressBody); - m_compressBody = NULL; - m_compressBodyLen = 0; - } -} - -Message::Message(const Message& other) -{ - m_body = (char*)malloc(other.m_bodyLen); - m_bodyLen = other.m_bodyLen; - memcpy(m_body, other.m_body, other.m_bodyLen); - - m_compressBody = NULL; - m_compressBodyLen = 0; - - m_topic = other.m_topic; - m_flag = other.m_flag; - m_properties = other.m_properties; -} - -Message& Message::operator=(const Message& other) -{ - if (this != &other) - { - if (m_body) - { - free(m_body); - m_body = NULL; - m_bodyLen = 0; - } - - if (m_compressBody) - { - free(m_compressBody); - m_compressBody = NULL; - m_compressBodyLen = 0; - } - - m_body = (char*)malloc(other.m_bodyLen);; - m_bodyLen = other.m_bodyLen; - memcpy(m_body, other.m_body, other.m_bodyLen); - - m_topic = other.m_topic; - m_flag = other.m_flag; - m_properties = other.m_properties; - } - - return *this; -} - -void Message::clearProperty(const std::string& name) -{ - m_properties.erase(name); -} - -void Message::putProperty(const std::string& name, const std::string& value) -{ - m_properties[name] = value; -} - -std::string Message::getProperty(const std::string& name) -{ - std::map<std::string, std::string>::const_iterator it = m_properties.find(name); - return (it == m_properties.end()) ? "" : it->second; -} - -std::string Message::getTopic()const -{ - return m_topic; -} - -void Message::setTopic(const std::string& topic) -{ - m_topic = topic; -} - -std::string Message::getTags() -{ - return getProperty(PROPERTY_TAGS); -} - -void Message::setTags(const std::string& tags) -{ - putProperty(PROPERTY_TAGS, tags); -} - -std::string Message::getKeys() -{ - return getProperty(PROPERTY_KEYS); -} - -void Message::setKeys(const std::string& keys) -{ - putProperty(PROPERTY_KEYS, keys); -} - -void Message::setKeys(const std::list<std::string> keys) -{ - if (keys.empty()) - { - return; - } - - std::list<std::string>::const_iterator it = keys.begin(); - std::string str; - str += *it; - it++; - - for (; it != keys.end(); it++) - { - str += KEY_SEPARATOR; - str += *it; - } - - setKeys(str); -} - -int Message::getDelayTimeLevel() -{ - std::string tmp = getProperty(PROPERTY_DELAY_TIME_LEVEL); - if (!tmp.empty()) - { - return atoi(tmp.c_str()); - } - - return 0; -} - -void Message::setDelayTimeLevel(int level) -{ - char tmp[16]; - snprintf(tmp, sizeof(tmp), "%d", level); - - putProperty(PROPERTY_DELAY_TIME_LEVEL, tmp); -} - -bool Message::isWaitStoreMsgOK() -{ - std::string tmp = getProperty(PROPERTY_WAIT_STORE_MSG_OK); - if (tmp.empty()) - { - return true; - } - else - { - return (tmp == "true") ? true : false; - } -} - -void Message::setWaitStoreMsgOK(bool waitStoreMsgOK) -{ - if (waitStoreMsgOK) - { - putProperty(PROPERTY_WAIT_STORE_MSG_OK, "true"); - } - else - { - putProperty(PROPERTY_WAIT_STORE_MSG_OK, "false"); - } -} - -int Message::getFlag() -{ - return m_flag; -} - -void Message::setFlag(int flag) -{ - m_flag = flag; -} - -const char* Message::getBody()const -{ - return m_body; -} - -int Message::getBodyLen()const -{ - return m_bodyLen; -} - -void Message::setBody(const char* body, int len) -{ - if (len > 0) - { - if (m_body) - { - free(m_body); - m_body = NULL; - m_bodyLen = 0; - } - - m_body = (char*)malloc(len); - m_bodyLen = len; - memcpy(m_body, body, len); - } -} - -bool Message::tryToCompress(int compressLevel) -{ - if (m_body != NULL) - { - if (m_compressBody) - { - free(m_compressBody); - m_compressBody = NULL; - m_compressBodyLen = 0; - } - - unsigned char* pOut; - int outLen = 0; - if (UtilAll::compress(m_body, m_bodyLen, &pOut, &outLen, compressLevel)) - { - m_compressBody = (char*)pOut; - m_compressBodyLen = outLen; - return true; - } - } - - return false; -} - - -const char* Message::getCompressBody() const -{ - return m_compressBody; -} - -int Message::getCompressBodyLen() const -{ - return m_compressBodyLen; -} - - - -std::map<std::string, std::string>& Message::getProperties() -{ - return m_properties; -} - -void Message::setProperties(const std::map<std::string, std::string>& properties) -{ - m_properties = properties; -} - -void Message::Init(const std::string& topic, const std::string& tags, const std::string& keys, const int flag, const char* body, int len, bool waitStoreMsgOK) -{ - m_topic = topic; - m_flag = flag; - - m_body = NULL; - m_bodyLen = len; - - m_compressBody = NULL; - m_compressBodyLen = 0; - - if (len > 0) - { - m_body = (char*)malloc(len); - memcpy(m_body, body, len); - } - - if (tags.length() > 0) - { - setTags(tags); - } - - if (keys.length() > 0) - { - setKeys(keys); - } - - setWaitStoreMsgOK(waitStoreMsgOK); -} - -std::string Message::toString() const -{ - std::stringstream ss; - ss << "{m_topic=" << m_topic - << ",m_flag=" << m_flag - << ",properties=" << UtilAll::toString(m_properties) - << ",m_bodyLen=" << m_bodyLen - << "}"; - return ss.str(); -} - - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/message/MessageDecoder.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/message/MessageDecoder.cpp b/rocketmq-client4cpp/src/message/MessageDecoder.cpp deleted file mode 100755 index 338121e..0000000 --- a/rocketmq-client4cpp/src/message/MessageDecoder.cpp +++ /dev/null @@ -1,366 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#include "MessageDecoder.h" - -#include <string.h> -#include <stdio.h> -#include <stdlib.h> -#include <sstream> -#include "MessageExt.h" -#include "MessageSysFlag.h" -#include "UtilAll.h" - -namespace rmq -{ - -const char MessageDecoder::NAME_VALUE_SEPARATOR = 1; -const char MessageDecoder::PROPERTY_SEPARATOR = 2; -const int MessageDecoder::MSG_ID_LENGTH = 8 + 8; - -int MessageDecoder::MessageMagicCodePostion = 4; -int MessageDecoder::MessageFlagPostion = 16; -int MessageDecoder::MessagePhysicOffsetPostion = 28; -int MessageDecoder::MessageStoreTimestampPostion = 56; - -std::string MessageDecoder::createMessageId(sockaddr& addr, long long offset) -{ - struct sockaddr_in sa; - memcpy(&sa, &addr, sizeof(sockaddr)); - sa.sin_family = AF_INET; - - int port = ntohs(sa.sin_port); - port = htonl(port); - int ip = sa.sin_addr.s_addr; - - unsigned char* buf = new unsigned char[MSG_ID_LENGTH]; - offset = h2nll(offset); - memcpy(buf, &ip, 4); - memcpy(buf + 4, &port, 4); - memcpy(buf + 8, &offset, 8); - - char* str = new char[2 * MSG_ID_LENGTH + 1]; - memset(str, 0, 2 * MSG_ID_LENGTH + 1); - - for (int i = 0; i < MSG_ID_LENGTH; i++) - { - char tmp[3]; - tmp[2] = 0; - - snprintf(tmp, sizeof(tmp), "%02X", buf[i]); - strncat(str, tmp, sizeof(tmp)); - } - - std::string ret = str; - - delete[] buf; - delete[] str; - - return ret; -} - -MessageId MessageDecoder::decodeMessageId(const std::string& msgId) -{ - std::string ipstr = msgId.substr(0, 8); - std::string portstr = msgId.substr(8, 8); - std::string offsetstr = msgId.substr(16); - - char* end; - int ipint = strtoul(ipstr.c_str(), &end, 16); - int portint = strtoul(portstr.c_str(), &end, 16); - - long long offset = UtilAll::hexstr2ull(offsetstr.c_str()); - - offset = n2hll(offset); - - portint = ntohl(portint); - short port = portint; - - struct sockaddr_in sa; - sa.sin_family = AF_INET; - sa.sin_port = htons(port); - sa.sin_addr.s_addr = ipint; - - sockaddr addr; - memcpy(&addr, &sa, sizeof(sockaddr)); - - MessageId id(addr, offset); - - return id; -} - -MessageExt* MessageDecoder::decode(const char* pData, int len, int& offset) -{ - return decode(pData, len, offset, true); -} - -MessageExt* MessageDecoder::decode(const char* pData, int len, int& offset, bool readBody) -{ - MessageExt* msgExt = NULL; - - try - { - msgExt = new MessageExt(); - - // 1 TOTALSIZE - int storeSize; - memcpy(&storeSize, pData, 4); - storeSize = ntohl(storeSize); - - msgExt->setStoreSize(storeSize); - - // 2 MAGICCODE sizeof(int) - - // 3 BODYCRC - int bodyCRC; - memcpy(&bodyCRC, pData + 2 * sizeof(int), 4); - bodyCRC = ntohl(bodyCRC); - msgExt->setBodyCRC(bodyCRC); - - // 4 QUEUEID - int queueId; - memcpy(&queueId, pData + 3 * sizeof(int), 4); - queueId = ntohl(queueId); - msgExt->setQueueId(queueId); - - // 5 FLAG - int flag ; - - memcpy(&flag, pData + 4 * sizeof(int), 4); - flag = ntohl(flag); - - msgExt->setFlag(flag); - - // 6 QUEUEOFFSET - long long queueOffset; - memcpy(&queueOffset, pData + 5 * sizeof(int), 8); - queueOffset = n2hll(queueOffset); - msgExt->setQueueOffset(queueOffset); - - // 7 PHYSICALOFFSET - long long physicOffset; - - memcpy(&physicOffset, pData + 7 * sizeof(int), 8); - physicOffset = n2hll(physicOffset); - msgExt->setCommitLogOffset(physicOffset); - - // 8 SYSFLAG - int sysFlag; - - memcpy(&sysFlag, pData + 9 * sizeof(int), 4); - sysFlag = ntohl(sysFlag); - msgExt->setSysFlag(sysFlag); - - // 9 BORNTIMESTAMP - long long bornTimeStamp; - memcpy(&bornTimeStamp, pData + 10 * sizeof(int), 8); - bornTimeStamp = n2hll(bornTimeStamp); - - msgExt->setBornTimestamp(bornTimeStamp); - - // 10 BORNHOST - int bornHost;//c0 a8 00 68 192.168.0.104 c0 a8 00 68 00 00 c4 04 - memcpy(&bornHost, pData + 12 * sizeof(int), 4); - - int port; - memcpy(&port, pData + 13 * sizeof(int), 4); - port = ntohl(port); - - struct sockaddr_in sa; - sa.sin_family = AF_INET; - sa.sin_port = htons(port); - sa.sin_addr.s_addr = bornHost; - - sockaddr bornAddr; - memcpy(&bornAddr, &sa, sizeof(sockaddr)); - msgExt->setBornHost(bornAddr); - - // 11 STORETIMESTAMP - long long storeTimestamp; - memcpy(&storeTimestamp, pData + 14 * sizeof(int), 8); - storeTimestamp = n2hll(storeTimestamp); - msgExt->setStoreTimestamp(storeTimestamp); - - // 12 STOREHOST - int storeHost; - memcpy(&storeHost, pData + 16 * sizeof(int), 4); - memcpy(&port, pData + 17 * sizeof(int), 4); - port = ntohl(port); - - sa.sin_family = AF_INET; - sa.sin_port = htons(port); - sa.sin_addr.s_addr = storeHost; - - sockaddr storeAddr; - memcpy(&storeAddr, &sa, sizeof(sockaddr)); - - msgExt->setStoreHost(storeAddr); - - // 13 RECONSUMETIMES - int reconsumeTimes; - memcpy(&reconsumeTimes, pData + 18 * sizeof(int), 4); - reconsumeTimes = ntohl(reconsumeTimes); - msgExt->setReconsumeTimes(reconsumeTimes); - - // 14 Prepared Transaction Offset - long long preparedTransactionOffset; - memcpy(&preparedTransactionOffset, pData + 19 * sizeof(int), 8); - preparedTransactionOffset = n2hll(preparedTransactionOffset); - msgExt->setPreparedTransactionOffset(preparedTransactionOffset); - - // 15 BODY - int bodyLen = 0; - memcpy(&bodyLen, pData + 21 * sizeof(int), 4); - bodyLen = ntohl(bodyLen); - - if (bodyLen > 0) - { - if (readBody) - { - const char* body = pData + 22 * sizeof(int); - int newBodyLen = bodyLen; - - // uncompress body - if ((sysFlag & MessageSysFlag::CompressedFlag) == MessageSysFlag::CompressedFlag) - { - unsigned char* pOut; - int outLen; - - if (UtilAll::decompress(body, bodyLen, &pOut, &outLen)) - { - msgExt->setBody((char*)pOut, outLen); - free(pOut); - } - else - { - msgExt->setBody(body, newBodyLen); - } - } - else - { - msgExt->setBody(body, newBodyLen); - } - } - else - { - - } - } - - // 16 TOPIC - int topicLen = *(pData + 22 * sizeof(int) + bodyLen); - - char* tmp = new char[topicLen + 1]; - - memcpy(tmp, pData + 22 * sizeof(int) + bodyLen + 1, topicLen); - tmp[topicLen] = 0; - std::string topic = tmp; - - delete[] tmp; - - msgExt->setTopic(topic); - - // 17 properties - short propertiesLength; - memcpy(&propertiesLength, pData + 22 * sizeof(int) + bodyLen + 1 + topicLen, 2); - propertiesLength = ntohs(propertiesLength); - - if (propertiesLength > 0) - { - char* properties = new char[propertiesLength + 1]; - memcpy(properties, pData + 22 * sizeof(int) + bodyLen + 1 + topicLen + 2, propertiesLength); - properties[propertiesLength] = 0; - std::string propertiesString = properties; - std::map<std::string, std::string> map; - string2messageProperties(map, propertiesString); - msgExt->setProperties(map); - delete[] properties; - } - - offset = 22 * sizeof(int) + bodyLen + 1 + topicLen + 2 + propertiesLength; - - // ��ϢID - std::string msgId = createMessageId(storeAddr, physicOffset); - msgExt->setMsgId(msgId); - - return msgExt; - } - catch (...) - { - RMQ_ERROR("decode exception"); - if (msgExt) - { - delete msgExt; - msgExt = NULL; - } - } - - return NULL; -} - -std::list<MessageExt*> MessageDecoder::decodes(const char* pData, int len) -{ - return decodes(pData, len, true); -} - -std::list<MessageExt*> MessageDecoder::decodes(const char* pData, int len, bool readBody) -{ - std::list<MessageExt*> list; - - int offset = 0; - while (offset < len) - { - int tmp; - MessageExt* msg = decode(pData + offset, len, tmp); - list.push_back(msg); - offset += tmp; - } - - return list; -} - -std::string MessageDecoder::messageProperties2String(const std::map<std::string, std::string>& properties) -{ - std::stringstream ss; - - std::map<std::string, std::string>::const_iterator it = properties.begin(); - - for (; it != properties.end(); it++) - { - ss << it->first << NAME_VALUE_SEPARATOR << it->second << PROPERTY_SEPARATOR; - } - - return ss.str(); -} - -void MessageDecoder::string2messageProperties(std::map<std::string, std::string>& properties, - std::string& propertiesString) -{ - std::vector<std::string> out; - UtilAll::Split(out, propertiesString, PROPERTY_SEPARATOR); - - for (size_t i = 0; i < out.size(); i++) - { - std::vector<std::string> outValue; - UtilAll::Split(outValue, out[i], NAME_VALUE_SEPARATOR); - - if (outValue.size() == 2) - { - properties[outValue[0]] = outValue[1]; - } - } -} - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/message/MessageDecoder.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/message/MessageDecoder.h b/rocketmq-client4cpp/src/message/MessageDecoder.h deleted file mode 100755 index a5f24ed..0000000 --- a/rocketmq-client4cpp/src/message/MessageDecoder.h +++ /dev/null @@ -1,64 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#ifndef __MESSAGEDECODER_H__ -#define __MESSAGEDECODER_H__ - -#include <string> -#include <list> -#include <map> - -#include "SocketUtil.h" -#include "MessageId.h" - -namespace rmq -{ - class MessageExt; - class UnknownHostException; - - /** - * Message decoder - * - */ - class MessageDecoder - { - public: - static std::string createMessageId(sockaddr& addr, long long offset); - static MessageId decodeMessageId(const std::string& msgId); - - static MessageExt* decode(const char* pData, int len, int& offset); - static MessageExt* decode(const char* pData, int len, int& offset, bool readBody); - - static std::list<MessageExt*> decodes(const char* pData, int len); - static std::list<MessageExt*> decodes(const char* pData, int len, bool readBody); - - static std::string messageProperties2String(const std::map<std::string, std::string>& properties); - static void string2messageProperties(std::map<std::string, std::string>& properties, - std::string& propertiesString); - - public: - static const char NAME_VALUE_SEPARATOR; - static const char PROPERTY_SEPARATOR; - - static const int MSG_ID_LENGTH; - - static int MessageMagicCodePostion; - static int MessageFlagPostion; - static int MessagePhysicOffsetPostion; - static int MessageStoreTimestampPostion; - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/message/MessageExt.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/message/MessageExt.cpp b/rocketmq-client4cpp/src/message/MessageExt.cpp deleted file mode 100755 index 35479ce..0000000 --- a/rocketmq-client4cpp/src/message/MessageExt.cpp +++ /dev/null @@ -1,244 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#include "MessageExt.h" - -#include <sstream> -#include "MessageSysFlag.h" -#include "SocketUtil.h" - -namespace rmq -{ - -MessageExt::MessageExt() - : m_queueOffset(0), - m_commitLogOffset(0), - m_bornTimestamp(0), - m_storeTimestamp(0), - m_preparedTransactionOffset(0), - m_queueId(0), - m_storeSize(0), - m_sysFlag(0), - m_bodyCRC(0), - m_reconsumeTimes(3), - m_msgId("") -{ -} - -MessageExt::MessageExt(int queueId, - long long bornTimestamp, - sockaddr bornHost, - long long storeTimestamp, - sockaddr storeHost, - std::string msgId) - : m_queueOffset(0), - m_commitLogOffset(0), - m_bornTimestamp(bornTimestamp), - m_storeTimestamp(storeTimestamp), - m_preparedTransactionOffset(0), - m_queueId(queueId), - m_storeSize(0), - m_sysFlag(0), - m_bodyCRC(0), - m_reconsumeTimes(3), - m_bornHost(bornHost), - m_storeHost(storeHost), - m_msgId(msgId) -{ - -} - -MessageExt::~MessageExt() -{ - -} - -int MessageExt::getQueueId() -{ - return m_queueId; -} - -void MessageExt::setQueueId(int queueId) -{ - m_queueId = queueId; -} - -long long MessageExt::getBornTimestamp() -{ - return m_bornTimestamp; -} - -void MessageExt::setBornTimestamp(long long bornTimestamp) -{ - m_bornTimestamp = bornTimestamp; -} - -sockaddr MessageExt::getBornHost() -{ - return m_bornHost; -} - -std::string MessageExt::getBornHostString() -{ - return socketAddress2String(m_bornHost); -} - -std::string MessageExt::getBornHostNameString() -{ - return getHostName(m_bornHost); -} - -void MessageExt::setBornHost(const sockaddr& bornHost) -{ - m_bornHost = bornHost; -} - -long long MessageExt::getStoreTimestamp() -{ - return m_storeTimestamp; -} - -void MessageExt::setStoreTimestamp(long long storeTimestamp) -{ - m_storeTimestamp = storeTimestamp; -} - -sockaddr MessageExt::getStoreHost() -{ - return m_storeHost; -} - -std::string MessageExt::getStoreHostString() -{ - return socketAddress2String(m_storeHost); -} - -void MessageExt::setStoreHost(const sockaddr& storeHost) -{ - m_storeHost = storeHost; -} - -std::string MessageExt::getMsgId() -{ - return m_msgId; -} - -void MessageExt::setMsgId(const std::string& msgId) -{ - m_msgId = msgId; -} - -int MessageExt::getSysFlag() -{ - return m_sysFlag; -} - -void MessageExt::setSysFlag(int sysFlag) -{ - m_sysFlag = sysFlag; -} - -int MessageExt::getBodyCRC() -{ - return m_bodyCRC; -} - -void MessageExt::setBodyCRC(int bodyCRC) -{ - m_bodyCRC = bodyCRC; -} - -long long MessageExt::getQueueOffset() -{ - return m_queueOffset; -} - -void MessageExt::setQueueOffset(long long queueOffset) -{ - m_queueOffset = queueOffset; -} - -long long MessageExt::getCommitLogOffset() -{ - return m_commitLogOffset; -} - -void MessageExt::setCommitLogOffset(long long physicOffset) -{ - m_commitLogOffset = physicOffset; -} - -int MessageExt::getStoreSize() -{ - return m_storeSize; -} - -void MessageExt::setStoreSize(int storeSize) -{ - m_storeSize = storeSize; -} - -TopicFilterType MessageExt::parseTopicFilterType(int sysFlag) -{ - if ((sysFlag & MessageSysFlag::MultiTagsFlag) == MessageSysFlag::MultiTagsFlag) - { - return MULTI_TAG; - } - - return SINGLE_TAG; -} - -int MessageExt::getReconsumeTimes() -{ - return m_reconsumeTimes; -} - -void MessageExt::setReconsumeTimes(int reconsumeTimes) -{ - m_reconsumeTimes = reconsumeTimes; -} - -long long MessageExt::getPreparedTransactionOffset() -{ - return m_preparedTransactionOffset; -} - -void MessageExt::setPreparedTransactionOffset(long long preparedTransactionOffset) -{ - m_preparedTransactionOffset = preparedTransactionOffset; -} - -std::string MessageExt::toString() const -{ - std::stringstream ss; - ss << "{msgId=" << m_msgId - << ",queueId=" << m_queueId - << ",storeSize=" << m_storeSize - << ",sysFlag=" << m_sysFlag - << ",queueOffset=" << m_queueOffset - << ",commitLogOffset=" << m_commitLogOffset - << ",preparedTransactionOffset=" << m_preparedTransactionOffset - << ",bornTimestamp=" << m_bornTimestamp - << ",bornHost=" << socketAddress2String(m_bornHost) - << ",storeHost=" << socketAddress2String(m_storeHost) - << ",storeTimestamp=" << m_storeTimestamp - << ",reconsumeTimes=" << m_reconsumeTimes - << ",bodyCRC=" << m_bodyCRC - << ",Message=" << Message::toString() - << "}"; - return ss.str(); -} - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/message/MessageId.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/message/MessageId.h b/rocketmq-client4cpp/src/message/MessageId.h deleted file mode 100644 index 5237f8d..0000000 --- a/rocketmq-client4cpp/src/message/MessageId.h +++ /dev/null @@ -1,59 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#ifndef __MESSAGEID_H__ -#define __MESSAGEID_H__ - -#include "SocketUtil.h" - -namespace rmq -{ - class MessageId - { - public: - MessageId(sockaddr address, long long offset) - : m_address(address), m_offset(offset) - { - - } - - sockaddr getAddress() - { - return m_address; - } - - void setAddress(sockaddr address) - { - m_address = address; - } - - long long getOffset() - { - return m_offset; - } - - void setOffset(long long offset) - { - m_offset = offset; - } - - private: - sockaddr m_address; - long long m_offset; - }; -} - -#endif