[ https://issues.apache.org/jira/browse/FLINK-35542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jan Gurda updated FLINK-35542: ------------------------------ Affects Version/s: jdbc-3.2.0 > ClassNotFoundException when deserializing CheckpointedOffset > ------------------------------------------------------------ > > Key: FLINK-35542 > URL: https://issues.apache.org/jira/browse/FLINK-35542 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC > Affects Versions: jdbc-3.2.0, jdbc-3.1.2 > Environment: Flink 1.19.0 > Flink JDBC Connector 3.2-SNAPSHOT (commit > 2defbbcf4fc550a76dd9c664e1eed7d261e028ca) > JDK 11 (Temurin) > Reporter: Jan Gurda > Priority: Major > Labels: pull-request-available > Fix For: jdbc-3.3.0 > > > I use the latest flink-connector-jdbc code from the main branch, it's > actually 3.2-SNAPSHOT (commit 2defbbcf4fc550a76dd9c664e1eed7d261e028ca). > > When jobs get interrupted while reading data from the JDBC source (for > example, by the TaskManager outage), they cannot recover due to the following > exception: > {code:java} > java.lang.RuntimeException: java.lang.ClassNotFoundException: > org.apache.flink.connector.jdbc.source.split.CheckpointedOffset > at > org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:71) > at > org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:34) > at > org.apache.flink.connector.base.source.hybrid.HybridSourceSplit.unwrapSplit(HybridSourceSplit.java:122) > at > org.apache.flink.connector.base.source.hybrid.HybridSourceReader.addSplits(HybridSourceReader.java:158) > at > org.apache.flink.connector.base.source.hybrid.HybridSourceReader.setCurrentReader(HybridSourceReader.java:247) > at > org.apache.flink.connector.base.source.hybrid.HybridSourceReader.handleSourceEvents(HybridSourceReader.java:186) > at > org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:571) > at > org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:72) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:80) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$22(StreamTask.java:1540) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.connector.jdbc.source.split.CheckpointedOffset > at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown > Source) > at > java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown > Source) > at java.base/java.lang.ClassLoader.loadClass(Unknown Source) > at java.base/java.lang.Class.forName0(Native Method) > at java.base/java.lang.Class.forName(Unknown Source) > at java.base/java.io.ObjectInputStream.resolveClass(Unknown Source) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:92) > at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) > at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source) > at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) > at java.base/java.io.ObjectInputStream.readObject0(Unknown Source) > at java.base/java.io.ObjectInputStream.readObject(Unknown Source) > at java.base/java.io.ObjectInputStream.readObject(Unknown Source) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527) > at > org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserializeJdbcSourceSplit(JdbcSourceSplitSerializer.java:109) > at > org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:69) > ... 22 more {code} > > In our deployment, we embed the JDBC connector classes into the job JAR file. > It means that the class > org.apache.flink.connector.jdbc.source.split.CheckpointedOffset is visible > only for the _FlinkUserCodeClassLoader_ and not for the _AppClassLoader._ I > believe the problem is in the following code snippet, where we use the class > loader of the JDK's > _DataInputStream_ class: > {code:java} > public JdbcSourceSplit deserializeJdbcSourceSplit(DataInputStream in) > throws IOException, ClassNotFoundException { > // .... > // Some lines skipped > CheckpointedOffset chkOffset = > InstantiationUtil.deserializeObject(chkOffsetBytes, > in.getClass().getClassLoader()); > return new JdbcSourceSplit(id, sqlTemplate, params, offset, chkOffset); > } {code} > If I change it to the following: > {code:java} > public JdbcSourceSplit deserializeJdbcSourceSplit(DataInputStream in) > throws IOException, ClassNotFoundException { > // .... > // Some lines skipped > CheckpointedOffset chkOffset = > InstantiationUtil.deserializeObject(chkOffsetBytes, > CheckpointedOffset.class.getClassLoader()); > return new JdbcSourceSplit(id, sqlTemplate, params, offset, chkOffset); > } {code} > Everything works as expected. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)