Repository: incubator-rocketmq-externals Updated Branches: refs/heads/master 42fc22353 -> 7ceba1b23
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/RocketMqUtils.scala ---------------------------------------------------------------------- diff --git a/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/RocketMqUtils.scala b/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/RocketMqUtils.scala new file mode 100644 index 0000000..a1b3f9d --- /dev/null +++ b/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/RocketMqUtils.scala @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spark + +import java.util.Properties +import java.{lang => jl, util => ju} + +import org.apache.commons.lang.StringUtils +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer +import org.apache.rocketmq.common.message.{Message, MessageExt, MessageQueue} +import org.apache.spark.SparkContext +import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.api.java.{JavaInputDStream, JavaStreamingContext} +import org.apache.spark.streaming.dstream.InputDStream +import org.apache.spark.streaming.{MQPullInputDStream, RocketMqRDD, StreamingContext} +import org.apache.rocketmq.spark.streaming.{ReliableRocketMQReceiver, RocketMQReceiver} +import org.apache.spark.storage.StorageLevel + +object RocketMqUtils { + + /** + * Scala constructor for a batch-oriented interface for consuming from rocketmq. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext + * @param groupId it is for rocketMq for identifying the consumer + * @param offsetRanges offset ranges that define the RocketMq data belonging to this RDD + * @param optionParams optional configs, see [[RocketMQConfig]] for more details. + * @param locationStrategy map from TopicQueueId to preferred host for processing that partition. + * In most cases, use [[LocationStrategy.PreferConsistent]] + * @return RDD[MessageExt] + */ + def createRDD( + sc: SparkContext, + groupId: String, + offsetRanges: ju.Map[TopicQueueId, Array[OffsetRange]], + optionParams: ju.Map[String, String] = new ju.HashMap, + locationStrategy: LocationStrategy = PreferConsistent + ): RDD[MessageExt] = { + + val preferredHosts = locationStrategy match { + case PreferConsistent => ju.Collections.emptyMap[TopicQueueId, String]() + case PreferFixed(hostMap) => hostMap + } + new RocketMqRDD(sc, groupId, optionParams, offsetRanges, preferredHosts, false) + } + + /** + * Java constructor for a batch-oriented interface for consuming from rocketmq. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param jsc SparkContext + * @param groupId it is for rocketMq for identifying the consumer + * @param offsetRanges offset ranges that define the RocketMq data belonging to this RDD + * @param optionParams optional configs, see [[RocketMQConfig]] for more details. + * @param locationStrategy map from TopicQueueId to preferred host for processing that partition. + * In most cases, use [[LocationStrategy.PreferConsistent]] + * @return JavaRDD[MessageExt] + */ + def createJavaRDD( + jsc: JavaSparkContext, + groupId: String, + offsetRanges: ju.Map[TopicQueueId, Array[OffsetRange]], + optionParams: ju.Map[String, String] = new ju.HashMap, + locationStrategy: LocationStrategy = PreferConsistent + ): JavaRDD[MessageExt] = { + new JavaRDD(createRDD(jsc.sc, groupId, offsetRanges, optionParams, locationStrategy)) + } + + /** + * Scala constructor for a RocketMq DStream + * @param groupId it is for rocketMq for identifying the consumer + * @param topics the topics for the rocketmq + * @param consumerStrategy consumerStrategy In most cases, pass in [[ConsumerStrategy.lastest]], + * see [[ConsumerStrategy]] for more details + * @param autoCommit whether commit the offset to the rocketmq server automatically or not. If the user + * implement the [[OffsetCommitCallback]], the autoCommit must be set false + * @param forceSpecial Generally if the rocketmq server has checkpoint for the [[MessageQueue]], then the consumer + * will consume from the checkpoint no matter we specify the offset or not. But if forceSpecial is true, + * the rocketmq will start consuming from the specific available offset in any case. + * @param failOnDataLoss Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, + * the user must make sure all messages in a topic have been processed when deleting a topic. + * @param locationStrategy map from TopicQueueId to preferred host for processing that partition. + * In most cases, use [[LocationStrategy.PreferConsistent]] + * @param optionParams optional configs, see [[RocketMQConfig]] for more details. + * @return InputDStream[MessageExt] + */ + def createMQPullStream( + ssc: StreamingContext, + groupId: String, + topics: ju.Collection[jl.String], + consumerStrategy: ConsumerStrategy, + autoCommit: Boolean, + forceSpecial: Boolean, + failOnDataLoss: Boolean, + locationStrategy: LocationStrategy = PreferConsistent, + optionParams: ju.Map[String, String] = new ju.HashMap + ): InputDStream[MessageExt] = { + + new MQPullInputDStream(ssc, groupId, topics, optionParams, locationStrategy, consumerStrategy, autoCommit, forceSpecial, + failOnDataLoss) + } + + def createMQPullStream( + ssc: StreamingContext, + groupId: String, + topic: String, + consumerStrategy: ConsumerStrategy, + autoCommit: Boolean, + forceSpecial: Boolean, + failOnDataLoss: Boolean, + optionParams: ju.Map[String, String] + ): InputDStream[MessageExt] = { + val topics = new ju.ArrayList[String]() + topics.add(topic) + new MQPullInputDStream(ssc, groupId, topics, optionParams, PreferConsistent, consumerStrategy, autoCommit, forceSpecial, + failOnDataLoss) + } + + /** + * Java constructor for a RocketMq DStream + * @param groupId it is for rocketMq for identifying the consumer + * @param topics the topics for the rocketmq + * @param consumerStrategy consumerStrategy In most cases, pass in [[ConsumerStrategy.lastest]], + * see [[ConsumerStrategy]] for more details + * @param autoCommit whether commit the offset to the rocketmq server automatically or not. If the user + * implement the [[OffsetCommitCallback]], the autoCommit must be set false + * @param forceSpecial Generally if the rocketmq server has checkpoint for the [[MessageQueue]], then the consumer + * will consume from the checkpoint no matter we specify the offset or not. But if forceSpecial is true, + * the rocketmq will start consuming from the specific available offset in any case. + * @param failOnDataLoss Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, + * the user must make sure all messages in a topic have been processed when deleting a topic. + * @param locationStrategy map from TopicQueueId to preferred host for processing that partition. + * In most cases, use [[LocationStrategy.PreferConsistent]] + * @param optionParams optional configs, see [[RocketMQConfig]] for more details. + * @return JavaInputDStream[MessageExt] + */ + def createJavaMQPullStream( + ssc: JavaStreamingContext, + groupId: String, + topics: ju.Collection[jl.String], + consumerStrategy: ConsumerStrategy, + autoCommit: Boolean, + forceSpecial: Boolean, + failOnDataLoss: Boolean, + locationStrategy: LocationStrategy = PreferConsistent, + optionParams: ju.Map[String, String] = new ju.HashMap + ): JavaInputDStream[MessageExt] = { + val inputDStream = createMQPullStream(ssc.ssc, groupId, topics, consumerStrategy, + autoCommit, forceSpecial, failOnDataLoss, locationStrategy, optionParams) + new JavaInputDStream(inputDStream) + } + + def createJavaMQPullStream( + ssc: JavaStreamingContext, + groupId: String, + topics: ju.Collection[jl.String], + consumerStrategy: ConsumerStrategy, + autoCommit: Boolean, + forceSpecial: Boolean, + failOnDataLoss: Boolean): JavaInputDStream[MessageExt] = { + val inputDStream = createMQPullStream(ssc.ssc, groupId, topics, consumerStrategy, + autoCommit, forceSpecial, failOnDataLoss) + new JavaInputDStream(inputDStream) + } + + def mkPullConsumerInstance(groupId: String, optionParams: ju.Map[String, String], instance: String): DefaultMQPullConsumer = { + val consumer = new DefaultMQPullConsumer(groupId) + if (optionParams.containsKey(RocketMQConfig.PULL_TIMEOUT_MS)) + consumer.setConsumerTimeoutMillisWhenSuspend(optionParams.get(RocketMQConfig.PULL_TIMEOUT_MS).toLong) + if (!StringUtils.isBlank(instance)) + consumer.setInstanceName(instance) + if (optionParams.containsKey(RocketMQConfig.NAME_SERVER_ADDR)) + consumer.setNamesrvAddr(optionParams.get(RocketMQConfig.NAME_SERVER_ADDR)) + + consumer.start() + consumer.setOffsetStore(consumer.getDefaultMQPullConsumerImpl.getOffsetStore) + consumer + } + + /** + * For creating Java push mode unreliable DStream + * @param jssc + * @param properties + * @param level + * @return + */ + def createJavaMQPushStream(jssc: JavaStreamingContext, properties: Properties, level: StorageLevel): JavaInputDStream[Message] = createJavaMQPushStream(jssc, properties, level, false) + + /** + * For creating Java push mode reliable DStream + * @param jssc + * @param properties + * @param level + * @return + */ + def createJavaReliableMQPushStream(jssc: JavaStreamingContext, properties: Properties, level: StorageLevel): JavaInputDStream[Message] = createJavaMQPushStream(jssc, properties, level, true) + + /** + * For creating Java push mode DStream + * @param jssc + * @param properties + * @param level + * @param reliable + * @return + */ + def createJavaMQPushStream(jssc: JavaStreamingContext, properties: Properties, level: StorageLevel, reliable: Boolean): JavaInputDStream[Message] = { + if (jssc == null || properties == null || level == null) return null + val receiver = if (reliable) new ReliableRocketMQReceiver(properties, level) else new RocketMQReceiver(properties, level) + val ds = jssc.receiverStream(receiver) + return ds + } + + def getInteger(props: Properties, key: String, defaultValue: Int): Int = props.getProperty(key, String.valueOf(defaultValue)).toInt + + def getBoolean(props: Properties, key: String, defaultValue: Boolean): Boolean = props.getProperty(key, String.valueOf(defaultValue)).toBoolean +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/streaming/MQPullInputDStream.scala ---------------------------------------------------------------------- diff --git a/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/streaming/MQPullInputDStream.scala b/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/streaming/MQPullInputDStream.scala new file mode 100644 index 0000000..10b0408 --- /dev/null +++ b/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/streaming/MQPullInputDStream.scala @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit} +import java.{lang => jl, util => ju} + +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer +import org.apache.rocketmq.client.consumer.store.ReadOffsetType +import org.apache.rocketmq.common.MixAll +import org.apache.rocketmq.common.message.{MessageExt, MessageQueue} +import org.apache.rocketmq.spark.{ConsumerStrategy, _} +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.dstream.{DStream, DStreamCheckpointData, InputDStream} +import org.apache.spark.streaming.scheduler.rate.RateEstimator +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.util.ThreadUtils + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * A DStream where + * each given RocketMq topic/queueId corresponds to an RDD partition. + * The configuration pull.max.speed.per.partition gives the maximum number + * of messages per second that each '''partition''' will accept. + * @param groupId it is for rocketMq for identifying the consumer + * @param topics the topics for the rocketmq + * @param locationStrategy locationStrategy In most cases, pass in [[LocationStrategy.PreferConsistent]], + * see [[LocationStrategy]] for more details. + * @param consumerStrategy consumerStrategy In most cases, pass in [[ConsumerStrategy.lastest]], + * see [[ConsumerStrategy]] for more details + * @param autoCommit whether commit the offset to the rocketmq server automatically or not + * @param forceSpecial Generally if the rocketmq server has checkpoint for the [[MessageQueue]], then the consumer + * will consume from the checkpoint no matter we specify the offset or not. But if forceSpecial is true, + * the rocketmq will start consuming from the specific available offset in any case. + * @param failOnDataLoss Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, + * the user must make sure all messages in a topic have been processed when deleting a topic. + */ +class MQPullInputDStream( + _ssc: StreamingContext, + groupId: String, + topics: ju.Collection[jl.String], + optionParams: ju.Map[String, String], + locationStrategy: LocationStrategy, + consumerStrategy: ConsumerStrategy, + autoCommit: Boolean, + forceSpecial: Boolean, + failOnDataLoss: Boolean + ) extends InputDStream[MessageExt](_ssc) with CanCommitOffsets{ + + private var currentOffsets = mutable.Map[TopicQueueId, Map[String, Long]]() + + private val commitQueue = new ConcurrentLinkedQueue[OffsetRange] + + private val commitCallback = new AtomicReference[OffsetCommitCallback] + + private val maxRateLimitPerPartition = optionParams.getOrDefault(RocketMQConfig.MAX_PULL_SPEED_PER_PARTITION, + "-1").toInt + + @transient private var kc: DefaultMQPullConsumer = null + + /** + * start up timer thread to persis the OffsetStore + */ + @transient private val scheduledExecutorService = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "Driver-Commit-Thread") + + private def consumer() = this.synchronized { + if (null == kc) { + kc = RocketMqUtils.mkPullConsumerInstance(groupId, optionParams, "driver") + val messageQueues = fetchSubscribeMessageQueues(topics) + val iter = messageQueues.iterator + while (iter.hasNext){ + val messageQueue = iter.next + val offset = computePullFromWhere(messageQueue) + val topicQueueId = new TopicQueueId(messageQueue.getTopic, messageQueue.getQueueId) + if (!currentOffsets.contains(topicQueueId)) { + currentOffsets += topicQueueId -> Map(messageQueue.getBrokerName -> offset) + } else { + if (!currentOffsets(topicQueueId).contains(messageQueue.getBrokerName)){ + currentOffsets(topicQueueId) += messageQueue.getBrokerName -> offset + } + } + } + + // timer persist + this.scheduledExecutorService.scheduleAtFixedRate( + new Runnable() { + def run() { + try { + val messageQueueSet = new ju.HashSet[MessageQueue] + val iter = topics.iterator + while (iter.hasNext){ + messageQueueSet.addAll(kc.fetchSubscribeMessageQueues(iter.next)) + } + kc.getOffsetStore.persistAll(messageQueues) + } catch { + case e: Exception => { + log.error("ScheduledTask persistAllConsumerOffset exception", e) + } + } + } + }, 1000 * 10, 1000 * 5, TimeUnit.MILLISECONDS) + } + kc + } + + private def fetchSubscribeMessageQueues(topics : ju.Collection[jl.String]): ju.HashSet[MessageQueue] = { + val messageQueueSet = new ju.HashSet[MessageQueue] + + val iter = topics.iterator + while (iter.hasNext){ + messageQueueSet.addAll(kc.fetchSubscribeMessageQueues(iter.next)) + } + messageQueueSet + } + + private def computePullFromWhere(mq: MessageQueue): Long = { + var result = -1L + val offsetStore = kc.getOffsetStore + val minOffset = kc.minOffset(mq) + val checkpointOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE) + + consumerStrategy match { + case LatestStrategy => { + if (checkpointOffset >= 0) { + //consider the checkpoint offset first + if (checkpointOffset < minOffset) { + reportDataLoss(s"MessageQueue $mq's checkpointOffset $checkpointOffset is smaller than minOffset $minOffset") + result = kc.maxOffset(mq) + } else { + result = checkpointOffset + } + } else { + // First start,no offset + if (mq.getTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + result = 0 + } else { + result = kc.maxOffset(mq) + } + } + } + case EarliestStrategy => { + if (checkpointOffset >= 0) { + //consider the checkpoint offset first + if (checkpointOffset < minOffset) { + reportDataLoss(s"MessageQueue $mq's checkpointOffset $checkpointOffset is smaller than minOffset $minOffset") + result = minOffset + } else { + result = checkpointOffset + } + } else { + // First start,no offset + result = minOffset + } + } + case SpecificOffsetStrategy(queueToOffset) => { + + val specificOffset = queueToOffset.get(mq) + + if (checkpointOffset >= 0 && !forceSpecial) { + if (checkpointOffset < minOffset) { + reportDataLoss(s"MessageQueue $mq's checkpointOffset $checkpointOffset is smaller than minOffset $minOffset") + result = minOffset + } else { + result = checkpointOffset + } + } else { + specificOffset match { + case Some(ConsumerStrategy.LATEST) => { + result = kc.maxOffset(mq) + } + case Some(ConsumerStrategy.EARLIEST) => { + result = kc.minOffset(mq) + } + case Some(offset) => { + if (offset < minOffset) { + reportDataLoss(s"MessageQueue $mq's specific offset $offset is smaller than minOffset $minOffset") + result = minOffset + } else { + result = offset + } + } + case None => { + if (checkpointOffset >= 0) { + //consider the checkpoint offset first + if (checkpointOffset < minOffset) { + reportDataLoss(s"MessageQueue $mq's checkpointOffset $checkpointOffset is smaller than minOffset $minOffset") + result = minOffset + } else { + result = checkpointOffset + } + } else { + logWarning(s"MessageQueue $mq's specific offset and checkpointOffset are none, then use the minOffset") + result = kc.minOffset(mq) + } + } + } + } + } + } + result + } + + private def firstConsumerOffset(mq: MessageQueue): Long = { + val offsetStore = kc.getOffsetStore + val lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE) + val minOffset = kc.minOffset(mq) + if (lastOffset < minOffset) { + reportDataLoss(s"MessageQueue $mq's checkpoint offset $lastOffset is smaller than minOffset $minOffset") + minOffset + } else { + lastOffset + } + } + + + override def persist(newLevel: StorageLevel): DStream[MessageExt] = { + logError("rocketmq MessageExt is not serializable. " + + "Use .map to extract fields before calling .persist or .window") + super.persist(newLevel) + } + + protected def getPreferredHosts: ju.Map[TopicQueueId, String] = { + locationStrategy match { + case PreferConsistent => ju.Collections.emptyMap[TopicQueueId, String]() + case PreferFixed(hostMap) => hostMap + } + } + + // Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]") + private[streaming] override def name: String = s"RocketMq polling stream [$id]" + + protected[streaming] override val checkpointData = + new MQInputDStreamCheckpointData + + + /** + * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker. + */ + override protected[streaming] val rateController: Option[RateController] = { + if (RateController.isBackPressureEnabled(_ssc.conf)) { + Some(new DirectMQRateController(id, + RateEstimator.create(_ssc.conf, context.graph.batchDuration))) + } else { + None + } + } + + /** + * calculate the until-offset per partition in theory + */ + private def maxMessagesPerPartition( + offsets: Map[TopicQueueId, Map[String, Long]]): Option[Map[TopicQueueId, Map[String, Long]]] = { + val estimatedRateLimit = rateController.map(_.getLatestRate().toInt) + + var lagPerPartition = Map[TopicQueueId, Long]() + var totalLag = 0L + val lagPerPartitionPerQueue = offsets.map{ case (tp, value) => + val partitionTotal = value.map{ case (name, maxOffset) => + var count = Math.max(maxOffset - currentOffsets(tp)(name), 0) + totalLag += count + (name, count) + } + lagPerPartition += tp -> partitionTotal.values.sum + tp -> partitionTotal + } + + val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match { + case Some(rate) => + lagPerPartitionPerQueue.map { case (tp, queues) => + val backPressRate = Math.round(lagPerPartition(tp) / totalLag.toFloat * rate) + val partitionMessages = (if (maxRateLimitPerPartition > 0) { + Math.min(backPressRate, maxRateLimitPerPartition)} else backPressRate) + tp -> queues.map{ case (name, count) => + (name, Math.ceil(count / lagPerPartition(tp).toFloat * partitionMessages)) + } + } + case None => + + lagPerPartitionPerQueue.map { case (tp, queues) => + val partitionMessages = maxRateLimitPerPartition + tp -> queues.map{ case (name, count) => + (name, Math.ceil(count / lagPerPartition(tp).toFloat * partitionMessages)) + } + } + } + + if (effectiveRateLimitPerPartition.flatMap(_._2).map(_._2).sum > 0) { + val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 + Some(effectiveRateLimitPerPartition.map { + case (tp, limit) => tp -> limit.map{ case (name, count) => + name -> (count * secsPerBatch).toLong + } + }) + } else { + None + } + } + + + /** + * Returns the latest (highest) available offsets, taking new partitions into account. + */ + protected def latestOffsets(): Map[TopicQueueId, Map[String, Long]] = { + val c = consumer + + val messageQueues = fetchSubscribeMessageQueues(topics) + + var maxOffsets = Map[TopicQueueId, Map[String, Long]]() + + val lastTopicQueues = currentOffsets.keySet + val fetchTopicQueues = mutable.Set[TopicQueueId]() + val iter = messageQueues.iterator + while (iter.hasNext) { + val messageQueue = iter.next + logDebug(s"${messageQueue.toString} min: ${c.minOffset(messageQueue)} max: ${c.maxOffset(messageQueue)}") + val topicQueueId = new TopicQueueId(messageQueue.getTopic, messageQueue.getQueueId) + fetchTopicQueues.add(topicQueueId) + if (!currentOffsets.contains(topicQueueId)){ + currentOffsets += topicQueueId -> Map(messageQueue.getBrokerName -> firstConsumerOffset(messageQueue)) + }else{ + if (!currentOffsets(topicQueueId).contains(messageQueue.getBrokerName)) + currentOffsets(topicQueueId) += messageQueue.getBrokerName -> firstConsumerOffset(messageQueue) + } + if (!maxOffsets.contains(topicQueueId)) { + maxOffsets += topicQueueId -> Map(messageQueue.getBrokerName -> c.maxOffset(messageQueue)) + }else{ + if (!maxOffsets(topicQueueId).contains(messageQueue.getBrokerName)) { + val tempMap = maxOffsets(topicQueueId) + (messageQueue.getBrokerName -> c.maxOffset(messageQueue)) + maxOffsets += topicQueueId -> tempMap + } + } + } + + val deletedPartitions = lastTopicQueues.diff(fetchTopicQueues) + if (deletedPartitions.size > 0){ + reportDataLoss( + s"Cannot find offsets of ${deletedPartitions}. Some data may have been missed") + } + maxOffsets + } + + /** + * limits the maximum number of messages per partition + */ + protected def clamp(offsets: Map[TopicQueueId, Map[String, Long]]): Map[TopicQueueId, Map[String, Long]] = { + maxMessagesPerPartition(offsets).map { mmp => + mmp.map { case (tp, partitionsOffsets) => + tp -> partitionsOffsets.map{case (name, messages) => + name -> Math.min(currentOffsets(tp)(name) + messages, offsets(tp)(name))} + } + }.getOrElse(offsets) + } + + + override def compute(validTime: Time): Option[RocketMqRDD] = { + + val untilOffsets = clamp(latestOffsets()) + + val offsetRangeRdd: ju.Map[TopicQueueId, Array[OffsetRange]] = new ju.HashMap() + untilOffsets.foreach { case (tp, uo) => + val values = uo.map { case (name, until) => + val fo = currentOffsets(tp)(name) + OffsetRange(tp.topic, tp.queueId, name, fo, until) + }.toArray + offsetRangeRdd.put(tp, values) + } + + val rdd = new RocketMqRDD( + context.sparkContext, groupId, optionParams, offsetRangeRdd, getPreferredHosts, true) + + // Report the record number and metadata of this batch interval to InputInfoTracker. + val description = offsetRangeRdd.asScala.flatMap{ case (tp, arrayRange) => + // Don't display empty ranges. + arrayRange + }.filter { offsetRange => + offsetRange.fromOffset != offsetRange.untilOffset + }.map { offsetRange => + s"topic: ${offsetRange.topic}\tqueueId: ${offsetRange.queueId}\t" + + s"brokerName: ${offsetRange.brokerName}\t" + + s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}" + }.mkString("\n") + // Copy offsetRanges to immutable.List to prevent from being modified by the user + val metadata = Map( + "offsets" -> offsetRangeRdd, + StreamInputInfo.METADATA_KEY_DESCRIPTION -> description) + val inputInfo = StreamInputInfo(id, rdd.count, metadata) + ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) + + currentOffsets = collection.mutable.Map() ++ untilOffsets + + if (autoCommit) { + currentOffsets.foreach { case (tp, uo) => + uo.map { case (name, until) => + val offset = currentOffsets(tp)(name) - 1 + val mq = new MessageQueue(tp.topic, name, tp.queueId) + kc.updateConsumeOffset(mq, offset) + } + } + } else { + commitAll() + } + Some(rdd) + } + + private def reportDataLoss(message: String): Unit = { + if (failOnDataLoss) { + throw new IllegalStateException(message) + } else { + logWarning(message) + } + } + + /** + * Queue up offset ranges for commit to rocketmq at a future time. Threadsafe. + * @param offsetRanges The maximum untilOffset for a given partition will be used at commit. + */ + def commitAsync(offsetRanges: ju.Map[TopicQueueId, Array[OffsetRange]]): Unit = { + commitAsync(offsetRanges, null) + } + + /** + * Queue up offset ranges for commit to rocketmq at a future time. Threadsafe. + * @param offsetRanges The maximum untilOffset for a given partition will be used at commit. + * @param callback Only the most recently provided callback will be used at commit. + */ + def commitAsync(offsetRanges: ju.Map[TopicQueueId, Array[OffsetRange]], callback: OffsetCommitCallback): Unit = { + commitCallback.set(callback) + offsetRanges.values.asScala.foreach{ value => + commitQueue.addAll(ju.Arrays.asList(value: _*)) + } + } + + protected def commitAll(): Unit = { + val m = new ju.HashMap[MessageQueue, jl.Long] + var osr = commitQueue.poll() + try { + while (null != osr) { + //Exclusive ending offset + val mq = new MessageQueue(osr.topic, osr.brokerName, osr.queueId) + kc.updateConsumeOffset(mq, osr.untilOffset - 1) + m.put(mq, osr.untilOffset - 1) + osr = commitQueue.poll() + } + if (commitCallback.get != null) { + commitCallback.get.onComplete(m, null) + } + } catch { + case e: Exception => { + if (commitCallback.get != null) + commitCallback.get.onComplete(m, e) + } + } + } + + + override def start(): Unit = { + consumer + } + + override def stop(): Unit = this.synchronized { + if (kc != null) { + kc.shutdown() + } + } + + private[streaming] + class MQInputDStreamCheckpointData extends DStreamCheckpointData(this) { + def batchForTime: mutable.HashMap[Time, mutable.HashMap[TopicQueueId, Array[(String, Int, String, Long, Long)]]] = { + data.asInstanceOf[mutable.HashMap[Time, mutable.HashMap[TopicQueueId, Array[OffsetRange.OffsetRangeTuple]]]] + } + + override def update(time: Time): Unit = { + batchForTime.clear() + generatedRDDs.foreach { kv => + val values = new mutable.HashMap[TopicQueueId, Array[OffsetRange.OffsetRangeTuple]] + kv._2.asInstanceOf[RocketMqRDD].offsetRanges.asScala.foreach{ case (k, v) => + values.put(k, v.map(_.toTuple)) + } + batchForTime += kv._1 ->values + } + } + + override def cleanup(time: Time): Unit = { } + + override def restore(): Unit = { + batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) => + logInfo(s"Restoring RocketMqRDD for time $t $b") + + val offsetRanges = new ju.HashMap[TopicQueueId, Array[OffsetRange]]() + b.foreach{ case (i, j) => + offsetRanges.put(i, j.map(OffsetRange(_))) + } + + generatedRDDs += t -> new RocketMqRDD( + context.sparkContext, + groupId, + optionParams, + offsetRanges, + getPreferredHosts, + // during restore, it's possible same partition will be consumed from multiple + // threads, so dont use cache + false + ) + } + } + } + + /** + * A RateController to retrieve the rate from RateEstimator. + */ + private class DirectMQRateController(id: Int, estimator: RateEstimator) + extends RateController(id, estimator) { + override def publish(rate: Long): Unit = () + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/streaming/RocketMqRDD.scala ---------------------------------------------------------------------- diff --git a/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/streaming/RocketMqRDD.scala b/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/streaming/RocketMqRDD.scala new file mode 100644 index 0000000..078a3c1 --- /dev/null +++ b/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/streaming/RocketMqRDD.scala @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import org.apache.rocketmq.common.message.MessageExt +import org.apache.rocketmq.spark._ +import org.apache.spark.partial.{BoundedDouble, PartialResult} +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.storage.StorageLevel +import org.apache.spark.{Partition, SparkContext, TaskContext} +import java.{util => ju} +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + + +/** + * A batch-oriented interface for consuming from RocketMq. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param groupId it is for rocketMq for identifying the consumer + * @param optionParams the configs + * @param offsetRanges offset ranges that define the RocketMq data belonging to this RDD + * @param preferredHosts map from TopicQueueId to preferred host for processing that partition. + * In most cases, use [[LocationStrategy.PreferConsistent]] + * @param useConsumerCache useConsumerCache whether to use a consumer from a per-jvm cache + */ +class RocketMqRDD ( + sc: SparkContext, + val groupId: String, + val optionParams: ju.Map[String, String], + val offsetRanges: ju.Map[TopicQueueId, Array[OffsetRange]], + val preferredHosts: ju.Map[TopicQueueId, String], + val useConsumerCache: Boolean + )extends RDD[MessageExt](sc, Nil) with HasOffsetRanges{ + + private val cacheInitialCapacity = + optionParams.getOrDefault(RocketMQConfig.PULL_CONSUMER_CACHE_INIT_CAPACITY, "16").toInt + private val cacheMaxCapacity = + optionParams.getOrDefault(RocketMQConfig.PULL_CONSUMER_CACHE_MAX_CAPACITY, "64").toInt + private val cacheLoadFactor = + optionParams.getOrDefault(RocketMQConfig.PULL_CONSUMER_CACHE_LOAD_FACTOR, "0.75").toFloat + + override def persist(newLevel: StorageLevel): this.type = { + super.persist(newLevel) + } + + override def getPartitions: Array[Partition] = { + offsetRanges.asScala.toArray.zipWithIndex.map{ case ((first, second), i) => + new RocketMqRDDPartition(i, first.topic, first.queueId, second) + }.toArray + } + + override def count(): Long = offsetRanges.asScala.map(_._2.map(_.count).sum).sum + + override def countApprox( + timeout: Long, + confidence: Double = 0.95 + ): PartialResult[BoundedDouble] = { + val c = count + new PartialResult(new BoundedDouble(c, 1.0, c, c), true) + } + + override def isEmpty(): Boolean = count == 0L + + override def take(num: Int): Array[MessageExt] = { + val nonEmptyPartitions = this.partitions + .map(_.asInstanceOf[RocketMqRDDPartition]) + .filter(_.count > 0) + + if (num < 1 || nonEmptyPartitions.isEmpty) { + return new Array[MessageExt](0) + } + + // Determine in advance how many messages need to be taken from each partition + val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) => + val remain = num - result.values.sum + if (remain > 0) { + val taken = Math.min(remain, part.count) + result + (part.index -> taken.toInt) + } else { + result + } + } + + val buf = new ArrayBuffer[MessageExt] + val res = context.runJob( + this, + (tc: TaskContext, it: Iterator[MessageExt]) => + it.take(parts(tc.partitionId)).toArray, parts.keys.toArray + ) + res.foreach(buf ++= _) + buf.toArray + } + + private def executors(): Array[ExecutorCacheTaskLocation] = { + val bm = sparkContext.env.blockManager + bm.master.getPeers(bm.blockManagerId).toArray + .map(x => ExecutorCacheTaskLocation(x.host, x.executorId)) + .sortWith(compareExecutors) + } + + private def compareExecutors( + a: ExecutorCacheTaskLocation, + b: ExecutorCacheTaskLocation): Boolean = + if (a.host == b.host) { + a.executorId > b.executorId + } else { + a.host > b.host + } + + /** + * Non-negative modulus, from java 8 math + */ + private def floorMod(a: Int, b: Int): Int = ((a % b) + b) % b + + protected override def getPreferredLocations(thePart: Partition): Seq[String] = { + // The intention is best-effort consistent executor for a given topic partition, + // so that caching consumers can be effective. + val part = thePart.asInstanceOf[RocketMqRDDPartition] + val allExecs = executors() + val tp = part.topicQueueId() + val prefHost = preferredHosts.get(tp) + val prefExecs = if (null == prefHost) allExecs else allExecs.filter(_.host == prefHost) + val execs = if (prefExecs.isEmpty) allExecs else prefExecs + if (execs.isEmpty) { + Seq() + } else { + // execs is sorted, tp.hashCode depends only on topic and partition, so consistent index + val index = this.floorMod(tp.hashCode, execs.length) + val chosen = execs(index) + Seq(chosen.toString) + } + } + + private def errBeginAfterEnd(part: RocketMqRDDPartition): String = + s"Beginning offset is after the ending offset ${part.partitionOffsetRanges.mkString(",")} " + + s"for topic ${part.topic} partition ${part.index}. " + + "You either provided an invalid fromOffset, or the Kafka topic has been damaged" + + override def compute(thePart: Partition, context: TaskContext): Iterator[MessageExt] = { + val part = thePart.asInstanceOf[RocketMqRDDPartition] + val count = part.count() + assert(count >= 0, errBeginAfterEnd(part)) + if (count == 0) { + logInfo(s"Beginning offset is the same as ending offset " + + s"skipping ${part.topic} ${part.queueId}") + Iterator.empty + } else { + new RocketMqRDDIterator(part, context) + } + } + + + /** + * An iterator that fetches messages directly from rocketmq for the offsets in partition. + * Uses a cached consumer where possible to take advantage of prefetching + */ + private class RocketMqRDDIterator( + part: RocketMqRDDPartition, + context: TaskContext) extends Iterator[MessageExt] { + + logDebug(s"Computing topic ${part.topic}, queueId ${part.queueId} " + + s"offsets ${part.partitionOffsetRanges.mkString(",")}") + + context.addTaskCompletionListener{ context => closeIfNeeded() } + + + val consumer = if (useConsumerCache) { + CachedMQConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor) + if (context.attemptNumber > 5) { + // just in case the prior attempt failures were cache related + CachedMQConsumer.remove(groupId, part.topic, part.queueId, part.brokerNames) + } + CachedMQConsumer.getOrCreate(groupId, part.topic, part.queueId, part.brokerNames, optionParams) + } else { + CachedMQConsumer.getUncached(groupId, part.topic, part.queueId, part.brokerNames, optionParams) + } + + var logicTotalOffset = 0 + val totalSum = part.partitionOffsetRanges.map(_.count).sum + var index = 0 + var requestOffset = part.partitionOffsetRanges.apply(index).fromOffset + + def closeIfNeeded(): Unit = { + if (!useConsumerCache && consumer != null) { + consumer.client.shutdown + } + } + + override def hasNext(): Boolean = { + totalSum > logicTotalOffset + } + + override def next(): MessageExt = { + assert(hasNext(), "Can't call getNext() once untilOffset has been reached") + val queueRange = part.partitionOffsetRanges.apply(index) + val r = consumer.get(queueRange.brokerName, requestOffset) + if (queueRange.untilOffset > (requestOffset + 1)) + requestOffset +=1 + else { + index +=1 + if (part.partitionOffsetRanges.length > index) + requestOffset = part.partitionOffsetRanges.apply(index).fromOffset + } + logicTotalOffset += 1 + r + } + } + + private[RocketMqRDD] + type OffsetRangeTuple = (String, Int) + + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/RocketMQServerMock.java ---------------------------------------------------------------------- diff --git a/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/RocketMQServerMock.java b/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/RocketMQServerMock.java new file mode 100644 index 0000000..50d46ad --- /dev/null +++ b/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/RocketMQServerMock.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.spark; + +import org.apache.commons.lang.time.DateFormatUtils; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MQVersion; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.namesrv.NamesrvConfig; +import org.apache.rocketmq.namesrv.NamesrvController; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.store.config.MessageStoreConfig; + +import java.util.Date; +import java.util.UUID; + +public class RocketMQServerMock { + + private NamesrvController namesrvController; + private BrokerController brokerController; + private String nameserverAddr = "localhost:9876"; + private String producerGroup = UUID.randomUUID().toString(); + + + public void startupServer() throws Exception{ + //start nameserver + startNamesrv(); + //start broker + startBroker(); + } + + public void shutdownServer() { + if (brokerController != null) { + brokerController.shutdown(); + } + + if (namesrvController != null) { + namesrvController.shutdown(); + } + } + + public String getNameServerAddr() { + return nameserverAddr; + } + + private void startNamesrv() throws Exception { + NamesrvConfig namesrvConfig = new NamesrvConfig(); + NettyServerConfig nettyServerConfig = new NettyServerConfig(); + nettyServerConfig.setListenPort(9876); + + namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig); + boolean initResult = namesrvController.initialize(); + if (!initResult) { + namesrvController.shutdown(); + throw new Exception("Namesvr init failure!"); + } + namesrvController.start(); + } + + private void startBroker() throws Exception { + System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); + + BrokerConfig brokerConfig = new BrokerConfig(); + brokerConfig.setNamesrvAddr(nameserverAddr); + brokerConfig.setBrokerId(MixAll.MASTER_ID); + NettyServerConfig nettyServerConfig = new NettyServerConfig(); + nettyServerConfig.setListenPort(10911); + NettyClientConfig nettyClientConfig = new NettyClientConfig(); + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + + brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig); + boolean initResult = brokerController.initialize(); + if (!initResult) { + brokerController.shutdown(); + throw new Exception("Broker init failure!"); + } + brokerController.start(); + } + + public void prepareDataTo(String topic, int times) throws Exception { + // publish test message + DefaultMQProducer producer = new DefaultMQProducer(producerGroup); + producer.setNamesrvAddr(nameserverAddr); + + String sendMsg = "\"Hello Rocket\"" + "," + DateFormatUtils.format(new Date(), "yyyy-MM-DD hh:mm:ss"); + + try { + producer.start(); + for (int i = 0; i < times; i++) { + producer.send(new Message(topic, sendMsg.getBytes("UTF-8"))); + } + } catch (Exception e) { + throw new MQClientException("Failed to publish messages", e); + } finally { + producer.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/MessageRetryManagerTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/MessageRetryManagerTest.java b/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/MessageRetryManagerTest.java new file mode 100644 index 0000000..93029fc --- /dev/null +++ b/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/MessageRetryManagerTest.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.spark.streaming; + +import org.apache.rocketmq.common.message.MessageExt; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingDeque; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class MessageRetryManagerTest { + MessageRetryManager messageRetryManager; + Map<String,MessageSet> cache; + BlockingQueue<MessageSet> queue; + + @Before + public void prepare() { + cache = new ConcurrentHashMap<>(10); + queue = new LinkedBlockingDeque<>(10); + int maxRetry = 3; + int ttl = 2000; + messageRetryManager = new DefaultMessageRetryManager(queue, maxRetry, ttl); + ((DefaultMessageRetryManager)messageRetryManager).setCache(cache); + } + + @Test + public void testRetryLogics() { + List<MessageExt> data = new ArrayList<>(); + //ack + MessageSet messageSet = new MessageSet(data); + messageRetryManager.mark(messageSet); + assertEquals(1, cache.size()); + assertTrue(cache.containsKey(messageSet.getId())); + + messageRetryManager.ack(messageSet.getId()); + assertEquals(0, cache.size()); + assertFalse(cache.containsKey(messageSet.getId())); + + + //fail need retry: retries < maxRetry + messageSet = new MessageSet(data); + messageRetryManager.mark(messageSet); + assertEquals(1, cache.size()); + assertTrue(cache.containsKey(messageSet.getId())); + + messageRetryManager.fail(messageSet.getId()); + assertEquals(0, cache.size()); + assertFalse(cache.containsKey(messageSet.getId())); + assertEquals(1, messageSet.getRetries()); + assertEquals(1, queue.size()); + assertEquals(messageSet, queue.poll()); + + + //fail need not retry: retries >= maxRetry + messageSet = new MessageSet(data); + messageRetryManager.mark(messageSet); + messageRetryManager.fail(messageSet.getId()); + assertEquals(0, cache.size()); + assertFalse(cache.containsKey(messageSet.getId())); + + messageRetryManager.mark(messageSet); + messageRetryManager.fail(messageSet.getId()); + assertEquals(2, messageSet.getRetries()); + messageRetryManager.mark(messageSet); + messageRetryManager.fail(messageSet.getId()); + assertEquals(3, messageSet.getRetries()); + + assertFalse(messageRetryManager.needRetry(messageSet)); + messageRetryManager.mark(messageSet); + messageRetryManager.fail(messageSet.getId()); + assertEquals(0, cache.size()); + assertEquals(3, queue.size()); + assertEquals(messageSet, queue.poll()); + + + //fail: no ack/fail received in ttl + messageSet = new MessageSet(data); + messageRetryManager.mark(messageSet); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + assertEquals(0, cache.size()); + assertFalse(cache.containsKey(messageSet.getId())); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/ReliableRocketMQReceiverTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/ReliableRocketMQReceiverTest.java b/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/ReliableRocketMQReceiverTest.java new file mode 100644 index 0000000..69a4c76 --- /dev/null +++ b/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/ReliableRocketMQReceiverTest.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.spark.streaming; + +import org.apache.rocketmq.spark.RocketMQConfig; +import org.apache.rocketmq.spark.RocketMQServerMock; +import org.apache.rocketmq.spark.RocketMqUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.Durations; +import org.apache.spark.streaming.api.java.JavaInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Properties; + +public class ReliableRocketMQReceiverTest { + private static RocketMQServerMock mockServer = new RocketMQServerMock(); + + private static final String NAMESERVER_ADDR = mockServer.getNameServerAddr(); + private static final String CONSUMER_GROUP = "wordcount2"; + private static final String CONSUMER_TOPIC = "wordcountsource"; + + @BeforeClass + public static void start() throws Exception { + mockServer.startupServer(); + + Thread.sleep(2000); + + //prepare data + mockServer.prepareDataTo(CONSUMER_TOPIC, 5); + } + + @AfterClass + public static void stop() { + mockServer.shutdownServer(); + } + + @Test + public void testReliableRocketMQReceiver() { + SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount"); + JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); + Properties properties = new Properties(); + properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, NAMESERVER_ADDR); + properties.setProperty(RocketMQConfig.CONSUMER_GROUP, CONSUMER_GROUP); + properties.setProperty(RocketMQConfig.CONSUMER_TOPIC, CONSUMER_TOPIC); + JavaInputDStream ds = RocketMqUtils.createJavaReliableMQPushStream(jssc, properties, StorageLevel.MEMORY_ONLY()); + ds.print(); + jssc.start(); + try { + jssc.awaitTerminationOrTimeout(60000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/RocketMQReceiverTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/RocketMQReceiverTest.java b/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/RocketMQReceiverTest.java new file mode 100644 index 0000000..89694a4 --- /dev/null +++ b/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/RocketMQReceiverTest.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.spark.streaming; + +import org.apache.rocketmq.spark.RocketMQConfig; +import org.apache.rocketmq.spark.RocketMQServerMock; +import org.apache.rocketmq.spark.RocketMqUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.Durations; +import org.apache.spark.streaming.api.java.JavaInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Properties; + +public class RocketMQReceiverTest { + private static RocketMQServerMock mockServer = new RocketMQServerMock(); + + private static final String NAMESERVER_ADDR = mockServer.getNameServerAddr(); + private static final String CONSUMER_GROUP = "wordcount"; + private static final String CONSUMER_TOPIC = "wordcountsource"; + + @BeforeClass + public static void start() throws Exception { + mockServer.startupServer(); + + Thread.sleep(2000); + + //prepare data + mockServer.prepareDataTo(CONSUMER_TOPIC, 5); + } + + @AfterClass + public static void stop() { + mockServer.shutdownServer(); + } + + @Test + public void testRocketMQReceiver() { + SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount"); + JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); + Properties properties = new Properties(); + properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, NAMESERVER_ADDR); + properties.setProperty(RocketMQConfig.CONSUMER_GROUP, CONSUMER_GROUP); + properties.setProperty(RocketMQConfig.CONSUMER_TOPIC, CONSUMER_TOPIC); + JavaInputDStream ds = RocketMqUtils.createJavaMQPushStream(jssc, properties, StorageLevel.MEMORY_ONLY()); + ds.print(); + jssc.start(); + try { + jssc.awaitTerminationOrTimeout(60000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/RocketMqUtilsTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/RocketMqUtilsTest.java b/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/RocketMqUtilsTest.java new file mode 100644 index 0000000..5a00c73 --- /dev/null +++ b/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/RocketMqUtilsTest.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.spark.streaming; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spark.ConsumerStrategy; +import org.apache.rocketmq.spark.HasOffsetRanges; +import org.apache.rocketmq.spark.LocationStrategy; +import org.apache.rocketmq.spark.OffsetRange; +import org.apache.rocketmq.spark.RocketMQConfig; +import org.apache.rocketmq.spark.RocketMQServerMock; +import org.apache.rocketmq.spark.RocketMqUtils; +import org.apache.rocketmq.spark.TopicQueueId; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.VoidFunction; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.Serializable; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +public class RocketMqUtilsTest implements Serializable { + + private static RocketMQServerMock mockServer = new RocketMQServerMock(); + + private static String NAME_SERVER = mockServer.getNameServerAddr(); + + private static String TOPIC_DEFAULT = UUID.randomUUID().toString(); + + private static int MESSAGE_NUM = 100; + + @BeforeClass + public static void start() throws Exception { + mockServer.startupServer(); + + Thread.sleep(2000); + + //producer message + mockServer.prepareDataTo(TOPIC_DEFAULT, MESSAGE_NUM); + } + + @AfterClass + public static void stop() { + mockServer.shutdownServer(); + } + + @Test + public void testConsumer() throws MQBrokerException, MQClientException, InterruptedException, UnsupportedEncodingException { + + // start up spark + Map<String, String> optionParams = new HashMap<>(); + optionParams.put(RocketMQConfig.NAME_SERVER_ADDR, NAME_SERVER); + SparkConf sparkConf = new SparkConf().setAppName("JavaCustomReceiver").setMaster("local[*]"); + JavaStreamingContext sc = new JavaStreamingContext(sparkConf, new Duration(1000)); + List<String> topics = new ArrayList<>(); + topics.add(TOPIC_DEFAULT); + + LocationStrategy locationStrategy = LocationStrategy.PreferConsistent(); + + JavaInputDStream<MessageExt> stream = RocketMqUtils.createJavaMQPullStream(sc, UUID.randomUUID().toString(), + topics, ConsumerStrategy.earliest(), false, false, false, locationStrategy, optionParams); + + + final Set<MessageExt> result = Collections.synchronizedSet(new HashSet<MessageExt>()); + + stream.foreachRDD(new VoidFunction<JavaRDD<MessageExt>>() { + @Override + public void call(JavaRDD<MessageExt> messageExtJavaRDD) throws Exception { + result.addAll(messageExtJavaRDD.collect()); + } + }); + + sc.start(); + + long startTime = System.currentTimeMillis(); + boolean matches = false; + while (!matches && System.currentTimeMillis() - startTime < 20000) { + matches = MESSAGE_NUM == result.size(); + Thread.sleep(50); + } + sc.stop(); + } + + @Test + public void testGetOffsets() throws MQBrokerException, MQClientException, InterruptedException, UnsupportedEncodingException { + + Map<String, String> optionParams = new HashMap<>(); + optionParams.put(RocketMQConfig.NAME_SERVER_ADDR, NAME_SERVER); + SparkConf sparkConf = new SparkConf().setAppName("JavaCustomReceiver").setMaster("local[*]"); + JavaStreamingContext sc = new JavaStreamingContext(sparkConf, new Duration(1000)); + List<String> topics = new ArrayList<>(); + topics.add(TOPIC_DEFAULT); + + LocationStrategy locationStrategy = LocationStrategy.PreferConsistent(); + + JavaInputDStream<MessageExt> dStream = RocketMqUtils.createJavaMQPullStream(sc, UUID.randomUUID().toString(), + topics, ConsumerStrategy.earliest(), false, false, false, locationStrategy, optionParams); + + // hold a reference to the current offset ranges, so it can be used downstream + final AtomicReference<Map<TopicQueueId, OffsetRange[]>> offsetRanges = new AtomicReference<>(); + + final Set<MessageExt> result = Collections.synchronizedSet(new HashSet<MessageExt>()); + + dStream.transform(new Function<JavaRDD<MessageExt>, JavaRDD<MessageExt>>() { + @Override + public JavaRDD<MessageExt> call(JavaRDD<MessageExt> v1) throws Exception { + Map<TopicQueueId, OffsetRange []> offsets = ((HasOffsetRanges) v1.rdd()).offsetRanges(); + offsetRanges.set(offsets); + return v1; + } + }).foreachRDD(new VoidFunction<JavaRDD<MessageExt>>() { + @Override + public void call(JavaRDD<MessageExt> messageExtJavaRDD) throws Exception { + result.addAll(messageExtJavaRDD.collect()); + } + }); + + sc.start(); + + long startTime = System.currentTimeMillis(); + boolean matches = false; + while (!matches && System.currentTimeMillis() - startTime < 20000) { + matches = MESSAGE_NUM == result.size(); + Thread.sleep(50); + } + sc.stop(); + + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/style/copyright/Apache.xml ---------------------------------------------------------------------- diff --git a/rocketmq-spark/style/copyright/Apache.xml b/rocketmq-spark/style/copyright/Apache.xml new file mode 100644 index 0000000..2db86d0 --- /dev/null +++ b/rocketmq-spark/style/copyright/Apache.xml @@ -0,0 +1,24 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<component name="CopyrightManager"> + <copyright> + <option name="myName" value="Apache"/> + <option name="notice" + value="Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License."/> + </copyright> +</component> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/style/copyright/profiles_settings.xml ---------------------------------------------------------------------- diff --git a/rocketmq-spark/style/copyright/profiles_settings.xml b/rocketmq-spark/style/copyright/profiles_settings.xml new file mode 100644 index 0000000..4c0e521 --- /dev/null +++ b/rocketmq-spark/style/copyright/profiles_settings.xml @@ -0,0 +1,64 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<component name="CopyrightManager"> + <settings default="Apache"> + <module2copyright> + <element module="All" copyright="Apache"/> + </module2copyright> + <LanguageOptions name="GSP"> + <option name="fileTypeOverride" value="3"/> + <option name="prefixLines" value="false"/> + </LanguageOptions> + <LanguageOptions name="HTML"> + <option name="fileTypeOverride" value="3"/> + <option name="prefixLines" value="false"/> + </LanguageOptions> + <LanguageOptions name="JAVA"> + <option name="fileTypeOverride" value="3"/> + <option name="addBlankAfter" value="false"/> + </LanguageOptions> + <LanguageOptions name="JSP"> + <option name="fileTypeOverride" value="3"/> + <option name="prefixLines" value="false"/> + </LanguageOptions> + <LanguageOptions name="JSPX"> + <option name="fileTypeOverride" value="3"/> + <option name="prefixLines" value="false"/> + </LanguageOptions> + <LanguageOptions name="MXML"> + <option name="fileTypeOverride" value="3"/> + <option name="prefixLines" value="false"/> + </LanguageOptions> + <LanguageOptions name="Properties"> + <option name="fileTypeOverride" value="3"/> + <option name="block" value="false"/> + </LanguageOptions> + <LanguageOptions name="SPI"> + <option name="fileTypeOverride" value="3"/> + <option name="block" value="false"/> + </LanguageOptions> + <LanguageOptions name="XML"> + <option name="fileTypeOverride" value="3"/> + <option name="prefixLines" value="false"/> + </LanguageOptions> + <LanguageOptions name="__TEMPLATE__"> + <option name="separateBefore" value="true"/> + <option name="lenBefore" value="1"/> + </LanguageOptions> + </settings> +</component> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/style/rmq_checkstyle.xml ---------------------------------------------------------------------- diff --git a/rocketmq-spark/style/rmq_checkstyle.xml b/rocketmq-spark/style/rmq_checkstyle.xml new file mode 100644 index 0000000..e3155cc --- /dev/null +++ b/rocketmq-spark/style/rmq_checkstyle.xml @@ -0,0 +1,135 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<!DOCTYPE module PUBLIC + "-//Puppy Crawl//DTD Check Configuration 1.3//EN" + "http://www.puppycrawl.com/dtds/configuration_1_3.dtd"> +<!--Refer http://checkstyle.sourceforge.net/reports/google-java-style.html#s2.2-file-encoding --> +<module name="Checker"> + + <property name="localeLanguage" value="en"/> + + <!--To configure the check to report on the first instance in each file--> + <module name="FileTabCharacter"/> + + <!-- header --> + <module name="RegexpHeader"> + <property name="header" value="/\*\nLicensed to the Apache Software Foundation*"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" value="System\.out\.println"/> + <property name="message" value="Prohibit invoking System.out.println in source code !"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" value="//FIXME"/> + <property name="message" value="Recommended fix FIXME task !"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" value="//TODO"/> + <property name="message" value="Recommended fix TODO task !"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" value="@alibaba"/> + <property name="message" value="Recommended remove @alibaba keyword!"/> + </module> + <module name="RegexpSingleline"> + <property name="format" value="@taobao"/> + <property name="message" value="Recommended remove @taobao keyword!"/> + </module> + <module name="RegexpSingleline"> + <property name="format" value="@author"/> + <property name="message" value="Recommended remove @author tag in javadoc!"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" + value=".*[\u3400-\u4DB5\u4E00-\u9FA5\u9FA6-\u9FBB\uF900-\uFA2D\uFA30-\uFA6A\uFA70-\uFAD9\uFF00-\uFFEF\u2E80-\u2EFF\u3000-\u303F\u31C0-\u31EF]+.*"/> + <property name="message" value="Not allow chinese character !"/> + </module> + + <module name="FileLength"> + <property name="max" value="3000"/> + </module> + + <module name="TreeWalker"> + + <module name="UnusedImports"> + <property name="processJavadoc" value="true"/> + </module> + <module name="RedundantImport"/> + + <!--<module name="IllegalImport" />--> + + <!--Checks that classes that override equals() also override hashCode()--> + <module name="EqualsHashCode"/> + <!--Checks for over-complicated boolean expressions. Currently finds code like if (topic == true), topic || true, !false, etc.--> + <module name="SimplifyBooleanExpression"/> + <module name="OneStatementPerLine"/> + <module name="UnnecessaryParentheses"/> + <!--Checks for over-complicated boolean return statements. For example the following code--> + <module name="SimplifyBooleanReturn"/> + + <!--Check that the default is after all the cases in producerGroup switch statement--> + <module name="DefaultComesLast"/> + <!--Detects empty statements (standalone ";" semicolon)--> + <module name="EmptyStatement"/> + <!--Checks that long constants are defined with an upper ell--> + <module name="UpperEll"/> + <module name="ConstantName"> + <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/> + </module> + <!--Checks that local, non-final variable names conform to producerGroup format specified by the format property--> + <module name="LocalVariableName"/> + <!--Validates identifiers for local, final variables, including catch parameters--> + <module name="LocalFinalVariableName"/> + <!--Validates identifiers for non-static fields--> + <module name="MemberName"/> + <!--Validates identifiers for class type parameters--> + <module name="ClassTypeParameterName"> + <property name="format" value="^[A-Z0-9]*$"/> + </module> + <!--Validates identifiers for method type parameters--> + <module name="MethodTypeParameterName"> + <property name="format" value="^[A-Z0-9]*$"/> + </module> + <module name="PackageName"/> + <module name="ParameterName"/> + <module name="StaticVariableName"/> + <module name="TypeName"/> + <!--Checks that there are no import statements that use the * notation--> + <module name="AvoidStarImport"/> + + <!--whitespace--> + <module name="GenericWhitespace"/> + <module name="NoWhitespaceBefore"/> + <module name="WhitespaceAfter"/> + <module name="NoWhitespaceAfter"/> + <module name="WhitespaceAround"> + <property name="allowEmptyConstructors" value="true"/> + <property name="allowEmptyMethods" value="true"/> + </module> + <module name="Indentation"/> + <module name="MethodParamPad"/> + <module name="ParenPad"/> + <module name="TypecastParenPad"/> + </module> +</module> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/style/rmq_codeStyle.xml ---------------------------------------------------------------------- diff --git a/rocketmq-spark/style/rmq_codeStyle.xml b/rocketmq-spark/style/rmq_codeStyle.xml new file mode 100644 index 0000000..cd95ee6 --- /dev/null +++ b/rocketmq-spark/style/rmq_codeStyle.xml @@ -0,0 +1,157 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<code_scheme name="rocketmq"> + <option name="USE_SAME_INDENTS" value="true"/> + <option name="IGNORE_SAME_INDENTS_FOR_LANGUAGES" value="true"/> + <option name="OTHER_INDENT_OPTIONS"> + <value> + <option name="INDENT_SIZE" value="4"/> + <option name="CONTINUATION_INDENT_SIZE" value="4"/> + <option name="TAB_SIZE" value="4"/> + <option name="USE_TAB_CHARACTER" value="false"/> + <option name="SMART_TABS" value="false"/> + <option name="LABEL_INDENT_SIZE" value="0"/> + <option name="LABEL_INDENT_ABSOLUTE" value="false"/> + <option name="USE_RELATIVE_INDENTS" value="false"/> + </value> + </option> + <option name="PREFER_LONGER_NAMES" value="false"/> + <option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/> + <option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/> + <option name="PACKAGES_TO_USE_IMPORT_ON_DEMAND"> + <value/> + </option> + <option name="IMPORT_LAYOUT_TABLE"> + <value> + <package name="" withSubpackages="true" static="false"/> + <emptyLine/> + <package name="" withSubpackages="true" static="true"/> + </value> + </option> + <option name="JD_ALIGN_PARAM_COMMENTS" value="false"/> + <option name="JD_ALIGN_EXCEPTION_COMMENTS" value="false"/> + <option name="JD_P_AT_EMPTY_LINES" value="false"/> + <option name="JD_KEEP_INVALID_TAGS" value="false"/> + <option name="JD_DO_NOT_WRAP_ONE_LINE_COMMENTS" value="true"/> + <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/> + <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/> + <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/> + <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/> + <option name="ELSE_ON_NEW_LINE" value="true"/> + <option name="WHILE_ON_NEW_LINE" value="true"/> + <option name="CATCH_ON_NEW_LINE" value="true"/> + <option name="FINALLY_ON_NEW_LINE" value="true"/> + <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/> + <option name="ALIGN_MULTILINE_FOR" value="false"/> + <option name="SPACE_AFTER_TYPE_CAST" value="false"/> + <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/> + <option name="METHOD_PARAMETERS_WRAP" value="1"/> + <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/> + <option name="LABELED_STATEMENT_WRAP" value="1"/> + <option name="WRAP_COMMENTS" value="true"/> + <option name="METHOD_ANNOTATION_WRAP" value="1"/> + <option name="CLASS_ANNOTATION_WRAP" value="1"/> + <option name="FIELD_ANNOTATION_WRAP" value="1"/> + <JavaCodeStyleSettings> + <option name="CLASS_NAMES_IN_JAVADOC" value="3"/> + </JavaCodeStyleSettings> + <XML> + <option name="XML_LEGACY_SETTINGS_IMPORTED" value="true"/> + </XML> + <ADDITIONAL_INDENT_OPTIONS fileType="haml"> + <option name="INDENT_SIZE" value="2"/> + </ADDITIONAL_INDENT_OPTIONS> + <codeStyleSettings language="Groovy"> + <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/> + <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/> + <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/> + <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/> + <option name="ELSE_ON_NEW_LINE" value="true"/> + <option name="CATCH_ON_NEW_LINE" value="true"/> + <option name="FINALLY_ON_NEW_LINE" value="true"/> + <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/> + <option name="ALIGN_MULTILINE_FOR" value="false"/> + <option name="SPACE_AFTER_TYPE_CAST" value="false"/> + <option name="METHOD_PARAMETERS_WRAP" value="1"/> + <option name="METHOD_ANNOTATION_WRAP" value="1"/> + <option name="CLASS_ANNOTATION_WRAP" value="1"/> + <option name="FIELD_ANNOTATION_WRAP" value="1"/> + <option name="PARENT_SETTINGS_INSTALLED" value="true"/> + <indentOptions> + <option name="CONTINUATION_INDENT_SIZE" value="4"/> + </indentOptions> + </codeStyleSettings> + <codeStyleSettings language="HOCON"> + <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/> + <option name="PARENT_SETTINGS_INSTALLED" value="true"/> + </codeStyleSettings> + <codeStyleSettings language="JAVA"> + <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/> + <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/> + <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/> + <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/> + <option name="ELSE_ON_NEW_LINE" value="true"/> + <option name="WHILE_ON_NEW_LINE" value="true"/> + <option name="CATCH_ON_NEW_LINE" value="true"/> + <option name="FINALLY_ON_NEW_LINE" value="true"/> + <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/> + <option name="ALIGN_MULTILINE_FOR" value="false"/> + <option name="SPACE_AFTER_TYPE_CAST" value="false"/> + <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/> + <option name="METHOD_PARAMETERS_WRAP" value="1"/> + <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/> + <option name="LABELED_STATEMENT_WRAP" value="1"/> + <option name="METHOD_ANNOTATION_WRAP" value="1"/> + <option name="CLASS_ANNOTATION_WRAP" value="1"/> + <option name="FIELD_ANNOTATION_WRAP" value="1"/> + <option name="PARENT_SETTINGS_INSTALLED" value="true"/> + <indentOptions> + <option name="CONTINUATION_INDENT_SIZE" value="4"/> + </indentOptions> + </codeStyleSettings> + <codeStyleSettings language="JSON"> + <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/> + <option name="PARENT_SETTINGS_INSTALLED" value="true"/> + </codeStyleSettings> + <codeStyleSettings language="Scala"> + <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/> + <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/> + <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/> + <option name="ELSE_ON_NEW_LINE" value="true"/> + <option name="WHILE_ON_NEW_LINE" value="true"/> + <option name="CATCH_ON_NEW_LINE" value="true"/> + <option name="FINALLY_ON_NEW_LINE" value="true"/> + <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/> + <option name="ALIGN_MULTILINE_FOR" value="false"/> + <option name="METHOD_PARAMETERS_WRAP" value="1"/> + <option name="METHOD_ANNOTATION_WRAP" value="1"/> + <option name="CLASS_ANNOTATION_WRAP" value="1"/> + <option name="FIELD_ANNOTATION_WRAP" value="1"/> + <option name="PARENT_SETTINGS_INSTALLED" value="true"/> + <indentOptions> + <option name="INDENT_SIZE" value="4"/> + <option name="CONTINUATION_INDENT_SIZE" value="4"/> + <option name="TAB_SIZE" value="4"/> + </indentOptions> + </codeStyleSettings> + <codeStyleSettings language="XML"> + <indentOptions> + <option name="CONTINUATION_INDENT_SIZE" value="4"/> + </indentOptions> + </codeStyleSettings> +</code_scheme> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-storm/README.md ---------------------------------------------------------------------- diff --git a/rocketmq-storm/README.md b/rocketmq-storm/README.md new file mode 100644 index 0000000..cdd23d8 --- /dev/null +++ b/rocketmq-storm/README.md @@ -0,0 +1,2 @@ +# RocketMQ Storm Integration +
