Hi,

flink-connector-jdbc_2.11-1.11.1.jar 
有添加在flink/lib下,只能保证在作业执行的时候,可以找到对应的class,在客户端提交的时候,会编译作业,从报错看,是客户端编译作业的时候找不到对应的class。

可以试试这里的方法:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/dependency_management.html#java-dependency

> 在 2021年2月7日,上午10:50,陈康 <844256...@qq.com> 写道:
> 
> 请教大佬们:
> 在执行pyflink过程中,报错 Could not read the user code wrapper:
> org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat
> ..
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat
> 
> flink-connector-jdbc_2.11-1.11.1.jar 有添加在flink/lib下
> 
> 有了解的大佬嘛、谢谢~
> 
> =============
> 
> [hadoop@hadoop01 lib]$ pwd
> /opt/module/flink-1.11.1/lib
> [hadoop@hadoop01 lib]$ ll
> total 228100
> *-rw-r--r-- 1 hadoop hadoop    196528 2021/02/07 10:26:21
> flink-connector-jdbc_2.11-1.11.1.jar*
> -rw-r--r-- 1 hadoop hadoop     90782 2020/07/15 17:24:31
> flink-csv-1.11.1.jar
> -rw-r--r-- 1 hadoop hadoop 108350618 2020/07/15 17:30:13
> flink-dist_2.11-1.11.1.jar
> -rw-r--r-- 1 hadoop hadoop     94865 2020/07/15 17:24:11
> flink-json-1.11.1.jar
> -rw-r--r-- 1 hadoop hadoop  41507566 2021/02/05 17:05:01
> flink-shaded-hadoop-2-uber-2.7.5-9.0.jar
> -rw-r--r-- 1 hadoop hadoop   7712156 2020/06/18 10:42:32
> flink-shaded-zookeeper-3.4.14.jar
> -rw-r--r-- 1 hadoop hadoop  33327194 2020/07/15 17:28:46
> flink-table_2.11-1.11.1.jar
> -rw-r--r-- 1 hadoop hadoop  37331759 2020/07/15 17:28:54
> flink-table-blink_2.11-1.11.1.jar
> -rw-r--r-- 1 hadoop hadoop   1889063 2021/01/13 17:03:03
> kafka-clients-2.1.0.jar
> -rw-r--r-- 1 hadoop hadoop     67114 2020/04/20 20:47:31
> log4j-1.2-api-2.12.1.jar
> -rw-r--r-- 1 hadoop hadoop    276771 2020/04/20 20:47:31
> log4j-api-2.12.1.jar
> -rw-r--r-- 1 hadoop hadoop   1674433 2020/04/20 20:47:32
> log4j-core-2.12.1.jar
> -rw-r--r-- 1 hadoop hadoop     23518 2020/04/20 20:47:31
> log4j-slf4j-impl-2.12.1.jar
> -rw-r--r-- 1 hadoop hadoop   1007502 2020/12/25 10:51:20
> mysql-connector-java-5.1.47.jar
> 
> 
> 
> --------------
> 
> *[hadoop@hadoop01 pyflink]$ /opt/module/flink-1.11.1/bin/flink run -py
> NtPyFlink.py*
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/opt/module/flink-1.11.1/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/opt/module/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/opt/module/hbase/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type
> [org.apache.logging.slf4j.Log4jLoggerFactory]
> 
> Traceback (most recent call last):
>  File "NtPyFlink.py", line 300, in <module>
>    t_env.execute('重连参数预测')
>  File
> "/opt/module/flink-1.11.1/opt/python/pyflink.zip/pyflink/table/table_environment.py",
> line 1057, in execute
>  File
> "/opt/module/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> line 1286, in __call__
>  File
> "/opt/module/flink-1.11.1/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line 147, in deco
>  File
> "/opt/module/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
> line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o6.execute.
> : org.apache.flink.util.FlinkException: Failed to execute job '重连参数预测'.
>       at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1823)
>       at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
>       at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
>       at
> org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
>       at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>       at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>       at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>       at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>       at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>       at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to
> submit JobGraph.
>       at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:366)
>       at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>       at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>       at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>       at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:292)
>       at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>       at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>       at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
>       at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
>       at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>       at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       ... 1 more
> Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal
> server error., <Exception on server side:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> job.
>       at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:344)
>       at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>       at
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>       at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>       at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>       at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
> instantiate JobManager.
>       at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
>       at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>       ... 6 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
> initialize task 'Source: TableSourceScan(table=[[default_catalog,
> default_database, source, project=[colA, colB, colC, colD, colE, colF, colG,
> colH]]], fields=[colA, colB, colC, colD, colE, colF, colG, colH]) ->
> StreamExecPythonCalc -> Sink:
> Sink(table=[default_catalog.default_database.print_table], fields=[colA,
> colB, colC, colD, colE, colF, colG, colH, p])': Loading the input/output
> formats failed: 
>       at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>       at
> org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
>       at
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
>       at
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229)
>       at
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119)
>       at
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
>       at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272)
>       at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
>       at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
>       at
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140)
>       at
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
>       at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
>       ... 7 more
> Caused by: java.lang.Exception: Loading the input/output formats failed: 
>       at
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:155)
>       at
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:59)
>       at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:212)
>       ... 19 more
> Caused by: java.lang.RuntimeException: Deserializing the input/output
> formats failed: Could not read the user code wrapper:
> org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat
>       at
> org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68)
>       at
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:152)
>       ... 21 more
> Caused by:
> org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could
> not read the user code wrapper:
> org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat
>       at
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
>       at
> org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66)
>       ... 22 more
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat
>       at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>       at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>       at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
>       at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>       at java.lang.Class.forName0(Native Method)
>       at java.lang.Class.forName(Class.java:348)
>       at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>       at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826)
>       at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
>       at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>       at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>       at java.util.HashMap.readObject(HashMap.java:1404)
>       at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>       at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
>       at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>       at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>       at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>       at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>       at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>       at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>       at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>       at
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>       ... 23 more
> 
> End of exception on server side>]
>       at
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
>       at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
>       at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>       at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>       ... 4 more
> 
> org.apache.flink.client.program.ProgramAbortException
>       at 
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>       at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>       at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>       at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>       at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>       at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>       at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>       at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

回复