Unable to serialize org.apache.kafka.common.config.types.Password
Hi team, I am passing a security enabled kafka consumer properties to FlinkKafkaConsumer but keep getting this error java.io.NotSerializableException? what is the best way to handle this? I use Flink 1.7.1 and here is the consumer property that produces the exception props.put(SaslConfigs.SASL_JAAS_CONFIG, new Password("LoginModule required subject=\"test\" secret=\"test\";")); stacktrace Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaConsumerBase is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1471) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1415) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1397) at org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob.main(KafkaEventsGeneratorJob.java:69) Caused by: java.io.NotSerializableException: org.apache.kafka.common.config.types.Password at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at java.util.Hashtable.writeObject(Hashtable.java:1157) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) ... 5 more
Re: Unable to serialize org.apache.kafka.common.config.types.Password
The exception is very clear that the SourceFunction should be serializable. Password is not serializable. You can try to set the kafka consumer properties such as this: props.put(SaslConfigs.SASL_JAAS_CONFIG, "LoginModule required subject=\"test\" secret=\"test\";"); The String value will be parsed to Password object.(refer to the method org.apache.kafka.common.config.ConfigDef.parseType) Regards, Dian > 在 2018年12月25日,下午11:04,tao xiao 写道: > > Hi team, > > I am passing a security enabled kafka consumer properties to > FlinkKafkaConsumer but keep getting this error > java.io.NotSerializableException? what is the best way to handle this? > > I use Flink 1.7.1 and here is the consumer property that produces the > exception > > props.put(SaslConfigs.SASL_JAAS_CONFIG, new Password("LoginModule required > subject=\"test\" secret=\"test\";")); > > stacktrace > Exception in thread "main" > org.apache.flink.api.common.InvalidProgramException: The implementation of > the FlinkKafkaConsumerBase is not serializable. The object probably contains > or references non serializable fields. > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1471) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1415) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1397) > at > org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob.main(KafkaEventsGeneratorJob.java:69) > Caused by: java.io.NotSerializableException: > org.apache.kafka.common.config.types.Password > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at java.util.Hashtable.writeObject(Hashtable.java:1157) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) > ... 5 more > smime.p7s Description: S/MIME cryptographic signature
Re: Unable to serialize org.apache.kafka.common.config.types.Password
Thanks, it works On Wed, 26 Dec 2018 at 10:07 fudian.fd wrote: > The exception is very clear that the SourceFunction should be > serializable. Password is not serializable. You can try to set the kafka > consumer properties such as this: > > props.put(SaslConfigs.SASL_JAAS_CONFIG, "LoginModule required > subject=\"test\" secret=\"test\";"); > > The String value will be parsed to Password object.(refer to the method > org.apache.kafka.common.config.ConfigDef.parseType) > > Regards, > Dian > > > 在 2018年12月25日,下午11:04,tao xiao 写道: > > Hi team, > > I am passing a security enabled kafka consumer properties to > FlinkKafkaConsumer but keep getting this > error java.io.NotSerializableException? what is the best way to handle this? > > I use Flink 1.7.1 and here is the consumer property that produces the > exception > > props.put(SaslConfigs.SASL_JAAS_CONFIG, new Password("LoginModule required > subject=\"test\" secret=\"test\";")); > > stacktrace > Exception in thread "main" > org.apache.flink.api.common.InvalidProgramException: The implementation of > the FlinkKafkaConsumerBase is not serializable. The object probably > contains or references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1471) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1415) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1397) > at > org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob.main(KafkaEventsGeneratorJob.java:69) > Caused by: java.io.NotSerializableException: > org.apache.kafka.common.config.types.Password > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at java.util.Hashtable.writeObject(Hashtable.java:1157) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534) > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) > ... 5 more > > >