This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 734686f [FLINK-12177][coordination] Remove legacy FlinkUntypedActor 734686f is described below commit 734686ff29c520d930f24c72dc069f18fd1674b7 Author: tison <wander4...@gmail.com> AuthorDate: Sat Apr 13 12:17:02 2019 +0800 [FLINK-12177][coordination] Remove legacy FlinkUntypedActor --- .../flink/runtime/akka/FlinkUntypedActor.java | 150 ------------------- .../flink/runtime/akka/FlinkUntypedActorTest.java | 160 --------------------- 2 files changed, 310 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java deleted file mode 100644 index e0279b3..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.akka; - -import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; -import org.apache.flink.runtime.messages.RequiresLeaderSessionID; - -import akka.actor.UntypedActor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.UUID; - -/** - * Base class for Flink's actors implemented with Java. Actors inheriting from this class - * automatically log received messages when the debug log level is activated. Furthermore, - * they filter out {@link LeaderSessionMessage} with the wrong leader session ID. If a message - * of type {@link RequiresLeaderSessionID} without being wrapped in a LeaderSessionMessage is - * detected, then an Exception is thrown. - * - * <p>In order to implement the actor behavior, an implementing subclass has to override the method - * handleMessage, which defines how messages are processed. Furthermore, the subclass has to provide - * a leader session ID option which is returned by getLeaderSessionID. - */ -public abstract class FlinkUntypedActor extends UntypedActor { - - //CHECKSTYLE.OFF: MemberNameCheck - re-enable after JobManager/TaskManager refactoring in FLIP-6? - protected final Logger LOG = LoggerFactory.getLogger(getClass()); - //CHECKSTYLE.ON: MemberNameCheck - - /** - * This method is called by Akka if a new message has arrived for the actor. It logs the - * processing time of the incoming message if the logging level is set to debug. After logging - * the handleLeaderSessionID method is called. - * - * <p>Important: This method cannot be overridden. The actor specific message handling logic is - * implemented by the method handleMessage. - * - * @param message Incoming message - * @throws Exception - */ - @Override - public final void onReceive(Object message) throws Exception { - if (LOG.isTraceEnabled()) { - LOG.trace("Received message {} at {} from {}.", message, getSelf().path(), getSender()); - - long start = System.nanoTime(); - - handleLeaderSessionID(message); - - long duration = (System.nanoTime() - start) / 1_000_000; - - LOG.trace("Handled message {} in {} ms from {}.", message, duration, getSender()); - } else { - handleLeaderSessionID(message); - } - } - - /** - * This method filters out {@link LeaderSessionMessage} whose leader session ID is not equal - * to the actors leader session ID. If a message of type {@link RequiresLeaderSessionID} - * arrives, then an Exception is thrown, because these messages have to be wrapped in a - * {@link LeaderSessionMessage}. - * - * @param message Incoming message - * @throws Exception - */ - private void handleLeaderSessionID(Object message) throws Exception { - if (message instanceof LeaderSessionMessage) { - LeaderSessionMessage msg = (LeaderSessionMessage) message; - UUID expectedID = getLeaderSessionID(); - UUID actualID = msg.leaderSessionID(); - - if (expectedID != null) { - if (expectedID.equals(actualID)) { - handleMessage(msg.message()); - } else { - handleDiscardedMessage(expectedID, msg); - } - } else { - handleNoLeaderId(msg); - } - } else if (message instanceof RequiresLeaderSessionID) { - throw new Exception("Received a message " + message + " without a leader session " + - "ID, even though the message requires a leader session ID."); - } else { - // call method to handle message - handleMessage(message); - } - } - - private void handleDiscardedMessage(UUID expectedLeaderSessionID, LeaderSessionMessage msg) { - LOG.warn("Discard message {} because the expected leader session ID {} did not " + - "equal the received leader session ID {}.", msg, expectedLeaderSessionID, - msg.leaderSessionID()); - } - - private void handleNoLeaderId(LeaderSessionMessage msg) { - LOG.warn("Discard message {} because there is currently no valid leader id known.", msg); - } - - /** - * This method contains the actor logic which defines how to react to incoming messages. - * - * @param message Incoming message - * @throws Exception - */ - protected abstract void handleMessage(Object message) throws Exception; - - /** - * Returns the current leader session ID associated with this actor. - * @return - */ - protected abstract UUID getLeaderSessionID(); - - /** - * This method should be called for every outgoing message. It wraps messages which require - * a leader session ID (indicated by {@link RequiresLeaderSessionID}) in a - * {@link LeaderSessionMessage} with the actor's leader session ID. - * - * <p>This method can be overridden to implement a different decoration behavior. - * - * @param message Message to be decorated - * @return The decorated message - */ - protected Object decorateMessage(Object message) { - if (message instanceof RequiresLeaderSessionID) { - return new LeaderSessionMessage(getLeaderSessionID(), message); - } else { - return message; - } - } - -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java deleted file mode 100644 index 2d39e0b..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.akka; - -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.messages.RequiresLeaderSessionID; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.util.TestLogger; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Kill; -import akka.actor.Props; -import akka.actor.RobustActorSystem; -import akka.testkit.JavaTestKit; -import akka.testkit.TestActorRef; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.util.UUID; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -/** - * Tests for {@link FlinkUntypedActor}. - */ -public class FlinkUntypedActorTest extends TestLogger { - - private static ActorSystem actorSystem; - - @BeforeClass - public static void setup() { - actorSystem = RobustActorSystem.create("TestingActorSystem", TestingUtils.testConfig()); - } - - @AfterClass - public static void teardown() { - JavaTestKit.shutdownActorSystem(actorSystem); - } - - /** - * Tests that LeaderSessionMessage messages with a wrong leader session ID are filtered - * out. - */ - @Test - public void testLeaderSessionMessageFilteringOfFlinkUntypedActor() { - final UUID leaderSessionID = UUID.randomUUID(); - final UUID oldSessionID = UUID.randomUUID(); - - TestActorRef<PlainFlinkUntypedActor> actor = null; - - try { - actor = TestActorRef.create( - actorSystem, Props.create(PlainFlinkUntypedActor.class, leaderSessionID)); - - final PlainFlinkUntypedActor underlyingActor = actor.underlyingActor(); - - actor.tell(new JobManagerMessages.LeaderSessionMessage(leaderSessionID, 1), ActorRef.noSender()); - actor.tell(new JobManagerMessages.LeaderSessionMessage(oldSessionID, 2), ActorRef.noSender()); - actor.tell(new JobManagerMessages.LeaderSessionMessage(leaderSessionID, 2), ActorRef.noSender()); - actor.tell(1, ActorRef.noSender()); - - assertEquals(3, underlyingActor.getMessageCounter()); - - } finally { - stopActor(actor); - } - } - - /** - * Tests that an exception is thrown, when the FlinkUntypedActore receives a message which - * extends {@link RequiresLeaderSessionID} and is not wrapped in a LeaderSessionMessage. - */ - @Test - public void testThrowingExceptionWhenReceivingNonWrappedRequiresLeaderSessionIDMessage() { - final UUID leaderSessionID = UUID.randomUUID(); - - TestActorRef<PlainFlinkUntypedActor> actor = null; - - try { - final Props props = Props.create(PlainFlinkUntypedActor.class, leaderSessionID); - actor = TestActorRef.create(actorSystem, props); - - actor.receive(new JobManagerMessages.LeaderSessionMessage(leaderSessionID, 1)); - - try { - actor.receive(new PlainRequiresLeaderSessionID()); - - fail("Expected an exception to be thrown, because a RequiresLeaderSessionID" + - "message was sent without being wrapped in LeaderSessionMessage."); - } catch (Exception e) { - assertEquals("Received a message PlainRequiresLeaderSessionID " + - "without a leader session ID, even though the message requires a " + - "leader session ID.", - e.getMessage()); - } - - } finally { - stopActor(actor); - } - } - - private static void stopActor(ActorRef actor) { - if (actor != null) { - actor.tell(Kill.getInstance(), ActorRef.noSender()); - } - } - - static class PlainFlinkUntypedActor extends FlinkUntypedActor { - - private UUID leaderSessionID; - - private int messageCounter; - - public PlainFlinkUntypedActor(UUID leaderSessionID) { - - this.leaderSessionID = leaderSessionID; - this.messageCounter = 0; - } - - @Override - protected void handleMessage(Object message) throws Exception { - messageCounter++; - } - - @Override - protected UUID getLeaderSessionID() { - return leaderSessionID; - } - - public int getMessageCounter() { - return messageCounter; - } - } - - static class PlainRequiresLeaderSessionID implements RequiresLeaderSessionID { - @Override - public String toString() { - return "PlainRequiresLeaderSessionID"; - } - } -}