This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-3.2 in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git
commit 7f68b7a299f9d68a5966059220ab152e158a069e Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Tue Feb 15 12:41:34 2022 +0100 [FLINK-26158] Update java/connected-components example to use playground ingress/egress --- java/connected-components/README.md | 30 ++- java/connected-components/docker-compose.yml | 45 +---- java/connected-components/module.yaml | 22 +-- .../connectedcomponents/ConnectedComponentsFn.java | 204 +++++++++++---------- .../connectedcomponents/types/EgressRecord.java | 28 +++ .../java/connectedcomponents/types/Types.java | 29 +-- .../java/connectedcomponents/types/Vertex.java | 23 ++- .../types/VertexComponentChange.java | 52 +++--- java/connected-components/vertices.txt | 12 -- 9 files changed, 224 insertions(+), 221 deletions(-) diff --git a/java/connected-components/README.md b/java/connected-components/README.md index 17cfa82..97b3c0c 100644 --- a/java/connected-components/README.md +++ b/java/connected-components/README.md @@ -9,8 +9,6 @@ This example works with Docker Compose, and runs a few services that build up an - Functions service that runs your functions and expose them through an HTTP endpoint. - StateFun runtime processes (a manager plus workers) that will handle ingress, egress, and inter-function messages as well as function state storage in a consistent and fault-tolerant manner. -- Apache Kafka broker for the application ingress and egress. StateFun currently natively supports AWS Kinesis as well, - and you can also extend to connect with other systems. To motivate this example, we'll implement a [connected components](https://en.wikipedia.org/wiki/Component_(graph_theory) algorithm on top of Stateful Functions. The program has one function - a `ConnectedComponentsFn` that consumes `Vertex` JSON events from an ingress and communicates with its neighbours to find the minimal component id. @@ -21,7 +19,6 @@ Changes of the component id of a vertex are being output via an egress. - `src/`, `pom.xml` and `Dockerfile`: These files and directories are the contents of a Java Maven project which builds our functions service, hosting the `ConnectedComponentsFn` behind a HTTP endpoint. Check out the source code under `src/main/java`. The `Dockerfile` is used to build a Docker image for our functions service. -- `vertices.txt`: A file with multiple JSON objects per line; this is used as test events produced to our application ingress. - `module.yaml`: The [Module Specification](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/deployment/module/) file to be mounted to the StateFun runtime process containers. This configures a few things for a StateFun application, such as the service endpoints of the application's functions, as well as definitions of [Ingresses and Egresses](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/io-module/overview/) which the application will use. @@ -40,7 +37,7 @@ First, lets build the example. From this directory, execute: $ docker-compose build ``` -This pulls all the necessary Docker images (StateFun and Kafka), and also builds the functions service image. This can +This pulls all the necessary Docker images (StateFun), and also builds the functions service image. This can take a few minutes as it also needs to build the function's Java project. Afterward the build completes, start running all the services: @@ -51,12 +48,33 @@ $ docker-compose up ## Play around! -You can take a look at what messages are being sent to the Kafka egress: +The connected components applications allows you to do the following actions: + +* Add a new vertex to the graph via sending a `Vertex` message to the `vertex` function + +In order to send messages to the Stateful Functions application you can run: + +``` +$ curl -X PUT -H "Content-Type: application/vnd.connected-components.types/vertex" -d '{"vertex_id": "1", "neighbours": ["2", "3"]}' localhost:8090/connected-components.fns/vertex/1 +$ curl -X PUT -H "Content-Type: application/vnd.connected-components.types/vertex" -d '{"vertex_id": "2", "neighbours": ["1", "4"]}' localhost:8090/connected-components.fns/vertex/2 +$ curl -X PUT -H "Content-Type: application/vnd.connected-components.types/vertex" -d '{"vertex_id": "3", "neighbours": ["1"]}' localhost:8090/connected-components.fns/vertex/3 +$ curl -X PUT -H "Content-Type: application/vnd.connected-components.types/vertex" -d '{"vertex_id": "4", "neighbours": ["2"]}' localhost:8090/connected-components.fns/vertex/4 +``` + +You can take a look at what messages are being sent to the Playground egress: ``` -$ docker-compose exec kafka rpk topic consume connected-component-changes +$ curl -X GET localhost:8091/connected-component-changes ``` +### Messages + +All messages are expected to be encoded as JSON: + +* `Vertex`: `{"vertex_id": "1", "neighbours": ["2", "3"]}`, `vertex_id` is the id of the `vertex` function + +## What's next? + You can also try modifying the function code in the `src/main/java` directory, and do a zero-downtime upgrade of the functions. Some ideas you can try out: - Enable the connected component computation for graphs with undirected edges diff --git a/java/connected-components/docker-compose.yml b/java/connected-components/docker-compose.yml index 3bbb2e6..b11ae1c 100644 --- a/java/connected-components/docker-compose.yml +++ b/java/connected-components/docker-compose.yml @@ -35,51 +35,12 @@ services: ############################################################### statefun: - image: apache/flink-statefun-playground:3.2.0 + image: apache/flink-statefun-playground:3.2.0-1.0 ports: - "8081:8081" + - "8090:8090" + - "8091:8091" depends_on: - - kafka - connected-components-functions volumes: - ./module.yaml:/module.yaml - - ############################################################### - # Kafka for ingress and egress - ############################################################### - - kafka: - image: docker.vectorized.io/vectorized/redpanda:v21.8.1 - command: - - redpanda start - - --smp 1 - - --memory 512M - - --overprovisioned - - --set redpanda.default_topic_replications=1 - - --set redpanda.auto_create_topics_enabled=true - - --kafka-addr INSIDE://0.0.0.0:9094,OUTSIDE://0.0.0.0:9092 - - --advertise-kafka-addr INSIDE://kafka:9094,OUTSIDE://kafka:9092 - - --pandaproxy-addr 0.0.0.0:8089 - - --advertise-pandaproxy-addr kafka:8089 - hostname: kafka - ports: - - "8089:8089" - - "9092:9092" - - "9094:9094" - - ############################################################### - # Simple Kafka JSON producer to simulate ingress events - ############################################################### - - vertices-producer: - image: ververica/statefun-playground-producer:latest - depends_on: - - kafka - - statefun - environment: - APP_PATH: /mnt/vertices.txt - APP_KAFKA_HOST: kafka:9092 - APP_KAFKA_TOPIC: vertices - APP_JSON_PATH: vertex_id - volumes: - - ./vertices.txt:/mnt/vertices.txt diff --git a/java/connected-components/module.yaml b/java/connected-components/module.yaml index 2aa144d..3ea162f 100644 --- a/java/connected-components/module.yaml +++ b/java/connected-components/module.yaml @@ -20,22 +20,12 @@ spec: transport: type: io.statefun.transports.v1/async --- -kind: io.statefun.kafka.v1/ingress +kind: io.statefun.playground.v1/ingress spec: - id: connected-components.io/vertices - address: kafka:9092 - consumerGroupId: connected-components - startupPosition: - type: earliest - topics: - - topic: vertices - valueType: connected-components.types/vertex - targets: - - connected-components.fns/vertex + port: 8090 --- -kind: io.statefun.kafka.v1/egress +kind: io.statefun.playground.v1/egress spec: - id: connected-components.io/connected-component-changes - address: kafka:9092 - deliverySemantic: - type: at-least-once + port: 8091 + topics: + - connected-component-changes diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsFn.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsFn.java index 0b49e6c..a83c1c8 100644 --- a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsFn.java +++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsFn.java @@ -1,5 +1,12 @@ package org.apache.flink.statefun.playground.java.connectedcomponents; +import static org.apache.flink.statefun.playground.java.connectedcomponents.types.Types.EGRESS_RECORD_JSON_TYPE; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import org.apache.flink.statefun.playground.java.connectedcomponents.types.EgressRecord; import org.apache.flink.statefun.playground.java.connectedcomponents.types.Types; import org.apache.flink.statefun.playground.java.connectedcomponents.types.Vertex; import org.apache.flink.statefun.playground.java.connectedcomponents.types.VertexComponentChange; @@ -8,127 +15,138 @@ import org.apache.flink.statefun.sdk.java.StatefulFunction; import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec; import org.apache.flink.statefun.sdk.java.TypeName; import org.apache.flink.statefun.sdk.java.ValueSpec; -import org.apache.flink.statefun.sdk.java.io.KafkaEgressMessage; +import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder; import org.apache.flink.statefun.sdk.java.message.Message; import org.apache.flink.statefun.sdk.java.message.MessageBuilder; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.CompletableFuture; - /** * A stateful function that computes the connected component for a stream of vertices. * - * <p>The underlying algorithm is a form of label propagation and works by recording for every vertex its component id. - * Whenever a vertex is created or its component id changes, it will send this update to all of its neighbours. - * Every neighbour will compare the broadcast component id with its own id. If the id is lower than its own, then - * it will accept this component id and broadcast this change to its neighbours. If the own component id is smaller, - * then it answers to the broadcaster by sending its own component id. + * <p>The underlying algorithm is a form of label propagation and works by recording for every + * vertex its component id. Whenever a vertex is created or its component id changes, it will send + * this update to all of its neighbours. Every neighbour will compare the broadcast component id + * with its own id. If the id is lower than its own, then it will accept this component id and + * broadcast this change to its neighbours. If the own component id is smaller, then it answers to + * the broadcaster by sending its own component id. * - * <p>That way, the minimum component id of each connected component will be broadcast throughout the whole - * connected component. Eventually, every vertex will have heard of the minimum component id and have accepted - * it. + * <p>That way, the minimum component id of each connected component will be broadcast throughout + * the whole connected component. Eventually, every vertex will have heard of the minimum component + * id and have accepted it. * - * <p>Every component id change will be output to the {@link #KAFKA_EGRESS} as a connected component change. + * <p>Every component id change will be output to the {@link #PLAYGROUND_EGRESS} as a connected + * component change. * - * @see <a href="https://en.wikipedia.org/wiki/Label_propagation_algorithm">Label propagation algorithm</a> + * @see <a href="https://en.wikipedia.org/wiki/Label_propagation_algorithm">Label propagation + * algorithm</a> */ final class ConnectedComponentsFn implements StatefulFunction { - /** - * The current component id of a vertex. - */ - private static final ValueSpec<Integer> COMPONENT_ID = ValueSpec.named("componentId").withIntType(); + /** The current component id of a vertex. */ + private static final ValueSpec<Integer> COMPONENT_ID = + ValueSpec.named("componentId").withIntType(); - /** - * List of known neighbours of a vertex. - */ - private static final ValueSpec<Set<Integer>> NEIGHBOURS_VALUE = ValueSpec.named("neighbours").withCustomType(Types.NEIGHBOURS_TYPE); + /** List of known neighbours of a vertex. */ + private static final ValueSpec<Set<Integer>> NEIGHBOURS_VALUE = + ValueSpec.named("neighbours").withCustomType(Types.NEIGHBOURS_TYPE); - static final TypeName TYPE_NAME = TypeName.typeNameOf("connected-components.fns", "vertex"); - static final StatefulFunctionSpec SPEC = StatefulFunctionSpec.builder(TYPE_NAME) - .withSupplier(ConnectedComponentsFn::new) - .withValueSpecs(COMPONENT_ID, NEIGHBOURS_VALUE) - .build(); + static final TypeName TYPE_NAME = TypeName.typeNameOf("connected-components.fns", "vertex"); + static final StatefulFunctionSpec SPEC = + StatefulFunctionSpec.builder(TYPE_NAME) + .withSupplier(ConnectedComponentsFn::new) + .withValueSpecs(COMPONENT_ID, NEIGHBOURS_VALUE) + .build(); - static final TypeName KAFKA_EGRESS = TypeName.typeNameOf("connected-components.io", "connected-component-changes"); + static final TypeName PLAYGROUND_EGRESS = TypeName.typeNameOf("io.statefun.playground", "egress"); - @Override - public CompletableFuture<Void> apply(Context context, Message message) { - // initialize a new vertex - if (message.is(Types.VERTEX_INIT_TYPE)) { - final Vertex vertex = message.as(Types.VERTEX_INIT_TYPE); + @Override + public CompletableFuture<Void> apply(Context context, Message message) { + // initialize a new vertex + if (message.is(Types.VERTEX_INIT_TYPE)) { + final Vertex vertex = message.as(Types.VERTEX_INIT_TYPE); - int currentComponentId = context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE); - final Set<Integer> currentNeighbours = getCurrentNeighbours(context); + int currentComponentId = context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE); + final Set<Integer> currentNeighbours = getCurrentNeighbours(context); - if (currentComponentId > vertex.getVertexId()) { - updateComponentId(context, vertex.getVertexId(), vertex.getVertexId()); - currentComponentId = vertex.getVertexId(); - } + if (currentComponentId > vertex.getVertexId()) { + updateComponentId(context, vertex.getVertexId(), vertex.getVertexId()); + currentComponentId = vertex.getVertexId(); + } - final HashSet<Integer> neighbourDiff = new HashSet<>(vertex.getNeighbours()); - neighbourDiff.removeAll(currentNeighbours); + final HashSet<Integer> neighbourDiff = new HashSet<>(vertex.getNeighbours()); + neighbourDiff.removeAll(currentNeighbours); - broadcastVertexConnectedComponentChange(context, vertex.getVertexId(), neighbourDiff, currentComponentId); + broadcastVertexConnectedComponentChange( + context, vertex.getVertexId(), neighbourDiff, currentComponentId); - // update the neighbours - neighbourDiff.addAll(currentNeighbours); - context.storage().set(NEIGHBOURS_VALUE, neighbourDiff); - } - // a neighbours component id has changed - else if (message.is(Types.VERTEX_COMPONENT_CHANGE_TYPE)) { - final VertexComponentChange vertexComponentChange = message.as(Types.VERTEX_COMPONENT_CHANGE_TYPE); - final Set<Integer> currentNeighbours = getCurrentNeighbours(context); - - // only process the message if we can reach the source --> connected components with directed edges - if (currentNeighbours.contains(vertexComponentChange.getSource())) { - final int componentIdCandidate = vertexComponentChange.getComponentId(); - final int currentComponentId = context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE); - - if (currentComponentId < componentIdCandidate) { - sendVertexConnectedComponentChange(context, vertexComponentChange.getTarget(), vertexComponentChange.getSource(), currentComponentId); - } else if (currentComponentId > componentIdCandidate) { - updateComponentId(context, vertexComponentChange.getTarget(), componentIdCandidate); - currentNeighbours.remove(vertexComponentChange.getSource()); - broadcastVertexConnectedComponentChange(context, vertexComponentChange.getTarget(), currentNeighbours, componentIdCandidate); - } - } + // update the neighbours + neighbourDiff.addAll(currentNeighbours); + context.storage().set(NEIGHBOURS_VALUE, neighbourDiff); + } + // a neighbours component id has changed + else if (message.is(Types.VERTEX_COMPONENT_CHANGE_TYPE)) { + final VertexComponentChange vertexComponentChange = + message.as(Types.VERTEX_COMPONENT_CHANGE_TYPE); + final Set<Integer> currentNeighbours = getCurrentNeighbours(context); + + // only process the message if we can reach the source --> connected components with directed + // edges + if (currentNeighbours.contains(vertexComponentChange.getSource())) { + final int componentIdCandidate = vertexComponentChange.getComponentId(); + final int currentComponentId = + context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE); + + if (currentComponentId < componentIdCandidate) { + sendVertexConnectedComponentChange( + context, + vertexComponentChange.getTarget(), + vertexComponentChange.getSource(), + currentComponentId); + } else if (currentComponentId > componentIdCandidate) { + updateComponentId(context, vertexComponentChange.getTarget(), componentIdCandidate); + currentNeighbours.remove(vertexComponentChange.getSource()); + broadcastVertexConnectedComponentChange( + context, vertexComponentChange.getTarget(), currentNeighbours, componentIdCandidate); } - - return context.done(); + } } - private Set<Integer> getCurrentNeighbours(Context context) { - return context.storage().get(NEIGHBOURS_VALUE).orElse(Collections.emptySet()); - } + return context.done(); + } - private void broadcastVertexConnectedComponentChange(Context context, int source, Iterable<Integer> neighbours, int componentId) { - for (Integer neighbour : neighbours) { - sendVertexConnectedComponentChange(context, source, neighbour, componentId); - } - } + private Set<Integer> getCurrentNeighbours(Context context) { + return context.storage().get(NEIGHBOURS_VALUE).orElse(Collections.emptySet()); + } - private void sendVertexConnectedComponentChange(Context context, int source, int target, int currentComponentId) { - final VertexComponentChange vertexComponentChange = VertexComponentChange.create(source, target, currentComponentId); - context.send(MessageBuilder.forAddress(TYPE_NAME, String.valueOf(target)) - .withCustomType( - Types.VERTEX_COMPONENT_CHANGE_TYPE, - vertexComponentChange) - .build()); + private void broadcastVertexConnectedComponentChange( + Context context, int source, Iterable<Integer> neighbours, int componentId) { + for (Integer neighbour : neighbours) { + sendVertexConnectedComponentChange(context, source, neighbour, componentId); } + } + + private void sendVertexConnectedComponentChange( + Context context, int source, int target, int currentComponentId) { + final VertexComponentChange vertexComponentChange = + VertexComponentChange.create(source, target, currentComponentId); + context.send( + MessageBuilder.forAddress(TYPE_NAME, String.valueOf(target)) + .withCustomType(Types.VERTEX_COMPONENT_CHANGE_TYPE, vertexComponentChange) + .build()); + } - private void updateComponentId(Context context, int vertexId, int componentId) { - context.storage().set(COMPONENT_ID, componentId); - outputConnectedComponentChange(context, vertexId, componentId); - } + private void updateComponentId(Context context, int vertexId, int componentId) { + context.storage().set(COMPONENT_ID, componentId); + outputConnectedComponentChange(context, vertexId, componentId); + } - private void outputConnectedComponentChange(Context context, int vertexId, int componentId) { - context.send(KafkaEgressMessage.forEgress(KAFKA_EGRESS) - .withTopic("connected-component-changes") - .withUtf8Key(String.valueOf(vertexId)) - .withUtf8Value(String.format("Vertex %s belongs to component %s.", vertexId, componentId)) + private void outputConnectedComponentChange(Context context, int vertexId, int componentId) { + context.send( + EgressMessageBuilder.forEgress(PLAYGROUND_EGRESS) + .withCustomType( + EGRESS_RECORD_JSON_TYPE, + new EgressRecord( + "connected-component-changes", + String.format("Vertex %s belongs to component %s.", vertexId, componentId))) .build()); - } + } } diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/EgressRecord.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/EgressRecord.java new file mode 100644 index 0000000..0bcf102 --- /dev/null +++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/EgressRecord.java @@ -0,0 +1,28 @@ +package org.apache.flink.statefun.playground.java.connectedcomponents.types; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class EgressRecord { + @JsonProperty("topic") + private String topic; + + @JsonProperty("payload") + private String payload; + + public EgressRecord() { + this(null, null); + } + + public EgressRecord(String topic, String payload) { + this.topic = topic; + this.payload = payload; + } + + public String getTopic() { + return topic; + } + + public String getPayload() { + return payload; + } +} diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Types.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Types.java index 2e7b010..d8ea67a 100644 --- a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Types.java +++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Types.java @@ -1,13 +1,11 @@ package org.apache.flink.statefun.playground.java.connectedcomponents.types; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Set; import org.apache.flink.statefun.sdk.java.TypeName; import org.apache.flink.statefun.sdk.java.types.SimpleType; import org.apache.flink.statefun.sdk.java.types.Type; -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.util.Set; - public final class Types { private Types() {} @@ -15,18 +13,14 @@ public final class Types { private static final ObjectMapper JSON_OBJ_MAPPER = new ObjectMapper(); private static final String TYPES_NAMESPACE = "connected-components.types"; - /** - * Type denoting a new vertex coming from the input source. - */ + /** Type denoting a new vertex coming from the input source. */ public static final Type<Vertex> VERTEX_INIT_TYPE = SimpleType.simpleImmutableTypeFrom( TypeName.typeNameOf(TYPES_NAMESPACE, "vertex"), JSON_OBJ_MAPPER::writeValueAsBytes, bytes -> JSON_OBJ_MAPPER.readValue(bytes, Vertex.class)); - /** - * Type denoting a component id change of a vertex. - */ + /** Type denoting a component id change of a vertex. */ public static final Type<VertexComponentChange> VERTEX_COMPONENT_CHANGE_TYPE = SimpleType.simpleImmutableTypeFrom( TypeName.typeNameOf(TYPES_NAMESPACE, "vertexComponentChange"), @@ -34,8 +28,15 @@ public final class Types { bytes -> JSON_OBJ_MAPPER.readValue(bytes, VertexComponentChange.class)); @SuppressWarnings("unchecked") - public static final Type<Set<Integer>> NEIGHBOURS_TYPE = SimpleType.simpleImmutableTypeFrom( - TypeName.typeNameOf(TYPES_NAMESPACE, "neighbours"), - JSON_OBJ_MAPPER::writeValueAsBytes, - bytes -> JSON_OBJ_MAPPER.readValue(bytes, Set.class)); + public static final Type<Set<Integer>> NEIGHBOURS_TYPE = + SimpleType.simpleImmutableTypeFrom( + TypeName.typeNameOf(TYPES_NAMESPACE, "neighbours"), + JSON_OBJ_MAPPER::writeValueAsBytes, + bytes -> JSON_OBJ_MAPPER.readValue(bytes, Set.class)); + + public static final Type<EgressRecord> EGRESS_RECORD_JSON_TYPE = + SimpleType.simpleImmutableTypeFrom( + TypeName.typeNameOf("io.statefun.playground", "EgressRecord"), + JSON_OBJ_MAPPER::writeValueAsBytes, + bytes -> JSON_OBJ_MAPPER.readValue(bytes, EgressRecord.class)); } diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Vertex.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Vertex.java index b6dd8cd..dd9bc41 100644 --- a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Vertex.java +++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Vertex.java @@ -1,24 +1,23 @@ package org.apache.flink.statefun.playground.java.connectedcomponents.types; import com.fasterxml.jackson.annotation.JsonProperty; - import java.util.List; public class Vertex { - @JsonProperty("vertex_id") - private int vertexId; + @JsonProperty("vertex_id") + private int vertexId; - @JsonProperty("neighbours") - private List<Integer> neighbours; + @JsonProperty("neighbours") + private List<Integer> neighbours; - public Vertex() {} + public Vertex() {} - public int getVertexId() { - return vertexId; - } + public int getVertexId() { + return vertexId; + } - public List<Integer> getNeighbours() { - return neighbours; - } + public List<Integer> getNeighbours() { + return neighbours; + } } diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/VertexComponentChange.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/VertexComponentChange.java index 6875bee..a1c1021 100644 --- a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/VertexComponentChange.java +++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/VertexComponentChange.java @@ -4,38 +4,38 @@ import com.fasterxml.jackson.annotation.JsonProperty; public class VertexComponentChange { - @JsonProperty("source") - private int source; + @JsonProperty("source") + private int source; - @JsonProperty("target") - private int target; + @JsonProperty("target") + private int target; - @JsonProperty("component_id") - private int componentId; + @JsonProperty("component_id") + private int componentId; - public VertexComponentChange() { - this(0, 0, 0); - } + public VertexComponentChange() { + this(0, 0, 0); + } - private VertexComponentChange(int source, int target, int componentId) { - this.source = source; - this.target = target; - this.componentId = componentId; - } + private VertexComponentChange(int source, int target, int componentId) { + this.source = source; + this.target = target; + this.componentId = componentId; + } - public int getSource() { - return source; - } + public int getSource() { + return source; + } - public int getTarget() { - return target; - } + public int getTarget() { + return target; + } - public int getComponentId() { - return componentId; - } + public int getComponentId() { + return componentId; + } - public static VertexComponentChange create(int source, int target, int componentId) { - return new VertexComponentChange(source, target, componentId); - } + public static VertexComponentChange create(int source, int target, int componentId) { + return new VertexComponentChange(source, target, componentId); + } } diff --git a/java/connected-components/vertices.txt b/java/connected-components/vertices.txt deleted file mode 100644 index 621b2ad..0000000 --- a/java/connected-components/vertices.txt +++ /dev/null @@ -1,12 +0,0 @@ -{"vertex_id": "1", "neighbours": ["2", "3"]} -{"vertex_id": "2", "neighbours": ["1", "4"]} -{"vertex_id": "3", "neighbours": ["1"]} -{"vertex_id": "4", "neighbours": ["2"]} -{"vertex_id": "5", "neighbours": []} -{"vertex_id": "6", "neighbours": ["7"]} -{"vertex_id": "7", "neighbours": ["6"]} -{"vertex_id": "8", "neighbours": ["9"]} -{"vertex_id": "9", "neighbours": ["8", "10"]} -{"vertex_id": "10", "neighbours": ["9", "11", "12"]} -{"vertex_id": "11", "neighbours": ["10", "12"]} -{"vertex_id": "12", "neighbours": ["10", "11"]} \ No newline at end of file