Hi Joe,

ping Chesnay for you, please wait for the reply.

Thanks, vino.

Joe Malt <jm...@yelp.com> 于2018年8月15日周三 上午7:16写道:

> Hi,
>
> I'm trying to write to a Kafka stream in a Flink job using the new Python
> streaming API.
>
> My program looks like this:
>
> def main(factory):
>
>     props = Properties()
>     props.setProperty("bootstrap.servers",configs['kafkaBroker'])
>
>     consumer = FlinkKafkaConsumer010([configs['kafkaReadTopic']], 
> SimpleStringSchema(), props)
>     producer = FlinkKafkaProducer010(configs['kafkaWriteTopic'], 
> SimpleStringSchema(), props)
>
>     env = factory.get_execution_environment()
>
>     stream = env.add_java_source(consumer)
>
>     stream.output() # this works (prints to a .out file)
>     stream.add_sink(producer) # producing to this causes the exception
>
>     env.execute()
>
> I'm getting a ClassCastException when trying to output to the
> FlinkKafkaProducer:
>
> java.lang.ClassCastException: org.python.core.PyUnicode cannot be cast to 
> java.lang.String
>       at 
> org.apache.flink.api.common.serialization.SimpleStringSchema.serialize(SimpleStringSchema.java:36)
>       at 
> org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper.serializeValue(KeyedSerializationSchemaWrapper.java:46)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:355)
>       at 
> org.apache.flink.streaming.python.api.functions.PythonSinkFunction.invoke(PythonSinkFunction.java:48)
>       at 
> org.apache.flink.streaming.python.api.functions.PythonSinkFunction.invoke(PythonSinkFunction.java:37)
>       at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>
>
> It seems that the Python string isn't getting converted to a
> java.lang.String, which should happen automatically in Jython.
>
> I've tried adding a MapFunction that maps each input to String(input)where
> String is the constructor for java.lang.String. This made no difference;
> I get the same error.
>
> Any ideas?
>
> Thanks,
>
> Joe Malt
>
> Software Engineering Intern
> Yelp
>

Reply via email to