Hi Harshit, I should have already replied to you in an earlier thread[1] for the same question. It seems that you have missed that. Please double check if that reply is helpful for you.
Regards, Dian [1] https://lists.apache.org/thread/cm6r569spq67249dxw57q8lxh0mk3f7y On Wed, Apr 27, 2022 at 6:57 PM harshit.varsh...@iktara.ai < harshit.varsh...@iktara.ai> wrote: > Dear Team, > > > > I am new to pyflink and request for your support in issue I am facing with > Pyflink. I am using Pyflink version 1.14.4 & using reference code from > pyflink github. > > > > I am getting following error . > > grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC > that terminated with: > > status = StatusCode.CANCELLED > > details = "Multiplexer hanging up" > > debug_error_string = > "{"created":"@1651051695.104000000","description":"Error received from peer > ipv6:[::1]:64839","file":"src/core/lib/surface/call.cc","file_line":904,"grpc_message":"Multiplexer > hanging up","grpc_status":1}" > > py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute. > > Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed > by NoRestartBackoffTimeStrategy > > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > > Caused by: java.lang.ClassCastException: [B cannot be cast to > org.apache.flink.types.Row > > > > Below is my code for reference.. > > > > import json > > import logging > > import os > > import sys > > > > from pyflink.common import Types, JsonRowSerializationSchema, Row, > CsvRowSerializationSchema > > from pyflink.datastream import StreamExecutionEnvironment > > from pyflink.datastream.connectors import FlinkKafkaProducer > > import math > > > > > > def show(ds, env): > > ds.print() > > env.execute() > > > > > > def basic_operations(): > > env = StreamExecutionEnvironment.get_execution_environment() > > env.set_parallelism(1) > > kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), > > 'flink-sql-connector-kafka_2.11-1.14.4.jar') > > env.add_jars("file:///{}".format(kafka_jar)) > > > > ds = env.from_collection( > > collection=[ > > ('user1', 1, 2000), ('user2', 2, 4000), ('user3', 3, 1000), > ('user1', 4, 25000), ('user2', 5, 7000), > > ('user3', 8, 7000), > > ('user1', 12, 2000), ('user2', 13, 4000), ('user3', 15, 1000), > ('user1', 17, 20000), ('user2', 18, 40000), > > ('user3', 20, 10000), ('user1', 21, 2000), ('user2', 22, > 4000), ('user3', 33, 1000), ('user1', 34, 25000), > > ('user2', 35, 7000), ('user3', 38, 7000) > > ], > > type_info=Types.ROW_NAMED(["id", "info", "value"], > [Types.STRING(), Types.INT(), Types.INT()]) > > ) > > ds1 = ds.map(lambda x: x) > > ds1.print() > > > > def update_tel(data): > > # parse the json > > test_data = data.info > > test_data += data.value > > res = Row('x', 'y') > > #return Types.ROW(data.id, test_data) > > return res(data.id, test_data) > > > > # show(ds.map(update_tel).key_by(lambda data: data[0]), env) > > ds = ds.map(update_tel) > > ds.print() > > # ds = ds.map(lambda x: type(x)) > > # ds.print() > > # ds = ds.map(lambda x: Row([x]), > output_type=Types.ROW([Types.STRING(), Types.INT()])) > > # ds.print() > > > > type_info = Types.ROW_NAMED(['x', 'y'], [Types.STRING(), Types.INT()]) > > serialization_schema = > CsvRowSerializationSchema.Builder(type_info).build() > > kafka_producer = FlinkKafkaProducer( > > topic='testing', > > serialization_schema=serialization_schema, > > producer_config={'bootstrap.servers': 'localhost:9093', 'group.id': > 'test_group'} > > ) > > > > ds.add_sink(kafka_producer) > > > > env.execute('basic_operations') > > > > if __name__ == '__main__': > > logging.basicConfig(stream=sys.stdout, level=logging.INFO, > format="%(message)s") > > > > basic_operations() > > > > >