http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/sequencer.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/thread/disruptor/sequencer.h b/rocketmq-cpp/src/thread/disruptor/sequencer.h new file mode 100755 index 0000000..98d617f --- /dev/null +++ b/rocketmq-cpp/src/thread/disruptor/sequencer.h @@ -0,0 +1,190 @@ +// Copyright (c) 2011, François Saint-Jacques +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of the disruptor-- nor the +// names of its contributors may be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL FRANÃOIS SAINT-JACQUES BE LIABLE FOR ANY +// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#ifndef DISRUPTOR_SEQUENCER_H_ // NOLINT +#define DISRUPTOR_SEQUENCER_H_ // NOLINT + +#include <vector> + +#include "batch_descriptor.h" +#include "claim_strategy.h" +#include "interface.h" +#include "sequence_barrier.h" +#include "wait_strategy.h" + +namespace rocketmq { + +// Coordinator for claiming sequences for access to a data structures while +// tracking dependent {@link Sequence}s +class Sequencer: public boost::noncopyable { + public: + // Construct a Sequencer with the selected strategies. + // + // @param buffer_size over which sequences are valid. + // @param claim_strategy_option for those claiming sequences. + // @param wait_strategy_option for those waiting on sequences. + Sequencer(int buffer_size, + ClaimStrategyOption claim_strategy_option, + WaitStrategyOption wait_strategy_option) : + buffer_size_(buffer_size), + claim_strategy_(CreateClaimStrategy(claim_strategy_option, + buffer_size_)), + wait_strategy_(CreateWaitStrategy(wait_strategy_option)) { } + + ~Sequencer() { + delete claim_strategy_; + delete wait_strategy_; + } + + // Set the sequences that will gate publishers to prevent the buffer + // wrapping. + // + // @param sequences to be gated on. + void set_gating_sequences( + const std::vector<Sequence*>& sequences) { + gating_sequences_ = sequences; + } + + // Create a {@link SequenceBarrier} that gates on the cursor and a list of + // {@link Sequence}s. + // + // @param sequences_to_track this barrier will track. + // @return the barrier gated as required. + ProcessingSequenceBarrier* NewBarrier( + const std::vector<Sequence*>& sequences_to_track) { + return new ProcessingSequenceBarrier(wait_strategy_, &cursor_, + sequences_to_track); + } + + // Create a new {@link BatchDescriptor} that is the minimum of the + // requested size and the buffer_size. + // + // @param size for the new batch. + // @return the new {@link BatchDescriptor}. + BatchDescriptor* NewBatchDescriptor(const int& size) { + return new BatchDescriptor(size<buffer_size_?size:buffer_size_); + } + + // The capacity of the data structure to hold entries. + // + // @return capacity of the data structure. + int buffer_size() { return buffer_size_; } + + + // Get the value of the cursor indicating the published sequence. + // + // @return value of the cursor for events that have been published. + int64_t GetCursor() { return cursor_.sequence(); } + + // Has the buffer capacity left to allocate another sequence. This is a + // concurrent method so the response should only be taken as an indication + // of available capacity. + // + // @return true if the buffer has the capacity to allocated another event. + bool HasAvalaibleCapacity() { + return claim_strategy_->HasAvalaibleCapacity(gating_sequences_); + } + + // Claim the next event in sequence for publishing to the {@link RingBuffer}. + // + // @return the claimed sequence. + int64_t Next() { + return claim_strategy_->IncrementAndGet(gating_sequences_); + } + + // Claim the next batch of sequence numbers for publishing. + // + // @param batch_descriptor to be updated for the batch range. + // @return the updated batch_descriptor. + BatchDescriptor* Next(BatchDescriptor* batch_descriptor) { + int64_t sequence = claim_strategy_->IncrementAndGet(batch_descriptor->size(), gating_sequences_); + batch_descriptor->set_end(sequence); + return batch_descriptor; + } + + // Claim a specific sequence when only one publisher is involved. + // + // @param sequence to be claimed. + // @return sequence just claime. + int64_t Claim(const int64_t& sequence) { + claim_strategy_->SetSequence(sequence, gating_sequences_); + return sequence; + } + + // Publish an event and make it visible to {@link EventProcessor}s. + // + // @param sequence to be published. + void Publish(const int64_t& sequence) { + Publish(sequence, 1); + } + + // Publish the batch of events in sequence. + // + // @param sequence to be published. + void Publish(const BatchDescriptor& batch_descriptor) { + Publish(batch_descriptor.end(), batch_descriptor.size()); + } + + // Force the publication of a cursor sequence. + // + // Only use this method when forcing a sequence and you are sure only one + // publisher exists. This will cause the cursor to advance to this + // sequence. + // + // @param sequence to which is to be forced for publication. + void ForcePublish(const int64_t& sequence) { + cursor_.set_sequence(sequence); + wait_strategy_->SignalAllWhenBlocking(); + } + + // TODO(fsaintjacques): This was added to overcome + // NoOpEventProcessor::GetSequence(), this is not a clean solution. + Sequence* GetSequencePtr() { + return &cursor_; + } + + private: + // Helpers + void Publish(const int64_t& sequence, const int64_t& batch_size) { + //LOG_DEBUG("publish sequence:%d", sequence); + claim_strategy_->SerialisePublishing(sequence, cursor_, batch_size); + cursor_.set_sequence(sequence); + wait_strategy_->SignalAllWhenBlocking(); + } + + // Members + const int buffer_size_; + + PaddedSequence cursor_; + std::vector<Sequence*> gating_sequences_; + + ClaimStrategyInterface* claim_strategy_; + WaitStrategyInterface* wait_strategy_; + +}; + +}; // namespace rocketmq + +#endif // DISRUPTOR_RING_BUFFER_H_ NOLINT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/utils.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/thread/disruptor/utils.h b/rocketmq-cpp/src/thread/disruptor/utils.h new file mode 100755 index 0000000..0730093 --- /dev/null +++ b/rocketmq-cpp/src/thread/disruptor/utils.h @@ -0,0 +1,35 @@ +// Copyright (c) 2011, François Saint-Jacques +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of the disruptor-- nor the +// names of its contributors may be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL FRANÃOIS SAINT-JACQUES BE LIABLE FOR ANY +// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#ifndef DISRUPTOR_UTILS_H_ // NOLINT +#define DISRUPTOR_UTILS_H_ // NOLINT + +// From Google C++ Standard, modified to use C++11 deleted functions. +// A macro to disallow the copy constructor and operator= functions. +#define DISALLOW_COPY_AND_ASSIGN(TypeName) \ + TypeName(const TypeName&) delete \ + void operator=(const TypeName&) delete; + +#endif // DISRUPTOR_UTILS_H_ NOLINT http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/wait_strategy.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/thread/disruptor/wait_strategy.h b/rocketmq-cpp/src/thread/disruptor/wait_strategy.h new file mode 100755 index 0000000..fb2e58a --- /dev/null +++ b/rocketmq-cpp/src/thread/disruptor/wait_strategy.h @@ -0,0 +1,377 @@ +// Copyright (c) 2011, François Saint-Jacques +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of the disruptor-- nor the +// names of its contributors may be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL FRANÃOIS SAINT-JACQUES BE LIABLE FOR ANY +// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#ifndef DISRUPTOR_WAITSTRATEGY_H_ // NOLINT +#define DISRUPTOR_WAITSTRATEGY_H_ // NOLINT + +#include <sys/time.h> + +#include <boost/chrono.hpp> +#include <boost/thread.hpp> +#include <vector> + +#include "exceptions.h" +#include "interface.h" +#include "sequence.h" + +namespace rocketmq { + +// Strategy options which are available to those waiting on a +// {@link RingBuffer} +enum WaitStrategyOption { + // This strategy uses a condition variable inside a lock to block the + // event procesor which saves CPU resource at the expense of lock + // contention. + kBlockingStrategy, + // This strategy uses a progressive back off strategy by first spinning, + // then yielding, then sleeping for 1ms period. This is a good strategy + // for burst traffic then quiet periods when latency is not critical. + kSleepingStrategy, + // This strategy calls Thread.yield() in a loop as a waiting strategy + // which reduces contention at the expense of CPU resource. + kYieldingStrategy, + // This strategy call spins in a loop as a waiting strategy which is + // lowest and most consistent latency but ties up a CPU. + kBusySpinStrategy +}; + +// Blocking strategy that uses a lock and condition variable for +// {@link Consumer}s waiting on a barrier. +// This strategy should be used when performance and low-latency are not as +// important as CPU resource. +class BlockingStrategy : public WaitStrategyInterface { + public: + BlockingStrategy() {} + + virtual int64_t WaitFor(const std::vector<Sequence*>& dependents, + const Sequence& cursor, + const SequenceBarrierInterface& barrier, + const int64_t& sequence) { + int64_t available_sequence = 0; + // We need to wait. + if ((available_sequence = cursor.sequence()) < sequence) { + // acquire lock + boost::unique_lock<boost::recursive_mutex> ulock(mutex_); + while ((available_sequence = cursor.sequence()) < sequence) { + barrier.CheckAlert(); + consumer_notify_condition_.wait(ulock); + } + } // unlock happens here, on ulock destruction. + + if (0 != dependents.size()) { + while ((available_sequence = GetMinimumSequence(dependents)) < \ + sequence) { + barrier.CheckAlert(); + } + } + + return available_sequence; + } + + virtual int64_t WaitFor(const std::vector<Sequence*>& dependents, + const Sequence& cursor, + const SequenceBarrierInterface& barrier, + const int64_t& sequence, + const int64_t& timeout_micros) { + int64_t available_sequence = 0; + // We have to wait + if ((available_sequence = cursor.sequence()) < sequence) { + boost::unique_lock<boost::recursive_mutex> ulock(mutex_); + while ((available_sequence = cursor.sequence()) < sequence) { + barrier.CheckAlert(); + if (boost::cv_status::timeout == consumer_notify_condition_.wait_for(ulock, + boost::chrono::microseconds(timeout_micros))) + break; + + } + } // unlock happens here, on ulock destruction + + if (0 != dependents.size()) { + while ((available_sequence = GetMinimumSequence(dependents)) \ + < sequence) { + barrier.CheckAlert(); + } + } + + return available_sequence; + } + + virtual void SignalAllWhenBlocking() { + boost::unique_lock<boost::recursive_mutex> ulock(mutex_); + consumer_notify_condition_.notify_all(); + } + + private: + boost::recursive_mutex mutex_; + boost::condition_variable_any consumer_notify_condition_; + +}; + +// Sleeping strategy +class SleepingStrategy : public WaitStrategyInterface { + public: + SleepingStrategy() {} + + virtual int64_t WaitFor(const std::vector<Sequence*>& dependents, + const Sequence& cursor, + const SequenceBarrierInterface& barrier, + const int64_t& sequence) { + int64_t available_sequence = 0; + int counter = kRetries; + + if (0 == dependents.size()) { + while ((available_sequence = cursor.sequence()) < sequence) { + counter = ApplyWaitMethod(barrier, counter); + } + } else { + while ((available_sequence = GetMinimumSequence(dependents)) < \ + sequence) { + counter = ApplyWaitMethod(barrier, counter); + } + } + + return available_sequence; + } + + virtual int64_t WaitFor(const std::vector<Sequence*>& dependents, + const Sequence& cursor, + const SequenceBarrierInterface& barrier, + const int64_t& sequence, + const int64_t& timeout_micros) { + // timing + struct timeval start_time, end_time; + gettimeofday(&start_time, NULL); + int64_t start_micro = start_time.tv_sec*1000000 + start_time.tv_usec; + + int64_t available_sequence = 0; + int counter = kRetries; + + if (0 == dependents.size()) { + while ((available_sequence = cursor.sequence()) < sequence) { + counter = ApplyWaitMethod(barrier, counter); + gettimeofday(&end_time, NULL); + int64_t end_micro = end_time.tv_sec*1000000 + end_time.tv_usec; + if (timeout_micros < (end_micro - start_micro)) + break; + } + } else { + while ((available_sequence = GetMinimumSequence(dependents)) < \ + sequence) { + counter = ApplyWaitMethod(barrier, counter); + gettimeofday(&end_time, NULL); + int64_t end_micro = end_time.tv_sec*1000000 + end_time.tv_usec; + if (timeout_micros < (end_micro - start_micro)) + break; + } + } + + return available_sequence; + } + + virtual void SignalAllWhenBlocking() {} + + static const int kRetries = 200; + + private: + int ApplyWaitMethod(const SequenceBarrierInterface& barrier, int counter) { + barrier.CheckAlert(); + if (counter > 100) { + counter--; + } else if (counter > 0) { + counter--; + boost::this_thread::yield(); + } else { + boost::this_thread::sleep_for(boost::chrono::milliseconds(1)); + } + + return counter; + } + +}; + +// Yielding strategy that uses a sleep(0) for {@link EventProcessor}s waiting +// on a barrier. This strategy is a good compromise between performance and +// CPU resource. +class YieldingStrategy : public WaitStrategyInterface { + public: + YieldingStrategy() {} + + virtual int64_t WaitFor(const std::vector<Sequence*>& dependents, + const Sequence& cursor, + const SequenceBarrierInterface& barrier, + const int64_t& sequence) { + int64_t available_sequence = 0; + int counter = kSpinTries; + + if (0 == dependents.size()) { + while ((available_sequence = cursor.sequence()) < sequence) { + counter = ApplyWaitMethod(barrier, counter); + } + } else { + while ((available_sequence = GetMinimumSequence(dependents)) < \ + sequence) { + counter = ApplyWaitMethod(barrier, counter); + } + } + + return available_sequence; + } + + virtual int64_t WaitFor(const std::vector<Sequence*>& dependents, + const Sequence& cursor, + const SequenceBarrierInterface& barrier, + const int64_t& sequence, + const int64_t& timeout_micros) { + struct timeval start_time, end_time; + gettimeofday(&start_time, NULL); + int64_t start_micro = start_time.tv_sec*1000000 + start_time.tv_usec; + + int64_t available_sequence = 0; + int counter = kSpinTries; + + if (0 == dependents.size()) { + while ((available_sequence = cursor.sequence()) < sequence) { + counter = ApplyWaitMethod(barrier, counter); + gettimeofday(&end_time, NULL); + int64_t end_micro = end_time.tv_sec*1000000 + end_time.tv_usec; + if (timeout_micros < (end_micro - start_micro)) + break; + } + } else { + while ((available_sequence = GetMinimumSequence(dependents)) < \ + sequence) { + counter = ApplyWaitMethod(barrier, counter); + gettimeofday(&end_time, NULL); + int64_t end_micro = end_time.tv_sec*1000000 + end_time.tv_usec; + if (timeout_micros < (end_micro - start_micro)) + break; + } + } + + return available_sequence; + } + + virtual void SignalAllWhenBlocking() {} + + static const int kSpinTries = 100; + + private: + int ApplyWaitMethod(const SequenceBarrierInterface& barrier, int counter) { + barrier.CheckAlert(); + if (counter == 0) { + boost::this_thread::yield(); + } else { + counter--; + } + + return counter; + } + +}; + + +// Busy Spin strategy that uses a busy spin loop for {@link EventProcessor}s +// waiting on a barrier. +// This strategy will use CPU resource to avoid syscalls which can introduce +// latency jitter. It is best used when threads can be bound to specific +// CPU cores. +class BusySpinStrategy : public WaitStrategyInterface { + public: + BusySpinStrategy() {} + + virtual int64_t WaitFor(const std::vector<Sequence*>& dependents, + const Sequence& cursor, + const SequenceBarrierInterface& barrier, + const int64_t& sequence) { + int64_t available_sequence = 0; + if (0 == dependents.size()) { + while ((available_sequence = cursor.sequence()) < sequence) { + barrier.CheckAlert(); + } + } else { + while ((available_sequence = GetMinimumSequence(dependents)) < \ + sequence) { + barrier.CheckAlert(); + } + } + + return available_sequence; + } + + virtual int64_t WaitFor(const std::vector<Sequence*>& dependents, + const Sequence& cursor, + const SequenceBarrierInterface& barrier, + const int64_t& sequence, + const int64_t& timeout_micros) { + struct timeval start_time, end_time; + gettimeofday(&start_time, NULL); + int64_t start_micro = start_time.tv_sec*1000000 + start_time.tv_usec; + int64_t available_sequence = 0; + + if (0 == dependents.size()) { + while ((available_sequence = cursor.sequence()) < sequence) { + barrier.CheckAlert(); + gettimeofday(&end_time, NULL); + int64_t end_micro = end_time.tv_sec*1000000 + end_time.tv_usec; + if (timeout_micros < (end_micro - start_micro)) + break; + } + } else { + while ((available_sequence = GetMinimumSequence(dependents)) < \ + sequence) { + barrier.CheckAlert(); + gettimeofday(&end_time, NULL); + int64_t end_micro = end_time.tv_sec*1000000 + end_time.tv_usec; + if (timeout_micros < (end_micro - start_micro)) + break; + } + } + + return available_sequence; + } + + virtual void SignalAllWhenBlocking() {} + +}; + +WaitStrategyInterface* CreateWaitStrategy(WaitStrategyOption wait_option) { + switch (wait_option) { + case kBlockingStrategy: + return new BlockingStrategy(); + case kSleepingStrategy: + return new SleepingStrategy(); + case kYieldingStrategy: + return new YieldingStrategy(); + case kBusySpinStrategy: + return new BusySpinStrategy(); + default: + return NULL; + } +} + + +}; // namespace rocketmq + +#endif // DISRUPTOR_WAITSTRATEGY_H_ NOLINT http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptorLFQ.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/thread/disruptorLFQ.h b/rocketmq-cpp/src/thread/disruptorLFQ.h new file mode 100644 index 0000000..a05b0cd --- /dev/null +++ b/rocketmq-cpp/src/thread/disruptorLFQ.h @@ -0,0 +1,113 @@ +#ifndef _DISRUPTORLFQ_ +#define _DISRUPTORLFQ_ + +#include <disruptor/event_processor.h> +#include <disruptor/event_publisher.h> +#include <disruptor/exception_handler.h> +#include <disruptor/interface.h> + +#include <boost/asio/io_service.hpp> +#include <boost/bind.hpp> +#include <boost/scoped_ptr.hpp> +#include <boost/thread/thread.hpp> + +namespace rocketmq { +class Task; +class taskEventFactory : public EventFactoryInterface<Task> { + public: + virtual Task* NewInstance(const int& size) const; +}; + +class taskBatchHandler : public EventHandlerInterface<Task> { + public: + taskBatchHandler(int pullMsgThreadPoolNum); + virtual ~taskBatchHandler() {} + + virtual void OnEvent(const int64_t& sequence, const bool& end_of_batch, + Task* event); + virtual void OnStart() {} + virtual void OnShutdown() {} + void runTaskEvent(Task event, int64_t sequence); + void stopIOService(); + + private: + boost::asio::io_service m_ioService; + boost::thread_group m_threadpool; + boost::asio::io_service::work m_ioServiceWork; +}; + +class taskEventTranslator : public EventTranslatorInterface<Task> { + public: + taskEventTranslator(Task* event); + virtual ~taskEventTranslator() {} + virtual Task* TranslateTo(const int64_t& sequence, Task* event); + + private: + Task* m_taskEvent; +}; + +class taskExceptionHandler : public ExceptionHandlerInterface<Task> { + public: + virtual void Handle(const std::exception& exception, const int64_t& sequence, + Task* event) {} +}; + +class disruptorLFQ { + public: + disruptorLFQ(int threadCount) { + m_task_factory.reset(new taskEventFactory()); + m_ring_buffer.reset(new RingBuffer<Task>( + m_task_factory.get(), + 1024, // default size is 1024, must be n power of 2 + kSingleThreadedStrategy, + // metaq::kBusySpinStrategy);//load normal, high cpu occupy, and + // smallest consume latency + // metaq::kYieldingStrategy); //load normal, high cpu occupy, and + // smaller consume latency + // metaq::kSleepingStrategy);//load normal, lowest cpu occupy, but + // largest consume latency + kBlockingStrategy)); // load normal, lowest CPU occupy, but + // largest consume latency + + m_sequence_to_track.reset(new std::vector<Sequence*>(0)); + m_sequenceBarrier.reset( + m_ring_buffer->NewBarrier(*(m_sequence_to_track.get()))); + + m_task_handler.reset(new taskBatchHandler(threadCount)); + m_task_exception_handler.reset(new taskExceptionHandler()); + m_processor.reset(new BatchEventProcessor<Task>( + m_ring_buffer.get(), + (SequenceBarrierInterface*)m_sequenceBarrier.get(), + m_task_handler.get(), m_task_exception_handler.get())); + + /* + Publisher will try to publish BUFFER_SIZE + 1 events. + The last event should wait for at least one consume before publishing, thus + preventing an overwrite. + After the single consume, the publisher should resume and publish the last + event. + */ + m_gating_sequences.push_back(m_processor.get()->GetSequence()); + m_ring_buffer->set_gating_sequences( + m_gating_sequences); // prevent overlap, publishEvent will be blocked + // on ring_buffer_->Next(); + + m_publisher.reset(new EventPublisher<Task>(m_ring_buffer.get())); + } + virtual ~disruptorLFQ() {} + + public: + boost::scoped_ptr<taskEventFactory> m_task_factory; + boost::scoped_ptr<taskBatchHandler> m_task_handler; + boost::scoped_ptr<taskExceptionHandler> m_task_exception_handler; + boost::scoped_ptr<std::vector<Sequence*>> m_sequence_to_track; + boost::scoped_ptr<RingBuffer<Task>> m_ring_buffer; + boost::scoped_ptr<ProcessingSequenceBarrier> m_sequenceBarrier; + boost::scoped_ptr<BatchEventProcessor<Task>> m_processor; + boost::scoped_ptr<EventPublisher<Task>> m_publisher; + std::vector<Sequence*> m_gating_sequences; +}; +} +//<!*************************************************************************** + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/task_queue.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/thread/task_queue.cpp b/rocketmq-cpp/src/thread/task_queue.cpp new file mode 100755 index 0000000..510425c --- /dev/null +++ b/rocketmq-cpp/src/thread/task_queue.cpp @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "task_queue.h" +#include <sys/prctl.h> +#include "UtilAll.h" +#include "disruptorLFQ.h" + +namespace rocketmq { +//<!*************************************************************************** +Task* taskEventFactory::NewInstance(const int& size) const { + return new Task[size]; +} + +taskBatchHandler::taskBatchHandler(int pullMsgThreadPoolNum) + : m_ioServiceWork(m_ioService) { + string taskName = UtilAll::getProcessName(); + prctl(PR_SET_NAME, "PullMsgTP", 0, 0, 0); + for (int i = 0; i != pullMsgThreadPoolNum; ++i) { + m_threadpool.create_thread( + boost::bind(&boost::asio::io_service::run, &m_ioService)); + } + prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0); +} + +void taskBatchHandler::OnEvent(const int64_t& sequence, + const bool& end_of_batch, Task* event) { + //cp Task event out, avoid publish event override current Task event + Task currentTask(*event); + m_ioService.post(boost::bind(&taskBatchHandler::runTaskEvent, this, + currentTask, sequence)); +} + +void taskBatchHandler::runTaskEvent(Task event, int64_t sequence) { + // LOG_INFO("processor event sequence:%lld", sequence); + event.run(); +} + +void taskBatchHandler::stopIOService() { + m_ioService.stop(); + m_threadpool.join_all(); +} + +taskEventTranslator::taskEventTranslator(Task* event) : m_taskEvent(event) {} + +Task* taskEventTranslator::TranslateTo(const int64_t& sequence, Task* event) { + // LOG_INFO("publish sequence:%lld, event:%x", sequence, event); + *event = *m_taskEvent; + return event; +}; + +//******************************************************************************************8 +TaskQueue::TaskQueue(int threadCount) { + m_flag.store(true, boost::memory_order_release); + m_disruptorLFQ = new disruptorLFQ(threadCount); +} + +TaskQueue::~TaskQueue() { + delete m_disruptorLFQ; + m_disruptorLFQ = NULL; +} + +void TaskQueue::close() { + m_flag.store(false, boost::memory_order_release); + m_disruptorLFQ->m_task_handler->stopIOService(); + m_disruptorLFQ->m_processor->Halt(); +} + +bool TaskQueue::bTaskQueueStatusOK() { + return m_flag.load(boost::memory_order_acquire) == true; +} + +void TaskQueue::produce(const Task& task) { + boost::mutex::scoped_lock lock(m_publishLock); + taskEventTranslator pTranslator(const_cast<Task*>(&task)); + m_disruptorLFQ->m_publisher->PublishEvent(&pTranslator); +} + +int TaskQueue::run() { + while (true) { + m_disruptorLFQ->m_processor->Run(); + if (m_flag.load(boost::memory_order_acquire) == false) { + break; + } + } + return 0; +} + +//<!*************************************************************************** +} //<! end namespace; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/task_queue.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/thread/task_queue.h b/rocketmq-cpp/src/thread/task_queue.h new file mode 100755 index 0000000..e60607c --- /dev/null +++ b/rocketmq-cpp/src/thread/task_queue.h @@ -0,0 +1,679 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *//******************************************************************** +author: qiwei....@alibaba-inc.com +*********************************************************************/ +#ifndef _TASK_QUEUE_I_ +#define _TASK_QUEUE_I_ + +#include <boost/atomic.hpp> +#include <boost/thread/mutex.hpp> +#include <list> +#include <vector> +using namespace std; + +namespace rocketmq { + +//<!*************************************************************************** +typedef void (*taskfunc)(void*); + +//<! æ°æ®å æä½çéå +class ITask_impl { + public: + virtual ~ITask_impl() {} + virtual void run() = 0; + virtual ITask_impl* fork() = 0; +}; + +//<!*************************************************************************** +class Task_impl : public ITask_impl { + public: + Task_impl(taskfunc func, void* arg_) : m_func(func), m_arg(arg_) {} + virtual ~Task_impl() { + m_func = 0; + m_arg = 0; + } + virtual void run() { + if (m_func != 0) m_func(m_arg); + } + virtual ITask_impl* fork() { return new Task_impl(m_func, m_arg); } + + protected: + taskfunc m_func; + void* m_arg; +}; + +//<!*************************************************************************** +//<! æé ITask_implçå类对象æ¶ï¼ä¸ºå ¶èµäºä¸åçæ°æ®åæä½å³å¯ã +//<! è¿é使ç¨äºç»åçæ¹å¼å®ç°äºæ¥å£åå®ç°çå离; +//<!*************************************************************************** +struct Task { + static void dumy(void*) {} + + Task(taskfunc f_, void* d_) { m_pTaskImpl = new Task_impl(f_, d_); } + Task(ITask_impl* task_imp_) : m_pTaskImpl(task_imp_) {} + Task(const Task& src_) : m_pTaskImpl(src_.m_pTaskImpl->fork()) {} + Task() { m_pTaskImpl = new Task_impl(&Task::dumy, 0); } + virtual ~Task() { delete m_pTaskImpl; } + Task& operator=(const Task& src_) { + delete m_pTaskImpl; + m_pTaskImpl = src_.m_pTaskImpl->fork(); + return *this; + } + void run() { + if (m_pTaskImpl) m_pTaskImpl->run(); + } + + private: + ITask_impl* m_pTaskImpl; +}; + +//<!*************************************************************************** +class ITaskQueue { + public: + typedef list<Task> TaskList; + + public: + virtual ~ITaskQueue() {} + virtual void close() = 0; + virtual void produce(const Task& task) = 0; + // virtual void multi_produce(const TaskList& tasks) = 0; + // virtual int consume(Task& task) = 0; + // virtual int consume_all(TaskList& tasks) = 0; + virtual int run() = 0; + // virtual int batch_run() = 0; + virtual bool bTaskQueueStatusOK() = 0; +}; + +//<!*************************************************************************** +//<! ç±äºä¸åçæä½åæ°æ®å¯è½éè¦æé ä¸åITask_implåç±»ï¼ +//<! +//æ们éè¦æä¾ä¸äºæ³åå½æ°ï¼è½å¤å°ç¨æ·çæææä½åæ°æ®é½è½è½»æç转æ¢æTask对象ã +//<! TaskBinder æä¾ä¸ç³»åçgenå½æ°ï¼è½å¤è½¬æ¢ç¨æ·çæ®éå½æ°åæ°æ®ä¸ºTask对象; +//<!*************************************************************************** +struct TaskBinder { + static Task gen(void (*func)(void*), void* p_) { return Task(func, p_); } + + template <typename RET> + static Task gen(RET (*func)(void)) { + struct lambda { + static void taskfunc(void* p_) { (*(RET(*)(void))p_)(); }; + }; + return Task(lambda::taskfunc, (void*)func); + } + + template <typename FUNCT, typename ARG1> + static Task gen(FUNCT func, ARG1 arg1) { + struct lambda : public ITask_impl { + FUNCT dest_func; + ARG1 arg1; + lambda(FUNCT func, const ARG1& arg1) : dest_func(func), arg1(arg1) {} + virtual void run() { (*dest_func)(arg1); } + virtual ITask_impl* fork() { return new lambda(dest_func, arg1); } + }; + return Task(new lambda(func, arg1)); + } + + template <typename FUNCT, typename ARG1, typename ARG2> + static Task gen(FUNCT func, ARG1 arg1, ARG2 arg2) { + struct lambda : public ITask_impl { + FUNCT dest_func; + ARG1 arg1; + ARG2 arg2; + lambda(FUNCT func, const ARG1& arg1, const ARG2& arg2) + : dest_func(func), arg1(arg1), arg2(arg2) {} + virtual void run() { (*dest_func)(arg1, arg2); } + virtual ITask_impl* fork() { return new lambda(dest_func, arg1, arg2); } + }; + return Task(new lambda(func, arg1, arg2)); + } + + template <typename FUNCT, typename ARG1, typename ARG2, typename ARG3> + static Task gen(FUNCT func, ARG1 arg1, ARG2 arg2, ARG3 arg3) { + struct lambda : public ITask_impl { + FUNCT dest_func; + ARG1 arg1; + ARG2 arg2; + ARG3 arg3; + lambda(FUNCT func, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3) + : dest_func(func), arg1(arg1), arg2(arg2), arg3(arg3) {} + virtual void run() { (*dest_func)(arg1, arg2, arg3); } + virtual ITask_impl* fork() { + return new lambda(dest_func, arg1, arg2, arg3); + } + }; + return Task(new lambda(func, arg1, arg2, arg3)); + } + + template <typename FUNCT, typename ARG1, typename ARG2, typename ARG3, + typename ARG4> + static Task gen(FUNCT func, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4) { + struct lambda : public ITask_impl { + FUNCT dest_func; + ARG1 arg1; + ARG2 arg2; + ARG3 arg3; + ARG4 arg4; + lambda(FUNCT func, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3, + const ARG4& arg4) + : dest_func(func), arg1(arg1), arg2(arg2), arg3(arg3), arg4(arg4) {} + virtual void run() { (*dest_func)(arg1, arg2, arg3, arg4); } + virtual ITask_impl* fork() { + return new lambda(dest_func, arg1, arg2, arg3, arg4); + } + }; + return Task(new lambda(func, arg1, arg2, arg3, arg4)); + } + + template <typename FUNCT, typename ARG1, typename ARG2, typename ARG3, + typename ARG4, typename ARG5> + static Task gen(FUNCT func, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4, + ARG5 arg5) { + struct lambda : public ITask_impl { + FUNCT dest_func; + ARG1 arg1; + ARG2 arg2; + ARG3 arg3; + ARG4 arg4; + ARG5 arg5; + lambda(FUNCT func, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3, + const ARG4& arg4, const ARG5& arg5) + : dest_func(func), + arg1(arg1), + arg2(arg2), + arg3(arg3), + arg4(arg4), + arg5(arg5) {} + virtual void run() { (*dest_func)(arg1, arg2, arg3, arg4, arg5); } + virtual ITask_impl* fork() { + return new lambda(dest_func, arg1, arg2, arg3, arg4, arg5); + } + }; + return Task(new lambda(func, arg1, arg2, arg3, arg4, arg5)); + } + + template <typename FUNCT, typename ARG1, typename ARG2, typename ARG3, + typename ARG4, typename ARG5, typename ARG6> + static Task gen(FUNCT func, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4, + ARG5 arg5, ARG6 arg6) { + struct lambda : public ITask_impl { + FUNCT dest_func; + ARG1 arg1; + ARG2 arg2; + ARG3 arg3; + ARG4 arg4; + ARG5 arg5; + ARG6 arg6; + lambda(FUNCT func, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3, + const ARG4& arg4, const ARG5& arg5, const ARG6& arg6) + : dest_func(func), + arg1(arg1), + arg2(arg2), + arg3(arg3), + arg4(arg4), + arg5(arg5), + arg6(arg6) {} + virtual void run() { (*dest_func)(arg1, arg2, arg3, arg4, arg5, arg6); } + virtual ITask_impl* fork() { + return new lambda(dest_func, arg1, arg2, arg3, arg4, arg5, arg6); + } + }; + return Task(new lambda(func, arg1, arg2, arg3, arg4, arg5, arg6)); + } + + template <typename FUNCT, typename ARG1, typename ARG2, typename ARG3, + typename ARG4, typename ARG5, typename ARG6, typename ARG7> + static Task gen(FUNCT func, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4, + ARG5 arg5, ARG6 arg6, ARG7 arg7) { + struct lambda : public ITask_impl { + FUNCT dest_func; + ARG1 arg1; + ARG2 arg2; + ARG3 arg3; + ARG4 arg4; + ARG5 arg5; + ARG6 arg6; + ARG7 arg7; + lambda(FUNCT func, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3, + const ARG4& arg4, const ARG5& arg5, const ARG6& arg6, + const ARG7& arg7) + : dest_func(func), + arg1(arg1), + arg2(arg2), + arg3(arg3), + arg4(arg4), + arg5(arg5), + arg6(arg6), + arg7(arg7) {} + virtual void run() { + (*dest_func)(arg1, arg2, arg3, arg4, arg5, arg6, arg7); + } + virtual ITask_impl* fork() { + return new lambda(dest_func, arg1, arg2, arg3, arg4, arg5, arg6, arg7); + } + }; + return Task(new lambda(func, arg1, arg2, arg3, arg4, arg5, arg6, arg7)); + } + + template <typename FUNCT, typename ARG1, typename ARG2, typename ARG3, + typename ARG4, typename ARG5, typename ARG6, typename ARG7, + typename ARG8> + static Task gen(FUNCT func, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4, + ARG5 arg5, ARG6 arg6, ARG7 arg7, ARG8 arg8) { + struct lambda : public ITask_impl { + FUNCT dest_func; + ARG1 arg1; + ARG2 arg2; + ARG3 arg3; + ARG4 arg4; + ARG5 arg5; + ARG6 arg6; + ARG7 arg7; + ARG8 arg8; + lambda(FUNCT func, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3, + const ARG4& arg4, const ARG5& arg5, const ARG6& arg6, + const ARG7& arg7, const ARG8& arg8) + : dest_func(func), + arg1(arg1), + arg2(arg2), + arg3(arg3), + arg4(arg4), + arg5(arg5), + arg6(arg6), + arg7(arg7), + arg8(arg8) {} + virtual void run() { + (*dest_func)(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8); + } + virtual ITask_impl* fork() { + return new lambda(dest_func, arg1, arg2, arg3, arg4, arg5, arg6, arg7, + arg8); + } + }; + return Task( + new lambda(func, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8)); + } + + template <typename FUNCT, typename ARG1, typename ARG2, typename ARG3, + typename ARG4, typename ARG5, typename ARG6, typename ARG7, + typename ARG8, typename ARG9> + static Task gen(FUNCT func, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4, + ARG5 arg5, ARG6 arg6, ARG7 arg7, ARG8 arg8, ARG9 arg9) { + struct lambda : public ITask_impl { + FUNCT dest_func; + ARG1 arg1; + ARG2 arg2; + ARG3 arg3; + ARG4 arg4; + ARG5 arg5; + ARG6 arg6; + ARG7 arg7; + ARG8 arg8; + ARG9 arg9; + lambda(FUNCT func, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3, + const ARG4& arg4, const ARG5& arg5, const ARG6& arg6, + const ARG7& arg7, const ARG8& arg8, const ARG9& arg9) + : dest_func(func), + arg1(arg1), + arg2(arg2), + arg3(arg3), + arg4(arg4), + arg5(arg5), + arg6(arg6), + arg7(arg7), + arg8(arg8), + arg9(arg9) {} + virtual void run() { + (*dest_func)(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9); + } + virtual ITask_impl* fork() { + return new lambda(dest_func, arg1, arg2, arg3, arg4, arg5, arg6, arg7, + arg8, arg9); + } + }; + return Task( + new lambda(func, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9)); + } + + //<!*************************************************************************** + //<! class fuctions;; + //<!*************************************************************************** + template <typename T, typename RET> + static Task gen(RET (T::*func)(void), T* obj) { + struct lambda : public ITask_impl { + RET (T::*dest_func)(void); + T* obj; + lambda(RET (T::*func)(void), T* obj) : dest_func(func), obj(obj) {} + virtual void run() { (obj->*dest_func)(); } + virtual ITask_impl* fork() { return new lambda(dest_func, obj); } + }; + return Task(new lambda(func, obj)); + } + + template <typename T, typename RET, typename FARG1, typename ARG1> + static Task gen(RET (T::*func)(FARG1), T* obj, ARG1 arg1) { + struct lambda : public ITask_impl { + RET (T::*dest_func)(FARG1); + T* obj; + ARG1 arg1; + lambda(RET (T::*pfunc)(FARG1), T* obj, const ARG1& arg1) + : dest_func(pfunc), obj(obj), arg1(arg1) {} + virtual void run() { (obj->*dest_func)(arg1); } + virtual ITask_impl* fork() { return new lambda(dest_func, obj, arg1); } + }; + return Task(new lambda(func, obj, arg1)); + } + + template <typename T, typename RET, typename FARG1, typename FARG2, + typename ARG1, typename ARG2> + static Task gen(RET (T::*func)(FARG1, FARG2), T* obj, ARG1 arg1, ARG2 arg2) { + struct lambda : public ITask_impl { + RET (T::*dest_func)(FARG1, FARG2); + T* obj; + ARG1 arg1; + ARG2 arg2; + lambda(RET (T::*func)(FARG1, FARG2), T* obj, const ARG1& arg1, + const ARG2& arg2) + : dest_func(func), obj(obj), arg1(arg1), arg2(arg2) {} + virtual void run() { (obj->*dest_func)(arg1, arg2); } + virtual ITask_impl* fork() { + return new lambda(dest_func, obj, arg1, arg2); + } + }; + return Task(new lambda(func, obj, arg1, arg2)); + } + + template <typename T, typename RET, typename FARG1, typename FARG2, + typename FARG3, typename ARG1, typename ARG2, typename ARG3> + static Task gen(RET (T::*func)(FARG1, FARG2, FARG3), T* obj, ARG1 arg1, + ARG2 arg2, ARG3 arg3) { + struct lambda : public ITask_impl { + RET (T::*dest_func)(FARG1, FARG2, FARG3); + T* obj; + ARG1 arg1; + ARG2 arg2; + ARG3 arg3; + lambda(RET (T::*func)(FARG1, FARG2, FARG3), T* obj, const ARG1& arg1, + const ARG2& arg2, const ARG3& arg3) + : dest_func(func), obj(obj), arg1(arg1), arg2(arg2), arg3(arg3) {} + virtual void run() { (obj->*dest_func)(arg1, arg2, arg3); } + virtual ITask_impl* fork() { + return new lambda(dest_func, obj, arg1, arg2, arg3); + } + }; + return Task(new lambda(func, obj, arg1, arg2, arg3)); + } + + template <typename T, typename RET, typename FARG1, typename FARG2, + typename FARG3, typename FARG4, typename ARG1, typename ARG2, + typename ARG3, typename ARG4> + static Task gen(RET (T::*func)(FARG1, FARG2, FARG3, FARG4), T* obj, ARG1 arg1, + ARG2 arg2, ARG3 arg3, ARG4 arg4) { + struct lambda : public ITask_impl { + RET (T::*dest_func)(FARG1, FARG2, FARG3, FARG4); + T* obj; + ARG1 arg1; + ARG2 arg2; + ARG3 arg3; + ARG4 arg4; + lambda(RET (T::*func)(FARG1, FARG2, FARG3, FARG4), T* obj, + const ARG1& arg1, const ARG2& arg2, const ARG3& arg3, + const ARG4& arg4) + : dest_func(func), + obj(obj), + arg1(arg1), + arg2(arg2), + arg3(arg3), + arg4(arg4) {} + virtual void run() { (obj->*dest_func)(arg1, arg2, arg3, arg4); } + virtual ITask_impl* fork() { + return new lambda(dest_func, obj, arg1, arg2, arg3, arg4); + } + }; + return Task(new lambda(func, obj, arg1, arg2, arg3, arg4)); + } + + template <typename T, typename RET, typename FARG1, typename FARG2, + typename FARG3, typename FARG4, typename FARG5, typename ARG1, + typename ARG2, typename ARG3, typename ARG4, typename ARG5> + static Task gen(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5), T* obj, + ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4, ARG5 arg5) { + struct lambda : public ITask_impl { + RET (T::*dest_func)(FARG1, FARG2, FARG3, FARG4, FARG5); + T* obj; + ARG1 arg1; + ARG2 arg2; + ARG3 arg3; + ARG4 arg4; + ARG5 arg5; + lambda(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5), T* obj, + const ARG1& arg1, const ARG2& arg2, const ARG3& arg3, + const ARG4& arg4, const ARG5& arg5) + : dest_func(func), + obj(obj), + arg1(arg1), + arg2(arg2), + arg3(arg3), + arg4(arg4), + arg5(arg5) {} + virtual void run() { (obj->*dest_func)(arg1, arg2, arg3, arg4, arg5); } + virtual ITask_impl* fork() { + return new lambda(dest_func, obj, arg1, arg2, arg3, arg4, arg5); + } + }; + return Task(new lambda(func, obj, arg1, arg2, arg3, arg4, arg5)); + } + + template <typename T, typename RET, typename FARG1, typename FARG2, + typename FARG3, typename FARG4, typename FARG5, typename FARG6, + typename ARG1, typename ARG2, typename ARG3, typename ARG4, + typename ARG5, typename ARG6> + static Task gen(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6), + T* obj, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4, ARG5 arg5, + ARG6 arg6) { + struct lambda : public ITask_impl { + RET (T::*dest_func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6); + T* obj; + ARG1 arg1; + ARG2 arg2; + ARG3 arg3; + ARG4 arg4; + ARG5 arg5; + ARG6 arg6; + lambda(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6), T* obj, + const ARG1& arg1, const ARG2& arg2, const ARG3& arg3, + const ARG4& arg4, const ARG5& arg5, const ARG6& arg6) + : dest_func(func), + obj(obj), + arg1(arg1), + arg2(arg2), + arg3(arg3), + arg4(arg4), + arg5(arg5), + arg6(arg6) {} + virtual void run() { + (obj->*dest_func)(arg1, arg2, arg3, arg4, arg5, arg6); + } + virtual ITask_impl* fork() { + return new lambda(dest_func, obj, arg1, arg2, arg3, arg4, arg5, arg6); + } + }; + return Task(new lambda(func, obj, arg1, arg2, arg3, arg4, arg5, arg6)); + } + + template <typename T, typename RET, typename FARG1, typename FARG2, + typename FARG3, typename FARG4, typename FARG5, typename FARG6, + typename FARG7, typename ARG1, typename ARG2, typename ARG3, + typename ARG4, typename ARG5, typename ARG6, typename ARG7> + static Task gen(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6, + FARG7), + T* obj, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4, ARG5 arg5, + ARG6 arg6, ARG7 arg7) { + struct lambda : public ITask_impl { + RET (T::*dest_func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6, FARG7); + T* obj; + ARG1 arg1; + ARG2 arg2; + ARG3 arg3; + ARG4 arg4; + ARG5 arg5; + ARG6 arg6; + ARG7 arg7; + lambda(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6, FARG7), + T* obj, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3, + const ARG4& arg4, const ARG5& arg5, const ARG6& arg6, + const ARG7& arg7) + : dest_func(func), + obj(obj), + arg1(arg1), + arg2(arg2), + arg3(arg3), + arg4(arg4), + arg5(arg5), + arg6(arg6), + arg7(arg7) {} + virtual void run() { + (obj->*dest_func)(arg1, arg2, arg3, arg4, arg5, arg6, arg7); + } + virtual ITask_impl* fork() { + return new lambda(dest_func, obj, arg1, arg2, arg3, arg4, arg5, arg6, + arg7); + } + }; + return Task( + new lambda(func, obj, arg1, arg2, arg3, arg4, arg5, arg6, arg7)); + } + + template <typename T, typename RET, typename FARG1, typename FARG2, + typename FARG3, typename FARG4, typename FARG5, typename FARG6, + typename FARG7, typename FARG8, typename ARG1, typename ARG2, + typename ARG3, typename ARG4, typename ARG5, typename ARG6, + typename ARG7, typename ARG8> + static Task gen(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6, + FARG7, FARG8), + T* obj, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4, ARG5 arg5, + ARG6 arg6, ARG7 arg7, ARG8 arg8) { + struct lambda : public ITask_impl { + RET (T::*dest_func) + (FARG1, FARG2, FARG3, FARG4, FARG5, FARG6, FARG7, FARG8); + T* obj; + ARG1 arg1; + ARG2 arg2; + ARG3 arg3; + ARG4 arg4; + ARG5 arg5; + ARG6 arg6; + ARG7 arg7; + ARG8 arg8; + lambda(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6, FARG7, + FARG8), + T* obj, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3, + const ARG4& arg4, const ARG5& arg5, const ARG6& arg6, + const ARG7& arg7, const ARG8& arg8) + : dest_func(func), + obj(obj), + arg1(arg1), + arg2(arg2), + arg3(arg3), + arg4(arg4), + arg5(arg5), + arg6(arg6), + arg7(arg7), + arg8(arg8) {} + virtual void run() { + (obj->*dest_func)(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8); + } + virtual ITask_impl* fork() { + return new lambda(dest_func, obj, arg1, arg2, arg3, arg4, arg5, arg6, + arg7, arg8); + } + }; + return Task( + new lambda(func, obj, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8)); + } + + template <typename T, typename RET, typename FARG1, typename FARG2, + typename FARG3, typename FARG4, typename FARG5, typename FARG6, + typename FARG7, typename FARG8, typename FARG9, typename ARG1, + typename ARG2, typename ARG3, typename ARG4, typename ARG5, + typename ARG6, typename ARG7, typename ARG8, typename ARG9> + static Task gen(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6, + FARG7, FARG8, FARG9), + T* obj, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4, ARG5 arg5, + ARG6 arg6, ARG7 arg7, ARG8 arg8, ARG9 arg9) { + struct lambda : public ITask_impl { + RET (T::*dest_func) + (FARG1, FARG2, FARG3, FARG4, FARG5, FARG6, FARG7, FARG8, FARG9); + T* obj; + ARG1 arg1; + ARG2 arg2; + ARG3 arg3; + ARG4 arg4; + ARG5 arg5; + ARG6 arg6; + ARG7 arg7; + ARG8 arg8; + ARG9 arg9; + lambda(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6, FARG7, + FARG8, FARG9), + T* obj, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3, + const ARG4& arg4, const ARG5& arg5, const ARG6& arg6, + const ARG7& arg7, const ARG8& arg8, const ARG9& arg9) + : dest_func(func), + obj(obj), + arg1(arg1), + arg2(arg2), + arg3(arg3), + arg4(arg4), + arg5(arg5), + arg6(arg6), + arg7(arg7), + arg8(arg8), + arg9(arg9) {} + virtual void run() { + (obj->*dest_func)(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9); + } + virtual ITask_impl* fork() { + return new lambda(dest_func, obj, arg1, arg2, arg3, arg4, arg5, arg6, + arg7, arg8, arg9); + } + }; + return Task(new lambda(func, obj, arg1, arg2, arg3, arg4, arg5, arg6, arg7, + arg8, arg9)); + } +}; + +//<!*************************************************************************** +class disruptorLFQ; +class TaskQueue : public ITaskQueue { + public: + TaskQueue(int threadCount); + virtual ~TaskQueue(); + virtual void close(); + virtual void produce(const Task& task); + virtual int run(); + virtual bool bTaskQueueStatusOK(); + + private: + boost::atomic<bool> m_flag; + disruptorLFQ* m_disruptorLFQ; + boost::mutex m_publishLock; +}; + +//<!*************************************************************************** +} //<! end namespace; + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/ClientRemotingProcessor.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/transport/ClientRemotingProcessor.cpp b/rocketmq-cpp/src/transport/ClientRemotingProcessor.cpp new file mode 100644 index 0000000..a1462e6 --- /dev/null +++ b/rocketmq-cpp/src/transport/ClientRemotingProcessor.cpp @@ -0,0 +1,146 @@ +#include "ClientRemotingProcessor.h" +#include "ClientRPCHook.h" +#include "ConsumerRunningInfo.h" +#include "MQClientFactory.h" +#include "UtilAll.h" + +namespace rocketmq { + +ClientRemotingProcessor::ClientRemotingProcessor( + MQClientFactory* mqClientFactory) + : m_mqClientFactory(mqClientFactory) {} + +ClientRemotingProcessor::~ClientRemotingProcessor() {} + +RemotingCommand* ClientRemotingProcessor::processRequest( + const string& addr, RemotingCommand* request) { + LOG_DEBUG("request Command received:processRequest"); + switch (request->getCode()) { + case CHECK_TRANSACTION_STATE: + // return checkTransactionState( request); + break; + case NOTIFY_CONSUMER_IDS_CHANGED: + return notifyConsumerIdsChanged(request); + break; + case RESET_CONSUMER_CLIENT_OFFSET: // oneWayRPC + return resetOffset(request); + case GET_CONSUMER_STATUS_FROM_CLIENT: + // return getConsumeStatus( request); + break; + case GET_CONSUMER_RUNNING_INFO: + return getConsumerRunningInfo(addr, request); + break; + case CONSUME_MESSAGE_DIRECTLY: + // return consumeMessageDirectly( request); + break; + default: + break; + } + return NULL; +} + +RemotingCommand* ClientRemotingProcessor::resetOffset( + RemotingCommand* request) { + request->SetExtHeader(request->getCode()); + const MemoryBlock* pbody = request->GetBody(); + if (pbody->getSize()) { + ResetOffsetBody* offsetBody = ResetOffsetBody::Decode(pbody); + ResetOffsetRequestHeader* offsetHeader = + (ResetOffsetRequestHeader*)request->getCommandHeader(); + if (offsetBody) { + m_mqClientFactory->resetOffset(offsetHeader->getGroup(), + offsetHeader->getTopic(), + offsetBody->getOffsetTable()); + } else { + LOG_ERROR( + "resetOffset failed as received data could not be unserialized"); + } + } + return NULL; // as resetOffset is oneWayRPC, do not need return any response +} + +std::map<MQMessageQueue, int64> ResetOffsetBody::getOffsetTable() { + return m_offsetTable; +} + +void ResetOffsetBody::setOffsetTable(MQMessageQueue mq, int64 offset) { + m_offsetTable[mq] = offset; +} + +ResetOffsetBody* ResetOffsetBody::Decode(const MemoryBlock* mem) { + const char* const pData = static_cast<const char*>(mem->getData()); + Json::Reader reader; + Json::Value root; + const char* begin = pData; + const char* end = pData + mem->getSize(); + + if (!reader.parse(begin, end, root, true)) { + LOG_ERROR("ResetOffsetBody::Decode fail"); + return NULL; + } + + ResetOffsetBody* rfb = new ResetOffsetBody(); + Json::Value qds = root["offsetTable"]; + for (unsigned int i = 0; i < qds.size(); i++) { + MQMessageQueue mq; + Json::Value qd = qds[i]; + mq.setBrokerName(qd["brokerName"].asString()); + mq.setQueueId(qd["queueId"].asInt()); + mq.setTopic(qd["topic"].asString()); + int64 offset = qd["offset"].asInt(); + LOG_INFO("ResetOffsetBody brokerName:%s, queueID:%d, topic:%s, offset:%lld", + mq.getBrokerName().c_str(), mq.getQueueId(), mq.getTopic().c_str(), + offset); + rfb->setOffsetTable(mq, offset); + } + return rfb; +} + +RemotingCommand* ClientRemotingProcessor::getConsumerRunningInfo( + const string& addr, RemotingCommand* request) { + request->SetExtHeader(request->getCode()); + GetConsumerRunningInfoRequestHeader* requestHeader = + (GetConsumerRunningInfoRequestHeader*)request->getCommandHeader(); + LOG_INFO("getConsumerRunningInfo:%s", + requestHeader->getConsumerGroup().c_str()); + + RemotingCommand* pResponse = new RemotingCommand( + request->getCode(), "CPP", request->getVersion(), request->getOpaque(), + request->getFlag(), request->getRemark(), NULL); + + unique_ptr<ConsumerRunningInfo> runningInfo( + m_mqClientFactory->consumerRunningInfo( + requestHeader->getConsumerGroup())); + if (runningInfo) { + if (requestHeader->isJstackEnable()) { + /*string jstack = UtilAll::jstack(); + consumerRunningInfo->setJstack(jstack);*/ + } + pResponse->setCode(SUCCESS_VALUE); + string body = runningInfo->encode(); + pResponse->SetBody(body.c_str(), body.length()); + pResponse->setMsgBody(body); + } else { + pResponse->setCode(SYSTEM_ERROR); + pResponse->setRemark("The Consumer Group not exist in this consumer"); + } + + SessionCredentials sessionCredentials; + m_mqClientFactory->getSessionCredentialFromConsumer( + requestHeader->getConsumerGroup(), sessionCredentials); + ClientRPCHook rpcHook(sessionCredentials); + rpcHook.doBeforeRequest(addr, *pResponse); + pResponse->Encode(); + return pResponse; +} + +RemotingCommand* ClientRemotingProcessor::notifyConsumerIdsChanged( + RemotingCommand* request) { + request->SetExtHeader(request->getCode()); + NotifyConsumerIdsChangedRequestHeader* requestHeader = + (NotifyConsumerIdsChangedRequestHeader*)request->getCommandHeader(); + LOG_INFO("notifyConsumerIdsChanged:%s", requestHeader->getGroup().c_str()); + m_mqClientFactory->doRebalanceByConsumerGroup(requestHeader->getGroup()); + return NULL; +} +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/ClientRemotingProcessor.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/transport/ClientRemotingProcessor.h b/rocketmq-cpp/src/transport/ClientRemotingProcessor.h new file mode 100644 index 0000000..1bb757e --- /dev/null +++ b/rocketmq-cpp/src/transport/ClientRemotingProcessor.h @@ -0,0 +1,39 @@ +#ifndef __CLIENTREMOTINGPROCESSOR_H__ +#define __CLIENTREMOTINGPROCESSOR_H__ + +#include "MQMessageQueue.h" +#include "MQProtos.h" +#include "RemotingCommand.h" + +namespace rocketmq { + +class MQClientFactory; +class ClientRemotingProcessor { + public: + ClientRemotingProcessor(MQClientFactory* mqClientFactory); + virtual ~ClientRemotingProcessor(); + + RemotingCommand* processRequest(const string& addr, RemotingCommand* request); + RemotingCommand* resetOffset(RemotingCommand* request); + RemotingCommand* getConsumerRunningInfo(const string& addr, + RemotingCommand* request); + RemotingCommand* notifyConsumerIdsChanged(RemotingCommand* request); + + private: + MQClientFactory* m_mqClientFactory; +}; + +class ResetOffsetBody { + public: + ResetOffsetBody() {} + virtual ~ResetOffsetBody() { m_offsetTable.clear(); } + void setOffsetTable(MQMessageQueue mq, int64 offset); + std::map<MQMessageQueue, int64> getOffsetTable(); + static ResetOffsetBody* Decode(const MemoryBlock* mem); + + private: + std::map<MQMessageQueue, int64> m_offsetTable; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/ResponseFuture.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/transport/ResponseFuture.cpp b/rocketmq-cpp/src/transport/ResponseFuture.cpp new file mode 100755 index 0000000..05cef84 --- /dev/null +++ b/rocketmq-cpp/src/transport/ResponseFuture.cpp @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "ResponseFuture.h" +#include "Logging.h" +#include "TcpRemotingClient.h" + +namespace rocketmq { +//<!************************************************************************ +ResponseFuture::ResponseFuture(int requestCode, int opaque, + TcpRemotingClient* powner, int64 timeout, + bool bAsync /* = false */, + AsyncCallbackWrap* pcall /* = NULL */) { + m_bAsync.store(bAsync); + m_requestCode = requestCode; + m_opaque = opaque; + m_timeout = timeout; + m_pCallbackWrap = pcall; + m_pResponseCommand = NULL; + m_sendRequestOK = false; + m_beginTimestamp = UtilAll::currentTimeMillis(); + m_asyncCallbackStatus = asyncCallBackStatus_init; + if (getASyncFlag()) { + m_asyncResponse.store(false); + m_syncResponse.store(true); + } else { + m_asyncResponse.store(true); + m_syncResponse.store(false); + } +} + +ResponseFuture::~ResponseFuture() { + deleteAndZero(m_pCallbackWrap); + /* + do not set m_pResponseCommand to NULL when destruct, as m_pResponseCommand + is used by MQClientAPIImpl concurrently, and will be released by producer or + consumer; + m_pResponseCommand = NULL; + */ +} + +void ResponseFuture::releaseThreadCondition() { m_defaultEvent.notify_all(); } + +RemotingCommand* ResponseFuture::waitResponse(int timeoutMillis) { + boost::unique_lock<boost::mutex> lk(m_defaultEventLock); + if (!m_defaultEvent.timed_wait( + lk, boost::posix_time::milliseconds(timeoutMillis))) { + LOG_WARN("waitResponse of code:%d with opaque:%d timeout", m_requestCode, + m_opaque); + m_syncResponse.store(true); + } + return m_pResponseCommand; +} + +void ResponseFuture::setResponse(RemotingCommand* pResponseCommand) { + // LOG_DEBUG("setResponse of opaque:%d",m_opaque); + m_pResponseCommand = pResponseCommand; + + if (!getASyncFlag()) { + if (m_syncResponse.load() == false) { + m_defaultEvent.notify_all(); + m_syncResponse.store(true); + } + } +} + +const bool ResponseFuture::getSyncResponseFlag() { + if (m_syncResponse.load() == true) { + return true; + } + return false; +} + +const bool ResponseFuture::getAsyncResponseFlag() { + if (m_asyncResponse.load() == true) { + // LOG_DEBUG("ASYNC flag is TRUE,opaque is:%d",getOpaque() ); + return true; + } + + return false; +} + +void ResponseFuture::setAsyncResponseFlag() { m_asyncResponse.store(true); } + +const bool ResponseFuture::getASyncFlag() { + if (m_bAsync.load() == true) { + // LOG_DEBUG("ASYNC flag is TRUE,opaque is:%d",getOpaque() ); + return true; + } + return false; +} + +bool ResponseFuture::isSendRequestOK() { return m_sendRequestOK; } + +void ResponseFuture::setSendRequestOK(bool sendRequestOK) { + m_sendRequestOK = sendRequestOK; +} + +int ResponseFuture::getOpaque() const { return m_opaque; } + +int ResponseFuture::getRequestCode() const { return m_requestCode; } + +void ResponseFuture::setAsyncCallBackStatus( + asyncCallBackStatus asyncCallbackStatus) { + boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock); + if (m_asyncCallbackStatus == asyncCallBackStatus_init) { + m_asyncCallbackStatus = asyncCallbackStatus; + } +} + +void ResponseFuture::executeInvokeCallback() { + if (m_pCallbackWrap == NULL) { + deleteAndZero(m_pResponseCommand); + return; + } else { + if (m_asyncCallbackStatus == asyncCallBackStatus_response) { + m_pCallbackWrap->operationComplete(this, true); + } else { + if (m_pResponseCommand) + deleteAndZero(m_pResponseCommand); // the responseCommand from + // RemotingCommand::Decode(mem) will + // only deleted by operationComplete + // automatically + LOG_WARN( + "timeout and response incoming concurrently of opaque:%d, and " + "executeInvokeCallbackException was called earlier", + m_opaque); + } + } +} + +void ResponseFuture::executeInvokeCallbackException() { + if (m_pCallbackWrap == NULL) { + LOG_ERROR("m_pCallbackWrap is NULL, critical error"); + return; + } else { + if (m_asyncCallbackStatus == asyncCallBackStatus_timeout) { + m_pCallbackWrap->onException(); + } else { + LOG_WARN( + "timeout and response incoming concurrently of opaque:%d, and " + "executeInvokeCallback was called earlier", + m_opaque); + } + } +} + +bool ResponseFuture::isTimeOut() const { + int64 diff = UtilAll::currentTimeMillis() - m_beginTimestamp; + //<!only async; + return m_bAsync.load() == 1 && diff > m_timeout; +} + +RemotingCommand* ResponseFuture::getCommand() const { + return m_pResponseCommand; +} + +AsyncCallbackWrap* ResponseFuture::getAsyncCallbackWrap() { + return m_pCallbackWrap; +} + +//<!************************************************************************ +} //<!end namespace; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/ResponseFuture.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/transport/ResponseFuture.h b/rocketmq-cpp/src/transport/ResponseFuture.h new file mode 100755 index 0000000..92fa772 --- /dev/null +++ b/rocketmq-cpp/src/transport/ResponseFuture.h @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __RESPONSEFUTURE_H__ +#define __RESPONSEFUTURE_H__ +#include <boost/atomic.hpp> +#include <boost/thread/condition_variable.hpp> +#include "AsyncCallbackWrap.h" +#include "RemotingCommand.h" +#include "UtilAll.h" + +namespace rocketmq { + +typedef enum asyncCallBackStatus { + asyncCallBackStatus_init = 0, + asyncCallBackStatus_response = 1, + asyncCallBackStatus_timeout = 2 +} asyncCallBackStatus; + +class TcpRemotingClient; +//<!*************************************************************************** +class ResponseFuture { + public: + ResponseFuture(int requestCode, int opaque, TcpRemotingClient* powner, + int64 timeoutMilliseconds, bool bAsync = false, + AsyncCallbackWrap* pcall = NULL); + virtual ~ResponseFuture(); + void releaseThreadCondition(); + RemotingCommand* waitResponse(int timeoutMillis); + RemotingCommand* getCommand() const; + + void setResponse(RemotingCommand* pResponseCommand); + bool isSendRequestOK(); + void setSendRequestOK(bool sendRequestOK); + int getRequestCode() const; + int getOpaque() const; + + //<!callback; + void executeInvokeCallback(); + void executeInvokeCallbackException(); + bool isTimeOut() const; + // bool isTimeOutMoreThan30s() const; + const bool getASyncFlag(); + void setAsyncResponseFlag(); + const bool getAsyncResponseFlag(); + const bool getSyncResponseFlag(); + AsyncCallbackWrap* getAsyncCallbackWrap(); + void setAsyncCallBackStatus(asyncCallBackStatus asyncCallbackStatus); + + private: + int m_requestCode; + int m_opaque; + bool m_sendRequestOK; + boost::mutex m_defaultEventLock; + boost::condition_variable_any m_defaultEvent; + int64 m_beginTimestamp; + int64 m_timeout; // ms + boost::atomic<bool> m_bAsync; + RemotingCommand* m_pResponseCommand; //<!delete outside; + AsyncCallbackWrap* m_pCallbackWrap; + boost::mutex m_asyncCallbackLock; + asyncCallBackStatus m_asyncCallbackStatus; + boost::atomic<bool> m_asyncResponse; + boost::atomic<bool> m_syncResponse; + // TcpRemotingClient* m_tcpRemoteClient; +}; +//<!************************************************************************ +} //<!end namespace; + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/SocketUtil.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/transport/SocketUtil.cpp b/rocketmq-cpp/src/transport/SocketUtil.cpp new file mode 100755 index 0000000..d428f24 --- /dev/null +++ b/rocketmq-cpp/src/transport/SocketUtil.cpp @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "SocketUtil.h" + +namespace rocketmq { +//<!*************************************************************************** +sockaddr IPPort2socketAddress(int host, int port) { + struct sockaddr_in sa; + sa.sin_family = AF_INET; + sa.sin_port = htons((uint16)port); + sa.sin_addr.s_addr = htonl(host); + + sockaddr bornAddr; + memcpy(&bornAddr, &sa, sizeof(sockaddr)); + return bornAddr; +} + +string socketAddress2IPPort(sockaddr addr) { + sockaddr_in sa; + memcpy(&sa, &addr, sizeof(sockaddr)); + + char tmp[32]; + sprintf(tmp, "%s:%d", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port)); + + string ipport = tmp; + return ipport; +} + +void socketAddress2IPPort(sockaddr addr, int& host, int& port) { + struct sockaddr_in sa; + memcpy(&sa, &addr, sizeof(sockaddr)); + + host = ntohl(sa.sin_addr.s_addr); + port = ntohs(sa.sin_port); +} + +string socketAddress2String(sockaddr addr) { + sockaddr_in in; + memcpy(&in, &addr, sizeof(sockaddr)); + + return inet_ntoa(in.sin_addr); +} + +string getHostName(sockaddr addr) { + sockaddr_in in; + memcpy(&in, &addr, sizeof(sockaddr)); + + struct hostent* remoteHost = gethostbyaddr((char*)&(in.sin_addr), 4, AF_INET); + char** alias = remoteHost->h_aliases; + if (*alias != 0) { + return *alias; + } else { + return inet_ntoa(in.sin_addr); + } +} + +uint64 swapll(uint64 v) { +#ifdef ENDIANMODE_BIG + return v; +#else + uint64 ret = ((v << 56) | ((v & 0xff00) << 40) | ((v & 0xff0000) << 24) | + ((v & 0xff000000) << 8) | ((v >> 8) & 0xff000000) | + ((v >> 24) & 0xff0000) | ((v >> 40) & 0xff00) | (v >> 56)); + + return ret; +#endif +} + +uint64 h2nll(uint64 v) { return swapll(v); } + +uint64 n2hll(uint64 v) { return swapll(v); } +} //<!end namespace; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/SocketUtil.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/transport/SocketUtil.h b/rocketmq-cpp/src/transport/SocketUtil.h new file mode 100755 index 0000000..7cbba0c --- /dev/null +++ b/rocketmq-cpp/src/transport/SocketUtil.h @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __SOCKETUTIL_H__ +#define __SOCKETUTIL_H__ + +#ifdef WIN32 +#include <WS2tcpip.h> +#include <Windows.h> +#include <Winsock2.h> +#pragma comment(lib, "ws2_32.lib") +#else +#include <arpa/inet.h> +#include <errno.h> +#include <fcntl.h> +#include <net/if.h> +#include <netdb.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <signal.h> +#include <sys/ioctl.h> +#include <sys/select.h> +#include <sys/socket.h> +#include <sys/time.h> +#include <sys/types.h> +#include <unistd.h> +#endif +#include <sys/socket.h> +#include "UtilAll.h" + +namespace rocketmq { +//<!************************************************************************ +/** +* IP:PORT +*/ +sockaddr IPPort2socketAddress(int host, int port); +string socketAddress2IPPort(sockaddr addr); +void socketAddress2IPPort(sockaddr addr, int& host, int& port); + +string socketAddress2String(sockaddr addr); +string getHostName(sockaddr addr); + +uint64 h2nll(uint64 v); +uint64 n2hll(uint64 v); + +//<!************************************************************************ +} //<!end namespace; + +#endif