I tried ExecutionConfig#disableGenericTypes and I get this error when I try to 
execute my Flink job with StreamExecutionEnvironment#execute (attached is the 
full stacktrace):

java.lang.UnsupportedOperationException: Generic types have been disabled in 
the ExecutionConfig and type java.util.List is treated as a generic type.

I suppose then there is nothing I can do at this time other than wait for the 
ticket I filed [1] to be resolved. I am assuming if generic types are disabled, 
then UnsupportedOperationException with be thrown for all generic types, 
regardless of whether the generic type is registered with Kryo or not.

[1] https://issues.apache.org/jira/browse/FLINK-25993

Best regards,
Shane

________________________________
From: Chesnay Schepler <ches...@apache.org>
Sent: February 7, 2022 3:08 AM
To: Shane Bishop <shane.bis...@outlook.com>; user@flink.apache.org 
<user@flink.apache.org>
Subject: Re: Questions about Kryo setRegistrationRequired(false)

There isn't any setting to control setRegistrationRequired().

You can however turn Kryo off via ExecutionConfig#disableGenericTypes, although 
this may require changes to your data types.

I'd recommend to file a ticket.

On 04/02/2022 20:12, Shane Bishop wrote:
Hi all,

TL;DR: I am concerned that kryo.setRegistrationRequired(false) in Apache Flink 
might introduce serialization/deserialization vulnerabilities, and I want to 
better understand the security implications of its use in Flink.

There is an issue on the Kryo GitHub repo 
(link<https://github.com/EsotericSoftware/kryo/issues/398>) regarding type 
registration. The "fix" the Kryo developers made was to make 
setRegistrationRequired(true) the default (comment on GitHub 
issue<https://github.com/EsotericSoftware/kryo/issues/398#issuecomment-371153541>,
 commit with this 
fix<https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00>
 and the line in the commit that is the 
fix<https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00#diff-6d4638ca49aa0d0d9171ff04a0faa22e241f8320fda4a8a12c95853188d055a0R130>).

This is not a true fix, as the default can still be overridden. This only sets 
a safe default.

In Flink, the default of true is overridden in the 1.14.3 Flink release (see 
KryoSerializer.java<https://github.com/apache/flink/blob/release-1.14.3/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L492>
 and 
FlinkScalaKryoInstantiator.scala<https://github.com/apache/flink/blob/release-1.14.3/flink-scala/src/main/scala/org/apache/flink/runtime/types/FlinkScalaKryoInstantiator.scala#L46>).

I am no Flink contributor, so I might be missing safety mechanisms that are in 
place to prevent the Kryo serialization/deserialization vulnerability even when 
registration required is set to false. Are there any such safety mechanisms in 
place?

Is there anything I can do as a user of Flink to protect myself against this 
Kryo vulnerability?

Best regards,
Shane Bishop

org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute 
application.
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$4(JarRunOverrideHandler.java:247)
at 
java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
at 
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not execute application.
at 
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
at 
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
... 6 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute 
application.
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88)
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3(JarRunOverrideHandler.java:238)
at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
... 6 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: Generic types have been disabled in the ExecutionConfig 
and type java.util.List is treated as a generic type.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
... 9 more
Caused by: java.lang.UnsupportedOperationException: Generic types have been 
disabled in the ExecutionConfig and type java.util.List is treated as a generic 
type.
at 
org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
at 
org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:346)
at 
org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:338)
at 
org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:346)
at 
org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:338)
at 
org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:947)
at 
org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:379)
at 
org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:351)
at 
org.apache.flink.streaming.runtime.translators.AbstractOneInputTransformationTranslator.translateInternal(AbstractOneInputTransformationTranslator.java:63)
at 
org.apache.flink.streaming.runtime.translators.OneInputTransformationTranslator.translateForStreamingInternal(OneInputTransformationTranslator.java:65)
at 
org.apache.flink.streaming.runtime.translators.OneInputTransformationTranslator.translateForStreamingInternal(OneInputTransformationTranslator.java:37)
at 
org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:721)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:453)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:303)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2010)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1995)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)
at com.trendmicro.c1ws.tdax.App.main(App.java:69)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 12 more

Reply via email to