Thanks Gordon. Really appreciate your detailed response and this definitely helps.
On 2020/06/17 04:45:11, "Tzu-Li (Gordon) Tai" <tzuli...@apache.org> wrote: > (forwarding this to user@ as it is more suited to be located there) > > Hi Sunil, > > With remote functions (using the Python SDK), messages sent to / from them > must be Protobuf messages. > This is a requirement since remote functions can be written in any > language, and we use Protobuf as a means for cross-language messaging. > If you are defining Kafka ingresses in a remote module (via textual YAML > module configs), then records in the Kafka ingress will be directly routed > to the remote functions, and therefore they are required to be Protobuf > messages as well. > > With embedded functions (using the current Java SDK), then what you are > trying to do is possible. > When using the Java SDK, the Kafka ingress allows providing a > `KafkaIngressDeserializer` [1], where you can convert the bytes in Kafka > into any type you intend for messaging within the StateFun application. So > there, you can convert your JSON records. > > If you want to still write your main application logic in Python, but the > input and output messages in Kafka are required to be JSON, > what you can currently do is have a mix of remote module [2] containing the > application logic as Python functions, > and a separate embedded module [3] containing the Java Kafka ingress and > egresses. > So, concretely, your 2 modules will contain: > > Remote module: > - Your Python functions implementing the main business logic. > > Embedded module: > - Java Kafka ingress with deserializer that converts JSON to Protobuf > messages. Here you have the freedom to extract only the fields that you > need. > - A Java router [4] that routes those converted messages to the remote > functions, by their logical address > - A Java Kafka egress with serializer that converts Protobuf messages from > remote functions into JSON Kafka records. > - A Java function that simply forwards input messages to the Kafka Kafka > egress. If the remote functions need to write JSON messages to Kafka, they > send a Protobuf message to this function. > > > Hope this helps. > Note that the egress side of things can definitely be easier (without the > extra forwarding through a Java function) if the Python SDK's > `kafka_egress_record` method allows supplying arbitrary bytes. > Then you would be able to already write to Kafka JSON messages in the > Python functions. > This however isn't supported yet, but technically it is quite easy to > achieve. I've just filed a issue for this [5], in case you'd like to follow > that. > > Cheers, > Gordon > > [1] > https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/io-module/apache-kafka.html#kafka-deserializer > [2] > https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/sdk/modules.html#remote-module > > [3] > https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/sdk/modules.html#embedded-module > [4] > https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/io-module/index.html#router > [5] https://issues.apache.org/jira/browse/FLINK-18340 > > On Wed, Jun 17, 2020 at 9:25 AM Sunil <sunilsattir...@gmail.com> wrote: > > > checking to see if this is possible currently. > > Read json data from kafka topic => process using statefun => write out to > > kafka in json format. > > > > I could have a separate process to read the source json data convert to > > protobuf into another kafka topic but it sounds in-efficient. > > e.g. > > Read json data from kafka topic =>convert json to protobuf => process > > using statefun => write out to kafka in protobuf format.=> convert protobuf > > to json message > > > > Appreciate any advice on how to process json messages using statefun , > > also if this is not possible in the current python sdk, can i do that using > > the java/scala sdk? > > > > Thanks. > > > > On 2020/06/15 15:34:39, Sunil Sattiraju <sunilsattir...@gmail.com> wrote: > > > Thanks Igal, > > > I dont have control over the data source inside kafka ( current kafka > > topic contains either json or avro formats only, i am trying to reproduce > > this scenario using my test data generator ). > > > > > > is it possible to convert the json to proto at the receiving end of > > statefun applicaiton? > > > > > > On 2020/06/15 14:51:01, Igal Shilman <i...@ververica.com> wrote: > > > > Hi, > > > > > > > > The values must be valid encoded Protobuf messages [1], while in your > > > > attached code snippet you are sending utf-8 encoded JSON strings. > > > > You can take a look at this example with a generator that produces > > Protobuf > > > > messages [2][3] > > > > > > > > [1] https://developers.google.com/protocol-buffers/docs/pythontutorial > > > > [2] > > > > > > https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/generator/event-generator.py#L37 > > > > [3] > > > > > > https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/greeter/messages.proto#L25 > > > > > > > > On Mon, Jun 15, 2020 at 4:25 PM Sunil Sattiraju < > > sunilsattir...@gmail.com> > > > > wrote: > > > > > > > > > Hi, Based on the example from > > > > > > > https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greeter-example > > > > > I am trying to ingest json data in kafka, but unable to achieve > > based on > > > > > the examples. > > > > > > > > > > event-generator.py > > > > > > > > > > def produce(): > > > > > request = {} > > > > > request['id'] = "abc-123" > > > > > request['field1'] = "field1-1" > > > > > request['field2'] = "field2-2" > > > > > request['field3'] = "field3-3" > > > > > if len(sys.argv) == 2: > > > > > delay_seconds = int(sys.argv[1]) > > > > > else: > > > > > delay_seconds = 1 > > > > > producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER]) > > > > > for request in random_requests_dict(): > > > > > producer.send(topic='test-topic', > > > > > value=json.dumps(request).encode('utf-8')) > > > > > producer.flush() > > > > > time.sleep(delay_seconds) > > > > > > > > > > Below is the proto definition of the json data ( i dont always know > > all > > > > > the fields, but i know id fields definitely exists) > > > > > message.proto > > > > > > > > > > message MyRow { > > > > > string id = 1; > > > > > google.protobuf.Struct message = 2; > > > > > } > > > > > > > > > > Below is greeter that received the data > > > > > tokenizer.py ( same like greeter.py saving state of id instead of > > counting > > > > > ) > > > > > > > > > > > > > > > @app.route('/statefun', methods=['POST']) > > > > > def handle(): > > > > > my_row = MyRow() > > > > > data = my_row.ParseFromString(request.data) // Is this the right > > way > > > > > to do it? > > > > > response_data = handler(request.data) > > > > > response = make_response(response_data) > > > > > response.headers.set('Content-Type', 'application/octet-stream') > > > > > return response > > > > > > > > > > > > > > > but, below is the error message. I am a newbie with proto and > > appreciate > > > > > any help > > > > > > > > > > 11:55:17,996 tokenizer ERROR Exception on /statefun [POST] > > > > > Traceback (most recent call last): > > > > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line > > 2447, > > > > > in wsgi_app > > > > > response = self.full_dispatch_request() > > > > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line > > 1952, > > > > > in full_dispatch_request > > > > > rv = self.handle_user_exception(e) > > > > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line > > 1821, > > > > > in handle_user_exception > > > > > reraise(exc_type, exc_value, tb) > > > > > File "/usr/local/lib/python3.8/site-packages/flask/_compat.py", > > line 39, > > > > > in reraise > > > > > raise value > > > > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line > > 1950, > > > > > in full_dispatch_request > > > > > rv = self.dispatch_request() > > > > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line > > 1936, > > > > > in dispatch_request > > > > > return self.view_functions[rule.endpoint](**req.view_args) > > > > > File "/app/tokenizer.py", line 101, in handle > > > > > response_data = handler(data) > > > > > File > > "/usr/local/lib/python3.8/site-packages/statefun/request_reply.py", > > > > > line 38, in __call__ > > > > > request.ParseFromString(request_bytes) > > > > > File > > > > > "/usr/local/lib/python3.8/site-packages/google/protobuf/message.py", > > line > > > > > 199, in ParseFromString > > > > > return self.MergeFromString(serialized) > > > > > File > > > > > > > "/usr/local/lib/python3.8/site-packages/google/protobuf/internal/python_message.py", > > > > > line 1131, in MergeFromString > > > > > serialized = memoryview(serialized) > > > > > TypeError: memoryview: a bytes-like object is required, not 'int' > > > > > > > > > > > > > > > > > > > >