Stephan Ewen created FLINK-23301: ------------------------------------ Summary: StateFun HTTP Ingress Key: FLINK-23301 URL: https://issues.apache.org/jira/browse/FLINK-23301 Project: Flink Issue Type: Sub-task Components: Stateful Functions Reporter: Stephan Ewen
The HTTP ingress would start an HTTP Server at a specified port. The HTTP server would only handle _POST_ requests. The target function is represented by the path to which the request is made, the message contents is the body of the POST request. The following example would send an empty message to the function with the address \{{namespace='example', type='greeter', id='Igal'}}. {code} curl -X POST http://statefun-ingress:5555/in/example/greeter/Igal POST /in/example/greeter/Igal HTTP/1.1 Content-Length: 0 {code} The example below would send empty message of type 'statefun/string' to the function with the address \{{namespace='example', type='greeter', id='Elisa'}} and the message contents\{{"{numTimesToGreet: 5}"}}. curl -X POST -H "Content-Type: text/plain; charset=UTF-8" -d "\{numTimesToGreet: 5}" http://statefun-ingress:5555/in/example/greeter/Elisa POST /in/example/greeter/Elisa HTTP/1.1 Content-Type: text/plain; charset=UTF-8 Content-Length: 20 {numTimesToGreet: 5} {code} h3. Data Types The content type (mime type) specified in the request header of the HTTP request will be directly mapped to the statefun types. For example, a \{{Content-Type: io.statefun.tyes/int}} will set the type of the message to \{{io.statefun.tyes/int}}. As a special case, we map the content type \{{text/plain}} to \{{io.statefun.tyes/string}}, to make simple cases and examples work more seamlessly. The following examples would send a message to a function that expectes a ProtoBuf encoded type \{{Greeting}} registerd in StateFun as \{{example/greeting}}. {code} > curl -X POST -H "Content-Type: text/plain; charset=UTF-8" -d > "\{numTimesToGreet: 5}" > CONTENTS=`echo 'numTimesToGreet: 5' | ./protoc --encode Greeting > example_types.proto` > echo $CONTENTS | curl -X POST -H "Content-Type: example/greeting" > --data-binary @- http://statefun-ingress:5555/in/example/greeter/Bobby {code} h3. Sender and Responses We want to support round-trip-style interactions, meaning posting a request and receiving a response. Given the async messaging nature of StateFun, this might not be necessarily in one HTTP request which immediately gives you the corresponsing response. Instead, it can be in issuing (POST) a request to the HTTP ingress and polling (GET) a response from an associated HTTP Egress. To support these kind of patterns, the HTTP ingress will assign a random request correlation ID in the HTTP response. Furthermore, the ingress will optionally set the \{{sender()}} field of the created message to reference a configured associated egress. The ingress config woud add an entry referencing the egress (like \{{'paired_egress_name: httpout'}}). {code} > curl -X POST -i http://statefun-ingress:5555/in/example/greeter/Igal POST /in/example/greeter/Igal HTTP/1.1 Content-Length: 0 HTTP/1.1 200 OK StateFun-Request-Correlation-ID: 8acb377c-fc5e-4bdb-b2cc-eddb5992b7b5 Content-Length: 0 {code} The created message would have no body, but would have the \{{sender() = {{egress/httpout/8acb377c-fc5e-4bdb-b2cc-eddb5992b7b5}}. _Note: We would need to extend the message address scheme to be able to reference egresses. The egress itself can grab the correlation ID from the ID part of the address, because the HTTP egres doesn't use that field (in fact, no egress currently interprets the ID)._ h3. Singleton Instantiation To avoid port conflicts, we need to do a singleton instantiation per JVM. This can be achieved by using a statically referenced context to hold the instantiated servier and a reference to the In the future, we can look into extending this to avoid setup/teardown when operators are cancelled for recovery. The server would then live as long as the StateFun application (job) lives (or more precisely, as long as the slot lives, which is the duration that the TaskManager is associated with the StateFun deployment - typically the entire lifetime). To achieve that, we would tear down the server in a [shutdown hook of the user-code classloader](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L145). Instead of letting the first source set up the server, the first source would register its output as the stream for the server to push messages to. h3. Configuration parameters - Bind host (default 0.0.0.0) - Bind port (default 5555) - Path (default "in") (for the path in the URL \{{http(s)://<host>:<port>/<path>/<namespace>/<type>/<name>}}) - Egress pair name, for setting the egress that replies should go to. - To setup SSL for the connection, we add similar settings as for Flink's REST endpoint SSL support. See the Flink docs for details: https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/security/security-ssl/#rest-endpoints-external-connectivity -- This message was sent by Atlassian Jira (v8.3.4#803005)