Re: KAFKA SINK ERROR IN Flink-(version 1.14.4)

2022-04-23 Thread Dian Fu
Hi Harshit,

Could you try to update the following line `ds = ds.map(lambda x:
','.join([str(value) for value in x]))` as following:
`ds = ds.map(lambda x: ','.join([str(value) for value in x]),
output_type=Types.STRING())`

The reason is that if the output type is not specified, it will be
serialized using Pickle and so it will be a byte array. This works well
for immediate transformations as the output of the upstream operations will
also be deserialized using Pickle. However, when the output needs to be
written to a sink, the output type of the upstream operation must be
specified. See [1] for more details.

Regards,
Dian

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/data_types/#passing-python-records-to-java-operations

On Sat, Apr 23, 2022 at 1:46 PM harshit.varsh...@iktara.ai <
harshit.varsh...@iktara.ai> wrote:

> Dear Team,
>
>
>
> I am new to pyflink and request for your support in issue I am facing with
> Pyflink. I am using Pyflink version 1.14.4 & using reference code from
> pyflink getting started pages.
>
>
>
> I am getting following error .
>
> py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute.
>
> : org.apache.flink.runtime.client.JobExecutionException: Job execution
> failed.
>
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
> by NoRestartBackoffTimeStrategy
>
> Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException:
> Caught exception while processing timer.
>
> Caused by:
> TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator}
>
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>
> Caused by: java.lang.ClassCastException: [B cannot be cast to
> java.lang.String
>
>
>
>
>
> Below is my code for reference..
>
>
>
>
>
> import os
>
>
>
> from pyflink.common import SimpleStringSchema
>
> from pyflink.datastream import StreamExecutionEnvironment
>
> from pyflink.datastream.connectors import FlinkKafkaConsumer,
> FlinkKafkaProducer
>
>
>
>
>
> from pyflink.common import Types
>
>
>
>
>
> def main():
>
> env = StreamExecutionEnvironment.get_execution_environment()
>
> # the sql connector for kafka is used here as it's a fat jar and could
> avoid dependency issues
>
> env.set_parallelism(1)
>
> kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
>
>  'flink-sql-connector-kafka_2.11-1.14.4.jar')
>
> env.add_jars("file:///{}".format(kafka_jar))
>
> deserialization_schema = SimpleStringSchema()
>
>
>
> # Test for kafka consumer
>
>
>
> kafka_consumer = FlinkKafkaConsumer(
>
> topics='test',
>
> deserialization_schema=deserialization_schema,
>
> properties={'bootstrap.servers': 'localhost:9093'})
>
>
>
>
>
> ds = env.add_source(kafka_consumer)
>
> #DATA USED IN KAFKA IS LIKE ('user1', 1, 2000)
>
> ds = ds.map(lambda x: eval(x))
>
> ds = ds.map(lambda x: ','.join([str(value) for value in x]))
>
>
>
> #ds.print()
>
>
>
>
>
> kafka_producer = FlinkKafkaProducer(
>
> topic='testresult',
>
> serialization_schema=SimpleStringSchema(),
>
> producer_config={'bootstrap.servers': 'localhost:9093', 'group.id':
> 'fraud_test'})
>
>
>
> ds.add_sink(kafka_producer)
>
> env.execute('main')
>
>
>
>
>
> if __name__ == '__main__':
>
> main()
>
>
>
> Thanks and Regards,
>
> Harshit
>
>
>
>
>
>
>


KAFKA SINK ERROR IN Flink-(version 1.14.4)

2022-04-22 Thread harshit.varsh...@iktara.ai
Dear Team,

 

I am new to pyflink and request for your support in issue I am facing with
Pyflink. I am using Pyflink version 1.14.4 & using reference code from
pyflink getting started pages. 

 

I am getting following error . 

py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute.

: org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy

Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException:
Caught exception while processing timer.

Caused by:
TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOp
eratorException: Could not forward element to next operator}

Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException
: Could not forward element to next operator

Caused by: java.lang.ClassCastException: [B cannot be cast to
java.lang.String

 

 

Below is my code for reference..

 

 

import os

 

from pyflink.common import SimpleStringSchema

from pyflink.datastream import StreamExecutionEnvironment

from pyflink.datastream.connectors import FlinkKafkaConsumer,
FlinkKafkaProducer

 

 

from pyflink.common import Types

 

 

def main():

env = StreamExecutionEnvironment.get_execution_environment()

# the sql connector for kafka is used here as it's a fat jar and could
avoid dependency issues

env.set_parallelism(1)

kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),

 'flink-sql-connector-kafka_2.11-1.14.4.jar')

env.add_jars("file:///{}".format(kafka_jar))

deserialization_schema = SimpleStringSchema()

 

# Test for kafka consumer

 

kafka_consumer = FlinkKafkaConsumer(

topics='test',

deserialization_schema=deserialization_schema,

properties={'bootstrap.servers': 'localhost:9093'})

 

 

ds = env.add_source(kafka_consumer)

#DATA USED IN KAFKA IS LIKE ('user1', 1, 2000)

ds = ds.map(lambda x: eval(x))

ds = ds.map(lambda x: ','.join([str(value) for value in x]))

 

#ds.print()

 

 

kafka_producer = FlinkKafkaProducer(

topic='testresult',

serialization_schema=SimpleStringSchema(),

producer_config={'bootstrap.servers': 'localhost:9093', 'group.id':
'fraud_test'})

 

ds.add_sink(kafka_producer)

env.execute('main')

 

 

if __name__ == '__main__':

main()

 

Thanks and Regards,

Harshit