Stephan Ewen created FLINK-23301:
------------------------------------

             Summary: StateFun HTTP Ingress
                 Key: FLINK-23301
                 URL: https://issues.apache.org/jira/browse/FLINK-23301
             Project: Flink
          Issue Type: Sub-task
          Components: Stateful Functions
            Reporter: Stephan Ewen


The HTTP ingress would start an HTTP Server at a specified port.

The HTTP server would only handle _POST_ requests. The target function is 
represented by the path to which the request is made, the message contents is 
the body of the POST request.

The following example would send an empty message to the function with the 
address \{{namespace='example', type='greeter', id='Igal'}}.

{code}
curl -X POST http://statefun-ingress:5555/in/example/greeter/Igal

POST /in/example/greeter/Igal HTTP/1.1
Content-Length: 0
{code}

The example below would send empty message of type 'statefun/string' to the 
function with the address \{{namespace='example', type='greeter', id='Elisa'}} 
and the message contents\{{"{numTimesToGreet: 5}"}}.

curl -X POST -H "Content-Type: text/plain; charset=UTF-8" -d 
"\{numTimesToGreet: 5}" http://statefun-ingress:5555/in/example/greeter/Elisa

POST /in/example/greeter/Elisa HTTP/1.1
Content-Type: text/plain; charset=UTF-8
Content-Length: 20

{numTimesToGreet: 5}
{code}


h3. Data Types

The content type (mime type) specified in the request header of the HTTP 
request will be directly mapped to the statefun types.
For example, a \{{Content-Type: io.statefun.tyes/int}} will set the type of the 
message to \{{io.statefun.tyes/int}}.

As a special case, we map the content type \{{text/plain}} to 
\{{io.statefun.tyes/string}}, to make simple cases and examples work more 
seamlessly.

The following examples would send a message to a function that expectes a 
ProtoBuf encoded type \{{Greeting}} registerd in StateFun as 
\{{example/greeting}}.

{code}
> curl -X POST -H "Content-Type: text/plain; charset=UTF-8" -d 
> "\{numTimesToGreet: 5}"

> CONTENTS=`echo 'numTimesToGreet: 5' | ./protoc --encode Greeting 
> example_types.proto`
> echo $CONTENTS | curl -X POST -H "Content-Type: example/greeting" 
> --data-binary @- http://statefun-ingress:5555/in/example/greeter/Bobby
{code}


h3. Sender and Responses

We want to support round-trip-style interactions, meaning posting a request and 
receiving a response.
Given the async messaging nature of StateFun, this might not be necessarily in 
one HTTP request which immediately gives you the corresponsing response. 
Instead, it can be in issuing (POST) a request to the HTTP ingress and polling 
(GET) a response from an associated HTTP Egress.

To support these kind of patterns, the HTTP ingress will assign a random 
request correlation ID in the HTTP response.
Furthermore, the ingress will optionally set the \{{sender()}} field of the 
created message to reference a configured associated egress.

The ingress config woud add an entry referencing the egress (like 
\{{'paired_egress_name: httpout'}}).

{code}
> curl -X POST -i http://statefun-ingress:5555/in/example/greeter/Igal
POST /in/example/greeter/Igal HTTP/1.1
Content-Length: 0

HTTP/1.1 200 OK
StateFun-Request-Correlation-ID: 8acb377c-fc5e-4bdb-b2cc-eddb5992b7b5
Content-Length: 0
{code}

The created message would have no body, but would have the \{{sender() = 
{{egress/httpout/8acb377c-fc5e-4bdb-b2cc-eddb5992b7b5}}.

_Note: We would need to extend the message address scheme to be able to 
reference egresses.
The egress itself can grab the correlation ID from the ID part of the address, 
because the HTTP egres doesn't use that field (in fact, no egress currently 
interprets the ID)._

h3. Singleton Instantiation

To avoid port conflicts, we need to do a singleton instantiation per JVM.
This can be achieved by using a statically referenced context to hold the 
instantiated servier and a reference to the

In the future, we can look into extending this to avoid setup/teardown when 
operators are cancelled for recovery.
The server would then live as long as the StateFun application (job) lives (or 
more precisely, as long as the slot lives, which is the duration that the 
TaskManager is associated with the StateFun deployment - typically the entire 
lifetime).

To achieve that, we would tear down the server in a [shutdown hook of the 
user-code 
classloader](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L145).
 Instead of letting the first source set up the server, the first source would 
register its output as the stream for the server to push messages to.


h3. Configuration parameters

- Bind host (default 0.0.0.0)
 - Bind port (default 5555)
 - Path (default "in") (for the path in the URL 
\{{http(s)://<host>:<port>/<path>/<namespace>/<type>/<name>}})

- Egress pair name, for setting the egress that replies should go to.

- To setup SSL for the connection, we add similar settings as for Flink's REST 
endpoint SSL support. See the Flink docs for details: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/security/security-ssl/#rest-endpoints-external-connectivity



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to