(forwarding this to user@ as it is more suited to be located there)

Hi Sunil,

With remote functions (using the Python SDK), messages sent to / from them
must be Protobuf messages.
This is a requirement since remote functions can be written in any
language, and we use Protobuf as a means for cross-language messaging.
If you are defining Kafka ingresses in a remote module (via textual YAML
module configs), then records in the Kafka ingress will be directly routed
to the remote functions, and therefore they are required to be Protobuf
messages as well.

With embedded functions (using the current Java SDK), then what you are
trying to do is possible.
When using the Java SDK, the Kafka ingress allows providing a
`KafkaIngressDeserializer` [1], where you can convert the bytes in Kafka
into any type you intend for messaging within the StateFun application. So
there, you can convert your JSON records.

If you want to still write your main application logic in Python, but the
input and output messages in Kafka are required to be JSON,
what you can currently do is have a mix of remote module [2] containing the
application logic as Python functions,
and a separate embedded module [3] containing the Java Kafka ingress and
egresses.
So, concretely, your 2 modules will contain:

Remote module:
- Your Python functions implementing the main business logic.

Embedded module:
- Java Kafka ingress with deserializer that converts JSON to Protobuf
messages. Here you have the freedom to extract only the fields that you
need.
- A Java router [4] that routes those converted messages to the remote
functions, by their logical address
- A Java Kafka egress with serializer that converts Protobuf messages from
remote functions into JSON Kafka records.
- A Java function that simply forwards input messages to the Kafka Kafka
egress. If the remote functions need to write JSON messages to Kafka, they
send a Protobuf message to this function.


Hope this helps.
Note that the egress side of things can definitely be easier (without the
extra forwarding through a Java function) if the Python SDK's
`kafka_egress_record` method allows supplying arbitrary bytes.
Then you would be able to already write to Kafka JSON messages in the
Python functions.
This however isn't supported yet, but technically it is quite easy to
achieve. I've just filed a issue for this [5], in case you'd like to follow
that.

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/io-module/apache-kafka.html#kafka-deserializer
[2]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/sdk/modules.html#remote-module

[3]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/sdk/modules.html#embedded-module
[4]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/io-module/index.html#router
[5] https://issues.apache.org/jira/browse/FLINK-18340

On Wed, Jun 17, 2020 at 9:25 AM Sunil <sunilsattir...@gmail.com> wrote:

> checking to see if this is possible currently.
> Read json data from kafka topic => process using statefun => write out to
> kafka in json format.
>
> I could have a separate process to read the source json data convert to
> protobuf into another kafka topic but it sounds in-efficient.
> e.g.
> Read json data from kafka topic =>convert json to protobuf =>  process
> using statefun => write out to kafka in protobuf format.=> convert protobuf
> to json message
>
> Appreciate any advice on how to process json messages using statefun ,
> also if this is not possible in the current python sdk, can i do that using
> the java/scala sdk?
>
> Thanks.
>
> On 2020/06/15 15:34:39, Sunil Sattiraju <sunilsattir...@gmail.com> wrote:
> > Thanks Igal,
> > I dont have control over the data source inside kafka ( current kafka
> topic contains either json or avro formats only, i am trying to reproduce
> this scenario using my test data generator ).
> >
> > is it possible to convert the json to proto at the receiving end of
> statefun applicaiton?
> >
> > On 2020/06/15 14:51:01, Igal Shilman <i...@ververica.com> wrote:
> > > Hi,
> > >
> > > The values must be valid encoded Protobuf messages [1], while in your
> > > attached code snippet you are sending utf-8 encoded JSON strings.
> > > You can take a look at this example with a generator that produces
> Protobuf
> > > messages [2][3]
> > >
> > > [1] https://developers.google.com/protocol-buffers/docs/pythontutorial
> > > [2]
> > >
> https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/generator/event-generator.py#L37
> > > [3]
> > >
> https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/greeter/messages.proto#L25
> > >
> > > On Mon, Jun 15, 2020 at 4:25 PM Sunil Sattiraju <
> sunilsattir...@gmail.com>
> > > wrote:
> > >
> > > > Hi, Based on the example from
> > > >
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greeter-example
> > > > I am trying to ingest json data in kafka, but unable to achieve
> based on
> > > > the examples.
> > > >
> > > > event-generator.py
> > > >
> > > > def produce():
> > > >     request = {}
> > > >     request['id'] = "abc-123"
> > > >     request['field1'] = "field1-1"
> > > >     request['field2'] = "field2-2"
> > > >     request['field3'] = "field3-3"
> > > >     if len(sys.argv) == 2:
> > > >         delay_seconds = int(sys.argv[1])
> > > >     else:
> > > >         delay_seconds = 1
> > > >     producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])
> > > >     for request in random_requests_dict():
> > > >         producer.send(topic='test-topic',
> > > >                       value=json.dumps(request).encode('utf-8'))
> > > >         producer.flush()
> > > >         time.sleep(delay_seconds)
> > > >
> > > > Below is the proto definition of the json data ( i dont always know
> all
> > > > the fields, but i know id fields definitely exists)
> > > > message.proto
> > > >
> > > > message MyRow {
> > > >     string id = 1;
> > > >     google.protobuf.Struct message = 2;
> > > > }
> > > >
> > > > Below is greeter that received the data
> > > > tokenizer.py ( same like greeter.py saving state of id instead of
> counting
> > > > )
> > > >
> > > >
> > > > @app.route('/statefun', methods=['POST'])
> > > > def handle():
> > > >     my_row = MyRow()
> > > >     data = my_row.ParseFromString(request.data) // Is this the right
> way
> > > > to do it?
> > > >     response_data = handler(request.data)
> > > >     response = make_response(response_data)
> > > >     response.headers.set('Content-Type', 'application/octet-stream')
> > > >     return response
> > > >
> > > >
> > > > but, below is the error message. I am a newbie with proto and
> appreciate
> > > > any help
> > > >
> > > > 11:55:17,996 tokenizer ERROR Exception on /statefun [POST]
> > > > Traceback (most recent call last):
> > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> 2447,
> > > > in wsgi_app
> > > >     response = self.full_dispatch_request()
> > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> 1952,
> > > > in full_dispatch_request
> > > >     rv = self.handle_user_exception(e)
> > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> 1821,
> > > > in handle_user_exception
> > > >     reraise(exc_type, exc_value, tb)
> > > >   File "/usr/local/lib/python3.8/site-packages/flask/_compat.py",
> line 39,
> > > > in reraise
> > > >     raise value
> > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> 1950,
> > > > in full_dispatch_request
> > > >     rv = self.dispatch_request()
> > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> 1936,
> > > > in dispatch_request
> > > >     return self.view_functions[rule.endpoint](**req.view_args)
> > > >   File "/app/tokenizer.py", line 101, in handle
> > > >     response_data = handler(data)
> > > >   File
> "/usr/local/lib/python3.8/site-packages/statefun/request_reply.py",
> > > > line 38, in __call__
> > > >     request.ParseFromString(request_bytes)
> > > >   File
> > > > "/usr/local/lib/python3.8/site-packages/google/protobuf/message.py",
> line
> > > > 199, in ParseFromString
> > > >     return self.MergeFromString(serialized)
> > > >   File
> > > >
> "/usr/local/lib/python3.8/site-packages/google/protobuf/internal/python_message.py",
> > > > line 1131, in MergeFromString
> > > >     serialized = memoryview(serialized)
> > > > TypeError: memoryview: a bytes-like object is required, not 'int'
> > > >
> > > >
> > >
> >
>

Reply via email to