[ 
https://issues.apache.org/jira/browse/FLINK-39150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Przybylski updated FLINK-39150:
-------------------------------------
    Description: 
Flink fails to start a job when Table API's join is used on a Table that 
references a custom user type, like a POJO or a custom {{TypeSerializer}} 
attached to some native Java type's TypeInformation.

Example:
{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Table table1 = tEnv.fromDataStream(
        env.fromData(Row.of(1))
                .returns(ROW_NAMED(new String[]{"id"}, INT))
);

// note that for this to fail TestClass must *not* be in org.apache.flink 
namespace
Table table2 = tEnv.fromDataStream(
        env.fromData(Row.of(1, new TestClass()))
                .returns(ROW_NAMED(
                        new String[]{"id2", "value"},
                        INT,
                        new PojoTypeInfo<>(TestClass.class, new ArrayList<>())))
);

tEnv.toDataStream(table1.leftOuterJoin(table2, $("id").isEqual($("id2"))))
        .sinkTo(new DiscardingSink<>());{code}
Error from logs:
{code:java}
ERROR 
org.apache.flink.runtime.scheduler.adaptivebatch.DefaultAdaptiveExecutionHandler
 [] - Failed to handle job event 
ExecutionJobVertexFinishedEvent{vertexId=cbc357ccb763df2852fee8c4fc7d55f2, 
resultInfos={52fee8c522470986cbc357ccdd2d92bc=org.apache.flink.runtime.scheduler.adaptivebatch.AllToAllBlockingResultInfo@30d3445e}}.
java.lang.RuntimeException: Failed to deserialize AdaptiveJoin instance. Please 
check whether the flink-table-planner-loader.jar is in the classpath.
    at 
org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory.lazyInitialize(AdaptiveJoinOperatorFactory.java:123)
 ~[classes/:?]
    at 
org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory.checkAndLazyInitialize(AdaptiveJoinOperatorFactory.java:90)
 ~[classes/:?]
    at 
org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory.getJoinType(AdaptiveJoinOperatorFactory.java:72)
 ~[classes/:?]
    at 
org.apache.flink.table.runtime.strategy.AdaptiveBroadcastJoinOptimizationStrategy.tryOptimizeAdaptiveJoin(AdaptiveBroadcastJoinOptimizationStrategy.java:105)
 ~[classes/:?]
    at 
org.apache.flink.table.runtime.strategy.BaseAdaptiveJoinOperatorOptimizationStrategy.visitDownstreamAdaptiveJoinNode(BaseAdaptiveJoinOperatorOptimizationStrategy.java:90)
 ~[classes/:?]
    at 
org.apache.flink.table.runtime.strategy.AdaptiveBroadcastJoinOptimizationStrategy.onOperatorsFinished(AdaptiveBroadcastJoinOptimizationStrategy.java:74)
 ~[classes/:?]
    at 
org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphOptimizer.onOperatorsFinished(StreamGraphOptimizer.java:72)
 ~[classes/:?]
    at 
org.apache.flink.runtime.scheduler.adaptivebatch.DefaultAdaptiveExecutionHandler.tryOptimizeStreamGraph(DefaultAdaptiveExecutionHandler.java:118)
 ~[classes/:?]{code}
This worked in Flink 1.20, but started failing in 2.0 when type information 
started being serialized as part of {{{}AdaptiveJoinOperatorFactory{}}}, which 
can be deserialized only with classes provided by {{SubmoduleClassLoader}} 
(FLINK-36634).

  was:
Flink fails to start a job when Table API's join is used on a Table that 
references a custom user type, like a POJO or a custom {{TypeSerializer}} 
attached to some native Java type's TypeInformation.

Example:
{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Table table1 = tEnv.fromDataStream(
        env.fromData(Row.of(1))
                .returns(ROW_NAMED(new String[]{"id"}, INT))
);

Table table2 = tEnv.fromDataStream(
        env.fromData(Row.of(1, new TestClass()))
                .returns(ROW_NAMED(
                        new String[]{"id2", "value"},
                        INT,
                        new PojoTypeInfo<>(FlinkNamespaceTestClass.class, new 
ArrayList<>())))
);

tEnv.toDataStream(table1.leftOuterJoin(table2, $("id").isEqual($("id2"))))
        .sinkTo(new DiscardingSink<>());{code}
Error from logs:
{code:java}
ERROR 
org.apache.flink.runtime.scheduler.adaptivebatch.DefaultAdaptiveExecutionHandler
 [] - Failed to handle job event 
ExecutionJobVertexFinishedEvent{vertexId=cbc357ccb763df2852fee8c4fc7d55f2, 
resultInfos={52fee8c522470986cbc357ccdd2d92bc=org.apache.flink.runtime.scheduler.adaptivebatch.AllToAllBlockingResultInfo@30d3445e}}.
java.lang.RuntimeException: Failed to deserialize AdaptiveJoin instance. Please 
check whether the flink-table-planner-loader.jar is in the classpath.
    at 
org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory.lazyInitialize(AdaptiveJoinOperatorFactory.java:123)
 ~[classes/:?]
    at 
org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory.checkAndLazyInitialize(AdaptiveJoinOperatorFactory.java:90)
 ~[classes/:?]
    at 
org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory.getJoinType(AdaptiveJoinOperatorFactory.java:72)
 ~[classes/:?]
    at 
org.apache.flink.table.runtime.strategy.AdaptiveBroadcastJoinOptimizationStrategy.tryOptimizeAdaptiveJoin(AdaptiveBroadcastJoinOptimizationStrategy.java:105)
 ~[classes/:?]
    at 
org.apache.flink.table.runtime.strategy.BaseAdaptiveJoinOperatorOptimizationStrategy.visitDownstreamAdaptiveJoinNode(BaseAdaptiveJoinOperatorOptimizationStrategy.java:90)
 ~[classes/:?]
    at 
org.apache.flink.table.runtime.strategy.AdaptiveBroadcastJoinOptimizationStrategy.onOperatorsFinished(AdaptiveBroadcastJoinOptimizationStrategy.java:74)
 ~[classes/:?]
    at 
org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphOptimizer.onOperatorsFinished(StreamGraphOptimizer.java:72)
 ~[classes/:?]
    at 
org.apache.flink.runtime.scheduler.adaptivebatch.DefaultAdaptiveExecutionHandler.tryOptimizeStreamGraph(DefaultAdaptiveExecutionHandler.java:118)
 ~[classes/:?]{code}
This worked in Flink 1.20, but started failing in 2.0 when type information 
started being serialized as part of {{{}AdaptiveJoinOperatorFactory{}}}, which 
can be deserialized only with classes provided by {{SubmoduleClassLoader}} 
(FLINK-36634).


> Join operator crashes jobs when using custom types or custom type serializers
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-39150
>                 URL: https://issues.apache.org/jira/browse/FLINK-39150
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 2.0.0, 2.2.0
>            Reporter: Piotr Przybylski
>            Priority: Major
>
> Flink fails to start a job when Table API's join is used on a Table that 
> references a custom user type, like a POJO or a custom {{TypeSerializer}} 
> attached to some native Java type's TypeInformation.
> Example:
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> Table table1 = tEnv.fromDataStream(
>         env.fromData(Row.of(1))
>                 .returns(ROW_NAMED(new String[]{"id"}, INT))
> );
> // note that for this to fail TestClass must *not* be in org.apache.flink 
> namespace
> Table table2 = tEnv.fromDataStream(
>         env.fromData(Row.of(1, new TestClass()))
>                 .returns(ROW_NAMED(
>                         new String[]{"id2", "value"},
>                         INT,
>                         new PojoTypeInfo<>(TestClass.class, new 
> ArrayList<>())))
> );
> tEnv.toDataStream(table1.leftOuterJoin(table2, $("id").isEqual($("id2"))))
>         .sinkTo(new DiscardingSink<>());{code}
> Error from logs:
> {code:java}
> ERROR 
> org.apache.flink.runtime.scheduler.adaptivebatch.DefaultAdaptiveExecutionHandler
>  [] - Failed to handle job event 
> ExecutionJobVertexFinishedEvent{vertexId=cbc357ccb763df2852fee8c4fc7d55f2, 
> resultInfos={52fee8c522470986cbc357ccdd2d92bc=org.apache.flink.runtime.scheduler.adaptivebatch.AllToAllBlockingResultInfo@30d3445e}}.
> java.lang.RuntimeException: Failed to deserialize AdaptiveJoin instance. 
> Please check whether the flink-table-planner-loader.jar is in the classpath.
>     at 
> org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory.lazyInitialize(AdaptiveJoinOperatorFactory.java:123)
>  ~[classes/:?]
>     at 
> org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory.checkAndLazyInitialize(AdaptiveJoinOperatorFactory.java:90)
>  ~[classes/:?]
>     at 
> org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory.getJoinType(AdaptiveJoinOperatorFactory.java:72)
>  ~[classes/:?]
>     at 
> org.apache.flink.table.runtime.strategy.AdaptiveBroadcastJoinOptimizationStrategy.tryOptimizeAdaptiveJoin(AdaptiveBroadcastJoinOptimizationStrategy.java:105)
>  ~[classes/:?]
>     at 
> org.apache.flink.table.runtime.strategy.BaseAdaptiveJoinOperatorOptimizationStrategy.visitDownstreamAdaptiveJoinNode(BaseAdaptiveJoinOperatorOptimizationStrategy.java:90)
>  ~[classes/:?]
>     at 
> org.apache.flink.table.runtime.strategy.AdaptiveBroadcastJoinOptimizationStrategy.onOperatorsFinished(AdaptiveBroadcastJoinOptimizationStrategy.java:74)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphOptimizer.onOperatorsFinished(StreamGraphOptimizer.java:72)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.scheduler.adaptivebatch.DefaultAdaptiveExecutionHandler.tryOptimizeStreamGraph(DefaultAdaptiveExecutionHandler.java:118)
>  ~[classes/:?]{code}
> This worked in Flink 1.20, but started failing in 2.0 when type information 
> started being serialized as part of {{{}AdaptiveJoinOperatorFactory{}}}, 
> which can be deserialized only with classes provided by 
> {{SubmoduleClassLoader}} (FLINK-36634).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to