This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-3.2 in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git
commit 2c8820df7b54fcc6eb3bb71cd19d6f40a6c726dc Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Tue Feb 15 15:23:59 2022 +0100 [FLINK-26158] Update javascript/greeter example to use playground ingress/egress --- javascript/greeter/README.md | 37 +++++++++++++++++++--------- javascript/greeter/docker-compose.yml | 46 +++-------------------------------- javascript/greeter/functions.js | 23 +++++++++--------- javascript/greeter/input-example.json | 2 -- javascript/greeter/module.yaml | 21 +++++----------- 5 files changed, 46 insertions(+), 83 deletions(-) diff --git a/javascript/greeter/README.md b/javascript/greeter/README.md index ea1fa0a..1e64687 100644 --- a/javascript/greeter/README.md +++ b/javascript/greeter/README.md @@ -5,9 +5,7 @@ This is a simple example of a stateful functions application implemented in `Jav In this example, we imagine a service that computes personalized greetings. Our service, consist out of the following components: -* `kafka ingress` - This component forwards messages produced to the `names` kafka topic, -to the `person` stateful function. Messages produced to this topic has the following -schema `{ "name" : "bob"}`. +* `playground ingress` - Ingestion point for messages. Messages are sent to the specified target function. * `person` - This function is triggered by the ingress defined above. This function keeps track of the number of visits, and triggers the next functions: @@ -15,7 +13,7 @@ This function keeps track of the number of visits, and triggers the next functio * `greeter` - This function, computes a personalized greeting, based on the name and the number of visits of that user. The output of that computation is forward to a Kafka egress defined below. -* `kafka egress` - This wraps a Kafka producer that emits `utf-8` greetings to the `greetings` Kafka topic. +* `playground egress` - Queryable endpoint that collects the emitted greetings in the `greetings` topic. The greeting is `utf-8` encoded. ![Flow](arch.png "Flow") @@ -23,25 +21,40 @@ of visits of that user. The output of that computation is forward to a Kafka egr ## Running the example ``` -docker-compose build -docker-compose up +$ docker-compose build +$ docker-compose up ``` -To observe the customized greeting, as they appear in the `greetings` Kafka topic, run in a separate terminal: +## Play around! + +The greeter application allows you to do the following actions: + +* Create a greeting for a user via sending a `GreetRequest` message to the `person` function + +In order to send messages to the Stateful Functions application you can run: ``` -docker-compose exec kafka rpk topic consume greetings +$ curl -X PUT -H "Content-Type: application/vnd.example/GreetRequest" -d '{"name": "Bob"}' localhost:8090/example/person/Bob ``` -Try adding few more input lines to [input-example.json](input-example.json), and restart -the producer service. +To consume the customized greeting, as they appear in the `greetings` playground topic, run in a separate terminal: ``` -docker-compose restart producer +$ curl -X GET localhost:8091/greetings ``` +### Messages + +The messages are expected to be encoded as JSON. + +* `GreetRequest`: `{"name": "Bob"}`, `name` is the id of the `person` function + +## What's next? + Feeling curious? add the following log to the `person` function at [functions.js](functions.js): -```console.log(`Hello there ${context.self.id}!`);```. +``` +console.log(`Hello there ${context.self.id}!`); +``` Then, rebuild and restart only the `functions` service. diff --git a/javascript/greeter/docker-compose.yml b/javascript/greeter/docker-compose.yml index 8e4e8a3..2350ca2 100644 --- a/javascript/greeter/docker-compose.yml +++ b/javascript/greeter/docker-compose.yml @@ -34,52 +34,12 @@ services: ############################################################### statefun: - image: apache/flink-statefun-playground:3.2.0 + image: apache/flink-statefun-playground:3.2.0-1.0 ports: - "8081:8081" + - "8090:8090" + - "8091:8091" depends_on: - - kafka - functions volumes: - ./module.yaml:/module.yaml - - ############################################################### - # Kafka for ingress and egress - ############################################################### - - kafka: - image: docker.vectorized.io/vectorized/redpanda:v21.8.1 - command: - - redpanda start - - --smp 1 - - --memory 512M - - --overprovisioned - - --set redpanda.default_topic_replications=1 - - --set redpanda.auto_create_topics_enabled=true - - --kafka-addr INSIDE://0.0.0.0:9094,OUTSIDE://0.0.0.0:9092 - - --advertise-kafka-addr INSIDE://kafka:9094,OUTSIDE://kafka:9092 - - --pandaproxy-addr 0.0.0.0:8089 - - --advertise-pandaproxy-addr kafka:8089 - hostname: kafka - ports: - - "8089:8089" - - "9092:9092" - - "9094:9094" - - ############################################################### - # Simple Kafka JSON producer to simulate ingress events - ############################################################### - - producer: - image: ververica/statefun-playground-producer:latest - depends_on: - - kafka - - statefun - environment: - APP_PATH: /mnt/input-example.json - APP_KAFKA_HOST: kafka:9092 - APP_KAFKA_TOPIC: names - APP_JSON_PATH: name - APP_DELAY_SECONDS: 1 - volumes: - - ./input-example.json:/mnt/input-example.json diff --git a/javascript/greeter/functions.js b/javascript/greeter/functions.js index c6bc9a2..14a0caf 100644 --- a/javascript/greeter/functions.js +++ b/javascript/greeter/functions.js @@ -19,14 +19,14 @@ const http = require("http"); -const {StateFun, Message, Context, messageBuilder, kafkaEgressMessage} = require("apache-flink-statefun"); +const {StateFun, Message, Context, messageBuilder, egressMessageBuilder} = require("apache-flink-statefun"); // ------------------------------------------------------------------------------------------------------ // Greeter // ------------------------------------------------------------------------------------------------------ const GreetRequestType = StateFun.jsonType("example/GreetRequest"); - +const EgressRecordType = StateFun.jsonType("io.statefun.playground/EgressRecord") /** * A Stateful function that represents a person. @@ -42,7 +42,7 @@ async function person(context, message) { context.storage.visits = visits // enrich the request with the number of vists. - let request = message.as(GreetRequestType) + let request = message.as(GreetRequestType) request.visits = visits // next, we will forward a message to a special greeter function, @@ -63,12 +63,16 @@ async function greeter(context, message) { const visits = request.visits; const greeting = await compute_fancy_greeting(person_name, visits); - - context.send(kafkaEgressMessage({ - typename: "example/greets", + const egressRecord = { topic: "greetings", - key: person_name, - value: greeting})); + payload: greeting, + } + + context.send(egressMessageBuilder({ + typename: "io.statefun.playground/egress", + value: egressRecord, + valueType: EgressRecordType, + })); } @@ -115,6 +119,3 @@ statefun.bind({ // ------------------------------------------------------------------------------------------------------ http.createServer(statefun.handler()).listen(8000); - - - diff --git a/javascript/greeter/input-example.json b/javascript/greeter/input-example.json deleted file mode 100644 index ad72aa8..0000000 --- a/javascript/greeter/input-example.json +++ /dev/null @@ -1,2 +0,0 @@ -{"name" : "Bob"} -{"name" : "Joe"} diff --git a/javascript/greeter/module.yaml b/javascript/greeter/module.yaml index 5c4d214..1fa4f9a 100644 --- a/javascript/greeter/module.yaml +++ b/javascript/greeter/module.yaml @@ -20,21 +20,12 @@ spec: # transport: # type: io.statefun.transports.v1/async --- -kind: io.statefun.kafka.v1/ingress +kind: io.statefun.playground.v1/ingress spec: - id: example/names - address: kafka:9092 - consumerGroupId: my-group-id - topics: - - topic: names - valueType: example/GreetRequest - targets: - - example/person + port: 8090 --- -kind: io.statefun.kafka.v1/egress +kind: io.statefun.playground.v1/egress spec: - id: example/greets - address: kafka:9092 - deliverySemantic: - type: exactly-once - transactionTimeoutMillis: 100000 + port: 8091 + topics: + - greetings