Repository: incubator-rocketmq-externals
Updated Branches:
  refs/heads/master 42fc22353 -> 7ceba1b23


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/RocketMqUtils.scala
----------------------------------------------------------------------
diff --git 
a/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/RocketMqUtils.scala 
b/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/RocketMqUtils.scala
new file mode 100644
index 0000000..a1b3f9d
--- /dev/null
+++ 
b/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/RocketMqUtils.scala
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spark
+
+import java.util.Properties
+import java.{lang => jl, util => ju}
+
+import org.apache.commons.lang.StringUtils
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer
+import org.apache.rocketmq.common.message.{Message, MessageExt, MessageQueue}
+import org.apache.spark.SparkContext
+import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.api.java.{JavaInputDStream, 
JavaStreamingContext}
+import org.apache.spark.streaming.dstream.InputDStream
+import org.apache.spark.streaming.{MQPullInputDStream, RocketMqRDD, 
StreamingContext}
+import org.apache.rocketmq.spark.streaming.{ReliableRocketMQReceiver, 
RocketMQReceiver}
+import org.apache.spark.storage.StorageLevel
+
+object RocketMqUtils {
+
+  /**
+    *  Scala constructor for a batch-oriented interface for consuming from 
rocketmq.
+    * Starting and ending offsets are specified in advance,
+    * so that you can control exactly-once semantics.
+    * @param sc SparkContext
+    * @param groupId it is for rocketMq for identifying the consumer
+    * @param offsetRanges offset ranges that define the RocketMq data 
belonging to this RDD
+    * @param optionParams optional configs, see [[RocketMQConfig]] for more 
details.
+    * @param locationStrategy map from TopicQueueId to preferred host for 
processing that partition.
+    * In most cases, use [[LocationStrategy.PreferConsistent]]
+    * @return RDD[MessageExt]
+    */
+  def createRDD(
+       sc: SparkContext,
+       groupId: String,
+       offsetRanges: ju.Map[TopicQueueId, Array[OffsetRange]],
+       optionParams: ju.Map[String, String] = new ju.HashMap,
+       locationStrategy: LocationStrategy = PreferConsistent
+     ): RDD[MessageExt] = {
+
+    val preferredHosts = locationStrategy match {
+      case PreferConsistent => ju.Collections.emptyMap[TopicQueueId, String]()
+      case PreferFixed(hostMap) => hostMap
+    }
+    new RocketMqRDD(sc, groupId, optionParams, offsetRanges, preferredHosts, 
false)
+  }
+
+  /**
+    *  Java constructor for a batch-oriented interface for consuming from 
rocketmq.
+    * Starting and ending offsets are specified in advance,
+    * so that you can control exactly-once semantics.
+    * @param jsc SparkContext
+    * @param groupId it is for rocketMq for identifying the consumer
+    * @param offsetRanges offset ranges that define the RocketMq data 
belonging to this RDD
+    * @param optionParams optional configs, see [[RocketMQConfig]] for more 
details.
+    * @param locationStrategy map from TopicQueueId to preferred host for 
processing that partition.
+    * In most cases, use [[LocationStrategy.PreferConsistent]]
+    * @return JavaRDD[MessageExt]
+    */
+  def createJavaRDD(
+       jsc: JavaSparkContext,
+       groupId: String,
+       offsetRanges: ju.Map[TopicQueueId, Array[OffsetRange]],
+       optionParams: ju.Map[String, String] = new ju.HashMap,
+       locationStrategy: LocationStrategy = PreferConsistent
+     ): JavaRDD[MessageExt] = {
+    new JavaRDD(createRDD(jsc.sc, groupId, offsetRanges, optionParams, 
locationStrategy))
+  }
+
+  /**
+    * Scala constructor for a RocketMq DStream
+    * @param groupId it is for rocketMq for identifying the consumer
+    * @param topics the topics for the rocketmq
+    * @param consumerStrategy consumerStrategy In most cases, pass in 
[[ConsumerStrategy.lastest]],
+    *   see [[ConsumerStrategy]] for more details
+    * @param autoCommit whether commit the offset to the rocketmq server 
automatically or not. If the user
+    *                   implement the [[OffsetCommitCallback]], the autoCommit 
must be set false
+    * @param forceSpecial Generally if the rocketmq server has checkpoint for 
the [[MessageQueue]], then the consumer
+    *  will consume from the checkpoint no matter we specify the offset or 
not. But if forceSpecial is true,
+    *  the rocketmq will start consuming from the specific available offset in 
any case.
+    * @param failOnDataLoss Zero data lost is not guaranteed when topics are 
deleted. If zero data lost is critical,
+    * the user must make sure all messages in a topic have been processed when 
deleting a topic.
+    * @param locationStrategy map from TopicQueueId to preferred host for 
processing that partition.
+    * In most cases, use [[LocationStrategy.PreferConsistent]]
+    * @param optionParams optional configs, see [[RocketMQConfig]] for more 
details.
+    * @return InputDStream[MessageExt]
+    */
+  def createMQPullStream(
+       ssc: StreamingContext,
+       groupId: String,
+       topics: ju.Collection[jl.String],
+       consumerStrategy: ConsumerStrategy,
+       autoCommit: Boolean,
+       forceSpecial: Boolean,
+       failOnDataLoss: Boolean,
+       locationStrategy: LocationStrategy = PreferConsistent,
+       optionParams: ju.Map[String, String] = new ju.HashMap
+     ): InputDStream[MessageExt] = {
+
+    new MQPullInputDStream(ssc, groupId, topics, optionParams, 
locationStrategy, consumerStrategy, autoCommit, forceSpecial,
+      failOnDataLoss)
+  }
+
+  def createMQPullStream(
+        ssc: StreamingContext,
+        groupId: String,
+        topic: String,
+        consumerStrategy: ConsumerStrategy,
+        autoCommit: Boolean,
+        forceSpecial: Boolean,
+        failOnDataLoss: Boolean,
+        optionParams: ju.Map[String, String]
+      ): InputDStream[MessageExt] = {
+    val topics = new ju.ArrayList[String]()
+    topics.add(topic)
+    new MQPullInputDStream(ssc, groupId, topics, optionParams, 
PreferConsistent, consumerStrategy, autoCommit, forceSpecial,
+      failOnDataLoss)
+  }
+
+  /**
+    * Java constructor for a RocketMq DStream
+    * @param groupId it is for rocketMq for identifying the consumer
+    * @param topics the topics for the rocketmq
+    * @param consumerStrategy consumerStrategy In most cases, pass in 
[[ConsumerStrategy.lastest]],
+    *   see [[ConsumerStrategy]] for more details
+    * @param autoCommit whether commit the offset to the rocketmq server 
automatically or not. If the user
+    *                   implement the [[OffsetCommitCallback]], the autoCommit 
must be set false
+    * @param forceSpecial Generally if the rocketmq server has checkpoint for 
the [[MessageQueue]], then the consumer
+    *  will consume from the checkpoint no matter we specify the offset or 
not. But if forceSpecial is true,
+    *  the rocketmq will start consuming from the specific available offset in 
any case.
+    * @param failOnDataLoss Zero data lost is not guaranteed when topics are 
deleted. If zero data lost is critical,
+    * the user must make sure all messages in a topic have been processed when 
deleting a topic.
+    * @param locationStrategy map from TopicQueueId to preferred host for 
processing that partition.
+    * In most cases, use [[LocationStrategy.PreferConsistent]]
+    * @param optionParams optional configs, see [[RocketMQConfig]] for more 
details.
+    * @return JavaInputDStream[MessageExt]
+    */
+  def createJavaMQPullStream(
+     ssc: JavaStreamingContext,
+     groupId: String,
+     topics: ju.Collection[jl.String],
+     consumerStrategy: ConsumerStrategy,
+     autoCommit: Boolean,
+     forceSpecial: Boolean,
+     failOnDataLoss: Boolean,
+     locationStrategy: LocationStrategy = PreferConsistent,
+     optionParams: ju.Map[String, String] = new ju.HashMap
+    ): JavaInputDStream[MessageExt] = {
+    val inputDStream = createMQPullStream(ssc.ssc, groupId, topics, 
consumerStrategy,
+      autoCommit, forceSpecial, failOnDataLoss, locationStrategy, optionParams)
+    new JavaInputDStream(inputDStream)
+  }
+
+  def createJavaMQPullStream(
+    ssc: JavaStreamingContext,
+    groupId: String,
+    topics: ju.Collection[jl.String],
+    consumerStrategy: ConsumerStrategy,
+    autoCommit: Boolean,
+    forceSpecial: Boolean,
+    failOnDataLoss: Boolean): JavaInputDStream[MessageExt] = {
+    val inputDStream = createMQPullStream(ssc.ssc, groupId, topics, 
consumerStrategy,
+      autoCommit, forceSpecial, failOnDataLoss)
+    new JavaInputDStream(inputDStream)
+  }
+
+  def mkPullConsumerInstance(groupId: String, optionParams: ju.Map[String, 
String], instance: String): DefaultMQPullConsumer = {
+    val consumer = new DefaultMQPullConsumer(groupId)
+    if (optionParams.containsKey(RocketMQConfig.PULL_TIMEOUT_MS))
+      
consumer.setConsumerTimeoutMillisWhenSuspend(optionParams.get(RocketMQConfig.PULL_TIMEOUT_MS).toLong)
+    if (!StringUtils.isBlank(instance)) 
+      consumer.setInstanceName(instance)
+    if (optionParams.containsKey(RocketMQConfig.NAME_SERVER_ADDR))
+      
consumer.setNamesrvAddr(optionParams.get(RocketMQConfig.NAME_SERVER_ADDR))
+
+    consumer.start()
+    
consumer.setOffsetStore(consumer.getDefaultMQPullConsumerImpl.getOffsetStore)
+    consumer
+  }
+
+  /**
+    * For creating Java push mode unreliable DStream
+    * @param jssc
+    * @param properties
+    * @param level
+    * @return
+    */
+  def createJavaMQPushStream(jssc: JavaStreamingContext, properties: 
Properties, level: StorageLevel): JavaInputDStream[Message] = 
createJavaMQPushStream(jssc, properties, level, false)
+
+  /**
+    * For creating Java push mode reliable DStream
+    * @param jssc
+    * @param properties
+    * @param level
+    * @return
+    */
+  def createJavaReliableMQPushStream(jssc: JavaStreamingContext, properties: 
Properties, level: StorageLevel): JavaInputDStream[Message] = 
createJavaMQPushStream(jssc, properties, level, true)
+
+  /**
+    * For creating Java push mode DStream
+    * @param jssc
+    * @param properties
+    * @param level
+    * @param reliable
+    * @return
+    */
+  def createJavaMQPushStream(jssc: JavaStreamingContext, properties: 
Properties, level: StorageLevel, reliable: Boolean): JavaInputDStream[Message] 
= {
+    if (jssc == null || properties == null || level == null) return null
+    val receiver = if (reliable) new ReliableRocketMQReceiver(properties, 
level) else new RocketMQReceiver(properties, level)
+    val ds = jssc.receiverStream(receiver)
+    return ds
+  }
+
+  def getInteger(props: Properties, key: String, defaultValue: Int): Int = 
props.getProperty(key, String.valueOf(defaultValue)).toInt
+
+  def getBoolean(props: Properties, key: String, defaultValue: Boolean): 
Boolean = props.getProperty(key, String.valueOf(defaultValue)).toBoolean
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/streaming/MQPullInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/streaming/MQPullInputDStream.scala
 
b/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/streaming/MQPullInputDStream.scala
new file mode 100644
index 0000000..10b0408
--- /dev/null
+++ 
b/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/streaming/MQPullInputDStream.scala
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import java.util.concurrent.atomic.AtomicReference
+import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
+import java.{lang => jl, util => ju}
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType
+import org.apache.rocketmq.common.MixAll
+import org.apache.rocketmq.common.message.{MessageExt, MessageQueue}
+import org.apache.rocketmq.spark.{ConsumerStrategy, _}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream.{DStream, DStreamCheckpointData, 
InputDStream}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
+import org.apache.spark.util.ThreadUtils
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+  *  A DStream where
+  * each given RocketMq topic/queueId corresponds to an RDD partition.
+  * The configuration pull.max.speed.per.partition gives the maximum number
+  *  of messages per second that each '''partition''' will accept.
+  * @param groupId it is for rocketMq for identifying the consumer
+  * @param topics the topics for the rocketmq
+  * @param locationStrategy locationStrategy In most cases, pass in 
[[LocationStrategy.PreferConsistent]],
+  *   see [[LocationStrategy]] for more details.
+  * @param consumerStrategy consumerStrategy In most cases, pass in 
[[ConsumerStrategy.lastest]],
+  *   see [[ConsumerStrategy]] for more details
+  * @param autoCommit  whether commit the offset to the rocketmq server 
automatically or not
+  * @param forceSpecial Generally if the rocketmq server has checkpoint for 
the [[MessageQueue]], then the consumer
+  *  will consume from the checkpoint no matter we specify the offset or not. 
But if forceSpecial is true,
+  *  the rocketmq will start consuming from the specific available offset in 
any case.
+  * @param failOnDataLoss Zero data lost is not guaranteed when topics are 
deleted. If zero data lost is critical, 
+  * the user must make sure all messages in a topic have been processed when 
deleting a topic.
+  */
+class MQPullInputDStream(
+    _ssc: StreamingContext,
+    groupId: String,
+    topics: ju.Collection[jl.String],
+    optionParams: ju.Map[String, String],
+    locationStrategy: LocationStrategy,
+    consumerStrategy: ConsumerStrategy,
+    autoCommit: Boolean,
+    forceSpecial: Boolean,
+    failOnDataLoss: Boolean
+  ) extends InputDStream[MessageExt](_ssc) with CanCommitOffsets{
+
+  private var currentOffsets = mutable.Map[TopicQueueId, Map[String, Long]]()
+
+  private val commitQueue = new ConcurrentLinkedQueue[OffsetRange]
+
+  private val commitCallback = new AtomicReference[OffsetCommitCallback]
+
+  private val maxRateLimitPerPartition = 
optionParams.getOrDefault(RocketMQConfig.MAX_PULL_SPEED_PER_PARTITION,
+    "-1").toInt
+  
+  @transient private var kc: DefaultMQPullConsumer = null
+
+  /**
+    * start up timer thread to persis the OffsetStore
+    */
+  @transient private val scheduledExecutorService = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+    "Driver-Commit-Thread")
+
+  private def consumer() = this.synchronized {
+    if (null == kc) {
+      kc = RocketMqUtils.mkPullConsumerInstance(groupId, optionParams, 
"driver")
+      val messageQueues = fetchSubscribeMessageQueues(topics)
+      val iter = messageQueues.iterator
+      while (iter.hasNext){
+        val messageQueue = iter.next
+        val offset = computePullFromWhere(messageQueue)
+        val topicQueueId = new TopicQueueId(messageQueue.getTopic, 
messageQueue.getQueueId)
+        if (!currentOffsets.contains(topicQueueId)) {
+          currentOffsets += topicQueueId -> Map(messageQueue.getBrokerName -> 
offset)
+        } else {
+          if 
(!currentOffsets(topicQueueId).contains(messageQueue.getBrokerName)){
+            currentOffsets(topicQueueId) += messageQueue.getBrokerName -> 
offset
+          }
+        }
+      }
+
+      // timer persist
+      this.scheduledExecutorService.scheduleAtFixedRate(
+        new Runnable() {
+          def run() {
+            try {
+              val messageQueueSet = new ju.HashSet[MessageQueue]
+              val iter = topics.iterator
+              while (iter.hasNext){
+                  
messageQueueSet.addAll(kc.fetchSubscribeMessageQueues(iter.next))
+              }
+              kc.getOffsetStore.persistAll(messageQueues)
+            } catch {
+              case e: Exception => {
+                log.error("ScheduledTask persistAllConsumerOffset exception", 
e)
+              }
+            }
+          }
+        }, 1000 * 10, 1000 * 5, TimeUnit.MILLISECONDS)
+    }
+    kc
+  }
+
+  private def fetchSubscribeMessageQueues(topics : ju.Collection[jl.String]): 
ju.HashSet[MessageQueue] = {
+    val messageQueueSet = new ju.HashSet[MessageQueue]
+
+    val iter = topics.iterator
+    while (iter.hasNext){
+      messageQueueSet.addAll(kc.fetchSubscribeMessageQueues(iter.next))
+    }
+    messageQueueSet
+  }
+
+  private def computePullFromWhere(mq: MessageQueue): Long = {
+    var result = -1L
+    val offsetStore = kc.getOffsetStore
+    val minOffset = kc.minOffset(mq)
+    val checkpointOffset = offsetStore.readOffset(mq, 
ReadOffsetType.READ_FROM_STORE)
+
+    consumerStrategy match {
+      case LatestStrategy => {
+        if (checkpointOffset >= 0) {
+          //consider the checkpoint offset first
+          if (checkpointOffset < minOffset) {
+            reportDataLoss(s"MessageQueue $mq's checkpointOffset 
$checkpointOffset is smaller than minOffset $minOffset")
+            result = kc.maxOffset(mq)
+          } else {
+            result = checkpointOffset
+          }
+        } else {
+          // First start,no offset
+          if (mq.getTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+            result = 0
+          } else {
+            result = kc.maxOffset(mq)
+          }
+        }
+      }
+      case EarliestStrategy => {
+        if (checkpointOffset >= 0) {
+          //consider the checkpoint offset first
+          if (checkpointOffset < minOffset) {
+            reportDataLoss(s"MessageQueue $mq's checkpointOffset 
$checkpointOffset is smaller than minOffset $minOffset")
+            result = minOffset
+          } else {
+            result = checkpointOffset
+          }
+        } else {
+          // First start,no offset
+          result = minOffset
+        }
+      }
+      case SpecificOffsetStrategy(queueToOffset) => {
+
+        val specificOffset = queueToOffset.get(mq)
+
+        if (checkpointOffset >= 0 && !forceSpecial) {
+          if (checkpointOffset < minOffset) {
+            reportDataLoss(s"MessageQueue $mq's checkpointOffset 
$checkpointOffset is smaller than minOffset $minOffset")
+            result = minOffset
+          } else {
+            result = checkpointOffset
+          }
+        } else {
+          specificOffset match {
+            case Some(ConsumerStrategy.LATEST) => {
+              result = kc.maxOffset(mq)
+            }
+            case Some(ConsumerStrategy.EARLIEST) => {
+              result = kc.minOffset(mq)
+            }
+            case Some(offset) => {
+              if (offset < minOffset) {
+                reportDataLoss(s"MessageQueue $mq's specific offset $offset is 
smaller than minOffset $minOffset")
+                result = minOffset
+              } else {
+                result = offset
+              }
+            }
+            case None => {
+              if (checkpointOffset >= 0) {
+                //consider the checkpoint offset first
+                if (checkpointOffset < minOffset) {
+                  reportDataLoss(s"MessageQueue $mq's checkpointOffset 
$checkpointOffset is smaller than minOffset $minOffset")
+                  result = minOffset
+                } else {
+                  result = checkpointOffset
+                }
+              } else {
+                logWarning(s"MessageQueue $mq's specific offset and 
checkpointOffset are none, then use the minOffset")
+                result = kc.minOffset(mq)
+              }
+            }
+          }
+        }
+      }
+    }
+    result
+  }
+
+  private def firstConsumerOffset(mq: MessageQueue): Long = {
+    val offsetStore = kc.getOffsetStore
+    val lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE)
+    val minOffset = kc.minOffset(mq)
+    if (lastOffset < minOffset) {
+      reportDataLoss(s"MessageQueue $mq's checkpoint offset $lastOffset is 
smaller than minOffset $minOffset")
+      minOffset
+    } else {
+      lastOffset
+    }
+  }
+
+
+  override def persist(newLevel: StorageLevel): DStream[MessageExt] = {
+    logError("rocketmq MessageExt is not serializable. " +
+      "Use .map to extract fields before calling .persist or .window")
+    super.persist(newLevel)
+  }
+
+  protected def getPreferredHosts: ju.Map[TopicQueueId, String] = {
+    locationStrategy match {
+      case PreferConsistent => ju.Collections.emptyMap[TopicQueueId, String]()
+      case PreferFixed(hostMap) => hostMap
+    }
+  }
+
+  // Keep this consistent with how other streams are named (e.g. "Flume 
polling stream [2]")
+  private[streaming] override def name: String = s"RocketMq polling stream 
[$id]"
+
+  protected[streaming] override val checkpointData =
+    new MQInputDStreamCheckpointData
+
+
+  /**
+    * Asynchronously maintains & sends new rate limits to the receiver through 
the receiver tracker.
+    */
+  override protected[streaming] val rateController: Option[RateController] = {
+    if (RateController.isBackPressureEnabled(_ssc.conf)) {
+      Some(new DirectMQRateController(id,
+        RateEstimator.create(_ssc.conf, context.graph.batchDuration)))
+    } else {
+      None
+    }
+  }
+
+  /**
+    * calculate the until-offset per partition in theory
+    */
+  private def maxMessagesPerPartition(
+     offsets: Map[TopicQueueId, Map[String, Long]]): Option[Map[TopicQueueId, 
Map[String, Long]]] = {
+    val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
+
+    var lagPerPartition = Map[TopicQueueId, Long]()
+    var totalLag = 0L
+    val lagPerPartitionPerQueue = offsets.map{ case (tp, value) =>
+      val partitionTotal = value.map{ case (name, maxOffset) =>
+        var count = Math.max(maxOffset - currentOffsets(tp)(name), 0)
+        totalLag += count
+        (name, count)
+      }
+      lagPerPartition += tp -> partitionTotal.values.sum
+      tp -> partitionTotal
+    }
+
+    val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) 
match {
+      case Some(rate) =>
+        lagPerPartitionPerQueue.map { case (tp, queues) =>
+          val backPressRate = Math.round(lagPerPartition(tp) / 
totalLag.toFloat * rate)
+          val partitionMessages = (if (maxRateLimitPerPartition > 0) {
+            Math.min(backPressRate, maxRateLimitPerPartition)} else 
backPressRate)
+          tp -> queues.map{ case (name, count) =>
+            (name, Math.ceil(count / lagPerPartition(tp).toFloat * 
partitionMessages))
+          }
+        }
+      case None =>
+
+        lagPerPartitionPerQueue.map { case (tp, queues) =>
+          val partitionMessages = maxRateLimitPerPartition
+          tp -> queues.map{ case (name, count) =>
+            (name, Math.ceil(count / lagPerPartition(tp).toFloat * 
partitionMessages))
+          }
+        }
+    }
+
+    if (effectiveRateLimitPerPartition.flatMap(_._2).map(_._2).sum > 0) {
+      val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 
1000
+      Some(effectiveRateLimitPerPartition.map {
+        case (tp, limit) => tp -> limit.map{ case (name, count) =>
+          name -> (count * secsPerBatch).toLong
+        }
+      })
+    } else {
+      None
+    }
+  }
+
+
+  /**
+    * Returns the latest (highest) available offsets, taking new partitions 
into account.
+    */
+  protected def latestOffsets(): Map[TopicQueueId, Map[String, Long]] = {
+    val c = consumer
+
+    val messageQueues = fetchSubscribeMessageQueues(topics)
+
+    var maxOffsets = Map[TopicQueueId, Map[String, Long]]()
+
+    val lastTopicQueues = currentOffsets.keySet
+    val fetchTopicQueues = mutable.Set[TopicQueueId]()
+    val iter = messageQueues.iterator
+    while (iter.hasNext) {
+      val messageQueue = iter.next
+      logDebug(s"${messageQueue.toString} min: ${c.minOffset(messageQueue)}  
max: ${c.maxOffset(messageQueue)}")
+      val topicQueueId = new TopicQueueId(messageQueue.getTopic, 
messageQueue.getQueueId)
+      fetchTopicQueues.add(topicQueueId)
+      if (!currentOffsets.contains(topicQueueId)){
+        currentOffsets += topicQueueId -> Map(messageQueue.getBrokerName -> 
firstConsumerOffset(messageQueue))
+      }else{
+        if (!currentOffsets(topicQueueId).contains(messageQueue.getBrokerName))
+          currentOffsets(topicQueueId) += messageQueue.getBrokerName -> 
firstConsumerOffset(messageQueue)
+      }
+      if (!maxOffsets.contains(topicQueueId)) {
+        maxOffsets += topicQueueId -> Map(messageQueue.getBrokerName -> 
c.maxOffset(messageQueue))
+      }else{
+        if (!maxOffsets(topicQueueId).contains(messageQueue.getBrokerName)) {
+          val tempMap = maxOffsets(topicQueueId) + (messageQueue.getBrokerName 
-> c.maxOffset(messageQueue))
+          maxOffsets += topicQueueId -> tempMap
+        }
+      }
+    }
+
+    val deletedPartitions = lastTopicQueues.diff(fetchTopicQueues)
+    if (deletedPartitions.size > 0){
+      reportDataLoss(
+        s"Cannot find offsets of ${deletedPartitions}. Some data may have been 
missed")
+    }
+    maxOffsets
+  }
+
+  /**
+    * limits the maximum number of messages per partition
+    */
+  protected def clamp(offsets: Map[TopicQueueId, Map[String, Long]]): 
Map[TopicQueueId, Map[String, Long]] = {
+    maxMessagesPerPartition(offsets).map { mmp =>
+      mmp.map { case (tp, partitionsOffsets) =>
+        tp -> partitionsOffsets.map{case (name, messages) =>
+          name -> Math.min(currentOffsets(tp)(name) + messages, 
offsets(tp)(name))}
+      }
+    }.getOrElse(offsets)
+  }
+
+
+  override def compute(validTime: Time): Option[RocketMqRDD] = {
+
+    val untilOffsets = clamp(latestOffsets())
+
+    val offsetRangeRdd: ju.Map[TopicQueueId, Array[OffsetRange]] = new 
ju.HashMap()
+    untilOffsets.foreach { case (tp, uo) =>
+      val values = uo.map { case (name, until) =>
+        val fo = currentOffsets(tp)(name)
+        OffsetRange(tp.topic, tp.queueId, name, fo, until)
+      }.toArray
+      offsetRangeRdd.put(tp, values)
+    }
+
+    val rdd = new RocketMqRDD(
+      context.sparkContext, groupId, optionParams, offsetRangeRdd, 
getPreferredHosts, true)
+
+    // Report the record number and metadata of this batch interval to 
InputInfoTracker.
+    val description = offsetRangeRdd.asScala.flatMap{ case (tp, arrayRange) =>
+      // Don't display empty ranges.
+      arrayRange
+    }.filter { offsetRange =>
+      offsetRange.fromOffset != offsetRange.untilOffset
+    }.map { offsetRange =>
+      s"topic: ${offsetRange.topic}\tqueueId: ${offsetRange.queueId}\t" +
+        s"brokerName: ${offsetRange.brokerName}\t" +
+        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
+    }.mkString("\n")
+    // Copy offsetRanges to immutable.List to prevent from being modified by 
the user
+    val metadata = Map(
+      "offsets" -> offsetRangeRdd,
+      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
+    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
+    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
+
+    currentOffsets = collection.mutable.Map() ++ untilOffsets
+
+    if (autoCommit) {
+      currentOffsets.foreach { case (tp, uo) =>
+        uo.map { case (name, until) =>
+          val offset = currentOffsets(tp)(name) - 1
+          val mq = new MessageQueue(tp.topic, name, tp.queueId)
+          kc.updateConsumeOffset(mq, offset)
+        }
+      }
+    } else {
+      commitAll()
+    }
+    Some(rdd)
+  }
+
+  private def reportDataLoss(message: String): Unit = {
+    if (failOnDataLoss) {
+      throw new IllegalStateException(message)
+    } else {
+      logWarning(message)
+    }
+  }
+
+  /**
+    * Queue up offset ranges for commit to rocketmq at a future time.  
Threadsafe.
+    * @param offsetRanges The maximum untilOffset for a given partition will 
be used at commit.
+    */
+  def commitAsync(offsetRanges: ju.Map[TopicQueueId, Array[OffsetRange]]): 
Unit = {
+    commitAsync(offsetRanges, null)
+  }
+
+  /**
+    * Queue up offset ranges for commit to rocketmq at a future time.  
Threadsafe.
+    * @param offsetRanges The maximum untilOffset for a given partition will 
be used at commit.
+    * @param callback Only the most recently provided callback will be used at 
commit.
+    */
+  def commitAsync(offsetRanges: ju.Map[TopicQueueId, Array[OffsetRange]], 
callback: OffsetCommitCallback): Unit = {
+    commitCallback.set(callback)
+    offsetRanges.values.asScala.foreach{ value =>
+      commitQueue.addAll(ju.Arrays.asList(value: _*))
+    }
+  }
+
+  protected def commitAll(): Unit = {
+    val m = new ju.HashMap[MessageQueue, jl.Long]
+    var osr = commitQueue.poll()
+    try {
+      while (null != osr) {
+        //Exclusive ending offset
+        val mq = new MessageQueue(osr.topic, osr.brokerName, osr.queueId)
+        kc.updateConsumeOffset(mq, osr.untilOffset - 1)
+        m.put(mq, osr.untilOffset - 1)
+        osr = commitQueue.poll()
+      }
+      if (commitCallback.get != null) {
+        commitCallback.get.onComplete(m, null)
+      }
+    } catch {
+      case e: Exception => {
+        if (commitCallback.get != null)
+          commitCallback.get.onComplete(m, e)
+      }
+    }
+  }
+
+
+  override def start(): Unit = {
+     consumer
+  }
+
+  override def stop(): Unit = this.synchronized {
+    if (kc != null) {
+      kc.shutdown()
+    }
+  }
+
+  private[streaming]
+  class MQInputDStreamCheckpointData extends DStreamCheckpointData(this) {
+    def batchForTime: mutable.HashMap[Time, mutable.HashMap[TopicQueueId, 
Array[(String, Int, String, Long, Long)]]] = {
+      data.asInstanceOf[mutable.HashMap[Time, mutable.HashMap[TopicQueueId, 
Array[OffsetRange.OffsetRangeTuple]]]]
+    }
+
+    override def update(time: Time): Unit = {
+      batchForTime.clear()
+      generatedRDDs.foreach { kv =>
+        val values = new mutable.HashMap[TopicQueueId, 
Array[OffsetRange.OffsetRangeTuple]]
+        kv._2.asInstanceOf[RocketMqRDD].offsetRanges.asScala.foreach{ case (k, 
v) =>
+          values.put(k, v.map(_.toTuple))
+        }
+        batchForTime += kv._1 ->values
+      }
+    }
+
+    override def cleanup(time: Time): Unit = { }
+
+    override def restore(): Unit = {
+      batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
+        logInfo(s"Restoring RocketMqRDD for time $t $b")
+
+        val offsetRanges = new ju.HashMap[TopicQueueId, Array[OffsetRange]]()
+        b.foreach{ case (i, j) =>
+          offsetRanges.put(i, j.map(OffsetRange(_)))
+        }
+
+        generatedRDDs += t -> new RocketMqRDD(
+          context.sparkContext,
+          groupId,
+          optionParams,
+          offsetRanges,
+          getPreferredHosts,
+          // during restore, it's possible same partition will be consumed 
from multiple
+          // threads, so dont use cache
+          false
+        )
+      }
+    }
+  }
+
+  /**
+    * A RateController to retrieve the rate from RateEstimator.
+    */
+  private class DirectMQRateController(id: Int, estimator: RateEstimator)
+    extends RateController(id, estimator) {
+    override def publish(rate: Long): Unit = ()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/streaming/RocketMqRDD.scala
----------------------------------------------------------------------
diff --git 
a/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/streaming/RocketMqRDD.scala
 
b/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/streaming/RocketMqRDD.scala
new file mode 100644
index 0000000..078a3c1
--- /dev/null
+++ 
b/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/streaming/RocketMqRDD.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import org.apache.rocketmq.common.message.MessageExt
+import org.apache.rocketmq.spark._
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import java.{util => ju}
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+
+/**
+  * A batch-oriented interface for consuming from RocketMq.
+  * Starting and ending offsets are specified in advance,
+  * so that you can control exactly-once semantics.
+  * @param groupId it is for rocketMq for identifying the consumer
+  * @param optionParams the configs
+  * @param offsetRanges offset ranges that define the RocketMq data belonging 
to this RDD
+  * @param preferredHosts map from TopicQueueId to preferred host for 
processing that partition.
+  * In most cases, use [[LocationStrategy.PreferConsistent]]
+  * @param useConsumerCache useConsumerCache whether to use a consumer from a 
per-jvm cache
+  */
+class RocketMqRDD (
+      sc: SparkContext,
+      val groupId: String,
+      val optionParams: ju.Map[String, String],
+      val offsetRanges: ju.Map[TopicQueueId, Array[OffsetRange]],
+      val preferredHosts: ju.Map[TopicQueueId, String],
+      val useConsumerCache: Boolean
+    )extends RDD[MessageExt](sc, Nil) with HasOffsetRanges{
+
+  private val cacheInitialCapacity =
+    
optionParams.getOrDefault(RocketMQConfig.PULL_CONSUMER_CACHE_INIT_CAPACITY, 
"16").toInt
+  private val cacheMaxCapacity =
+    optionParams.getOrDefault(RocketMQConfig.PULL_CONSUMER_CACHE_MAX_CAPACITY, 
"64").toInt
+  private val cacheLoadFactor =
+    optionParams.getOrDefault(RocketMQConfig.PULL_CONSUMER_CACHE_LOAD_FACTOR, 
"0.75").toFloat
+
+  override def persist(newLevel: StorageLevel): this.type = {
+    super.persist(newLevel)
+  }
+
+  override def getPartitions: Array[Partition] = {
+    offsetRanges.asScala.toArray.zipWithIndex.map{ case ((first, second), i) =>
+      new RocketMqRDDPartition(i, first.topic, first.queueId, second)
+    }.toArray
+  }
+
+  override def count(): Long = 
offsetRanges.asScala.map(_._2.map(_.count).sum).sum
+
+  override def countApprox(
+                            timeout: Long,
+                            confidence: Double = 0.95
+                          ): PartialResult[BoundedDouble] = {
+    val c = count
+    new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
+  }
+
+  override def isEmpty(): Boolean = count == 0L
+
+  override def take(num: Int): Array[MessageExt] = {
+    val nonEmptyPartitions = this.partitions
+      .map(_.asInstanceOf[RocketMqRDDPartition])
+      .filter(_.count > 0)
+
+    if (num < 1 || nonEmptyPartitions.isEmpty) {
+      return new Array[MessageExt](0)
+    }
+
+    // Determine in advance how many messages need to be taken from each 
partition
+    val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) 
=>
+      val remain = num - result.values.sum
+      if (remain > 0) {
+        val taken = Math.min(remain, part.count)
+        result + (part.index -> taken.toInt)
+      } else {
+        result
+      }
+    }
+
+    val buf = new ArrayBuffer[MessageExt]
+    val res = context.runJob(
+      this,
+      (tc: TaskContext, it: Iterator[MessageExt]) =>
+        it.take(parts(tc.partitionId)).toArray, parts.keys.toArray
+    )
+    res.foreach(buf ++= _)
+    buf.toArray
+  }
+
+  private def executors(): Array[ExecutorCacheTaskLocation] = {
+    val bm = sparkContext.env.blockManager
+    bm.master.getPeers(bm.blockManagerId).toArray
+      .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
+      .sortWith(compareExecutors)
+  }
+
+  private def compareExecutors(
+     a: ExecutorCacheTaskLocation,
+     b: ExecutorCacheTaskLocation): Boolean =
+    if (a.host == b.host) {
+      a.executorId > b.executorId
+    } else {
+      a.host > b.host
+    }
+
+  /**
+    * Non-negative modulus, from java 8 math
+    */
+  private def floorMod(a: Int, b: Int): Int = ((a % b) + b) % b
+
+  protected override def getPreferredLocations(thePart: Partition): 
Seq[String] = {
+    // The intention is best-effort consistent executor for a given topic 
partition,
+    // so that caching consumers can be effective.
+    val part = thePart.asInstanceOf[RocketMqRDDPartition]
+    val allExecs = executors()
+    val tp = part.topicQueueId()
+    val prefHost = preferredHosts.get(tp)
+    val prefExecs = if (null == prefHost) allExecs else allExecs.filter(_.host 
== prefHost)
+    val execs = if (prefExecs.isEmpty) allExecs else prefExecs
+    if (execs.isEmpty) {
+      Seq()
+    } else {
+      // execs is sorted, tp.hashCode depends only on topic and partition, so 
consistent index
+      val index = this.floorMod(tp.hashCode, execs.length)
+      val chosen = execs(index)
+      Seq(chosen.toString)
+    }
+  }
+
+  private def errBeginAfterEnd(part: RocketMqRDDPartition): String =
+    s"Beginning offset is after the ending offset 
${part.partitionOffsetRanges.mkString(",")} " +
+      s"for topic ${part.topic} partition ${part.index}. " +
+      "You either provided an invalid fromOffset, or the Kafka topic has been 
damaged"
+
+  override def compute(thePart: Partition, context: TaskContext): 
Iterator[MessageExt] = {
+    val part = thePart.asInstanceOf[RocketMqRDDPartition]
+    val count = part.count()
+    assert(count >= 0, errBeginAfterEnd(part))
+    if (count == 0) {
+      logInfo(s"Beginning offset is the same as ending offset " +
+        s"skipping ${part.topic} ${part.queueId}")
+      Iterator.empty
+    } else {
+      new RocketMqRDDIterator(part, context)
+    }
+  }
+
+
+  /**
+    * An iterator that fetches messages directly from rocketmq for the offsets 
in partition.
+    * Uses a cached consumer where possible to take advantage of prefetching
+    */
+  private class RocketMqRDDIterator(
+    part: RocketMqRDDPartition,
+    context: TaskContext) extends Iterator[MessageExt] {
+
+    logDebug(s"Computing topic ${part.topic}, queueId ${part.queueId} " +
+      s"offsets ${part.partitionOffsetRanges.mkString(",")}")
+
+    context.addTaskCompletionListener{ context => closeIfNeeded() }
+
+
+    val consumer = if (useConsumerCache) {
+      CachedMQConsumer.init(cacheInitialCapacity, cacheMaxCapacity, 
cacheLoadFactor)
+      if (context.attemptNumber > 5) {
+        // just in case the prior attempt failures were cache related
+        CachedMQConsumer.remove(groupId, part.topic, part.queueId, 
part.brokerNames)
+      }
+      CachedMQConsumer.getOrCreate(groupId, part.topic, part.queueId, 
part.brokerNames, optionParams)
+    } else {
+      CachedMQConsumer.getUncached(groupId, part.topic, part.queueId, 
part.brokerNames, optionParams)
+    }
+
+    var logicTotalOffset = 0
+    val totalSum = part.partitionOffsetRanges.map(_.count).sum
+    var index = 0
+    var requestOffset = part.partitionOffsetRanges.apply(index).fromOffset
+
+    def closeIfNeeded(): Unit = {
+      if (!useConsumerCache && consumer != null) {
+        consumer.client.shutdown
+      }
+    }
+
+    override def hasNext(): Boolean = {
+      totalSum > logicTotalOffset
+    }
+
+    override def next(): MessageExt = {
+      assert(hasNext(), "Can't call getNext() once untilOffset has been 
reached")
+      val queueRange = part.partitionOffsetRanges.apply(index)
+      val r = consumer.get(queueRange.brokerName, requestOffset)
+      if (queueRange.untilOffset > (requestOffset + 1))
+        requestOffset +=1
+      else {
+        index +=1
+        if (part.partitionOffsetRanges.length > index)
+          requestOffset = part.partitionOffsetRanges.apply(index).fromOffset
+      }
+      logicTotalOffset += 1
+      r
+    }
+  }
+
+  private[RocketMqRDD]
+  type OffsetRangeTuple = (String, Int)
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/RocketMQServerMock.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/RocketMQServerMock.java
 
b/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/RocketMQServerMock.java
new file mode 100644
index 0000000..50d46ad
--- /dev/null
+++ 
b/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/RocketMQServerMock.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.spark;
+
+import org.apache.commons.lang.time.DateFormatUtils;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+import java.util.Date;
+import java.util.UUID;
+
+public class RocketMQServerMock {
+
+    private NamesrvController namesrvController;
+    private BrokerController brokerController;
+    private String nameserverAddr = "localhost:9876";
+    private String producerGroup = UUID.randomUUID().toString();
+
+
+    public void startupServer() throws Exception{
+        //start nameserver
+        startNamesrv();
+        //start broker
+        startBroker();
+    }
+
+    public void shutdownServer() {
+        if (brokerController != null) {
+            brokerController.shutdown();
+        }
+
+        if (namesrvController != null) {
+            namesrvController.shutdown();
+        }
+    }
+
+    public String getNameServerAddr() {
+        return nameserverAddr;
+    }
+
+    private void startNamesrv() throws Exception {
+        NamesrvConfig namesrvConfig = new NamesrvConfig();
+        NettyServerConfig nettyServerConfig = new NettyServerConfig();
+        nettyServerConfig.setListenPort(9876);
+
+        namesrvController = new NamesrvController(namesrvConfig, 
nettyServerConfig);
+        boolean initResult = namesrvController.initialize();
+        if (!initResult) {
+            namesrvController.shutdown();
+            throw new Exception("Namesvr init failure!");
+        }
+        namesrvController.start();
+    }
+
+    private void startBroker() throws Exception {
+        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, 
Integer.toString(MQVersion.CURRENT_VERSION));
+
+        BrokerConfig brokerConfig = new BrokerConfig();
+        brokerConfig.setNamesrvAddr(nameserverAddr);
+        brokerConfig.setBrokerId(MixAll.MASTER_ID);
+        NettyServerConfig nettyServerConfig = new NettyServerConfig();
+        nettyServerConfig.setListenPort(10911);
+        NettyClientConfig nettyClientConfig = new NettyClientConfig();
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+
+        brokerController = new BrokerController(brokerConfig, 
nettyServerConfig, nettyClientConfig, messageStoreConfig);
+        boolean initResult = brokerController.initialize();
+        if (!initResult) {
+            brokerController.shutdown();
+            throw new Exception("Broker init failure!");
+        }
+        brokerController.start();
+    }
+
+    public void prepareDataTo(String topic, int times) throws Exception {
+        // publish test message
+        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
+        producer.setNamesrvAddr(nameserverAddr);
+
+        String sendMsg = "\"Hello Rocket\"" + "," + DateFormatUtils.format(new 
Date(), "yyyy-MM-DD hh:mm:ss");
+
+        try {
+            producer.start();
+            for (int i = 0; i < times; i++) {
+                producer.send(new Message(topic, sendMsg.getBytes("UTF-8")));
+            }
+        } catch (Exception e) {
+            throw new MQClientException("Failed to publish messages", e);
+        } finally {
+            producer.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/MessageRetryManagerTest.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/MessageRetryManagerTest.java
 
b/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/MessageRetryManagerTest.java
new file mode 100644
index 0000000..93029fc
--- /dev/null
+++ 
b/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/MessageRetryManagerTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.spark.streaming;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class MessageRetryManagerTest {
+    MessageRetryManager messageRetryManager;
+    Map<String,MessageSet> cache;
+    BlockingQueue<MessageSet> queue;
+
+    @Before
+    public void prepare() {
+        cache = new ConcurrentHashMap<>(10);
+        queue = new LinkedBlockingDeque<>(10);
+        int maxRetry = 3;
+        int ttl = 2000;
+        messageRetryManager = new DefaultMessageRetryManager(queue, maxRetry, 
ttl);
+        ((DefaultMessageRetryManager)messageRetryManager).setCache(cache);
+    }
+
+    @Test
+    public void testRetryLogics() {
+        List<MessageExt> data = new ArrayList<>();
+        //ack
+        MessageSet messageSet = new MessageSet(data);
+        messageRetryManager.mark(messageSet);
+        assertEquals(1, cache.size());
+        assertTrue(cache.containsKey(messageSet.getId()));
+
+        messageRetryManager.ack(messageSet.getId());
+        assertEquals(0, cache.size());
+        assertFalse(cache.containsKey(messageSet.getId()));
+
+
+        //fail need retry: retries < maxRetry
+        messageSet = new MessageSet(data);
+        messageRetryManager.mark(messageSet);
+        assertEquals(1, cache.size());
+        assertTrue(cache.containsKey(messageSet.getId()));
+
+        messageRetryManager.fail(messageSet.getId());
+        assertEquals(0, cache.size());
+        assertFalse(cache.containsKey(messageSet.getId()));
+        assertEquals(1, messageSet.getRetries());
+        assertEquals(1, queue.size());
+        assertEquals(messageSet, queue.poll());
+
+
+        //fail need not retry: retries >= maxRetry
+        messageSet = new MessageSet(data);
+        messageRetryManager.mark(messageSet);
+        messageRetryManager.fail(messageSet.getId());
+        assertEquals(0, cache.size());
+        assertFalse(cache.containsKey(messageSet.getId()));
+
+        messageRetryManager.mark(messageSet);
+        messageRetryManager.fail(messageSet.getId());
+        assertEquals(2, messageSet.getRetries());
+        messageRetryManager.mark(messageSet);
+        messageRetryManager.fail(messageSet.getId());
+        assertEquals(3, messageSet.getRetries());
+
+        assertFalse(messageRetryManager.needRetry(messageSet));
+        messageRetryManager.mark(messageSet);
+        messageRetryManager.fail(messageSet.getId());
+        assertEquals(0, cache.size());
+        assertEquals(3, queue.size());
+        assertEquals(messageSet, queue.poll());
+
+
+        //fail: no ack/fail received in ttl
+        messageSet = new MessageSet(data);
+        messageRetryManager.mark(messageSet);
+        try {
+            Thread.sleep(10000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        assertEquals(0, cache.size());
+        assertFalse(cache.containsKey(messageSet.getId()));
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/ReliableRocketMQReceiverTest.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/ReliableRocketMQReceiverTest.java
 
b/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/ReliableRocketMQReceiverTest.java
new file mode 100644
index 0000000..69a4c76
--- /dev/null
+++ 
b/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/ReliableRocketMQReceiverTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.spark.streaming;
+
+import org.apache.rocketmq.spark.RocketMQConfig;
+import org.apache.rocketmq.spark.RocketMQServerMock;
+import org.apache.rocketmq.spark.RocketMqUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Properties;
+
+public class ReliableRocketMQReceiverTest {
+    private static RocketMQServerMock mockServer = new RocketMQServerMock();
+
+    private static final String NAMESERVER_ADDR = 
mockServer.getNameServerAddr();
+    private static final String CONSUMER_GROUP = "wordcount2";
+    private static final String CONSUMER_TOPIC = "wordcountsource";
+
+    @BeforeClass
+    public static void start() throws Exception {
+        mockServer.startupServer();
+
+        Thread.sleep(2000);
+
+        //prepare data
+        mockServer.prepareDataTo(CONSUMER_TOPIC, 5);
+    }
+
+    @AfterClass
+    public static void stop() {
+        mockServer.shutdownServer();
+    }
+
+    @Test
+    public void testReliableRocketMQReceiver() {
+        SparkConf conf = new 
SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
+        JavaStreamingContext jssc = new JavaStreamingContext(conf, 
Durations.seconds(5));
+        Properties properties = new Properties();
+        properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, 
NAMESERVER_ADDR);
+        properties.setProperty(RocketMQConfig.CONSUMER_GROUP, CONSUMER_GROUP);
+        properties.setProperty(RocketMQConfig.CONSUMER_TOPIC, CONSUMER_TOPIC);
+        JavaInputDStream ds = 
RocketMqUtils.createJavaReliableMQPushStream(jssc, properties, 
StorageLevel.MEMORY_ONLY());
+        ds.print();
+        jssc.start();
+        try {
+            jssc.awaitTerminationOrTimeout(60000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/RocketMQReceiverTest.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/RocketMQReceiverTest.java
 
b/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/RocketMQReceiverTest.java
new file mode 100644
index 0000000..89694a4
--- /dev/null
+++ 
b/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/RocketMQReceiverTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.spark.streaming;
+
+import org.apache.rocketmq.spark.RocketMQConfig;
+import org.apache.rocketmq.spark.RocketMQServerMock;
+import org.apache.rocketmq.spark.RocketMqUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Properties;
+
+public class RocketMQReceiverTest {
+    private static RocketMQServerMock mockServer = new RocketMQServerMock();
+
+    private static final String NAMESERVER_ADDR = 
mockServer.getNameServerAddr();
+    private static final String CONSUMER_GROUP = "wordcount";
+    private static final String CONSUMER_TOPIC = "wordcountsource";
+
+    @BeforeClass
+    public static void start() throws Exception {
+        mockServer.startupServer();
+
+        Thread.sleep(2000);
+
+        //prepare data
+        mockServer.prepareDataTo(CONSUMER_TOPIC, 5);
+    }
+
+    @AfterClass
+    public static void stop() {
+        mockServer.shutdownServer();
+    }
+
+    @Test
+    public void testRocketMQReceiver() {
+        SparkConf conf = new 
SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
+        JavaStreamingContext jssc = new JavaStreamingContext(conf, 
Durations.seconds(5));
+        Properties properties = new Properties();
+        properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, 
NAMESERVER_ADDR);
+        properties.setProperty(RocketMQConfig.CONSUMER_GROUP, CONSUMER_GROUP);
+        properties.setProperty(RocketMQConfig.CONSUMER_TOPIC, CONSUMER_TOPIC);
+        JavaInputDStream ds = RocketMqUtils.createJavaMQPushStream(jssc, 
properties, StorageLevel.MEMORY_ONLY());
+        ds.print();
+        jssc.start();
+        try {
+            jssc.awaitTerminationOrTimeout(60000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/RocketMqUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/RocketMqUtilsTest.java
 
b/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/RocketMqUtilsTest.java
new file mode 100644
index 0000000..5a00c73
--- /dev/null
+++ 
b/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/RocketMqUtilsTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.spark.streaming;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.spark.ConsumerStrategy;
+import org.apache.rocketmq.spark.HasOffsetRanges;
+import org.apache.rocketmq.spark.LocationStrategy;
+import org.apache.rocketmq.spark.OffsetRange;
+import org.apache.rocketmq.spark.RocketMQConfig;
+import org.apache.rocketmq.spark.RocketMQServerMock;
+import org.apache.rocketmq.spark.RocketMqUtils;
+import org.apache.rocketmq.spark.TopicQueueId;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class RocketMqUtilsTest implements Serializable {
+
+    private static RocketMQServerMock mockServer = new RocketMQServerMock();
+
+    private static String NAME_SERVER = mockServer.getNameServerAddr();
+
+    private static String TOPIC_DEFAULT = UUID.randomUUID().toString();
+
+    private static int MESSAGE_NUM = 100;
+
+    @BeforeClass
+    public static void start() throws Exception {
+        mockServer.startupServer();
+
+        Thread.sleep(2000);
+
+        //producer message
+        mockServer.prepareDataTo(TOPIC_DEFAULT, MESSAGE_NUM);
+    }
+
+    @AfterClass
+    public static void stop() {
+        mockServer.shutdownServer();
+    }
+
+    @Test
+    public void testConsumer() throws MQBrokerException, MQClientException, 
InterruptedException, UnsupportedEncodingException {
+
+        // start up spark
+        Map<String, String> optionParams = new HashMap<>();
+        optionParams.put(RocketMQConfig.NAME_SERVER_ADDR, NAME_SERVER);
+        SparkConf sparkConf = new 
SparkConf().setAppName("JavaCustomReceiver").setMaster("local[*]");
+        JavaStreamingContext sc = new JavaStreamingContext(sparkConf, new 
Duration(1000));
+        List<String> topics = new ArrayList<>();
+        topics.add(TOPIC_DEFAULT);
+
+        LocationStrategy locationStrategy = 
LocationStrategy.PreferConsistent();
+
+        JavaInputDStream<MessageExt> stream = 
RocketMqUtils.createJavaMQPullStream(sc, UUID.randomUUID().toString(),
+                topics, ConsumerStrategy.earliest(), false, false, false, 
locationStrategy, optionParams);
+
+
+        final Set<MessageExt> result = Collections.synchronizedSet(new 
HashSet<MessageExt>());
+
+        stream.foreachRDD(new VoidFunction<JavaRDD<MessageExt>>() {
+            @Override
+            public void call(JavaRDD<MessageExt> messageExtJavaRDD) throws 
Exception {
+                result.addAll(messageExtJavaRDD.collect());
+            }
+        });
+
+        sc.start();
+
+        long startTime = System.currentTimeMillis();
+        boolean matches = false;
+        while (!matches && System.currentTimeMillis() - startTime < 20000) {
+            matches = MESSAGE_NUM == result.size();
+            Thread.sleep(50);
+        }
+        sc.stop();
+    }
+
+    @Test
+    public void testGetOffsets() throws MQBrokerException, MQClientException, 
InterruptedException, UnsupportedEncodingException {
+
+        Map<String, String> optionParams = new HashMap<>();
+        optionParams.put(RocketMQConfig.NAME_SERVER_ADDR, NAME_SERVER);
+        SparkConf sparkConf = new 
SparkConf().setAppName("JavaCustomReceiver").setMaster("local[*]");
+        JavaStreamingContext sc = new JavaStreamingContext(sparkConf, new 
Duration(1000));
+        List<String> topics = new ArrayList<>();
+        topics.add(TOPIC_DEFAULT);
+
+        LocationStrategy locationStrategy = 
LocationStrategy.PreferConsistent();
+
+        JavaInputDStream<MessageExt> dStream = 
RocketMqUtils.createJavaMQPullStream(sc, UUID.randomUUID().toString(),
+                topics, ConsumerStrategy.earliest(), false, false, false, 
locationStrategy, optionParams);
+
+        // hold a reference to the current offset ranges, so it can be used 
downstream
+        final AtomicReference<Map<TopicQueueId, OffsetRange[]>> offsetRanges = 
new AtomicReference<>();
+
+        final Set<MessageExt> result = Collections.synchronizedSet(new 
HashSet<MessageExt>());
+
+        dStream.transform(new Function<JavaRDD<MessageExt>, 
JavaRDD<MessageExt>>() {
+            @Override
+            public JavaRDD<MessageExt> call(JavaRDD<MessageExt> v1) throws 
Exception {
+                Map<TopicQueueId, OffsetRange []> offsets = ((HasOffsetRanges) 
v1.rdd()).offsetRanges();
+                offsetRanges.set(offsets);
+                return v1;
+            }
+        }).foreachRDD(new VoidFunction<JavaRDD<MessageExt>>() {
+            @Override
+            public void call(JavaRDD<MessageExt> messageExtJavaRDD) throws 
Exception {
+                result.addAll(messageExtJavaRDD.collect());
+            }
+        });
+
+        sc.start();
+
+        long startTime = System.currentTimeMillis();
+        boolean matches = false;
+        while (!matches && System.currentTimeMillis() - startTime < 20000) {
+            matches = MESSAGE_NUM == result.size();
+            Thread.sleep(50);
+        }
+        sc.stop();
+
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/style/copyright/Apache.xml
----------------------------------------------------------------------
diff --git a/rocketmq-spark/style/copyright/Apache.xml 
b/rocketmq-spark/style/copyright/Apache.xml
new file mode 100644
index 0000000..2db86d0
--- /dev/null
+++ b/rocketmq-spark/style/copyright/Apache.xml
@@ -0,0 +1,24 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<component name="CopyrightManager">
+    <copyright>
+        <option name="myName" value="Apache"/>
+        <option name="notice"
+                value="Licensed to the Apache Software Foundation (ASF) under 
one or more&#10;contributor license agreements.  See the NOTICE file 
distributed with&#10;this work for additional information regarding copyright 
ownership.&#10;The ASF licenses this file to You under the Apache License, 
Version 2.0&#10;(the &quot;License&quot;); you may not use this file except in 
compliance with&#10;the License.  You may obtain a copy of the License 
at&#10;&#10;    http://www.apache.org/licenses/LICENSE-2.0&#10;&#10;Unless 
required by applicable law or agreed to in writing, software&#10;distributed 
under the License is distributed on an &quot;AS IS&quot; BASIS,&#10;WITHOUT 
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.&#10;See the 
License for the specific language governing permissions and&#10;limitations 
under the License."/>
+    </copyright>
+</component>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/style/copyright/profiles_settings.xml
----------------------------------------------------------------------
diff --git a/rocketmq-spark/style/copyright/profiles_settings.xml 
b/rocketmq-spark/style/copyright/profiles_settings.xml
new file mode 100644
index 0000000..4c0e521
--- /dev/null
+++ b/rocketmq-spark/style/copyright/profiles_settings.xml
@@ -0,0 +1,64 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<component name="CopyrightManager">
+    <settings default="Apache">
+        <module2copyright>
+            <element module="All" copyright="Apache"/>
+        </module2copyright>
+        <LanguageOptions name="GSP">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="HTML">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="JAVA">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="addBlankAfter" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="JSP">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="JSPX">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="MXML">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="Properties">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="block" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="SPI">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="block" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="XML">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="__TEMPLATE__">
+            <option name="separateBefore" value="true"/>
+            <option name="lenBefore" value="1"/>
+        </LanguageOptions>
+    </settings>
+</component>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/style/rmq_checkstyle.xml
----------------------------------------------------------------------
diff --git a/rocketmq-spark/style/rmq_checkstyle.xml 
b/rocketmq-spark/style/rmq_checkstyle.xml
new file mode 100644
index 0000000..e3155cc
--- /dev/null
+++ b/rocketmq-spark/style/rmq_checkstyle.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<!DOCTYPE module PUBLIC
+    "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+    "http://www.puppycrawl.com/dtds/configuration_1_3.dtd";>
+<!--Refer 
http://checkstyle.sourceforge.net/reports/google-java-style.html#s2.2-file-encoding
 -->
+<module name="Checker">
+
+    <property name="localeLanguage" value="en"/>
+
+    <!--To configure the check to report on the first instance in each file-->
+    <module name="FileTabCharacter"/>
+
+    <!-- header -->
+    <module name="RegexpHeader">
+        <property name="header" value="/\*\nLicensed to the Apache Software 
Foundation*"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="System\.out\.println"/>
+        <property name="message" value="Prohibit invoking System.out.println 
in source code !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="//FIXME"/>
+        <property name="message" value="Recommended fix FIXME task !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="//TODO"/>
+        <property name="message" value="Recommended fix TODO task !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="@alibaba"/>
+        <property name="message" value="Recommended remove @alibaba keyword!"/>
+    </module>
+    <module name="RegexpSingleline">
+        <property name="format" value="@taobao"/>
+        <property name="message" value="Recommended remove @taobao keyword!"/>
+    </module>
+    <module name="RegexpSingleline">
+        <property name="format" value="@author"/>
+        <property name="message" value="Recommended remove @author tag in 
javadoc!"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format"
+                  
value=".*[\u3400-\u4DB5\u4E00-\u9FA5\u9FA6-\u9FBB\uF900-\uFA2D\uFA30-\uFA6A\uFA70-\uFAD9\uFF00-\uFFEF\u2E80-\u2EFF\u3000-\u303F\u31C0-\u31EF]+.*"/>
+        <property name="message" value="Not allow chinese character !"/>
+    </module>
+
+    <module name="FileLength">
+        <property name="max" value="3000"/>
+    </module>
+
+    <module name="TreeWalker">
+
+        <module name="UnusedImports">
+            <property name="processJavadoc" value="true"/>
+        </module>
+        <module name="RedundantImport"/>
+
+        <!--<module name="IllegalImport" />-->
+
+        <!--Checks that classes that override equals() also override 
hashCode()-->
+        <module name="EqualsHashCode"/>
+        <!--Checks for over-complicated boolean expressions. Currently finds 
code like if (topic == true), topic || true, !false, etc.-->
+        <module name="SimplifyBooleanExpression"/>
+        <module name="OneStatementPerLine"/>
+        <module name="UnnecessaryParentheses"/>
+        <!--Checks for over-complicated boolean return statements. For example 
the following code-->
+        <module name="SimplifyBooleanReturn"/>
+
+        <!--Check that the default is after all the cases in producerGroup 
switch statement-->
+        <module name="DefaultComesLast"/>
+        <!--Detects empty statements (standalone ";" semicolon)-->
+        <module name="EmptyStatement"/>
+        <!--Checks that long constants are defined with an upper ell-->
+        <module name="UpperEll"/>
+        <module name="ConstantName">
+            <property name="format" 
value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/>
+        </module>
+        <!--Checks that local, non-final variable names conform to 
producerGroup format specified by the format property-->
+        <module name="LocalVariableName"/>
+        <!--Validates identifiers for local, final variables, including catch 
parameters-->
+        <module name="LocalFinalVariableName"/>
+        <!--Validates identifiers for non-static fields-->
+        <module name="MemberName"/>
+        <!--Validates identifiers for class type parameters-->
+        <module name="ClassTypeParameterName">
+            <property name="format" value="^[A-Z0-9]*$"/>
+        </module>
+        <!--Validates identifiers for method type parameters-->
+        <module name="MethodTypeParameterName">
+            <property name="format" value="^[A-Z0-9]*$"/>
+        </module>
+        <module name="PackageName"/>
+        <module name="ParameterName"/>
+        <module name="StaticVariableName"/>
+        <module name="TypeName"/>
+        <!--Checks that there are no import statements that use the * 
notation-->
+        <module name="AvoidStarImport"/>
+
+        <!--whitespace-->
+        <module name="GenericWhitespace"/>
+        <module name="NoWhitespaceBefore"/>
+        <module name="WhitespaceAfter"/>
+        <module name="NoWhitespaceAfter"/>
+        <module name="WhitespaceAround">
+            <property name="allowEmptyConstructors" value="true"/>
+            <property name="allowEmptyMethods" value="true"/>
+        </module>
+        <module name="Indentation"/>
+        <module name="MethodParamPad"/>
+        <module name="ParenPad"/>
+        <module name="TypecastParenPad"/>
+    </module>
+</module>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/style/rmq_codeStyle.xml
----------------------------------------------------------------------
diff --git a/rocketmq-spark/style/rmq_codeStyle.xml 
b/rocketmq-spark/style/rmq_codeStyle.xml
new file mode 100644
index 0000000..cd95ee6
--- /dev/null
+++ b/rocketmq-spark/style/rmq_codeStyle.xml
@@ -0,0 +1,157 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<code_scheme name="rocketmq">
+    <option name="USE_SAME_INDENTS" value="true"/>
+    <option name="IGNORE_SAME_INDENTS_FOR_LANGUAGES" value="true"/>
+    <option name="OTHER_INDENT_OPTIONS">
+        <value>
+            <option name="INDENT_SIZE" value="4"/>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+            <option name="TAB_SIZE" value="4"/>
+            <option name="USE_TAB_CHARACTER" value="false"/>
+            <option name="SMART_TABS" value="false"/>
+            <option name="LABEL_INDENT_SIZE" value="0"/>
+            <option name="LABEL_INDENT_ABSOLUTE" value="false"/>
+            <option name="USE_RELATIVE_INDENTS" value="false"/>
+        </value>
+    </option>
+    <option name="PREFER_LONGER_NAMES" value="false"/>
+    <option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/>
+    <option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/>
+    <option name="PACKAGES_TO_USE_IMPORT_ON_DEMAND">
+        <value/>
+    </option>
+    <option name="IMPORT_LAYOUT_TABLE">
+        <value>
+            <package name="" withSubpackages="true" static="false"/>
+            <emptyLine/>
+            <package name="" withSubpackages="true" static="true"/>
+        </value>
+    </option>
+    <option name="JD_ALIGN_PARAM_COMMENTS" value="false"/>
+    <option name="JD_ALIGN_EXCEPTION_COMMENTS" value="false"/>
+    <option name="JD_P_AT_EMPTY_LINES" value="false"/>
+    <option name="JD_KEEP_INVALID_TAGS" value="false"/>
+    <option name="JD_DO_NOT_WRAP_ONE_LINE_COMMENTS" value="true"/>
+    <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+    <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+    <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+    <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+    <option name="ELSE_ON_NEW_LINE" value="true"/>
+    <option name="WHILE_ON_NEW_LINE" value="true"/>
+    <option name="CATCH_ON_NEW_LINE" value="true"/>
+    <option name="FINALLY_ON_NEW_LINE" value="true"/>
+    <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+    <option name="ALIGN_MULTILINE_FOR" value="false"/>
+    <option name="SPACE_AFTER_TYPE_CAST" value="false"/>
+    <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/>
+    <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+    <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/>
+    <option name="LABELED_STATEMENT_WRAP" value="1"/>
+    <option name="WRAP_COMMENTS" value="true"/>
+    <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+    <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+    <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+    <JavaCodeStyleSettings>
+        <option name="CLASS_NAMES_IN_JAVADOC" value="3"/>
+    </JavaCodeStyleSettings>
+    <XML>
+        <option name="XML_LEGACY_SETTINGS_IMPORTED" value="true"/>
+    </XML>
+    <ADDITIONAL_INDENT_OPTIONS fileType="haml">
+        <option name="INDENT_SIZE" value="2"/>
+    </ADDITIONAL_INDENT_OPTIONS>
+    <codeStyleSettings language="Groovy">
+        <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+        <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="ELSE_ON_NEW_LINE" value="true"/>
+        <option name="CATCH_ON_NEW_LINE" value="true"/>
+        <option name="FINALLY_ON_NEW_LINE" value="true"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="SPACE_AFTER_TYPE_CAST" value="false"/>
+        <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+        <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+        <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+        <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+        <indentOptions>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="HOCON">
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+    </codeStyleSettings>
+    <codeStyleSettings language="JAVA">
+        <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+        <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="ELSE_ON_NEW_LINE" value="true"/>
+        <option name="WHILE_ON_NEW_LINE" value="true"/>
+        <option name="CATCH_ON_NEW_LINE" value="true"/>
+        <option name="FINALLY_ON_NEW_LINE" value="true"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="SPACE_AFTER_TYPE_CAST" value="false"/>
+        <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/>
+        <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+        <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/>
+        <option name="LABELED_STATEMENT_WRAP" value="1"/>
+        <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+        <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+        <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+        <indentOptions>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="JSON">
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+    </codeStyleSettings>
+    <codeStyleSettings language="Scala">
+        <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="ELSE_ON_NEW_LINE" value="true"/>
+        <option name="WHILE_ON_NEW_LINE" value="true"/>
+        <option name="CATCH_ON_NEW_LINE" value="true"/>
+        <option name="FINALLY_ON_NEW_LINE" value="true"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+        <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+        <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+        <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+        <indentOptions>
+            <option name="INDENT_SIZE" value="4"/>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+            <option name="TAB_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="XML">
+        <indentOptions>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+</code_scheme>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-storm/README.md
----------------------------------------------------------------------
diff --git a/rocketmq-storm/README.md b/rocketmq-storm/README.md
new file mode 100644
index 0000000..cdd23d8
--- /dev/null
+++ b/rocketmq-storm/README.md
@@ -0,0 +1,2 @@
+# RocketMQ Storm Integration
+

Reply via email to