http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java deleted file mode 100644 index 3283295..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.state; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.queryablestate.client.state.ImmutableListState; -import org.apache.flink.runtime.state.heap.HeapListState; - -import org.junit.Before; -import org.junit.Test; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import static org.junit.Assert.assertEquals; - -/** - * Tests the {@link ImmutableListState}. - */ -public class ImmutableListStateTest { - - private final ListStateDescriptor<Long> listStateDesc = - new ListStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO); - - private ImmutableListState<Long> listState; - - @Before - public void setUp() throws Exception { - if (!listStateDesc.isSerializerInitialized()) { - listStateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); - } - - List<Long> init = new ArrayList<>(); - init.add(42L); - - byte[] serInit = serializeInitValue(init); - listState = ImmutableListState.createState(listStateDesc, serInit); - } - - @Test(expected = UnsupportedOperationException.class) - public void testUpdate() { - List<Long> list = getStateContents(); - assertEquals(1L, list.size()); - - long element = list.get(0); - assertEquals(42L, element); - - listState.add(54L); - } - - @Test(expected = UnsupportedOperationException.class) - public void testClear() { - List<Long> list = getStateContents(); - assertEquals(1L, list.size()); - - long element = list.get(0); - assertEquals(42L, element); - - listState.clear(); - } - - /** - * Copied from {@link HeapListState#getSerializedValue(Object, Object)}. - */ - private byte[] serializeInitValue(List<Long> toSerialize) throws IOException { - TypeSerializer<Long> serializer = listStateDesc.getElementSerializer(); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(baos); - - // write the same as RocksDB writes lists, with one ',' separator - for (int i = 0; i < toSerialize.size(); i++) { - serializer.serialize(toSerialize.get(i), view); - if (i < toSerialize.size() - 1) { - view.writeByte(','); - } - } - view.flush(); - - return baos.toByteArray(); - } - - private List<Long> getStateContents() { - List<Long> list = new ArrayList<>(); - for (Long elem: listState.get()) { - list.add(elem); - } - return list; - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java deleted file mode 100644 index 30a8a50..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.state; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.queryablestate.client.state.ImmutableMapState; -import org.apache.flink.runtime.query.netty.message.KvStateSerializer; - -import org.junit.Before; -import org.junit.Test; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Tests the {@link ImmutableMapState}. - */ -public class ImmutableMapStateTest { - - private final MapStateDescriptor<Long, Long> mapStateDesc = - new MapStateDescriptor<>( - "test", - BasicTypeInfo.LONG_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO); - - private ImmutableMapState<Long, Long> mapState; - - @Before - public void setUp() throws Exception { - if (!mapStateDesc.isSerializerInitialized()) { - mapStateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); - } - - Map<Long, Long> initMap = new HashMap<>(); - initMap.put(1L, 5L); - initMap.put(2L, 5L); - - byte[] initSer = KvStateSerializer.serializeMap( - initMap.entrySet(), - BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()), - BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig())); - - mapState = ImmutableMapState.createState(mapStateDesc, initSer); - } - - @Test(expected = UnsupportedOperationException.class) - public void testPut() { - assertTrue(mapState.contains(1L)); - long value = mapState.get(1L); - assertEquals(5L, value); - - assertTrue(mapState.contains(2L)); - value = mapState.get(2L); - assertEquals(5L, value); - - mapState.put(2L, 54L); - } - - @Test(expected = UnsupportedOperationException.class) - public void testPutAll() { - assertTrue(mapState.contains(1L)); - long value = mapState.get(1L); - assertEquals(5L, value); - - assertTrue(mapState.contains(2L)); - value = mapState.get(2L); - assertEquals(5L, value); - - Map<Long, Long> nMap = new HashMap<>(); - nMap.put(1L, 7L); - nMap.put(2L, 7L); - - mapState.putAll(nMap); - } - - @Test(expected = UnsupportedOperationException.class) - public void testUpdate() { - assertTrue(mapState.contains(1L)); - long value = mapState.get(1L); - assertEquals(5L, value); - - assertTrue(mapState.contains(2L)); - value = mapState.get(2L); - assertEquals(5L, value); - - mapState.put(2L, 54L); - } - - @Test(expected = UnsupportedOperationException.class) - public void testIterator() { - assertTrue(mapState.contains(1L)); - long value = mapState.get(1L); - assertEquals(5L, value); - - assertTrue(mapState.contains(2L)); - value = mapState.get(2L); - assertEquals(5L, value); - - Iterator<Map.Entry<Long, Long>> iterator = mapState.iterator(); - while (iterator.hasNext()) { - iterator.remove(); - } - } - - @Test(expected = UnsupportedOperationException.class) - public void testIterable() { - assertTrue(mapState.contains(1L)); - long value = mapState.get(1L); - assertEquals(5L, value); - - assertTrue(mapState.contains(2L)); - value = mapState.get(2L); - assertEquals(5L, value); - - Iterable<Map.Entry<Long, Long>> iterable = mapState.entries(); - Iterator<Map.Entry<Long, Long>> iterator = iterable.iterator(); - while (iterator.hasNext()) { - assertEquals(5L, (long) iterator.next().getValue()); - iterator.remove(); - } - } - - @Test(expected = UnsupportedOperationException.class) - public void testKeys() { - assertTrue(mapState.contains(1L)); - long value = mapState.get(1L); - assertEquals(5L, value); - - assertTrue(mapState.contains(2L)); - value = mapState.get(2L); - assertEquals(5L, value); - - Iterator<Long> iterator = mapState.keys().iterator(); - while (iterator.hasNext()) { - iterator.remove(); - } - } - - @Test(expected = UnsupportedOperationException.class) - public void testValues() { - assertTrue(mapState.contains(1L)); - long value = mapState.get(1L); - assertEquals(5L, value); - - assertTrue(mapState.contains(2L)); - value = mapState.get(2L); - assertEquals(5L, value); - - Iterator<Long> iterator = mapState.values().iterator(); - while (iterator.hasNext()) { - iterator.remove(); - } - } - - @Test(expected = UnsupportedOperationException.class) - public void testClear() { - assertTrue(mapState.contains(1L)); - long value = mapState.get(1L); - assertEquals(5L, value); - - assertTrue(mapState.contains(2L)); - value = mapState.get(2L); - assertEquals(5L, value); - - mapState.clear(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java deleted file mode 100644 index 9b1ecf8..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.state; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.state.ReducingStateDescriptor; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.queryablestate.client.state.ImmutableReducingState; - -import org.junit.Before; -import org.junit.Test; - -import java.nio.ByteBuffer; - -import static org.junit.Assert.assertEquals; - -/** - * Tests the {@link ImmutableReducingState}. - */ -public class ImmutableReducingStateTest { - - private final ReducingStateDescriptor<Long> reducingStateDesc = - new ReducingStateDescriptor<>("test", new SumReduce(), BasicTypeInfo.LONG_TYPE_INFO); - - private ImmutableReducingState<Long> reduceState; - - @Before - public void setUp() throws Exception { - if (!reducingStateDesc.isSerializerInitialized()) { - reducingStateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); - } - - reduceState = ImmutableReducingState.createState( - reducingStateDesc, - ByteBuffer.allocate(Long.BYTES).putLong(42L).array() - ); - } - - @Test(expected = UnsupportedOperationException.class) - public void testUpdate() { - long value = reduceState.get(); - assertEquals(42L, value); - - reduceState.add(54L); - } - - @Test(expected = UnsupportedOperationException.class) - public void testClear() { - long value = reduceState.get(); - assertEquals(42L, value); - - reduceState.clear(); - } - - /** - * Test {@link ReduceFunction} summing up its two arguments. - */ - private static class SumReduce implements ReduceFunction<Long> { - - private static final long serialVersionUID = 6041237513913189144L; - - @Override - public Long reduce(Long value1, Long value2) throws Exception { - return value1 + value2; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java deleted file mode 100644 index 5f7032d..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.state; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.queryablestate.client.state.ImmutableValueState; - -import org.junit.Before; -import org.junit.Test; - -import java.nio.ByteBuffer; - -import static org.junit.Assert.assertEquals; - -/** - * Tests the {@link ImmutableValueState}. - */ -public class ImmutableValueStateTest { - - private final ValueStateDescriptor<Long> valueStateDesc = - new ValueStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO); - - private ImmutableValueState<Long> valueState; - - @Before - public void setUp() throws Exception { - if (!valueStateDesc.isSerializerInitialized()) { - valueStateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); - } - - valueState = ImmutableValueState.createState( - valueStateDesc, - ByteBuffer.allocate(Long.BYTES).putLong(42L).array() - ); - } - - @Test(expected = UnsupportedOperationException.class) - public void testUpdate() { - long value = valueState.value(); - assertEquals(42L, value); - - valueState.update(54L); - } - - @Test(expected = UnsupportedOperationException.class) - public void testClear() { - long value = valueState.value(); - assertEquals(42L, value); - - valueState.clear(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties b/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties deleted file mode 100644 index 10792cd..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,31 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Set root logger level to OFF to not flood build logs -# set manually to INFO for debugging purposes -log4j.rootLogger=OFF, testlogger - -# A1 is set to be a ConsoleAppender. -log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target = System.err -log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout -log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n - -# suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR -log4j.logger.org.apache.zookeeper=OFF http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/pom.xml ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/pom.xml b/flink-queryable-state/flink-queryable-state-runtime/pom.xml new file mode 100644 index 0000000..f39498e --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-runtime/pom.xml @@ -0,0 +1,119 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-queryable-state</artifactId> + <version>1.4-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-queryable-state-runtime_${scala.binary.version}</artifactId> + <name>flink-queryable-state-runtime</name> + <packaging>jar</packaging> + + <dependencies> + + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-queryable-state-client-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <!-- =================================================== + Testing + =================================================== --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils-junit</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <version>${curator.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java new file mode 100644 index 0000000..d434336 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.proxy; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.queryablestate.KvStateID; +import org.apache.flink.queryablestate.exceptions.UnknownKvStateIdException; +import org.apache.flink.queryablestate.exceptions.UnknownKvStateKeyGroupLocationException; +import org.apache.flink.queryablestate.messages.KvStateInternalRequest; +import org.apache.flink.queryablestate.messages.KvStateRequest; +import org.apache.flink.queryablestate.messages.KvStateResponse; +import org.apache.flink.queryablestate.network.AbstractServerHandler; +import org.apache.flink.queryablestate.network.Client; +import org.apache.flink.queryablestate.network.messages.MessageSerializer; +import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats; +import org.apache.flink.queryablestate.network.stats.KvStateRequestStats; +import org.apache.flink.queryablestate.server.KvStateServerImpl; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.query.KvStateClientProxy; +import org.apache.flink.runtime.query.KvStateLocation; +import org.apache.flink.runtime.query.KvStateMessage; +import org.apache.flink.runtime.query.UnknownKvStateLocation; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import scala.concurrent.duration.FiniteDuration; +import scala.reflect.ClassTag$; + +/** + * This handler acts as an internal (to the Flink cluster) client that receives + * the requests from external clients, executes them by contacting the Job Manager (if necessary) and + * the Task Manager holding the requested state, and forwards the answer back to the client. + */ +@Internal [email protected] +public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequest, KvStateResponse> { + + private static final Logger LOG = LoggerFactory.getLogger(KvStateClientProxyHandler.class); + + /** The proxy using this handler. */ + private final KvStateClientProxy proxy; + + /** A cache to hold the location of different states for which we have already seen requests. */ + private final ConcurrentMap<Tuple2<JobID, String>, CompletableFuture<KvStateLocation>> lookupCache = + new ConcurrentHashMap<>(); + + /** + * Network client to forward queries to {@link KvStateServerImpl state server} + * instances inside the cluster. + */ + private final Client<KvStateInternalRequest, KvStateResponse> kvStateClient; + + /** + * Create the handler used by the {@link KvStateClientProxyImpl}. + * + * @param proxy the {@link KvStateClientProxyImpl proxy} using the handler. + * @param queryExecutorThreads the number of threads used to process incoming requests. + * @param serializer the {@link MessageSerializer} used to (de-) serialize the different messages. + * @param stats server statistics collector. + */ + public KvStateClientProxyHandler( + final KvStateClientProxyImpl proxy, + final int queryExecutorThreads, + final MessageSerializer<KvStateRequest, KvStateResponse> serializer, + final KvStateRequestStats stats) { + + super(proxy, serializer, stats); + this.proxy = Preconditions.checkNotNull(proxy); + this.kvStateClient = createInternalClient(queryExecutorThreads); + } + + private static Client<KvStateInternalRequest, KvStateResponse> createInternalClient(int threads) { + final MessageSerializer<KvStateInternalRequest, KvStateResponse> messageSerializer = + new MessageSerializer<>( + new KvStateInternalRequest.KvStateInternalRequestDeserializer(), + new KvStateResponse.KvStateResponseDeserializer()); + + return new Client<>( + "Queryable State Proxy Client", + threads, + messageSerializer, + new DisabledKvStateRequestStats()); + } + + @Override + public CompletableFuture<KvStateResponse> handleRequest( + final long requestId, + final KvStateRequest request) { + CompletableFuture<KvStateResponse> response = new CompletableFuture<>(); + executeActionAsync(response, request, false); + return response; + } + + private void executeActionAsync( + final CompletableFuture<KvStateResponse> result, + final KvStateRequest request, + final boolean update) { + + if (!result.isDone()) { + final CompletableFuture<KvStateResponse> operationFuture = getState(request, update); + operationFuture.whenCompleteAsync( + (t, throwable) -> { + if (throwable != null) { + if (throwable instanceof CancellationException) { + result.completeExceptionally(throwable); + } else if (throwable.getCause() instanceof UnknownKvStateIdException || + throwable.getCause() instanceof UnknownKvStateKeyGroupLocationException || + throwable.getCause() instanceof UnknownKvStateLocation || + throwable.getCause() instanceof ConnectException) { + + // These failures are likely to be caused by out-of-sync + // KvStateLocation. Therefore we retry this query and + // force look up the location. + + executeActionAsync(result, request, true); + } else { + result.completeExceptionally(throwable); + } + } else { + result.complete(t); + } + }, queryExecutor); + + result.whenComplete( + (t, throwable) -> operationFuture.cancel(false)); + } + } + + private CompletableFuture<KvStateResponse> getState( + final KvStateRequest request, + final boolean forceUpdate) { + + return getKvStateLookupInfo(request.getJobId(), request.getStateName(), forceUpdate) + .thenComposeAsync((Function<KvStateLocation, CompletableFuture<KvStateResponse>>) location -> { + final int keyGroupIndex = KeyGroupRangeAssignment.computeKeyGroupForKeyHash( + request.getKeyHashCode(), location.getNumKeyGroups()); + + final InetSocketAddress serverAddress = location.getKvStateServerAddress(keyGroupIndex); + if (serverAddress == null) { + return FutureUtils.getFailedFuture(new UnknownKvStateKeyGroupLocationException(getServerName())); + } else { + // Query server + final KvStateID kvStateId = location.getKvStateID(keyGroupIndex); + final KvStateInternalRequest internalRequest = new KvStateInternalRequest( + kvStateId, request.getSerializedKeyAndNamespace()); + return kvStateClient.sendRequest(serverAddress, internalRequest); + } + }, queryExecutor); + } + + /** + * Lookup the {@link KvStateLocation} for the given job and queryable state name. + * + * <p>The job manager will be queried for the location only if forced or no + * cached location can be found. There are no guarantees about + * + * @param jobId JobID the state instance belongs to. + * @param queryableStateName Name under which the state instance has been published. + * @param forceUpdate Flag to indicate whether to force a update via the lookup service. + * @return Future holding the KvStateLocation + */ + private CompletableFuture<KvStateLocation> getKvStateLookupInfo( + final JobID jobId, + final String queryableStateName, + final boolean forceUpdate) { + + final Tuple2<JobID, String> cacheKey = new Tuple2<>(jobId, queryableStateName); + final CompletableFuture<KvStateLocation> cachedFuture = lookupCache.get(cacheKey); + + if (!forceUpdate && cachedFuture != null && !cachedFuture.isCompletedExceptionally()) { + LOG.debug("Retrieving location for state={} of job={} from the cache.", jobId, queryableStateName); + return cachedFuture; + } + + LOG.debug("Retrieving location for state={} of job={} from the job manager.", jobId, queryableStateName); + + return proxy.getJobManagerFuture().thenComposeAsync( + jobManagerGateway -> { + final Object msg = new KvStateMessage.LookupKvStateLocation(jobId, queryableStateName); + final CompletableFuture<KvStateLocation> locationFuture = FutureUtils.toJava( + jobManagerGateway.ask(msg, FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS)) + .mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))); + + lookupCache.put(cacheKey, locationFuture); + return locationFuture; + }, queryExecutor); + } + + @Override + public void shutdown() { + kvStateClient.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java new file mode 100644 index 0000000..f473443 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.proxy; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.queryablestate.exceptions.UnknownJobManagerException; +import org.apache.flink.queryablestate.messages.KvStateRequest; +import org.apache.flink.queryablestate.messages.KvStateResponse; +import org.apache.flink.queryablestate.network.AbstractServerBase; +import org.apache.flink.queryablestate.network.AbstractServerHandler; +import org.apache.flink.queryablestate.network.messages.MessageSerializer; +import org.apache.flink.queryablestate.network.stats.KvStateRequestStats; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.query.KvStateClientProxy; +import org.apache.flink.util.Preconditions; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; + +/** + * The default implementation of the {@link KvStateClientProxy}. + */ +@Internal +public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, KvStateResponse> implements KvStateClientProxy { + + private static final CompletableFuture<ActorGateway> UNKNOWN_JOB_MANAGER = + FutureUtils.getFailedFuture(new UnknownJobManagerException()); + + /** Number of threads used to process incoming requests. */ + private final int queryExecutorThreads; + + /** Statistics collector. */ + private final KvStateRequestStats stats; + + private final Object leaderLock = new Object(); + + private CompletableFuture<ActorGateway> jobManagerFuture = UNKNOWN_JOB_MANAGER; + + /** + * Creates the Queryable State Client Proxy. + * + * <p>The server is instantiated using reflection by the + * {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateClientProxy(InetAddress, Iterator, int, int, KvStateRequestStats) + * QueryableStateUtils.createKvStateClientProxy(InetAddress, Iterator, int, int, KvStateRequestStats)}. + * + * <p>The server needs to be started via {@link #start()} in order to bind + * to the configured bind address. + * + * @param bindAddress the address to listen to. + * @param bindPortIterator the port range to try to bind to. + * @param numEventLoopThreads number of event loop threads. + * @param numQueryThreads number of query threads. + * @param stats the statistics collector. + */ + public KvStateClientProxyImpl( + final InetAddress bindAddress, + final Iterator<Integer> bindPortIterator, + final Integer numEventLoopThreads, + final Integer numQueryThreads, + final KvStateRequestStats stats) { + + super("Queryable State Proxy Server", bindAddress, bindPortIterator, numEventLoopThreads, numQueryThreads); + Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive number of query threads."); + this.queryExecutorThreads = numQueryThreads; + this.stats = Preconditions.checkNotNull(stats); + } + + @Override + public InetSocketAddress getServerAddress() { + return super.getServerAddress(); + } + + @Override + public void start() throws Throwable { + super.start(); + } + + @Override + public void shutdown() { + super.shutdown(); + } + + @Override + public void updateJobManager(CompletableFuture<ActorGateway> leadingJobManager) throws Exception { + synchronized (leaderLock) { + if (leadingJobManager == null) { + jobManagerFuture = UNKNOWN_JOB_MANAGER; + } else { + jobManagerFuture = leadingJobManager; + } + } + } + + @Override + public CompletableFuture<ActorGateway> getJobManagerFuture() { + synchronized (leaderLock) { + return jobManagerFuture; + } + } + + @Override + public AbstractServerHandler<KvStateRequest, KvStateResponse> initializeHandler() { + MessageSerializer<KvStateRequest, KvStateResponse> serializer = + new MessageSerializer<>( + new KvStateRequest.KvStateRequestDeserializer(), + new KvStateResponse.KvStateResponseDeserializer()); + return new KvStateClientProxyHandler(this, queryExecutorThreads, serializer, stats); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java new file mode 100644 index 0000000..8c8de59 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.messages; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.queryablestate.KvStateID; +import org.apache.flink.queryablestate.network.messages.MessageBody; +import org.apache.flink.queryablestate.network.messages.MessageDeserializer; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +import java.nio.ByteBuffer; + +/** + * The request to be forwarded by the {@link org.apache.flink.runtime.query.KvStateClientProxy + * Queryable State Client Proxy} to the {@link org.apache.flink.runtime.query.KvStateServer State Server} + * of the Task Manager responsible for the requested state. + */ +@Internal +public class KvStateInternalRequest extends MessageBody { + + private final KvStateID kvStateId; + private final byte[] serializedKeyAndNamespace; + + public KvStateInternalRequest( + final KvStateID stateId, + final byte[] serializedKeyAndNamespace) { + + this.kvStateId = Preconditions.checkNotNull(stateId); + this.serializedKeyAndNamespace = Preconditions.checkNotNull(serializedKeyAndNamespace); + } + + public KvStateID getKvStateId() { + return kvStateId; + } + + public byte[] getSerializedKeyAndNamespace() { + return serializedKeyAndNamespace; + } + + @Override + public byte[] serialize() { + + // KvStateId + sizeOf(serializedKeyAndNamespace) + serializedKeyAndNamespace + final int size = KvStateID.SIZE + Integer.BYTES + serializedKeyAndNamespace.length; + + return ByteBuffer.allocate(size) + .putLong(kvStateId.getLowerPart()) + .putLong(kvStateId.getUpperPart()) + .putInt(serializedKeyAndNamespace.length) + .put(serializedKeyAndNamespace) + .array(); + } + + /** + * A {@link MessageDeserializer deserializer} for {@link KvStateInternalRequest}. + */ + public static class KvStateInternalRequestDeserializer implements MessageDeserializer<KvStateInternalRequest> { + + @Override + public KvStateInternalRequest deserializeMessage(ByteBuf buf) { + KvStateID kvStateId = new KvStateID(buf.readLong(), buf.readLong()); + + int length = buf.readInt(); + Preconditions.checkArgument(length >= 0, + "Negative length for key and namespace. " + + "This indicates a serialization error."); + + byte[] serializedKeyAndNamespace = new byte[length]; + if (length > 0) { + buf.readBytes(serializedKeyAndNamespace); + } + return new KvStateInternalRequest(kvStateId, serializedKeyAndNamespace); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java new file mode 100644 index 0000000..476f153 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.server; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException; +import org.apache.flink.queryablestate.exceptions.UnknownKvStateIdException; +import org.apache.flink.queryablestate.messages.KvStateInternalRequest; +import org.apache.flink.queryablestate.messages.KvStateResponse; +import org.apache.flink.queryablestate.network.AbstractServerHandler; +import org.apache.flink.queryablestate.network.messages.MessageSerializer; +import org.apache.flink.queryablestate.network.stats.KvStateRequestStats; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; + +/** + * This handler dispatches asynchronous tasks, which query {@link InternalKvState} + * instances and write the result to the channel. + * + * <p>The network threads receive the message, deserialize it and dispatch the + * query task. The actual query is handled in a separate thread as it might + * otherwise block the network threads (file I/O etc.). + */ +@Internal [email protected] +public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalRequest, KvStateResponse> { + + private static final Logger LOG = LoggerFactory.getLogger(KvStateServerHandler.class); + + /** KvState registry holding references to the KvState instances. */ + private final KvStateRegistry registry; + + /** + * Create the handler used by the {@link KvStateServerImpl}. + * + * @param server the {@link KvStateServerImpl} using the handler. + * @param kvStateRegistry registry to query. + * @param serializer the {@link MessageSerializer} used to (de-) serialize the different messages. + * @param stats server statistics collector. + */ + public KvStateServerHandler( + final KvStateServerImpl server, + final KvStateRegistry kvStateRegistry, + final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer, + final KvStateRequestStats stats) { + + super(server, serializer, stats); + this.registry = Preconditions.checkNotNull(kvStateRegistry); + } + + @Override + public CompletableFuture<KvStateResponse> handleRequest(final long requestId, final KvStateInternalRequest request) { + final CompletableFuture<KvStateResponse> responseFuture = new CompletableFuture<>(); + + try { + final InternalKvState<?> kvState = registry.getKvState(request.getKvStateId()); + if (kvState == null) { + responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId())); + } else { + byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace(); + + byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace); + if (serializedResult != null) { + responseFuture.complete(new KvStateResponse(serializedResult)); + } else { + responseFuture.completeExceptionally(new UnknownKeyOrNamespaceException(getServerName())); + } + } + return responseFuture; + } catch (Throwable t) { + String errMsg = "Error while processing request with ID " + requestId + + ". Caused by: " + ExceptionUtils.stringifyException(t); + responseFuture.completeExceptionally(new RuntimeException(errMsg)); + return responseFuture; + } + } + + @Override + public void shutdown() { + // do nothing + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java new file mode 100644 index 0000000..fe07687 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.server; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.queryablestate.messages.KvStateInternalRequest; +import org.apache.flink.queryablestate.messages.KvStateResponse; +import org.apache.flink.queryablestate.network.AbstractServerBase; +import org.apache.flink.queryablestate.network.AbstractServerHandler; +import org.apache.flink.queryablestate.network.messages.MessageSerializer; +import org.apache.flink.queryablestate.network.stats.KvStateRequestStats; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.query.KvStateServer; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Iterator; + +/** + * The default implementation of the {@link KvStateServer}. + */ +@Internal +public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest, KvStateResponse> implements KvStateServer { + + private static final Logger LOG = LoggerFactory.getLogger(KvStateServerImpl.class); + + /** The {@link KvStateRegistry} to query for state instances. */ + private final KvStateRegistry kvStateRegistry; + + private final KvStateRequestStats stats; + + private MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer; + + /** + * Creates the state server. + * + * <p>The server is instantiated using reflection by the + * {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateServer(InetAddress, Iterator, int, int, KvStateRegistry, KvStateRequestStats) + * QueryableStateUtils.createKvStateServer(InetAddress, Iterator, int, int, KvStateRegistry, KvStateRequestStats)}. + * + * <p>The server needs to be started via {@link #start()} in order to bind + * to the configured bind address. + * + * @param bindAddress the address to listen to. + * @param bindPortIterator the port range to try to bind to. + * @param numEventLoopThreads number of event loop threads. + * @param numQueryThreads number of query threads. + * @param kvStateRegistry {@link KvStateRegistry} to query for state instances. + * @param stats the statistics collector. + */ + public KvStateServerImpl( + final InetAddress bindAddress, + final Iterator<Integer> bindPortIterator, + final Integer numEventLoopThreads, + final Integer numQueryThreads, + final KvStateRegistry kvStateRegistry, + final KvStateRequestStats stats) { + + super("Queryable State Server", bindAddress, bindPortIterator, numEventLoopThreads, numQueryThreads); + this.stats = Preconditions.checkNotNull(stats); + this.kvStateRegistry = Preconditions.checkNotNull(kvStateRegistry); + } + + @Override + public AbstractServerHandler<KvStateInternalRequest, KvStateResponse> initializeHandler() { + this.serializer = new MessageSerializer<>( + new KvStateInternalRequest.KvStateInternalRequestDeserializer(), + new KvStateResponse.KvStateResponseDeserializer()); + return new KvStateServerHandler(this, kvStateRegistry, serializer, stats); + } + + public MessageSerializer<KvStateInternalRequest, KvStateResponse> getSerializer() { + Preconditions.checkState(serializer != null, "Server " + getServerName() + " has not been started."); + return serializer; + } + + @Override + public void start() throws Throwable { + super.start(); + } + + @Override + public InetSocketAddress getServerAddress() { + return super.getServerAddress(); + } + + @Override + public void shutdown() { + super.shutdown(); + } +}
