Thanks Igal, I dont have control over the data source inside kafka ( current kafka topic contains either json or avro formats only, i am trying to reproduce this scenario using my test data generator ).
is it possible to convert the json to proto at the receiving end of statefun applicaiton? On 2020/06/15 14:51:01, Igal Shilman <i...@ververica.com> wrote: > Hi, > > The values must be valid encoded Protobuf messages [1], while in your > attached code snippet you are sending utf-8 encoded JSON strings. > You can take a look at this example with a generator that produces Protobuf > messages [2][3] > > [1] https://developers.google.com/protocol-buffers/docs/pythontutorial > [2] > https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/generator/event-generator.py#L37 > [3] > https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/greeter/messages.proto#L25 > > On Mon, Jun 15, 2020 at 4:25 PM Sunil Sattiraju <sunilsattir...@gmail.com> > wrote: > > > Hi, Based on the example from > > https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greeter-example > > I am trying to ingest json data in kafka, but unable to achieve based on > > the examples. > > > > event-generator.py > > > > def produce(): > > request = {} > > request['id'] = "abc-123" > > request['field1'] = "field1-1" > > request['field2'] = "field2-2" > > request['field3'] = "field3-3" > > if len(sys.argv) == 2: > > delay_seconds = int(sys.argv[1]) > > else: > > delay_seconds = 1 > > producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER]) > > for request in random_requests_dict(): > > producer.send(topic='test-topic', > > value=json.dumps(request).encode('utf-8')) > > producer.flush() > > time.sleep(delay_seconds) > > > > Below is the proto definition of the json data ( i dont always know all > > the fields, but i know id fields definitely exists) > > message.proto > > > > message MyRow { > > string id = 1; > > google.protobuf.Struct message = 2; > > } > > > > Below is greeter that received the data > > tokenizer.py ( same like greeter.py saving state of id instead of counting > > ) > > > > > > @app.route('/statefun', methods=['POST']) > > def handle(): > > my_row = MyRow() > > data = my_row.ParseFromString(request.data) // Is this the right way > > to do it? > > response_data = handler(request.data) > > response = make_response(response_data) > > response.headers.set('Content-Type', 'application/octet-stream') > > return response > > > > > > but, below is the error message. I am a newbie with proto and appreciate > > any help > > > > 11:55:17,996 tokenizer ERROR Exception on /statefun [POST] > > Traceback (most recent call last): > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2447, > > in wsgi_app > > response = self.full_dispatch_request() > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1952, > > in full_dispatch_request > > rv = self.handle_user_exception(e) > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1821, > > in handle_user_exception > > reraise(exc_type, exc_value, tb) > > File "/usr/local/lib/python3.8/site-packages/flask/_compat.py", line 39, > > in reraise > > raise value > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1950, > > in full_dispatch_request > > rv = self.dispatch_request() > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1936, > > in dispatch_request > > return self.view_functions[rule.endpoint](**req.view_args) > > File "/app/tokenizer.py", line 101, in handle > > response_data = handler(data) > > File "/usr/local/lib/python3.8/site-packages/statefun/request_reply.py", > > line 38, in __call__ > > request.ParseFromString(request_bytes) > > File > > "/usr/local/lib/python3.8/site-packages/google/protobuf/message.py", line > > 199, in ParseFromString > > return self.MergeFromString(serialized) > > File > > "/usr/local/lib/python3.8/site-packages/google/protobuf/internal/python_message.py", > > line 1131, in MergeFromString > > serialized = memoryview(serialized) > > TypeError: memoryview: a bytes-like object is required, not 'int' > > > > >