http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java index a7f65f3..7ff4ec6 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -34,8 +35,12 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.queryablestate.UnknownKeyOrNamespace; +import org.apache.flink.configuration.QueryableStateOptions; +import org.apache.flink.queryablestate.UnknownKeyOrNamespaceException; import org.apache.flink.queryablestate.client.QueryableStateClient; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.messages.JobManagerMessages; @@ -53,25 +58,27 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; -import akka.actor.ActorSystem; -import akka.dispatch.Futures; -import akka.dispatch.OnSuccess; -import akka.dispatch.Recover; -import akka.pattern.Patterns; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongArray; +import java.util.function.Supplier; import scala.concurrent.Await; -import scala.concurrent.Future; import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; import scala.reflect.ClassTag$; @@ -84,10 +91,12 @@ import static org.junit.Assert.assertTrue; */ public abstract class AbstractQueryableStateITCase extends TestLogger { - protected static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10000, TimeUnit.SECONDS); - private static final FiniteDuration QUERY_RETRY_DELAY = new FiniteDuration(100, TimeUnit.MILLISECONDS); + private static final int NO_OF_RETRIES = 100; + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10000L, TimeUnit.SECONDS); + private static final Time QUERY_RETRY_DELAY = Time.milliseconds(100L); - protected static ActorSystem testActorSystem; + private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4); + private final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService); /** * State backend to use. @@ -136,7 +145,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { final Deadline deadline = TEST_TIMEOUT.fromNow(); final int numKeys = 256; - final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); + final QueryableStateClient client = new QueryableStateClient( + "localhost", + cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT)); JobID jobId = null; @@ -150,7 +161,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { // Very important, because cluster is shared between tests and we // don't explicitly check that all slots are available before // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); DataStream<Tuple2<Integer, Long>> source = env .addSource(new TestKeyRangeSource(numKeys)); @@ -163,15 +174,14 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { final String queryName = "hakuna-matata"; - final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState = - source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { - private static final long serialVersionUID = 7143749578983540352L; + source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { + private static final long serialVersionUID = 7143749578983540352L; - @Override - public Integer getKey(Tuple2<Integer, Long> value) throws Exception { - return value.f0; - } - }).asQueryableState(queryName, reducingState); + @Override + public Integer getKey(Tuple2<Integer, Long> value) throws Exception { + return value.f0; + } + }).asQueryableState(queryName, reducingState); // Submit the job graph JobGraph jobGraph = env.getStreamGraph().getJobGraph(); @@ -188,19 +198,19 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { while (!allNonZero && deadline.hasTimeLeft()) { allNonZero = true; - final List<Future<Tuple2<Integer, Long>>> futures = new ArrayList<>(numKeys); + final List<CompletableFuture<Tuple2<Integer, Long>>> futures = new ArrayList<>(numKeys); for (int i = 0; i < numKeys; i++) { final int key = i; - if (counts.get(key) > 0) { + if (counts.get(key) > 0L) { // Skip this one continue; } else { allNonZero = false; } - Future<Tuple2<Integer, Long>> result = getKvStateWithRetries( + CompletableFuture<Tuple2<Integer, Long>> result = getKvStateWithRetries( client, jobId, queryName, @@ -208,24 +218,21 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { BasicTypeInfo.INT_TYPE_INFO, reducingState, QUERY_RETRY_DELAY, - false); + false, + executor); - result.onSuccess(new OnSuccess<Tuple2<Integer, Long>>() { - @Override - public void onSuccess(Tuple2<Integer, Long> result) throws Throwable { - counts.set(key, result.f1); - assertEquals("Key mismatch", key, result.f0.intValue()); - } - }, testActorSystem.dispatcher()); + result.thenAccept(res -> { + counts.set(key, res.f1); + assertEquals("Key mismatch", key, res.f0.intValue()); + }); futures.add(result); } - Future<Iterable<Tuple2<Integer, Long>>> futureSequence = Futures.sequence( - futures, - testActorSystem.dispatcher()); - - Await.ready(futureSequence, deadline.timeLeft()); + // wait for all the futures to complete + CompletableFuture + .allOf(futures.toArray(new CompletableFuture<?>[futures.size()])) + .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); } assertTrue("Not all keys are non-zero", allNonZero); @@ -238,15 +245,15 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { } finally { // Free cluster resources if (jobId != null) { - Future<CancellationSuccess> cancellation = cluster + CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster .getLeaderGateway(deadline.timeLeft()) .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)); + .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class))); - Await.ready(cancellation, deadline.timeLeft()); + cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); } - client.shutDown(); + client.shutdown(); } } @@ -274,7 +281,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { // Very important, because cluster is shared between tests and we // don't explicitly check that all slots are available before // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); DataStream<Tuple2<Integer, Long>> source = env .addSource(new TestKeyRangeSource(numKeys)); @@ -311,22 +318,23 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { JobGraph jobGraph = env.getStreamGraph().getJobGraph(); jobId = jobGraph.getJobID(); - Future<TestingJobManagerMessages.JobStatusIs> failedFuture = cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.FAILED), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)); + CompletableFuture<TestingJobManagerMessages.JobStatusIs> failedFuture = FutureUtils.toJava( + cluster.getLeaderGateway(deadline.timeLeft()) + .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.FAILED), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class))); cluster.submitJobDetached(jobGraph); - TestingJobManagerMessages.JobStatusIs jobStatus = Await.result(failedFuture, deadline.timeLeft()); + TestingJobManagerMessages.JobStatusIs jobStatus = + failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); assertEquals(JobStatus.FAILED, jobStatus.state()); // Get the job and check the cause - JobManagerMessages.JobFound jobFound = Await.result( + JobManagerMessages.JobFound jobFound = FutureUtils.toJava( cluster.getLeaderGateway(deadline.timeLeft()) .ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)), - deadline.timeLeft()); + .mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class))) + .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString(); @@ -338,10 +346,10 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { } finally { // Free cluster resources if (jobId != null) { - Future<CancellationSuccess> cancellation = cluster + scala.concurrent.Future<CancellationSuccess> cancellation = cluster .getLeaderGateway(deadline.timeLeft()) .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)); + .mapTo(ClassTag$.MODULE$.<JobManagerMessages.CancellationSuccess>apply(JobManagerMessages.CancellationSuccess.class)); Await.ready(cancellation, deadline.timeLeft()); } @@ -359,9 +367,11 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { // Config final Deadline deadline = TEST_TIMEOUT.fromNow(); - final int numElements = 1024; + final long numElements = 1024L; - final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); + final QueryableStateClient client = new QueryableStateClient( + "localhost", + cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT)); JobID jobId = null; try { @@ -371,7 +381,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { // Very important, because cluster is shared between tests and we // don't explicitly check that all slots are available before // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); DataStream<Tuple2<Integer, Long>> source = env .addSource(new TestAscendingValueSource(numElements)); @@ -381,15 +391,14 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { "any", source.getType()); - QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState = - source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { - private static final long serialVersionUID = 7662520075515707428L; + source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { + private static final long serialVersionUID = 7662520075515707428L; - @Override - public Integer getKey(Tuple2<Integer, Long> value) throws Exception { - return value.f0; - } - }).asQueryableState("hakuna", valueState); + @Override + public Integer getKey(Tuple2<Integer, Long> value) throws Exception { + return value.f0; + } + }).asQueryableState("hakuna", valueState); // Submit the job graph JobGraph jobGraph = env.getStreamGraph().getJobGraph(); @@ -397,22 +406,19 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { cluster.submitJobDetached(jobGraph); - // Now query - long expected = numElements; - - executeQuery(deadline, client, jobId, "hakuna", valueState, expected); + executeQuery(deadline, client, jobId, "hakuna", valueState, numElements); } finally { // Free cluster resources if (jobId != null) { - Future<CancellationSuccess> cancellation = cluster + CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster .getLeaderGateway(deadline.timeLeft()) .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)); + .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class))); - Await.ready(cancellation, deadline.timeLeft()); + cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); } - client.shutDown(); + client.shutdown(); } } @@ -425,9 +431,11 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { // Config final Deadline deadline = TEST_TIMEOUT.fromNow(); - final int numElements = 1024; + final long numElements = 1024L; - final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); + final QueryableStateClient client = new QueryableStateClient( + "localhost", + cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT)); JobID jobId = null; try { @@ -437,7 +445,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { // Very important, because cluster is shared between tests and we // don't explicitly check that all slots are available before // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); DataStream<Tuple2<Integer, Long>> source = env .addSource(new TestAscendingValueSource(numElements)); @@ -481,15 +489,15 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { } finally { // Free cluster resources if (jobId != null) { - Future<CancellationSuccess> cancellation = cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)); + CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster + .getLeaderGateway(deadline.timeLeft()) + .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class))); - Await.ready(cancellation, deadline.timeLeft()); + cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); } - client.shutDown(); + client.shutdown(); } } @@ -508,23 +516,25 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { for (int key = 0; key < maxParallelism; key++) { boolean success = false; while (deadline.hasTimeLeft() && !success) { - Future<Tuple2<Integer, Long>> future = getKvStateWithRetries(client, - jobId, - queryableStateName, - key, - BasicTypeInfo.INT_TYPE_INFO, - stateDescriptor, - QUERY_RETRY_DELAY, - false); + CompletableFuture<Tuple2<Integer, Long>> future = getKvStateWithRetries( + client, + jobId, + queryableStateName, + key, + BasicTypeInfo.INT_TYPE_INFO, + stateDescriptor, + QUERY_RETRY_DELAY, + false, + executor); - Tuple2<Integer, Long> value = Await.result(future, deadline.timeLeft()); + Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); assertEquals("Key mismatch", key, value.f0.intValue()); if (expected == value.f1) { success = true; } else { // Retry - Thread.sleep(50); + Thread.sleep(50L); } } @@ -554,16 +564,17 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { BasicTypeInfo.INT_TYPE_INFO, valueSerializer, QUERY_RETRY_DELAY, - false); + false, + executor); - Tuple2<Integer, Long> value = Await.result(future, deadline.timeLeft()); + Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); assertEquals("Key mismatch", key, value.f0.intValue()); if (expected == value.f1) { success = true; } else { // Retry - Thread.sleep(50); + Thread.sleep(50L); } } @@ -575,20 +586,21 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { * Tests simple value state queryable state instance with a default value * set. Each source emits (subtaskIndex, 0)..(subtaskIndex, numElements) * tuples, the key is mapped to 1 but key 0 is queried which should throw - * a {@link UnknownKeyOrNamespace} exception. + * a {@link UnknownKeyOrNamespaceException} exception. * - * @throws UnknownKeyOrNamespace thrown due querying a non-existent key + * @throws UnknownKeyOrNamespaceException thrown due querying a non-existent key */ - @Test(expected = UnknownKeyOrNamespace.class) - public void testValueStateDefault() throws - Exception, UnknownKeyOrNamespace { + @Test(expected = UnknownKeyOrNamespaceException.class) + public void testValueStateDefault() throws Throwable { // Config final Deadline deadline = TEST_TIMEOUT.fromNow(); - final int numElements = 1024; + final long numElements = 1024L; - final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); + final QueryableStateClient client = new QueryableStateClient( + "localhost", + cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT)); JobID jobId = null; try { @@ -600,7 +612,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { // don't explicitly check that all slots are available before // submitting. env.setRestartStrategy(RestartStrategies - .fixedDelayRestart(Integer.MAX_VALUE, 1000)); + .fixedDelayRestart(Integer.MAX_VALUE, 1000L)); DataStream<Tuple2<Integer, Long>> source = env .addSource(new TestAscendingValueSource(numElements)); @@ -635,30 +647,37 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { // Now query int key = 0; - Future<Tuple2<Integer, Long>> future = getKvStateWithRetries(client, - jobId, - queryableState.getQueryableStateName(), - key, - BasicTypeInfo.INT_TYPE_INFO, - valueState, - QUERY_RETRY_DELAY, - true); - - Await.result(future, deadline.timeLeft()); + CompletableFuture<Tuple2<Integer, Long>> future = getKvStateWithRetries( + client, + jobId, + queryableState.getQueryableStateName(), + key, + BasicTypeInfo.INT_TYPE_INFO, + valueState, + QUERY_RETRY_DELAY, + true, + executor); + + try { + future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + } catch (ExecutionException | CompletionException e) { + // get() on a completedExceptionally future wraps the + // exception in an ExecutionException. + throw e.getCause(); + } } finally { + // Free cluster resources if (jobId != null) { - Future<CancellationSuccess> cancellation = cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), - deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply( - CancellationSuccess.class)); + CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster + .getLeaderGateway(deadline.timeLeft()) + .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class))); - Await.ready(cancellation, deadline.timeLeft()); + cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); } - client.shutDown(); + client.shutdown(); } } @@ -675,9 +694,11 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { // Config final Deadline deadline = TEST_TIMEOUT.fromNow(); - final int numElements = 1024; + final long numElements = 1024L; - final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); + final QueryableStateClient client = new QueryableStateClient( + "localhost", + cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT)); JobID jobId = null; try { @@ -687,7 +708,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { // Very important, because cluster is shared between tests and we // don't explicitly check that all slots are available before // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); DataStream<Tuple2<Integer, Long>> source = env .addSource(new TestAscendingValueSource(numElements)); @@ -709,23 +730,21 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { cluster.submitJobDetached(jobGraph); - // Now query - long expected = numElements; - executeQuery(deadline, client, jobId, "matata", - queryableState.getValueSerializer(), expected); + queryableState.getValueSerializer(), numElements); } finally { + // Free cluster resources if (jobId != null) { - Future<CancellationSuccess> cancellation = cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)); + CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava( + cluster.getLeaderGateway(deadline.timeLeft()) + .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class))); - Await.ready(cancellation, deadline.timeLeft()); + cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); } - client.shutDown(); + client.shutdown(); } } @@ -743,7 +762,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { final int numElements = 1024; - final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); + final QueryableStateClient client = new QueryableStateClient( + "localhost", + cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT)); JobID jobId = null; try { @@ -788,21 +809,23 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { for (int key = 0; key < maxParallelism; key++) { boolean success = false; while (deadline.hasTimeLeft() && !success) { - Future<String> future = getKvStateWithRetries(client, + CompletableFuture<String> future = getKvStateWithRetries( + client, jobId, queryableState.getQueryableStateName(), key, BasicTypeInfo.INT_TYPE_INFO, foldingState, QUERY_RETRY_DELAY, - false); + false, + executor); - String value = Await.result(future, deadline.timeLeft()); + String value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); if (expected.equals(value)) { success = true; } else { // Retry - Thread.sleep(50); + Thread.sleep(50L); } } @@ -811,15 +834,15 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { } finally { // Free cluster resources if (jobId != null) { - Future<CancellationSuccess> cancellation = cluster + CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster .getLeaderGateway(deadline.timeLeft()) .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)); + .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class))); - Await.ready(cancellation, deadline.timeLeft()); + cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); } - client.shutDown(); + client.shutdown(); } } @@ -834,9 +857,11 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { // Config final Deadline deadline = TEST_TIMEOUT.fromNow(); - final int numElements = 1024; + final long numElements = 1024L; - final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); + final QueryableStateClient client = new QueryableStateClient( + "localhost", + cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT)); JobID jobId = null; try { @@ -858,15 +883,14 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { new SumReduce(), source.getType()); - QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState = - source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { - private static final long serialVersionUID = 8470749712274833552L; + source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { + private static final long serialVersionUID = 8470749712274833552L; - @Override - public Integer getKey(Tuple2<Integer, Long> value) throws Exception { - return value.f0; - } - }).asQueryableState("jungle", reducingState); + @Override + public Integer getKey(Tuple2<Integer, Long> value) throws Exception { + return value.f0; + } + }).asQueryableState("jungle", reducingState); // Submit the job graph JobGraph jobGraph = env.getStreamGraph().getJobGraph(); @@ -877,117 +901,24 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { // Wait until job is running // Now query - long expected = numElements * (numElements + 1) / 2; + long expected = numElements * (numElements + 1L) / 2L; executeQuery(deadline, client, jobId, "jungle", reducingState, expected); } finally { // Free cluster resources if (jobId != null) { - Future<CancellationSuccess> cancellation = cluster + CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster .getLeaderGateway(deadline.timeLeft()) .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)); + .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class))); - Await.ready(cancellation, deadline.timeLeft()); + cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); } - client.shutDown(); + client.shutdown(); } } - private static <K, V> Future<V> getKvStateWithRetries( - final QueryableStateClient client, - final JobID jobId, - final String queryName, - final K key, - final TypeInformation<K> keyTypeInfo, - final TypeSerializer<V> valueTypeSerializer, - final FiniteDuration retryDelay, - final boolean failForUnknownKeyOrNamespace) { - - return client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, valueTypeSerializer) - .recoverWith(new Recover<Future<V>>() { - @Override - public Future<V> recover(Throwable failure) throws Throwable { - if (failure instanceof AssertionError) { - return Futures.failed(failure); - } else if (failForUnknownKeyOrNamespace && - (failure instanceof UnknownKeyOrNamespace)) { - return Futures.failed(failure); - } else { - // At startup some failures are expected - // due to races. Make sure that they don't - // fail this test. - return Patterns.after( - retryDelay, - testActorSystem.scheduler(), - testActorSystem.dispatcher(), - new Callable<Future<V>>() { - @Override - public Future<V> call() throws Exception { - return getKvStateWithRetries( - client, - jobId, - queryName, - key, - keyTypeInfo, - valueTypeSerializer, - retryDelay, - failForUnknownKeyOrNamespace); - } - }); - } - } - }, testActorSystem.dispatcher()); - - } - - private static <K, V> Future<V> getKvStateWithRetries( - final QueryableStateClient client, - final JobID jobId, - final String queryName, - final K key, - final TypeInformation<K> keyTypeInfo, - final StateDescriptor<?, V> stateDescriptor, - final FiniteDuration retryDelay, - final boolean failForUnknownKeyOrNamespace) { - - return client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor) - .recoverWith(new Recover<Future<V>>() { - @Override - public Future<V> recover(Throwable failure) throws Throwable { - if (failure instanceof AssertionError) { - return Futures.failed(failure); - } else if (failForUnknownKeyOrNamespace && - (failure instanceof UnknownKeyOrNamespace)) { - return Futures.failed(failure); - } else { - // At startup some failures are expected - // due to races. Make sure that they don't - // fail this test. - return Patterns.after( - retryDelay, - testActorSystem.scheduler(), - testActorSystem.dispatcher(), - new Callable<Future<V>>() { - @Override - public Future<V> call() throws Exception { - return getKvStateWithRetries( - client, - jobId, - queryName, - key, - keyTypeInfo, - stateDescriptor, - retryDelay, - failForUnknownKeyOrNamespace); - } - }); - } - } - }, testActorSystem.dispatcher()); - } - /** * Test source producing (key, 0)..(key, maxValue) with key being the sub * task index. @@ -1030,7 +961,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { while (isRunning) { synchronized (this) { - this.wait(); + wait(); } } } @@ -1040,7 +971,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { isRunning = false; synchronized (this) { - this.notifyAll(); + notifyAll(); } } @@ -1125,4 +1056,105 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { } } + ///// General Utility Methods ////// + + private static <K, V> Future<V> getKvStateWithRetries( + final QueryableStateClient client, + final JobID jobId, + final String queryName, + final K key, + final TypeInformation<K> keyTypeInfo, + final TypeSerializer<V> valueTypeSerializer, + final Time retryDelay, + final boolean failForUnknownKeyOrNamespace, + final ScheduledExecutor executor) { + + return retryWithDelay( + () -> client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, valueTypeSerializer), + NO_OF_RETRIES, + retryDelay, + executor, + failForUnknownKeyOrNamespace); + } + + private static <K, V> CompletableFuture<V> getKvStateWithRetries( + final QueryableStateClient client, + final JobID jobId, + final String queryName, + final K key, + final TypeInformation<K> keyTypeInfo, + final StateDescriptor<?, V> stateDescriptor, + final Time retryDelay, + final boolean failForUnknownKeyOrNamespace, + final ScheduledExecutor executor) { + return retryWithDelay( + () -> client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor), + NO_OF_RETRIES, + retryDelay, + executor, + failForUnknownKeyOrNamespace); + } + + private static <T> CompletableFuture<T> retryWithDelay( + final Supplier<CompletableFuture<T>> operation, + final int retries, + final Time retryDelay, + final ScheduledExecutor scheduledExecutor, + final boolean failIfUnknownKeyOrNamespace) { + + final CompletableFuture<T> resultFuture = new CompletableFuture<>(); + + retryWithDelay( + resultFuture, + operation, + retries, + retryDelay, + scheduledExecutor, + failIfUnknownKeyOrNamespace); + + return resultFuture; + } + + public static <T> void retryWithDelay( + final CompletableFuture<T> resultFuture, + final Supplier<CompletableFuture<T>> operation, + final int retries, + final Time retryDelay, + final ScheduledExecutor scheduledExecutor, + final boolean failIfUnknownKeyOrNamespace) { + + if (!resultFuture.isDone()) { + final CompletableFuture<T> operationResultFuture = operation.get(); + operationResultFuture.whenCompleteAsync( + (t, throwable) -> { + if (throwable != null) { + if (throwable.getCause() instanceof CancellationException) { + resultFuture.completeExceptionally(new FutureUtils.RetryException("Operation future was cancelled.", throwable.getCause())); + } else if (throwable.getCause() instanceof AssertionError || + (failIfUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException)) { + resultFuture.completeExceptionally(throwable.getCause()); + } else { + if (retries > 0) { + final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule( + () -> retryWithDelay(resultFuture, operation, retries - 1, retryDelay, scheduledExecutor, failIfUnknownKeyOrNamespace), + retryDelay.toMilliseconds(), + TimeUnit.MILLISECONDS); + + resultFuture.whenComplete( + (innerT, innerThrowable) -> scheduledFuture.cancel(false)); + } else { + resultFuture.completeExceptionally(new FutureUtils.RetryException("Could not complete the operation. Number of retries " + + "has been exhausted.", throwable)); + } + } + } else { + resultFuture.complete(t); + } + }, + scheduledExecutor); + + resultFuture.whenComplete( + (t, throwable) -> operationResultFuture.cancel(false)); + } + } }
http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java index 15a5ff6..a2a9678 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java @@ -22,7 +22,6 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.QueryableStateOptions; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.testingUtils.TestingCluster; @@ -40,7 +39,7 @@ import static org.junit.Assert.fail; public abstract class HAAbstractQueryableStateITCase extends AbstractQueryableStateITCase { private static final int NUM_JMS = 2; - private static final int NUM_TMS = 4; + private static final int NUM_TMS = 1; private static final int NUM_SLOTS_PER_TM = 4; private static TestingServer zkServer; @@ -67,8 +66,6 @@ public abstract class HAAbstractQueryableStateITCase extends AbstractQueryableSt cluster = new TestingCluster(config, false); cluster.start(); - testActorSystem = AkkaUtils.createDefaultActorSystem(); - // verify that we are in HA mode Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.ZOOKEEPER); @@ -85,9 +82,6 @@ public abstract class HAAbstractQueryableStateITCase extends AbstractQueryableSt cluster.awaitTermination(); } - testActorSystem.shutdown(); - testActorSystem.awaitTermination(); - try { zkServer.stop(); zkServer.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java index c52acc8..1173d0d 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java @@ -22,7 +22,6 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.QueryableStateOptions; import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.testingUtils.TestingCluster; @@ -37,7 +36,7 @@ import static org.junit.Assert.fail; */ public abstract class NonHAAbstractQueryableStateITCase extends AbstractQueryableStateITCase { - private static final int NUM_TMS = 2; + private static final int NUM_TMS = 1; private static final int NUM_SLOTS_PER_TM = 4; @BeforeClass @@ -48,14 +47,13 @@ public abstract class NonHAAbstractQueryableStateITCase extends AbstractQueryabl config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1); + config.setInteger(QueryableStateOptions.SERVER_PORT, 9069); config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true); config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1); cluster = new TestingCluster(config, false); cluster.start(true); - testActorSystem = AkkaUtils.createDefaultActorSystem(); - // verify that we are not in HA mode Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.NONE); @@ -73,9 +71,5 @@ public abstract class NonHAAbstractQueryableStateITCase extends AbstractQueryabl e.printStackTrace(); fail(e.getMessage()); } - - if (testActorSystem != null) { - testActorSystem.shutdown(); - } } } http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java deleted file mode 100644 index d9a41a1..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java +++ /dev/null @@ -1,399 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.network; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.queryablestate.UnknownJobManager; -import org.apache.flink.queryablestate.client.AkkaKvStateLocationLookupService; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.akka.FlinkUntypedActor; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; -import org.apache.flink.runtime.query.KvStateLocation; -import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.TestLogger; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.actor.Status; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.util.ArrayDeque; -import java.util.Queue; -import java.util.UUID; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * Tests for {@link AkkaKvStateLocationLookupService}. - */ -public class AkkaKvStateLocationLookupServiceTest extends TestLogger { - - /** The default timeout. */ - private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS); - - /** Test actor system shared between the tests. */ - private static ActorSystem testActorSystem; - - @BeforeClass - public static void setUp() throws Exception { - testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); - } - - @AfterClass - public static void tearDown() throws Exception { - if (testActorSystem != null) { - testActorSystem.shutdown(); - } - } - - /** - * Tests responses if no leader notification has been reported or leadership - * has been lost (leaderAddress = <code>null</code>). - */ - @Test - public void testNoJobManagerRegistered() throws Exception { - TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( - null, - null); - Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>(); - - AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService( - leaderRetrievalService, - testActorSystem, - TIMEOUT, - new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory()); - - lookupService.start(); - - // - // No leader registered initially => fail with UnknownJobManager - // - try { - JobID jobId = new JobID(); - String name = "coffee"; - - Future<KvStateLocation> locationFuture = lookupService.getKvStateLookupInfo(jobId, name); - - Await.result(locationFuture, TIMEOUT); - fail("Did not throw expected Exception"); - } catch (UnknownJobManager ignored) { - // Expected - } - - assertEquals("Received unexpected lookup", 0, received.size()); - - // - // Leader registration => communicate with new leader - // - UUID leaderSessionId = HighAvailabilityServices.DEFAULT_LEADER_ID; - KvStateLocation expected = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "tea"); - - ActorRef testActor = LookupResponseActor.create(received, leaderSessionId, expected); - - String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor); - - // Notify the service about a leader - leaderRetrievalService.notifyListener(testActorAddress, leaderSessionId); - - JobID jobId = new JobID(); - String name = "tea"; - - // Verify that the leader response is handled - KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(jobId, name), TIMEOUT); - assertEquals(expected, location); - - // Verify that the correct message was sent to the leader - assertEquals(1, received.size()); - - verifyLookupMsg(received.poll(), jobId, name); - - // - // Leader loss => fail with UnknownJobManager - // - leaderRetrievalService.notifyListener(null, null); - - try { - Future<KvStateLocation> locationFuture = lookupService - .getKvStateLookupInfo(new JobID(), "coffee"); - - Await.result(locationFuture, TIMEOUT); - fail("Did not throw expected Exception"); - } catch (UnknownJobManager ignored) { - // Expected - } - - // No new messages received - assertEquals(0, received.size()); - } - - /** - * Tests that messages are properly decorated with the leader session ID. - */ - @Test - public void testLeaderSessionIdChange() throws Exception { - TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( - "localhost", - HighAvailabilityServices.DEFAULT_LEADER_ID); - Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>(); - - AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService( - leaderRetrievalService, - testActorSystem, - TIMEOUT, - new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory()); - - lookupService.start(); - - // Create test actors with random leader session IDs - KvStateLocation expected1 = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "salt"); - UUID leaderSessionId1 = UUID.randomUUID(); - ActorRef testActor1 = LookupResponseActor.create(received, leaderSessionId1, expected1); - String testActorAddress1 = AkkaUtils.getAkkaURL(testActorSystem, testActor1); - - KvStateLocation expected2 = new KvStateLocation(new JobID(), new JobVertexID(), 22321, "pepper"); - UUID leaderSessionId2 = UUID.randomUUID(); - ActorRef testActor2 = LookupResponseActor.create(received, leaderSessionId1, expected2); - String testActorAddress2 = AkkaUtils.getAkkaURL(testActorSystem, testActor2); - - JobID jobId = new JobID(); - - // - // Notify about first leader - // - leaderRetrievalService.notifyListener(testActorAddress1, leaderSessionId1); - - KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(jobId, "rock"), TIMEOUT); - assertEquals(expected1, location); - - assertEquals(1, received.size()); - verifyLookupMsg(received.poll(), jobId, "rock"); - - // - // Notify about second leader - // - leaderRetrievalService.notifyListener(testActorAddress2, leaderSessionId2); - - location = Await.result(lookupService.getKvStateLookupInfo(jobId, "roll"), TIMEOUT); - assertEquals(expected2, location); - - assertEquals(1, received.size()); - verifyLookupMsg(received.poll(), jobId, "roll"); - } - - /** - * Tests that lookups are retried when no leader notification is available. - */ - @Test - public void testRetryOnUnknownJobManager() throws Exception { - final Queue<AkkaKvStateLocationLookupService.LookupRetryStrategy> retryStrategies = new LinkedBlockingQueue<>(); - - AkkaKvStateLocationLookupService.LookupRetryStrategyFactory retryStrategy = - new AkkaKvStateLocationLookupService.LookupRetryStrategyFactory() { - @Override - public AkkaKvStateLocationLookupService.LookupRetryStrategy createRetryStrategy() { - return retryStrategies.poll(); - } - }; - - final TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( - null, - null); - - AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService( - leaderRetrievalService, - testActorSystem, - TIMEOUT, - retryStrategy); - - lookupService.start(); - - // - // Test call to retry - // - final AtomicBoolean hasRetried = new AtomicBoolean(); - retryStrategies.add( - new AkkaKvStateLocationLookupService.LookupRetryStrategy() { - @Override - public FiniteDuration getRetryDelay() { - return FiniteDuration.Zero(); - } - - @Override - public boolean tryRetry() { - if (hasRetried.compareAndSet(false, true)) { - return true; - } - return false; - } - }); - - Future<KvStateLocation> locationFuture = lookupService.getKvStateLookupInfo(new JobID(), "yessir"); - - Await.ready(locationFuture, TIMEOUT); - assertTrue("Did not retry ", hasRetried.get()); - - // - // Test leader notification after retry - // - Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>(); - - KvStateLocation expected = new KvStateLocation(new JobID(), new JobVertexID(), 12122, "garlic"); - ActorRef testActor = LookupResponseActor.create(received, null, expected); - final String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor); - - retryStrategies.add(new AkkaKvStateLocationLookupService.LookupRetryStrategy() { - @Override - public FiniteDuration getRetryDelay() { - return FiniteDuration.apply(100, TimeUnit.MILLISECONDS); - } - - @Override - public boolean tryRetry() { - leaderRetrievalService.notifyListener(testActorAddress, HighAvailabilityServices.DEFAULT_LEADER_ID); - return true; - } - }); - - KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(new JobID(), "yessir"), TIMEOUT); - assertEquals(expected, location); - } - - @Test - public void testUnexpectedResponseType() throws Exception { - TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( - "localhost", - HighAvailabilityServices.DEFAULT_LEADER_ID); - Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>(); - - AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService( - leaderRetrievalService, - testActorSystem, - TIMEOUT, - new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory()); - - lookupService.start(); - - // Create test actors with random leader session IDs - String expected = "unexpected-response-type"; - ActorRef testActor = LookupResponseActor.create(received, null, expected); - String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor); - - leaderRetrievalService.notifyListener(testActorAddress, null); - - try { - Await.result(lookupService.getKvStateLookupInfo(new JobID(), "spicy"), TIMEOUT); - fail("Did not throw expected Exception"); - } catch (Throwable ignored) { - // Expected - } - } - - private static final class LookupResponseActor extends FlinkUntypedActor { - - /** Received lookup messages. */ - private final Queue<LookupKvStateLocation> receivedLookups; - - /** Responses on KvStateMessage.LookupKvStateLocation messages. */ - private final Queue<Object> lookupResponses; - - /** The leader session ID. */ - private UUID leaderSessionId; - - public LookupResponseActor( - Queue<LookupKvStateLocation> receivedLookups, - UUID leaderSessionId, Object... lookupResponses) { - - this.receivedLookups = Preconditions.checkNotNull(receivedLookups, "Received lookups"); - this.leaderSessionId = leaderSessionId; - this.lookupResponses = new ArrayDeque<>(); - - if (lookupResponses != null) { - for (Object resp : lookupResponses) { - this.lookupResponses.add(resp); - } - } - } - - @Override - public void handleMessage(Object message) throws Exception { - if (message instanceof LookupKvStateLocation) { - // Add to received lookups queue - receivedLookups.add((LookupKvStateLocation) message); - - Object msg = lookupResponses.poll(); - if (msg != null) { - if (msg instanceof Throwable) { - sender().tell(new Status.Failure((Throwable) msg), self()); - } else { - sender().tell(new Status.Success(msg), self()); - } - } - } else if (message instanceof UUID) { - this.leaderSessionId = (UUID) message; - } else { - LOG.debug("Received unhandled message: {}", message); - } - } - - @Override - protected UUID getLeaderSessionID() { - return leaderSessionId; - } - - private static ActorRef create( - Queue<LookupKvStateLocation> receivedLookups, - UUID leaderSessionId, - Object... lookupResponses) { - - return testActorSystem.actorOf(Props.create( - LookupResponseActor.class, - receivedLookups, - leaderSessionId, - lookupResponses)); - } - } - - private static void verifyLookupMsg( - LookupKvStateLocation lookUpMsg, - JobID expectedJobId, - String expectedName) { - - assertNotNull(lookUpMsg); - assertEquals(expectedJobId, lookUpMsg.getJobId()); - assertEquals(expectedName, lookUpMsg.getRegistrationName()); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java new file mode 100644 index 0000000..b6f855e --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java @@ -0,0 +1,784 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.queryablestate.messages.KvStateInternalRequest; +import org.apache.flink.queryablestate.messages.KvStateResponse; +import org.apache.flink.queryablestate.network.messages.MessageSerializer; +import org.apache.flink.queryablestate.network.messages.MessageType; +import org.apache.flink.queryablestate.server.KvStateServerImpl; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.query.KvStateID; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.query.KvStateServerAddress; +import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.util.NetUtils; + +import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; + +import org.junit.AfterClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.ConnectException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for {@link Client}. + */ +public class ClientTest { + + private static final Logger LOG = LoggerFactory.getLogger(ClientTest.class); + + // Thread pool for client bootstrap (shared between tests) + private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup(); + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS); + + @AfterClass + public static void tearDown() throws Exception { + if (NIO_GROUP != null) { + NIO_GROUP.shutdownGracefully(); + } + } + + /** + * Tests simple queries, of which half succeed and half fail. + */ + @Test + public void testSimpleRequests() throws Exception { + Deadline deadline = TEST_TIMEOUT.fromNow(); + AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); + + MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + + Client<KvStateInternalRequest, KvStateResponse> client = null; + Channel serverChannel = null; + + try { + client = new Client<>("Test Client", 1, serializer, stats); + + // Random result + final byte[] expected = new byte[1024]; + ThreadLocalRandom.current().nextBytes(expected); + + final LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<>(); + final AtomicReference<Channel> channel = new AtomicReference<>(); + + serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + channel.set(ctx.channel()); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + received.add((ByteBuf) msg); + } + }); + + KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel); + + long numQueries = 1024L; + + List<CompletableFuture<KvStateResponse>> futures = new ArrayList<>(); + for (long i = 0L; i < numQueries; i++) { + KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]); + futures.add(client.sendRequest(serverAddress, request)); + } + + // Respond to messages + Exception testException = new RuntimeException("Expected test Exception"); + + for (long i = 0L; i < numQueries; i++) { + ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + assertNotNull("Receive timed out", buf); + + Channel ch = channel.get(); + assertNotNull("Channel not active", ch); + + assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf)); + long requestId = MessageSerializer.getRequestId(buf); + KvStateInternalRequest deserRequest = serializer.deserializeRequest(buf); + + buf.release(); + + if (i % 2L == 0L) { + ByteBuf response = MessageSerializer.serializeResponse( + serverChannel.alloc(), + requestId, + new KvStateResponse(expected)); + + ch.writeAndFlush(response); + } else { + ByteBuf response = MessageSerializer.serializeRequestFailure( + serverChannel.alloc(), + requestId, + testException); + + ch.writeAndFlush(response); + } + } + + for (long i = 0L; i < numQueries; i++) { + + if (i % 2L == 0L) { + KvStateResponse serializedResult = futures.get((int) i).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + assertArrayEquals(expected, serializedResult.getContent()); + } else { + try { + futures.get((int) i).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + fail("Did not throw expected Exception"); + } catch (ExecutionException e) { + + if (!(e.getCause() instanceof RuntimeException)) { + fail("Did not throw expected Exception"); + } + // else expected + } + } + } + + assertEquals(numQueries, stats.getNumRequests()); + long expectedRequests = numQueries / 2L; + + // Counts can take some time to propagate + while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != expectedRequests || + stats.getNumFailed() != expectedRequests)) { + Thread.sleep(100L); + } + + assertEquals(expectedRequests, stats.getNumSuccessful()); + assertEquals(expectedRequests, stats.getNumFailed()); + } finally { + if (client != null) { + client.shutdown(); + } + + if (serverChannel != null) { + serverChannel.close(); + } + + assertEquals("Channel leak", 0L, stats.getNumConnections()); + } + } + + /** + * Tests that a request to an unavailable host is failed with ConnectException. + */ + @Test + public void testRequestUnavailableHost() throws Exception { + Deadline deadline = TEST_TIMEOUT.fromNow(); + AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); + + MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + + Client<KvStateInternalRequest, KvStateResponse> client = null; + + try { + client = new Client<>("Test Client", 1, serializer, stats); + + int availablePort = NetUtils.getAvailablePort(); + + KvStateServerAddress serverAddress = new KvStateServerAddress( + InetAddress.getLocalHost(), + availablePort); + + KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]); + CompletableFuture<KvStateResponse> future = client.sendRequest(serverAddress, request); + + try { + future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + fail("Did not throw expected ConnectException"); + } catch (ExecutionException e) { + if (!(e.getCause() instanceof ConnectException)) { + fail("Did not throw expected ConnectException"); + } + // else expected + } + } finally { + if (client != null) { + client.shutdown(); + } + + assertEquals("Channel leak", 0L, stats.getNumConnections()); + } + } + + /** + * Multiple threads concurrently fire queries. + */ + @Test + public void testConcurrentQueries() throws Exception { + Deadline deadline = TEST_TIMEOUT.fromNow(); + AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); + + final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + + ExecutorService executor = null; + Client<KvStateInternalRequest, KvStateResponse> client = null; + Channel serverChannel = null; + + final byte[] serializedResult = new byte[1024]; + ThreadLocalRandom.current().nextBytes(serializedResult); + + try { + int numQueryTasks = 4; + final int numQueriesPerTask = 1024; + + executor = Executors.newFixedThreadPool(numQueryTasks); + + client = new Client<>("Test Client", 1, serializer, stats); + + serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ByteBuf buf = (ByteBuf) msg; + assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf)); + long requestId = MessageSerializer.getRequestId(buf); + KvStateInternalRequest request = serializer.deserializeRequest(buf); + + buf.release(); + + KvStateResponse response = new KvStateResponse(serializedResult); + ByteBuf serResponse = MessageSerializer.serializeResponse( + ctx.alloc(), + requestId, + response); + + ctx.channel().writeAndFlush(serResponse); + } + }); + + final KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel); + + final Client<KvStateInternalRequest, KvStateResponse> finalClient = client; + Callable<List<CompletableFuture<KvStateResponse>>> queryTask = () -> { + List<CompletableFuture<KvStateResponse>> results = new ArrayList<>(numQueriesPerTask); + + for (int i = 0; i < numQueriesPerTask; i++) { + KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]); + results.add(finalClient.sendRequest(serverAddress, request)); + } + + return results; + }; + + // Submit query tasks + List<Future<List<CompletableFuture<KvStateResponse>>>> futures = new ArrayList<>(); + for (int i = 0; i < numQueryTasks; i++) { + futures.add(executor.submit(queryTask)); + } + + // Verify results + for (Future<List<CompletableFuture<KvStateResponse>>> future : futures) { + List<CompletableFuture<KvStateResponse>> results = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + for (CompletableFuture<KvStateResponse> result : results) { + KvStateResponse actual = result.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + assertArrayEquals(serializedResult, actual.getContent()); + } + } + + int totalQueries = numQueryTasks * numQueriesPerTask; + + // Counts can take some time to propagate + while (deadline.hasTimeLeft() && stats.getNumSuccessful() != totalQueries) { + Thread.sleep(100L); + } + + assertEquals(totalQueries, stats.getNumRequests()); + assertEquals(totalQueries, stats.getNumSuccessful()); + } finally { + if (executor != null) { + executor.shutdown(); + } + + if (serverChannel != null) { + serverChannel.close(); + } + + if (client != null) { + client.shutdown(); + } + + assertEquals("Channel leak", 0L, stats.getNumConnections()); + } + } + + /** + * Tests that a server failure closes the connection and removes it from + * the established connections. + */ + @Test + public void testFailureClosesChannel() throws Exception { + Deadline deadline = TEST_TIMEOUT.fromNow(); + AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); + + final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + + Client<KvStateInternalRequest, KvStateResponse> client = null; + Channel serverChannel = null; + + try { + client = new Client<>("Test Client", 1, serializer, stats); + + final LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<>(); + final AtomicReference<Channel> channel = new AtomicReference<>(); + + serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + channel.set(ctx.channel()); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + received.add((ByteBuf) msg); + } + }); + + KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel); + + // Requests + List<Future<KvStateResponse>> futures = new ArrayList<>(); + KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]); + + futures.add(client.sendRequest(serverAddress, request)); + futures.add(client.sendRequest(serverAddress, request)); + + ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + assertNotNull("Receive timed out", buf); + buf.release(); + + buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + assertNotNull("Receive timed out", buf); + buf.release(); + + assertEquals(1L, stats.getNumConnections()); + + Channel ch = channel.get(); + assertNotNull("Channel not active", ch); + + // Respond with failure + ch.writeAndFlush(MessageSerializer.serializeServerFailure( + serverChannel.alloc(), + new RuntimeException("Expected test server failure"))); + + try { + futures.remove(0).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + fail("Did not throw expected server failure"); + } catch (ExecutionException e) { + + if (!(e.getCause() instanceof RuntimeException)) { + fail("Did not throw expected Exception"); + } + // Expected + } + + try { + futures.remove(0).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + fail("Did not throw expected server failure"); + } catch (ExecutionException e) { + + if (!(e.getCause() instanceof RuntimeException)) { + fail("Did not throw expected Exception"); + } + // Expected + } + + assertEquals(0L, stats.getNumConnections()); + + // Counts can take some time to propagate + while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0L || stats.getNumFailed() != 2L)) { + Thread.sleep(100L); + } + + assertEquals(2L, stats.getNumRequests()); + assertEquals(0L, stats.getNumSuccessful()); + assertEquals(2L, stats.getNumFailed()); + } finally { + if (client != null) { + client.shutdown(); + } + + if (serverChannel != null) { + serverChannel.close(); + } + + assertEquals("Channel leak", 0L, stats.getNumConnections()); + } + } + + /** + * Tests that a server channel close, closes the connection and removes it + * from the established connections. + */ + @Test + public void testServerClosesChannel() throws Exception { + Deadline deadline = TEST_TIMEOUT.fromNow(); + AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); + + final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + + Client<KvStateInternalRequest, KvStateResponse> client = null; + Channel serverChannel = null; + + try { + client = new Client<>("Test Client", 1, serializer, stats); + + final AtomicBoolean received = new AtomicBoolean(); + final AtomicReference<Channel> channel = new AtomicReference<>(); + + serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + channel.set(ctx.channel()); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + received.set(true); + } + }); + + KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel); + + // Requests + KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]); + Future<KvStateResponse> future = client.sendRequest(serverAddress, request); + + while (!received.get() && deadline.hasTimeLeft()) { + Thread.sleep(50L); + } + assertTrue("Receive timed out", received.get()); + + assertEquals(1, stats.getNumConnections()); + + channel.get().close().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + + try { + future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + fail("Did not throw expected server failure"); + } catch (ExecutionException e) { + if (!(e.getCause() instanceof ClosedChannelException)) { + fail("Did not throw expected Exception"); + } + // Expected + } + + assertEquals(0L, stats.getNumConnections()); + + // Counts can take some time to propagate + while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0L || stats.getNumFailed() != 1L)) { + Thread.sleep(100L); + } + + assertEquals(1L, stats.getNumRequests()); + assertEquals(0L, stats.getNumSuccessful()); + assertEquals(1L, stats.getNumFailed()); + } finally { + if (client != null) { + client.shutdown(); + } + + if (serverChannel != null) { + serverChannel.close(); + } + + assertEquals("Channel leak", 0L, stats.getNumConnections()); + } + } + + /** + * Tests multiple clients querying multiple servers until 100k queries have + * been processed. At this point, the client is shut down and its verified + * that all ongoing requests are failed. + */ + @Test + public void testClientServerIntegration() throws Exception { + // Config + final int numServers = 2; + final int numServerEventLoopThreads = 2; + final int numServerQueryThreads = 2; + + final int numClientEventLoopThreads = 4; + final int numClientsTasks = 8; + + final int batchSize = 16; + + final int numKeyGroups = 1; + + AbstractStateBackend abstractBackend = new MemoryStateBackend(); + KvStateRegistry dummyRegistry = new KvStateRegistry(); + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); + dummyEnv.setKvStateRegistry(dummyRegistry); + + AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( + dummyEnv, + new JobID(), + "test_op", + IntSerializer.INSTANCE, + numKeyGroups, + new KeyGroupRange(0, 0), + dummyRegistry.createTaskRegistry(new JobID(), new JobVertexID())); + + final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS); + + AtomicKvStateRequestStats clientStats = new AtomicKvStateRequestStats(); + + final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + + Client<KvStateInternalRequest, KvStateResponse> client = null; + ExecutorService clientTaskExecutor = null; + final KvStateServerImpl[] server = new KvStateServerImpl[numServers]; + + try { + client = new Client<>("Test Client", numClientEventLoopThreads, serializer, clientStats); + clientTaskExecutor = Executors.newFixedThreadPool(numClientsTasks); + + // Create state + ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); + desc.setQueryable("any"); + + // Create servers + KvStateRegistry[] registry = new KvStateRegistry[numServers]; + AtomicKvStateRequestStats[] serverStats = new AtomicKvStateRequestStats[numServers]; + final KvStateID[] ids = new KvStateID[numServers]; + + for (int i = 0; i < numServers; i++) { + registry[i] = new KvStateRegistry(); + serverStats[i] = new AtomicKvStateRequestStats(); + server[i] = new KvStateServerImpl( + InetAddress.getLocalHost(), + 0, + numServerEventLoopThreads, + numServerQueryThreads, + registry[i], + serverStats[i]); + + server[i].start(); + + backend.setCurrentKey(1010 + i); + + // Value per server + ValueState<Integer> state = backend.getPartitionedState(VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + desc); + + state.update(201 + i); + + // we know it must be a KvStat but this is not exposed to the user via State + InternalKvState<?> kvState = (InternalKvState<?>) state; + + // Register KvState (one state instance for all server) + ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState); + } + + final Client<KvStateInternalRequest, KvStateResponse> finalClient = client; + Callable<Void> queryTask = () -> { + while (true) { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + + // Random server permutation + List<Integer> random = new ArrayList<>(); + for (int j = 0; j < batchSize; j++) { + random.add(j); + } + Collections.shuffle(random); + + // Dispatch queries + List<Future<KvStateResponse>> futures = new ArrayList<>(batchSize); + + for (int j = 0; j < batchSize; j++) { + int targetServer = random.get(j) % numServers; + + byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace( + 1010 + targetServer, + IntSerializer.INSTANCE, + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE); + + KvStateInternalRequest request = new KvStateInternalRequest(ids[targetServer], serializedKeyAndNamespace); + futures.add(finalClient.sendRequest(server[targetServer].getServerAddress(), request)); + } + + // Verify results + for (int j = 0; j < batchSize; j++) { + int targetServer = random.get(j) % numServers; + + Future<KvStateResponse> future = futures.get(j); + byte[] buf = future.get(timeout.toMillis(), TimeUnit.MILLISECONDS).getContent(); + int value = KvStateSerializer.deserializeValue(buf, IntSerializer.INSTANCE); + assertEquals(201L + targetServer, value); + } + } + }; + + // Submit tasks + List<Future<Void>> taskFutures = new ArrayList<>(); + for (int i = 0; i < numClientsTasks; i++) { + taskFutures.add(clientTaskExecutor.submit(queryTask)); + } + + long numRequests; + while ((numRequests = clientStats.getNumRequests()) < 100_000L) { + Thread.sleep(100L); + LOG.info("Number of requests {}/100_000", numRequests); + } + + // Shut down + client.shutdown(); + + for (Future<Void> future : taskFutures) { + try { + future.get(); + fail("Did not throw expected Exception after shut down"); + } catch (ExecutionException t) { + if (t.getCause().getCause() instanceof ClosedChannelException || + t.getCause().getCause() instanceof IllegalStateException) { + // Expected + } else { + t.printStackTrace(); + fail("Failed with unexpected Exception type: " + t.getClass().getName()); + } + } + } + + assertEquals("Connection leak (client)", 0L, clientStats.getNumConnections()); + for (int i = 0; i < numServers; i++) { + boolean success = false; + int numRetries = 0; + while (!success) { + try { + assertEquals("Connection leak (server)", 0L, serverStats[i].getNumConnections()); + success = true; + } catch (Throwable t) { + if (numRetries < 10) { + LOG.info("Retrying connection leak check (server)"); + Thread.sleep((numRetries + 1) * 50L); + numRetries++; + } else { + throw t; + } + } + } + } + } finally { + if (client != null) { + client.shutdown(); + } + + for (int i = 0; i < numServers; i++) { + if (server[i] != null) { + server[i].shutdown(); + } + } + + if (clientTaskExecutor != null) { + clientTaskExecutor.shutdown(); + } + } + } + + // ------------------------------------------------------------------------ + + private Channel createServerChannel(final ChannelHandler... handlers) throws UnknownHostException, InterruptedException { + ServerBootstrap bootstrap = new ServerBootstrap() + // Bind address and port + .localAddress(InetAddress.getLocalHost(), 0) + // NIO server channels + .group(NIO_GROUP) + .channel(NioServerSocketChannel.class) + // See initializer for pipeline details + .childHandler(new ChannelInitializer<SocketChannel>() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline() + .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)) + .addLast(handlers); + } + }); + + return bootstrap.bind().sync().channel(); + } + + private KvStateServerAddress getKvStateServerAddress(Channel serverChannel) { + InetSocketAddress localAddress = (InetSocketAddress) serverChannel.localAddress(); + + return new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java index 0b97bda..cb490aa 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java @@ -18,8 +18,8 @@ package org.apache.flink.queryablestate.network; -import org.apache.flink.queryablestate.client.KvStateClientHandler; -import org.apache.flink.queryablestate.client.KvStateClientHandlerCallback; +import org.apache.flink.queryablestate.messages.KvStateInternalRequest; +import org.apache.flink.queryablestate.messages.KvStateResponse; import org.apache.flink.queryablestate.network.messages.MessageSerializer; import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; @@ -37,7 +37,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; /** - * Tests for {@link KvStateClientHandler}. + * Tests for {@link ClientHandler}. */ public class KvStateClientHandlerTest { @@ -47,28 +47,30 @@ public class KvStateClientHandlerTest { */ @Test public void testReadCallbacksAndBufferRecycling() throws Exception { - KvStateClientHandlerCallback callback = mock(KvStateClientHandlerCallback.class); + final ClientHandlerCallback<KvStateResponse> callback = mock(ClientHandlerCallback.class); - EmbeddedChannel channel = new EmbeddedChannel(new KvStateClientHandler(callback)); + final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + final EmbeddedChannel channel = new EmbeddedChannel(new ClientHandler<>("Test Client", serializer, callback)); + + final byte[] content = new byte[0]; + final KvStateResponse response = new KvStateResponse(content); // // Request success // - ByteBuf buf = MessageSerializer.serializeKvStateRequestResult( - channel.alloc(), - 1222112277, - new byte[0]); + ByteBuf buf = MessageSerializer.serializeResponse(channel.alloc(), 1222112277L, response); buf.skipBytes(4); // skip frame length // Verify callback channel.writeInbound(buf); - verify(callback, times(1)).onRequestResult(eq(1222112277L), any(byte[].class)); + verify(callback, times(1)).onRequestResult(eq(1222112277L), any(KvStateResponse.class)); assertEquals("Buffer not recycled", 0, buf.refCnt()); // // Request failure // - buf = MessageSerializer.serializeKvStateRequestFailure( + buf = MessageSerializer.serializeRequestFailure( channel.alloc(), 1222112278, new RuntimeException("Expected test Exception"));
