Hi Harshit,

I should have already replied to you in an earlier thread[1] for the same
question. It seems that you have missed that. Please double check if that
reply is helpful for you.

Regards,
Dian

[1] https://lists.apache.org/thread/cm6r569spq67249dxw57q8lxh0mk3f7y


On Wed, Apr 27, 2022 at 6:57 PM harshit.varsh...@iktara.ai <
harshit.varsh...@iktara.ai> wrote:

> Dear Team,
>
>
>
> I am new to pyflink and request for your support in issue I am facing with
> Pyflink. I am using Pyflink version 1.14.4 & using reference code from
> pyflink github.
>
>
>
> I am getting following error .
>
> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
> that terminated with:
>
>                 status = StatusCode.CANCELLED
>
>                 details = "Multiplexer hanging up"
>
>                 debug_error_string =
> "{"created":"@1651051695.104000000","description":"Error received from peer
> ipv6:[::1]:64839","file":"src/core/lib/surface/call.cc","file_line":904,"grpc_message":"Multiplexer
> hanging up","grpc_status":1}"
>
> py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute.
>
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
> by NoRestartBackoffTimeStrategy
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>
> Caused by: java.lang.ClassCastException: [B cannot be cast to
> org.apache.flink.types.Row
>
>
>
> Below is my code for reference..
>
>
>
> import json
>
> import logging
>
> import os
>
> import sys
>
>
>
> from pyflink.common import Types, JsonRowSerializationSchema, Row,
> CsvRowSerializationSchema
>
> from pyflink.datastream import StreamExecutionEnvironment
>
> from pyflink.datastream.connectors import FlinkKafkaProducer
>
> import math
>
>
>
>
>
> def show(ds, env):
>
>     ds.print()
>
>     env.execute()
>
>
>
>
>
> def basic_operations():
>
>     env = StreamExecutionEnvironment.get_execution_environment()
>
>     env.set_parallelism(1)
>
>     kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
>
>                              'flink-sql-connector-kafka_2.11-1.14.4.jar')
>
>     env.add_jars("file:///{}".format(kafka_jar))
>
>
>
>     ds = env.from_collection(
>
>         collection=[
>
>             ('user1', 1, 2000), ('user2', 2, 4000), ('user3', 3, 1000),
> ('user1', 4, 25000), ('user2', 5, 7000),
>
>             ('user3', 8, 7000),
>
>             ('user1', 12, 2000), ('user2', 13, 4000), ('user3', 15, 1000),
> ('user1', 17, 20000), ('user2', 18, 40000),
>
>             ('user3', 20, 10000), ('user1', 21, 2000), ('user2', 22,
> 4000), ('user3', 33, 1000), ('user1', 34, 25000),
>
>             ('user2', 35, 7000), ('user3', 38, 7000)
>
>         ],
>
>         type_info=Types.ROW_NAMED(["id", "info", "value"],
> [Types.STRING(), Types.INT(), Types.INT()])
>
>     )
>
>     ds1 = ds.map(lambda x: x)
>
>     ds1.print()
>
>
>
>     def update_tel(data):
>
>         # parse the json
>
>         test_data = data.info
>
>         test_data += data.value
>
>         res = Row('x', 'y')
>
>         #return Types.ROW(data.id, test_data)
>
>         return res(data.id, test_data)
>
>
>
>     # show(ds.map(update_tel).key_by(lambda data: data[0]), env)
>
>     ds = ds.map(update_tel)
>
>     ds.print()
>
>     # ds = ds.map(lambda x: type(x))
>
>     # ds.print()
>
>     # ds = ds.map(lambda x: Row([x]),
> output_type=Types.ROW([Types.STRING(), Types.INT()]))
>
>     # ds.print()
>
>
>
>     type_info = Types.ROW_NAMED(['x', 'y'], [Types.STRING(), Types.INT()])
>
>     serialization_schema =
> CsvRowSerializationSchema.Builder(type_info).build()
>
>     kafka_producer = FlinkKafkaProducer(
>
>         topic='testing',
>
>         serialization_schema=serialization_schema,
>
>         producer_config={'bootstrap.servers': 'localhost:9093', 'group.id':
> 'test_group'}
>
>     )
>
>
>
>     ds.add_sink(kafka_producer)
>
>
>
>     env.execute('basic_operations')
>
>
>
> if __name__ == '__main__':
>
>     logging.basicConfig(stream=sys.stdout, level=logging.INFO,
> format="%(message)s")
>
>
>
>     basic_operations()
>
>
>
>
>

Reply via email to