Hi Dian,

we had this discussion in the past. Yes, it might help in certain cases. But on the other hand also helps in finding version mismatches when people misconfigured there dependencies. Different JVM versions should not result incompatible classes as the default serialVersionUID is standadized, no?

Regards,
Timo

On 27.07.20 10:53, Dian Fu wrote:
@Wojtek Just find that it has not defined the serialVersionUID in org.apache.flink.table.types.logical.RowType$RowField and so you have to make sure that the JDK version is the same between the client side and the server side. Could you check that?

@Timo I think we should define the serialVersionUID for all the classes which implements Serializable. What do you think?

Regards,
Dian

在 2020年7月27日,下午4:38,Timo Walther <twal...@apache.org <mailto:twal...@apache.org>> 写道:

Hi,

the InvalidClassException indicates that you are using different versions of the same class. Are you sure you are using the same Flink minor version (including the Scala suffix) for all dependencies and Kubernetes?

Regards,
Timo


On 27.07.20 09:51, Wojciech Korczyński wrote:
Hi,
when I try it locally it runs well. The problem is when I run it using Kubernetes. I don't know how to make Flink and Kubernetes go well together in that case.
Best, Wojtek
pt., 24 lip 2020 o 17:51 Xingbo Huang <hxbks...@gmail.com <mailto:hxbks...@gmail.com><mailto:hxbks...@gmail.com>> napisał(a):
   Hi Wojciech,
   In many cases, you can make sure that your code can run correctly in
   local mode, and then submit the job to the cluster for testing. For
   how to add jar packages in local mode, you can refer to the doc[1].
   Besides, you'd better use blink planner which is the default
   planner. For how to use blink planner, you can refer to the doc[2]
   [1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/common_questions.html#adding-jar-files
   [2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#create-a-tableenvironment
   Best,
   Xingbo
   Wojciech Korczyński <wojciech.korczyn...@alphamoon.ai <mailto:wojciech.korczyn...@alphamoon.ai>
   <mailto:wojciech.korczyn...@alphamoon.ai>> 于2020年7月24日周五 下午
   9:40写道:
       Hi,
       I've done like you recommended:
       from pyflink.datastreamimport StreamExecutionEnvironment
       from pyflink.datasetimport ExecutionEnvironment
       from pyflink.tableimport TableConfig, DataTypes, BatchTableEnvironment, StreamTableEnvironment, ScalarFunction        from pyflink.table.descriptorsimport Schema, OldCsv, FileSystem, Kafka, Json, Csv
       from pyflink.table.udfimport udf
       exec_env = StreamExecutionEnvironment.get_execution_environment()
       t_config = TableConfig()
       t_env = StreamTableEnvironment.create(exec_env, t_config)
       INPUT_TABLE ="my_topic"
       INPUT_TOPIC ="my-topic"
       LOCAL_KAFKA ='my-cluster-kafka-bootstrap:9092'
       OUTPUT_TABLE ="my_topic_output"
       OUTPUT_TOPIC ="my-topic-output"
       ddl_source =f"""
       CREATE TABLE {INPUT_TABLE}(
       message STRING
       ) WITH (
       'connector' = 'kafka',
       'topic' = '{INPUT_TOPIC}',
       'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
       'format' = 'json'
       )
       """
       ddl_sink =f"""
       CREATE TABLE {OUTPUT_TABLE}(
       message STRING
       ) WITH (
       'connector' = 'kafka',
       'topic' = '{OUTPUT_TOPIC}',
       'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
       'format' = 'json'
       )
       """
       t_env.execute_sql(ddl_source)
       t_env.execute_sql(ddl_sink)
       result = t_env.execute_sql(f"""
       INSERT INTO {OUTPUT_TABLE}
       SELECT message
       FROM {INPUT_TABLE}
       """)
       result.get_job_client().get_job_execution_result().result()
       I think it is correctly written.
       However after deploying that job I'm getting an error:
       wojtek@wojtek-B365M-D3H:~/PycharmProjects/k8s_kafka_demo$ /home/wojtek/flink/build-target/bin/flink run -m $(minikube ip):30081 -py kafka2flink.py -j jar/flink-sql-connector-kafka_2.11-1.11.0.jar
       WARNING: An illegal reflective access operation has occurred
       WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/lib/flink-dist_2.11-1.11-SNAPSHOT.jar) to field java.util.Properties.serialVersionUID        WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner        WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations        WARNING: All illegal access operations will be denied in a future release
       Job has been submitted with JobID d23fe59415e9c9d79d15a1cf7e5409aa
       Traceback (most recent call last):
          File "kafka2flink.py", line 62, in <module>
            result.get_job_client().get_job_execution_result().result()
          File "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/common/completable_future.py", line 78, in result           File "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__           File "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco           File "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value        py4j.protocol.Py4JJavaError: An error occurred while calling o52.get.        : java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: d23fe59415e9c9d79d15a1cf7e5409aa) at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
       Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: d23fe59415e9c9d79d15a1cf7e5409aa) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:116) at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
       Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:114)
... 18 more
       Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
at sun.reflect.GeneratedMethodAccessor78.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)        Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
       *Caused by: java.io.InvalidClassException:
       org.apache.flink.table.types.logical.RowType$RowField; local
       class incompatible: stream classdesc serialVersionUID =
       3988094341871744603, local class serialVersionUID =
       -7902169369767750595
       = -7902169369767750595*
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1964)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1830)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2121)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:483)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:441)
at java.util.ArrayList.readObject(ArrayList.java:799)
at sun.reflect.GeneratedMethodAccessor49.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2257)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:613) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.readObject(FlinkKafkaProducer.java:1290)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2257)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:483)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:441)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:276)
... 8 more
       org.apache.flink.client.program.ProgramAbortException
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
       I presume that something is incorrect between the
       FlinkversioncauseI want to deploy that job on Kubernetes. To
       create a clusteron K8S I used image *flink:1.11.0-scala_2.11. *Local version of Flink is *release-1.11*.
       What can cause that problem?
       Thanks,
       Wojtek
       pt., 24 lip 2020 o 11:32 Xingbo Huang <hxbks...@gmail.com <mailto:hxbks...@gmail.com>
       <mailto:hxbks...@gmail.com>> napisał(a):
           Hi Wojtek,
           The following ways of using Pyflink is my personal
           recommendation:
           1. Use DDL[1] to create your source and sink instead of the
           descriptor way, because as of flink 1.11, there are some
           bugs in the descriptor way.
           2. Use `execute_sql` for single statement, use
           `create_statement_set` for multiple DML statements.[2]
           3. Use `execute_insert` for single sink, use
           `TableTableEnvironment#create_statement_set` for multiple sinks
           4. Use `from_path` method instead of `scan` method
           5. Call the method
           `get_job_client().get_job_execution_result().result()` of
           TableResult  which is the returned type of execute_insert or
           execute_sql after calling the method `excute_*`
           All PyFlink related common questions you can refer to the doc[3]
           [1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/
           [2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#translate-and-execute-a-query
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html
           [3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/common_questions.html
           Best,
           Xingbo
           Wojciech Korczyński <wojciech.korczyn...@alphamoon.ai <mailto:wojciech.korczyn...@alphamoon.ai>
           <mailto:wojciech.korczyn...@alphamoon.ai>> 于2020年7月24日周
           五 下午4:44写道:
               Hi,
               thank you for your answer, it is very helpful.
               Currently my python program looks like:
               from pyflink.datastreamimport StreamExecutionEnvironment
               from pyflink.datasetimport ExecutionEnvironment
               from pyflink.tableimport TableConfig, DataTypes, BatchTableEnvironment, StreamTableEnvironment                from pyflink.table.descriptorsimport Schema, OldCsv, FileSystem, Kafka, Json, Csv                exec_env = StreamExecutionEnvironment.get_execution_environment()
               t_config = TableConfig()
               t_env = StreamTableEnvironment.create(exec_env, t_config)
               t_env.connect(Kafka()
                              .version("universal")
                              .topic("my-topic")
                              .property("bootstrap.servers", 'my-cluster-kafka-bootstrap:9092')
                              ) \
                    .in_append_mode() \
                    .with_format(Csv()
                                 .line_delimiter("\r\n") \
                                 .derive_schema()) \
                    .with_schema(Schema()
                                 .field("value", DataTypes.STRING())) \
                    .create_temporary_table('mySource')
               t_env.connect(Kafka()
                              .version("universal")
                              .topic("my-topic-out")
                              .property("bootstrap.servers", 'my-cluster-kafka-bootstrap:9092')
                              ) \
                    .with_format(Csv()
                             .line_delimiter("\r\n") \
                             .derive_schema()) \
                    .with_schema(Schema()
                                 .field("value", DataTypes.STRING())) \
                    .in_append_mode() \
                    .create_temporary_table('mySink')
               t_env.scan('mySource') \
                    .select('"flink_job_" + value') \
                    .insert_into('mySink')
               t_env.execute("tutorial_job")
               I have installed PyFlink 1.11 so the IDE is pointing me
               out the commandsconnect, scan, insert_into, *execute
               *are deprectade. What is the correct way the program
               should be different following 1.11 version of PyFlink?
               Kind regards,
               Wojtek
               pt., 24 lip 2020 o 04:21 Xingbo Huang
               <hxbks...@gmail.com <mailto:hxbks...@gmail.com><mailto:hxbks...@gmail.com>> napisał(a):
                   Hi Wojtek,
                   In flink 1.11, the methods register_table_source and
                   register_table_sink of ConnectTableDescriptor have
                   been removed. You need to use createTemporaryTable
                   instead of these two methods.Besides, it seems that
                   the version of your pyflink is 1.10, but the
                   corresponding flink is 1.11.
                   Best,
                   Xingbo
                   Wojciech Korczyński
                   <wojciech.korczyn...@alphamoon.ai <mailto:wojciech.korczyn...@alphamoon.ai>
                   <mailto:wojciech.korczyn...@alphamoon.ai>> 于2020年7
                   月23日周四 下午9:01写道:
                       Thank you for your answer.
                       I have replaced that .jar with Kafka version
                       universal - the links to other versions are extinct.
                       After the attempt of deploying:
                       bin/flink run -py
                       
/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py
                       --jarfile
                       
/home/wojtek/Downloads/flink-sql-connector-kafka_2.11-1.11.0.jar
                       there another error occurs:
                       Traceback (most recent call last):
                          File
                       
"/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py",
                       line 20, in <module>
                            .field("tbd", DataTypes.INT())) \
                       AttributeError: 'StreamTableDescriptor' object
                       has no attribute 'register_table_source'
                       org.apache.flink.client.program.ProgramAbortException
                       at
                       
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
                       at
                       
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
                       Method)
                       at
                       
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                       at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                       at
                       
java.base/java.lang.reflect.Method.invoke(Method.java:566)
                       at
                       
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
                       at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
                       at
                       
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
                       at
                       
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
                       at
                       
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
                       at
                       
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
                       at
                       
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
                       at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
                       at
                       
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
                       Maybe the way the python program is written is
                       incorrect. Can it be deprecated taking into
                       account that the installed flink version is 1.11?
                       Best regards,
                       Wojtek
                       czw., 23 lip 2020 o 12:01 Xingbo Huang
                       <hxbks...@gmail.com <mailto:hxbks...@gmail.com><mailto:hxbks...@gmail.com>>
                       napisał(a):
                           Hi Wojtek,
                           you need to use the fat jar
                           'flink-sql-connector-kafka_2.11-1.11.0.jar'
                           which you can download in the doc[1]
                           [1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
                           Best,
                           Xingbo
                           Wojciech Korczyński
                           <wojciech.korczyn...@alphamoon.ai <mailto:wojciech.korczyn...@alphamoon.ai>
                           <mailto:wojciech.korczyn...@alphamoon.ai>>
                           于2020年7月23日周四 下午4:57写道:
                               Hello,
                               I am trying to deploy a Python job with
                               Kafka connector:
                               from pyflink.datastream import
                               StreamExecutionEnvironment
                               from pyflink.dataset import
                               ExecutionEnvironment
                               from pyflink.table import TableConfig,
                               DataTypes, BatchTableEnvironment,
                               StreamTableEnvironment
                               from pyflink.table.descriptors import
                               Schema, OldCsv, FileSystem, Kafka, Json, Csv
                               exec_env =
                               
StreamExecutionEnvironment.get_execution_environment()
                               t_config = TableConfig()
                               t_env =
                               StreamTableEnvironment.create(exec_env,
                               t_config)
                               t_env.connect(Kafka()
                                              .version("0.11")
                                              .topic("my-topic")
                                                                             
.property("bootstrap.servers",
                               'my-cluster-kafka-bootstrap:9092')
                                              ) \
                                    .in_append_mode() \
                                    .with_format(Csv()
                                                                                 .line_delimiter("\r\n")      \
                                                 .derive_schema()) \
                                    .with_schema(Schema()
                                                 .field("tbd",
                               DataTypes.INT())) \
                                    .register_table_source('mySource')
                               
t_env.connect(FileSystem().path('../production_data/kafkaoutput'))
                               \
                                    .with_format(OldCsv()
                                                 .field('tbd',
                               DataTypes.INT())) \
                                    .with_schema(Schema()
                                                 .field("tbd",
                               DataTypes.INT())) \
                                    .register_table_sink('mySink')
                               t_env.scan('mySource') \
                                    .select('tbd') \
                                    .where("tbd = 1") \
                                    .insert_into('mySink')
                               t_env.execute("tutorial_job")
                               When I run a deploying command:
                               bin/flink run -py
                               
/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py
                               --jarfile
                               
/home/wojtek/flink/flink-connectors/flink-connector-kafka-0.11/target/flink-connector-kafka-0.11_2.11-1.11-SNAPSHOT.jar
                               I get an error:
                               Traceback (most recent call last):
                                  File
                               
"/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py",
                               line 9, in <module>
                                    t_env =
                               StreamTableEnvironment.create(exec_env,
                               t_config)
                                  File
                               
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/table/table_environment.py",
                               line 1478, in create
                                  File
                               
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
                               line 1286, in __call__
                                  File
                               
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/util/exceptions.py",
                               line 147, in deco
                                  File
                               
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
                               line 328, in get_return_value
                               py4j.protocol.Py4JJavaError: An error
                               occurred while calling
                               
z:org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create.
                               : java.lang.NoClassDefFoundError:
                               
org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase
                               at
                               
java.base/java.lang.ClassLoader.defineClass1(Native
                               Method)
                               at
                               
java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1017)
                               at
                               
java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
                               at
                               
java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:550)
                               at
                               
java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458)
                               at
                               
java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452)
                               at
                               
java.base/java.security.AccessController.doPrivileged(Native
                               Method)
                               at
                               
java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451)
                               at
                               
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
                               at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
                               at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
                               at
                               
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
                               at
                               
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
                               at
                               java.base/java.lang.Class.forName0(Native Method)
                               at
                               java.base/java.lang.Class.forName(Class.java:398)
                               at
java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1209)
                               at
java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1220)
                               at
java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1264)
                               at
                               
java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1299)
                               at
                               
java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1384)
                               at
                               
java.base/java.util.Iterator.forEachRemaining(Iterator.java:132)
                               at
org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214)
                               at
                               
org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:170)
                               at
                               
org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:125)
                               at
                               
org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:48)
                               at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.lookupExecutor(StreamTableEnvironmentImpl.java:158)
                               at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:135)
                               at
org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:143)
                               at
                               
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
                               Method)
                               at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                               at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                               at
                               
java.base/java.lang.reflect.Method.invoke(Method.java:566)
                               at
                               
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
                               at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
                               at
                               
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
                               at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
                               at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
                               at
                               
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
                               at
                               java.base/java.lang.Thread.run(Thread.java:834)
                               Caused by:
                               java.lang.ClassNotFoundException:
                               
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase
                               at
                               
java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
                               at
                               
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
                               at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
                               at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
                               at
                               
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
                               at
                               
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
                               ... 39 more
                               
org.apache.flink.client.program.ProgramAbortException
                               at
                               
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
                               at
                               
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
                               Method)
                               at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                               at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                               at
                               
java.base/java.lang.reflect.Method.invoke(Method.java:566)
                               at
                               
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
                               at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
                               at
                               
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
                               at
                               
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
                               at
                               
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
                               at
                               
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
                               at
                               
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
                               at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
                               at
                               
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
                               What is the correct way to deploy python
                               job on Flink which uses Kafka? It seems
                               like it cannot get a correct dependency
                               of Kafka.
                               I wonder if there is some more simply
                               solution and if it matters that i would
                               like deploy a job on the K8s cluster.
                               Thanks,
                               Wojtek
                       UWAGA - Wiadomość oraz załączone do niej
                       dokumenty zawierają informacje poufne, które
                       mogą być również objęte tajemnicą handlową lub
                       służbową. Jeśli nie jesteś zamierzonym odbiorcą
                       wiadomości, proszę bezzwłocznie skontaktuj się z
                       nadawcą oraz usuń wiadomość ze swojego systemu.
                       Ujawnianie, kopiowanie, rozpowszechnianie czy
                       publikacja tej wiadomości oraz zawartych w niej
                       informacji jest zabronione.
                       Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3,
                       50-072 Wrocław,
                       wpisana pod numerem KRS 0000621513 do Krajowego
                       Rejestru Sądowego, prowadzonego przez Sąd
                       Rejonowy dla Wrocławia-Fabrycznej VI Wydział
                       Gospodarczy Krajowego Rejestru Sądowego, NIP:
                       8943079568, REGON 364634116.; Kapitał zakładowy:
                       5.000 PLN w pełni opłacony.
                       NOTE - Message and the documents attached
                       thereto contain confidential information, which
                       may also be a trade secret or confidential. If
                       you are not the intended recipient of the
                       message, please contact the sender without delay
                       and delete the message from your system.
                       Disclosure, copying, dissemination or
                       publication of this message and information
                       contained therein is prohibited.
                       Alphamoon Sp. z o.o. (Ltd.), ul. Pawła
                       Włodkowica 21/3, 50-072 Wrocław, Poland;
                       Registered under the KRS number 0000621513 to
                       the National Court Register, kept by the
                       District Court for Wrocław-Fabryczna VI Economic
                       Department of the National Court Register,
                       VAT-ID: PL8943079568, REGON 364634116; Share
                       capital: PLN 5.000 fully paid-up.
               UWAGA - Wiadomość oraz załączone do niej dokumenty
               zawierają informacje poufne, które mogą być również
               objęte tajemnicą handlową lub służbową. Jeśli nie jesteś
               zamierzonym odbiorcą wiadomości, proszę bezzwłocznie
               skontaktuj się z nadawcą oraz usuń wiadomość ze swojego
               systemu. Ujawnianie, kopiowanie, rozpowszechnianie czy
               publikacja tej wiadomości oraz zawartych w niej
               informacji jest zabronione.
               Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072
               Wrocław,
               wpisana pod numerem KRS 0000621513 do Krajowego Rejestru
               Sądowego, prowadzonego przez Sąd Rejonowy dla
               Wrocławia-Fabrycznej VI Wydział Gospodarczy Krajowego
               Rejestru Sądowego, NIP: 8943079568, REGON 364634116.;
               Kapitał zakładowy: 5.000 PLN w pełni opłacony.
               NOTE - Message and the documents attached thereto
               contain confidential information, which may also be a
               trade secret or confidential. If you are not the
               intended recipient of the message, please contact the
               sender without delay and delete the message from your
               system. Disclosure, copying, dissemination or
               publication of this message and information contained
               therein is prohibited.
               Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3,
               50-072 Wrocław, Poland;
               Registered under the KRS number 0000621513 to the
               National Court Register, kept by the District Court for
               Wrocław-Fabryczna VI Economic Department of the National
               Court Register, VAT-ID: PL8943079568, REGON 364634116;
               Share capital: PLN 5.000 fully paid-up.
       UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają
       informacje poufne, które mogą być również objęte tajemnicą
       handlową lub służbową. Jeśli nie jesteś zamierzonym odbiorcą
       wiadomości, proszę bezzwłocznie skontaktuj się z nadawcą oraz
       usuń wiadomość ze swojego systemu. Ujawnianie, kopiowanie,
       rozpowszechnianie czy publikacja tej wiadomości oraz zawartych w
       niej informacji jest zabronione.
       Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 Wrocław,
       wpisana pod numerem KRS 0000621513 do Krajowego Rejestru
       Sądowego, prowadzonego przez Sąd Rejonowy dla
       Wrocławia-Fabrycznej VI Wydział Gospodarczy Krajowego Rejestru
       Sądowego, NIP: 8943079568, REGON 364634116.; Kapitał zakładowy:
       5.000 PLN w pełni opłacony.
       NOTE - Message and the documents attached thereto contain
       confidential information, which may also be a trade secret or
       confidential. If you are not the intended recipient of the
       message, please contact the sender without delay and delete the
       message from your system. Disclosure, copying, dissemination or
       publication of this message and information contained therein is
       prohibited.
       Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, 50-072
       Wrocław, Poland;
       Registered under the KRS number 0000621513 to the National Court
       Register, kept by the District Court for Wrocław-Fabryczna VI
       Economic Department of the National Court Register, VAT-ID:
       PL8943079568, REGON 364634116; Share capital: PLN 5.000 fully
       paid-up.
UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają informacje poufne, które mogą być również objęte tajemnicą handlową lub służbową. Jeśli nie jesteś zamierzonym odbiorcą wiadomości, proszę bezzwłocznie skontaktuj się z nadawcą oraz usuń wiadomość ze swojego systemu. Ujawnianie, kopiowanie, rozpowszechnianie czy publikacja tej wiadomości oraz zawartych w niej informacji jest zabronione.
Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 Wrocław,
wpisana pod numerem KRS 0000621513 do Krajowego Rejestru Sądowego, prowadzonego przez Sąd Rejonowy dla Wrocławia-Fabrycznej VI Wydział Gospodarczy Krajowego Rejestru Sądowego, NIP: 8943079568, REGON 364634116.; Kapitał zakładowy: 5.000 PLN w pełni opłacony. NOTE - Message and the documents attached thereto contain confidential information, which may also be a trade secret or confidential. If you are not the intended recipient of the message, please contact the sender without delay and delete the message from your system. Disclosure, copying, dissemination or publication of this message and information contained therein is prohibited. Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, 50-072 Wrocław, Poland; Registered under the KRS number 0000621513 to the National Court Register, kept by the District Court for Wrocław-Fabryczna VI Economic Department of the National Court Register, VAT-ID: PL8943079568, REGON 364634116; Share capital: PLN 5.000 fully paid-up.


Reply via email to