Thanks Gordon.
Really appreciate your detailed response and this definitely helps.


On 2020/06/17 04:45:11, "Tzu-Li (Gordon) Tai" <tzuli...@apache.org> wrote: 
> (forwarding this to user@ as it is more suited to be located there)
> 
> Hi Sunil,
> 
> With remote functions (using the Python SDK), messages sent to / from them
> must be Protobuf messages.
> This is a requirement since remote functions can be written in any
> language, and we use Protobuf as a means for cross-language messaging.
> If you are defining Kafka ingresses in a remote module (via textual YAML
> module configs), then records in the Kafka ingress will be directly routed
> to the remote functions, and therefore they are required to be Protobuf
> messages as well.
> 
> With embedded functions (using the current Java SDK), then what you are
> trying to do is possible.
> When using the Java SDK, the Kafka ingress allows providing a
> `KafkaIngressDeserializer` [1], where you can convert the bytes in Kafka
> into any type you intend for messaging within the StateFun application. So
> there, you can convert your JSON records.
> 
> If you want to still write your main application logic in Python, but the
> input and output messages in Kafka are required to be JSON,
> what you can currently do is have a mix of remote module [2] containing the
> application logic as Python functions,
> and a separate embedded module [3] containing the Java Kafka ingress and
> egresses.
> So, concretely, your 2 modules will contain:
> 
> Remote module:
> - Your Python functions implementing the main business logic.
> 
> Embedded module:
> - Java Kafka ingress with deserializer that converts JSON to Protobuf
> messages. Here you have the freedom to extract only the fields that you
> need.
> - A Java router [4] that routes those converted messages to the remote
> functions, by their logical address
> - A Java Kafka egress with serializer that converts Protobuf messages from
> remote functions into JSON Kafka records.
> - A Java function that simply forwards input messages to the Kafka Kafka
> egress. If the remote functions need to write JSON messages to Kafka, they
> send a Protobuf message to this function.
> 
> 
> Hope this helps.
> Note that the egress side of things can definitely be easier (without the
> extra forwarding through a Java function) if the Python SDK's
> `kafka_egress_record` method allows supplying arbitrary bytes.
> Then you would be able to already write to Kafka JSON messages in the
> Python functions.
> This however isn't supported yet, but technically it is quite easy to
> achieve. I've just filed a issue for this [5], in case you'd like to follow
> that.
> 
> Cheers,
> Gordon
> 
> [1]
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/io-module/apache-kafka.html#kafka-deserializer
> [2]
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/sdk/modules.html#remote-module
> 
> [3]
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/sdk/modules.html#embedded-module
> [4]
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/io-module/index.html#router
> [5] https://issues.apache.org/jira/browse/FLINK-18340
> 
> On Wed, Jun 17, 2020 at 9:25 AM Sunil <sunilsattir...@gmail.com> wrote:
> 
> > checking to see if this is possible currently.
> > Read json data from kafka topic => process using statefun => write out to
> > kafka in json format.
> >
> > I could have a separate process to read the source json data convert to
> > protobuf into another kafka topic but it sounds in-efficient.
> > e.g.
> > Read json data from kafka topic =>convert json to protobuf =>  process
> > using statefun => write out to kafka in protobuf format.=> convert protobuf
> > to json message
> >
> > Appreciate any advice on how to process json messages using statefun ,
> > also if this is not possible in the current python sdk, can i do that using
> > the java/scala sdk?
> >
> > Thanks.
> >
> > On 2020/06/15 15:34:39, Sunil Sattiraju <sunilsattir...@gmail.com> wrote:
> > > Thanks Igal,
> > > I dont have control over the data source inside kafka ( current kafka
> > topic contains either json or avro formats only, i am trying to reproduce
> > this scenario using my test data generator ).
> > >
> > > is it possible to convert the json to proto at the receiving end of
> > statefun applicaiton?
> > >
> > > On 2020/06/15 14:51:01, Igal Shilman <i...@ververica.com> wrote:
> > > > Hi,
> > > >
> > > > The values must be valid encoded Protobuf messages [1], while in your
> > > > attached code snippet you are sending utf-8 encoded JSON strings.
> > > > You can take a look at this example with a generator that produces
> > Protobuf
> > > > messages [2][3]
> > > >
> > > > [1] https://developers.google.com/protocol-buffers/docs/pythontutorial
> > > > [2]
> > > >
> > https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/generator/event-generator.py#L37
> > > > [3]
> > > >
> > https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/greeter/messages.proto#L25
> > > >
> > > > On Mon, Jun 15, 2020 at 4:25 PM Sunil Sattiraju <
> > sunilsattir...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi, Based on the example from
> > > > >
> > https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greeter-example
> > > > > I am trying to ingest json data in kafka, but unable to achieve
> > based on
> > > > > the examples.
> > > > >
> > > > > event-generator.py
> > > > >
> > > > > def produce():
> > > > >     request = {}
> > > > >     request['id'] = "abc-123"
> > > > >     request['field1'] = "field1-1"
> > > > >     request['field2'] = "field2-2"
> > > > >     request['field3'] = "field3-3"
> > > > >     if len(sys.argv) == 2:
> > > > >         delay_seconds = int(sys.argv[1])
> > > > >     else:
> > > > >         delay_seconds = 1
> > > > >     producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])
> > > > >     for request in random_requests_dict():
> > > > >         producer.send(topic='test-topic',
> > > > >                       value=json.dumps(request).encode('utf-8'))
> > > > >         producer.flush()
> > > > >         time.sleep(delay_seconds)
> > > > >
> > > > > Below is the proto definition of the json data ( i dont always know
> > all
> > > > > the fields, but i know id fields definitely exists)
> > > > > message.proto
> > > > >
> > > > > message MyRow {
> > > > >     string id = 1;
> > > > >     google.protobuf.Struct message = 2;
> > > > > }
> > > > >
> > > > > Below is greeter that received the data
> > > > > tokenizer.py ( same like greeter.py saving state of id instead of
> > counting
> > > > > )
> > > > >
> > > > >
> > > > > @app.route('/statefun', methods=['POST'])
> > > > > def handle():
> > > > >     my_row = MyRow()
> > > > >     data = my_row.ParseFromString(request.data) // Is this the right
> > way
> > > > > to do it?
> > > > >     response_data = handler(request.data)
> > > > >     response = make_response(response_data)
> > > > >     response.headers.set('Content-Type', 'application/octet-stream')
> > > > >     return response
> > > > >
> > > > >
> > > > > but, below is the error message. I am a newbie with proto and
> > appreciate
> > > > > any help
> > > > >
> > > > > 11:55:17,996 tokenizer ERROR Exception on /statefun [POST]
> > > > > Traceback (most recent call last):
> > > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> > 2447,
> > > > > in wsgi_app
> > > > >     response = self.full_dispatch_request()
> > > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> > 1952,
> > > > > in full_dispatch_request
> > > > >     rv = self.handle_user_exception(e)
> > > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> > 1821,
> > > > > in handle_user_exception
> > > > >     reraise(exc_type, exc_value, tb)
> > > > >   File "/usr/local/lib/python3.8/site-packages/flask/_compat.py",
> > line 39,
> > > > > in reraise
> > > > >     raise value
> > > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> > 1950,
> > > > > in full_dispatch_request
> > > > >     rv = self.dispatch_request()
> > > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> > 1936,
> > > > > in dispatch_request
> > > > >     return self.view_functions[rule.endpoint](**req.view_args)
> > > > >   File "/app/tokenizer.py", line 101, in handle
> > > > >     response_data = handler(data)
> > > > >   File
> > "/usr/local/lib/python3.8/site-packages/statefun/request_reply.py",
> > > > > line 38, in __call__
> > > > >     request.ParseFromString(request_bytes)
> > > > >   File
> > > > > "/usr/local/lib/python3.8/site-packages/google/protobuf/message.py",
> > line
> > > > > 199, in ParseFromString
> > > > >     return self.MergeFromString(serialized)
> > > > >   File
> > > > >
> > "/usr/local/lib/python3.8/site-packages/google/protobuf/internal/python_message.py",
> > > > > line 1131, in MergeFromString
> > > > >     serialized = memoryview(serialized)
> > > > > TypeError: memoryview: a bytes-like object is required, not 'int'
> > > > >
> > > > >
> > > >
> > >
> >
> 

Reply via email to