[ 
https://issues.apache.org/jira/browse/FLINK-35542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jan Gurda updated FLINK-35542:
------------------------------
    Affects Version/s: jdbc-3.2.0

> ClassNotFoundException when deserializing CheckpointedOffset
> ------------------------------------------------------------
>
>                 Key: FLINK-35542
>                 URL: https://issues.apache.org/jira/browse/FLINK-35542
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>    Affects Versions: jdbc-3.2.0, jdbc-3.1.2
>         Environment: Flink 1.19.0
> Flink JDBC Connector 3.2-SNAPSHOT (commit 
> 2defbbcf4fc550a76dd9c664e1eed7d261e028ca)
> JDK 11 (Temurin)
>            Reporter: Jan Gurda
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: jdbc-3.3.0
>
>
> I use the latest flink-connector-jdbc code from the main branch, it's 
> actually 3.2-SNAPSHOT (commit 2defbbcf4fc550a76dd9c664e1eed7d261e028ca).
>  
> When jobs get interrupted while reading data from the JDBC source (for 
> example, by the TaskManager outage), they cannot recover due to the following 
> exception:
> {code:java}
> java.lang.RuntimeException: java.lang.ClassNotFoundException: 
> org.apache.flink.connector.jdbc.source.split.CheckpointedOffset
>     at 
> org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:71)
>     at 
> org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:34)
>     at 
> org.apache.flink.connector.base.source.hybrid.HybridSourceSplit.unwrapSplit(HybridSourceSplit.java:122)
>     at 
> org.apache.flink.connector.base.source.hybrid.HybridSourceReader.addSplits(HybridSourceReader.java:158)
>     at 
> org.apache.flink.connector.base.source.hybrid.HybridSourceReader.setCurrentReader(HybridSourceReader.java:247)
>     at 
> org.apache.flink.connector.base.source.hybrid.HybridSourceReader.handleSourceEvents(HybridSourceReader.java:186)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:571)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:72)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:80)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$22(StreamTask.java:1540)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>     at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.connector.jdbc.source.split.CheckpointedOffset
>     at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown 
> Source)
>     at 
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown 
> Source)
>     at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>     at java.base/java.lang.Class.forName0(Native Method)
>     at java.base/java.lang.Class.forName(Unknown Source)
>     at java.base/java.io.ObjectInputStream.resolveClass(Unknown Source)
>     at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:92)
>     at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
>     at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539)
>     at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527)
>     at 
> org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserializeJdbcSourceSplit(JdbcSourceSplitSerializer.java:109)
>     at 
> org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:69)
>     ... 22 more {code}
>  
> In our deployment, we embed the JDBC connector classes into the job JAR file. 
> It means that the class 
> org.apache.flink.connector.jdbc.source.split.CheckpointedOffset is visible 
> only for the _FlinkUserCodeClassLoader_ and not for the _AppClassLoader._ I 
> believe the problem is in the following code snippet, where we use the class 
> loader of the JDK's 
> _DataInputStream_ class:
> {code:java}
> public JdbcSourceSplit deserializeJdbcSourceSplit(DataInputStream in)
>         throws IOException, ClassNotFoundException {
>     // ....
>     // Some lines skipped 
>     CheckpointedOffset chkOffset =
>             InstantiationUtil.deserializeObject(chkOffsetBytes, 
> in.getClass().getClassLoader());
>     return new JdbcSourceSplit(id, sqlTemplate, params, offset, chkOffset);
> } {code}
> If I change it to the following:
> {code:java}
> public JdbcSourceSplit deserializeJdbcSourceSplit(DataInputStream in)
>         throws IOException, ClassNotFoundException {
>     // .... 
>     // Some lines skipped
>     CheckpointedOffset chkOffset =
>             InstantiationUtil.deserializeObject(chkOffsetBytes, 
> CheckpointedOffset.class.getClassLoader());
>     return new JdbcSourceSplit(id, sqlTemplate, params, offset, chkOffset);
> } {code}
> Everything works as expected.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to