Repository: incubator-gearpump Updated Branches: refs/heads/master 0bc6ac375 -> cb6aced50
[GEARPUMP-359] Fix OutputWatermark advancing logic in Subscription Author: huafengw <[email protected]> Closes #234 from huafengw/subs. fix style Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/cb6aced5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/cb6aced5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/cb6aced5 Branch: refs/heads/master Commit: cb6aced500e95b95e4ef0e870db7017d5647fbaa Parents: 0bc6ac3 Author: huafengw <[email protected]> Authored: Fri Oct 27 06:34:50 2017 +0800 Committer: manuzhang <[email protected]> Committed: Fri Oct 27 06:36:10 2017 +0800 ---------------------------------------------------------------------- .../gearpump/streaming/MessageSerializer.scala | 12 ++++-- .../gearpump/streaming/task/Subscription.scala | 14 ++----- .../gearpump/streaming/task/TaskActor.scala | 5 ++- .../streaming/task/TaskControlMessage.scala | 5 ++- .../streaming/MessageSerializerSpec.scala | 6 +-- .../streaming/task/SubscriptionSpec.scala | 42 ++++++++++++-------- .../gearpump/streaming/task/TaskActorSpec.scala | 2 +- 7 files changed, 48 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cb6aced5/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala index 20e2529..10879ac 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala @@ -40,13 +40,14 @@ class TaskIdSerializer extends TaskMessageSerializer[TaskId] { class AckSerializer extends TaskMessageSerializer[Ack] { val taskIdSerializer = new TaskIdSerializer - override def getLength(obj: Ack): Int = taskIdSerializer.getLength(obj.taskId) + 8 + override def getLength(obj: Ack): Int = taskIdSerializer.getLength(obj.taskId) + 16 override def write(dataOutput: DataOutput, obj: Ack): Unit = { taskIdSerializer.write(dataOutput, obj.taskId) dataOutput.writeShort(obj.seq) dataOutput.writeShort(obj.actualReceivedNum) dataOutput.writeInt(obj.sessionId) + dataOutput.writeLong(obj.watermark) } override def read(dataInput: DataInput): Ack = { @@ -54,7 +55,8 @@ class AckSerializer extends TaskMessageSerializer[Ack] { val seq = dataInput.readShort() val actualReceivedNum = dataInput.readShort() val sessionId = dataInput.readInt() - Ack(taskId, seq, actualReceivedNum, sessionId) + val watermark = dataInput.readLong() + Ack(taskId, seq, actualReceivedNum, sessionId, watermark) } } @@ -78,19 +80,21 @@ class InitialAckRequestSerializer extends TaskMessageSerializer[InitialAckReques class AckRequestSerializer extends TaskMessageSerializer[AckRequest] { val taskIdSerializer = new TaskIdSerializer - override def getLength(obj: AckRequest): Int = taskIdSerializer.getLength(obj.taskId) + 6 + override def getLength(obj: AckRequest): Int = taskIdSerializer.getLength(obj.taskId) + 14 override def write(dataOutput: DataOutput, obj: AckRequest): Unit = { taskIdSerializer.write(dataOutput, obj.taskId) dataOutput.writeShort(obj.seq) dataOutput.writeInt(obj.sessionId) + dataOutput.writeLong(obj.watermark) } override def read(dataInput: DataInput): AckRequest = { val taskId = taskIdSerializer.read(dataInput) val seq = dataInput.readShort() val sessionId = dataInput.readInt() - AckRequest(taskId, seq, sessionId) + val watermark = dataInput.readLong() + AckRequest(taskId, seq, sessionId, watermark) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cb6aced5/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala index 24f1763..ab99323 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala @@ -58,12 +58,9 @@ class Subscription( // Don't worry if this store negative number. We will wrap the Short private val messageCount: Array[Short] = new Array[Short](parallelism) private val pendingMessageCount: Array[Short] = new Array[Short](parallelism) - private val processingWatermarkSince: Array[Short] = new Array[Short](parallelism) private val outputWatermark: Array[MilliSeconds] = Array.fill(parallelism)( Watermark.MIN.toEpochMilli) - private val processingWatermark: Array[MilliSeconds] = Array.fill(parallelism)( - Watermark.MIN.toEpochMilli) private var maxPendingCount: Short = 0 @@ -113,8 +110,6 @@ class Subscription( val targetTask = TaskId(processorId, partition) publisher.transport(msg, targetTask) - this.processingWatermark(partition) = publisher.getProcessingWatermark.toEpochMilli - incrementMessageCount(partition, 1) if (messageCount(partition) % ackOnceEveryMessageCount == 0) { @@ -162,14 +157,12 @@ class Subscription( * @param ack acknowledge message received */ def receiveAck(ack: Ack): Unit = { - val index = ack.taskId.index if (ack.sessionId == sessionId) { if (ack.actualReceivedNum == ack.seq) { - if ((ack.seq - processingWatermarkSince(index)).toShort >= 0) { - outputWatermark(index) = processingWatermark(index) - processingWatermarkSince(index) = messageCount(index) + if (ack.watermark > outputWatermark(index)) { + outputWatermark(index) = ack.watermark } pendingMessageCount(ack.taskId.index) = (messageCount(ack.taskId.index) - ack.seq).toShort @@ -209,7 +202,8 @@ class Subscription( // to throttle the number of unacked AckRequest incrementMessageCount(partition, ackOnceEveryMessageCount) val targetTask = TaskId(processorId, partition) - val ackRequest = AckRequest(taskId, messageCount(partition), sessionId) + val processingWatermark = publisher.getProcessingWatermark.toEpochMilli + val ackRequest = AckRequest(taskId, messageCount(partition), sessionId, processingWatermark) publisher.transport(ackRequest, targetTask) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cb6aced5/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala index b43457e..9d9778e 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala @@ -392,7 +392,7 @@ object TaskActor { null } else { receivedMsgCount.put(sessionId, 0) - Ack(task_id, 0, 0, sessionId) + Ack(task_id, 0, 0, sessionId, Watermark.MIN.toEpochMilli) } } @@ -402,7 +402,8 @@ object TaskActor { // Increments more count for each AckRequest // to throttle the number of unacked AckRequest receivedMsgCount.put(sessionId, (receivedMsgCount.get(sessionId) + incrementCount).toShort) - Ack(task_id, ackRequest.seq, receivedMsgCount.get(sessionId), ackRequest.sessionId) + Ack(task_id, ackRequest.seq, receivedMsgCount.get(sessionId), ackRequest.sessionId, + ackRequest.watermark) } else { LOG.error(s"get unknown AckRequest $ackRequest from ${sender.toString()}") null http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cb6aced5/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala index 4ba9315..ffa3134 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala @@ -30,7 +30,7 @@ case class InitialAckRequest(taskId: TaskId, sessionId: Int) Here the sessionId filed is used to distinguish messages between different replays after the application restart */ -case class AckRequest(taskId: TaskId, seq: Short, sessionId: Int) +case class AckRequest(taskId: TaskId, seq: Short, sessionId: Int, watermark: Long) /** * Ack back to sender task actor. @@ -38,7 +38,8 @@ case class AckRequest(taskId: TaskId, seq: Short, sessionId: Int) * @param seq The seq field represents the expected number of received messages and the * actualReceivedNum field means the actual received number since start. */ -case class Ack(taskId: TaskId, seq: Short, actualReceivedNum: Short, sessionId: Int) +case class Ack( + taskId: TaskId, seq: Short, actualReceivedNum: Short, sessionId: Int, watermark: Long) sealed trait ClockEvent http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cb6aced5/streaming/src/test/scala/org/apache/gearpump/streaming/MessageSerializerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/MessageSerializerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/MessageSerializerSpec.scala index f6f6af2..1ab2358 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/MessageSerializerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/MessageSerializerSpec.scala @@ -17,9 +17,9 @@ */ package org.apache.gearpump.streaming +import org.apache.gearpump.streaming.source.Watermark import org.jboss.netty.buffer.{ChannelBufferOutputStream, ChannelBuffers} import org.scalatest.{Matchers, WordSpec} - import org.apache.gearpump.streaming.task._ import org.apache.gearpump.transport.netty.WrappedChannelBuffer @@ -55,7 +55,7 @@ class MessageSerializerSpec extends WordSpec with Matchers { "AckRequestSerializer" should { "serialize and deserialize AckRequest properly" in { val serializer = new AckRequestSerializer - val ackRequest = AckRequest(TaskId(1, 2), 1000, 1024) + val ackRequest = AckRequest(TaskId(1, 2), 1000, 1024, Watermark.MAX.toEpochMilli) assert(testSerializer(ackRequest, serializer).equals(ackRequest)) } } @@ -71,7 +71,7 @@ class MessageSerializerSpec extends WordSpec with Matchers { "AckSerializer" should { "serialize and deserialize Ack properly" in { val serializer = new AckSerializer - val ack = Ack(TaskId(1, 2), 1024, 1023, 1799) + val ack = Ack(TaskId(1, 2), 1024, 1023, 1799, Watermark.MAX.toEpochMilli) assert(testSerializer(ack, serializer).equals(ack)) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cb6aced5/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala index b05befa..8079928 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala @@ -70,6 +70,7 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar { val (subscription, sender) = prepare val msg1 = Message("1", timestamp = Instant.ofEpochMilli(70)) when(sender.getProcessingWatermark).thenReturn(msg1.timestamp) + // Send first message to Task(1, 1) subscription.sendMessage(msg1) verify(sender, times(1)).transport(msg1, TaskId(1, 1)) @@ -77,6 +78,7 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar { val msg2 = Message("0", timestamp = Instant.ofEpochMilli(50)) when(sender.getProcessingWatermark).thenReturn(msg2.timestamp) + // Send first message to Task(1, 0) subscription.sendMessage(msg2) verify(sender, times(1)).transport(msg2, TaskId(1, 0)) @@ -85,31 +87,39 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar { val initialMinClock = subscription.watermark // Acks initial AckRequest(0) - subscription.receiveAck(Ack(TaskId(1, 1), 0, 0, session)) - subscription.receiveAck(Ack(TaskId(1, 0), 0, 0, session)) + subscription.receiveAck(Ack(TaskId(1, 1), 0, 0, session, Watermark.MIN.toEpochMilli)) + subscription.receiveAck(Ack(TaskId(1, 0), 0, 0, session, Watermark.MIN.toEpochMilli)) - // Sends 100 messages - 100 until 200 foreach { clock => - when(sender.getProcessingWatermark).thenReturn(Instant.ofEpochMilli(clock), - Instant.ofEpochMilli(clock)) + // Sends 98 more messages to each downstream task + 100 until 198 foreach { clock => subscription.sendMessage(Message("1", clock)) subscription.sendMessage(Message("2", clock)) } - assert(subscription.watermark == 50) + // Triger sending AckRequest + val inOrders = org.mockito.Mockito.inOrder(sender, sender) - subscription.receiveAck(Ack(TaskId(1, 1), 100, 100, session)) - subscription.receiveAck(Ack(TaskId(1, 0), 100, 100, session)) + val msg3 = Message("1", Instant.ofEpochMilli(200)) + val expectedAckRequest = AckRequest(taskId, 200, session, 200) + when(sender.getProcessingWatermark).thenReturn(Instant.ofEpochMilli(200)) + subscription.sendMessage(msg3) + inOrders.verify(sender).transport(msg3, TaskId(1, 1)) + inOrders.verify(sender).transport(expectedAckRequest, TaskId(1, 1)) - // Ack received, minClock changed - assert(subscription.watermark > initialMinClock) + val msg4 = Message("2", Instant.ofEpochMilli(200)) + val expectedAckRequest2 = AckRequest(taskId, 200, session, 220) + when(sender.getProcessingWatermark).thenReturn(Instant.ofEpochMilli(220)) + subscription.sendMessage(msg4) + inOrders.verify(sender).transport(msg4, TaskId(1, 0)) + inOrders.verify(sender).transport(expectedAckRequest2, TaskId(1, 0)) + + assert(subscription.watermark == initialMinClock) - // Expects to receive two ackRequest for two downstream tasks - val ackRequestForTask0 = AckRequest(taskId, 200, session) - verify(sender, times(1)).transport(ackRequestForTask0, TaskId(1, 0)) + subscription.receiveAck(Ack(TaskId(1, 1), 200, 200, session, 200)) + subscription.receiveAck(Ack(TaskId(1, 0), 200, 200, session, 220)) - val ackRequestForTask1 = AckRequest(taskId, 200, session) - verify(sender, times(1)).transport(ackRequestForTask1, TaskId(1, 1)) + // Ack received, minClock changed + assert(subscription.watermark == 200) } it should "disallow more message sending if there is no ack back" in { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cb6aced5/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala index 8deee78..3906b36 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala @@ -80,7 +80,7 @@ class TaskActorSpec extends WordSpec with Matchers with BeforeAndAfterEach with testActor ! StartTask(taskId1) implicit val system = getActorSystem - val ack = Ack(taskId2, 100, 99, testActor.underlyingActor.sessionId) + val ack = Ack(taskId2, 100, 99, testActor.underlyingActor.sessionId, 1024L) EventFilter[MsgLostException](occurrences = 1) intercept { testActor ! ack }
