Hi Robert,

1) Do you mean that when submitting the same job multiple times and it succeed 
sometimes and hangs sometimes or it only hangs for some specific job?
2) Which deployment mode do you use? 
3) Is it possible to dump the stack trace? It would help us understanding 
what’s happening.

Thanks,
Dian

> 2021年3月16日 下午11:51,Robert Cullen <cinquate...@gmail.com> 写道:
> 
> Thanks All,
> 
> I've added python and pyflink to the TM image which fixed the problem.  Now 
> however submitting a python script to the cluster successfully is sporadic; 
> sometimes it completes but most of the time it just hangs.  Not sure what is 
> causing this.
> 
> On Mon, Mar 15, 2021 at 9:47 PM Xingbo Huang <hxbks...@gmail.com 
> <mailto:hxbks...@gmail.com>> wrote:
> Hi,
> 
> From the error message, I think the problem is no python interpreter on your 
> TaskManager machine. You need to install a python 3.5+ interpreter on the TM 
> machine, and this python environment needs to install pyflink (pip install 
> apache-flink). For details, you can refer to the document[1].
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/installation.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/installation.html>
> 
> Best,
> Xingbo
> 
> Robert Cullen <cinquate...@gmail.com <mailto:cinquate...@gmail.com>> 
> 于2021年3月16日周二 上午2:58写道:
> Okay, I added the jars and fixed that exception. However I have a new 
> exception that is harder to decipher:
> 
> 2021-03-15 14:46:20
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>     at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at akka.actor.Actor.aroundReceive(Actor.scala:517)
>     at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.io.IOException: Cannot run program "python": error=2, No such 
> file or directory
>     at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
>     at 
> org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181)
>     at 
> org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
>     at 
> org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
>     at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
>     at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
>     at 
> org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
>     at 
> org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
>     at 
> org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
>     at 
> org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
>     at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262)
>     at 
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123)
>     at 
> org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:198)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: error=2, No such file or directory
>     at java.lang.UNIXProcess.forkAndExec(Native Method)
>     at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
>     at java.lang.ProcessImpl.start(ProcessImpl.java:134)
>     at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
>     ... 20 more
> 
> On Mon, Mar 15, 2021 at 10:49 AM Robert Metzger <rmetz...@apache.org 
> <mailto:rmetz...@apache.org>> wrote:
> Hey,
> are you sure the class is in the lib/ folder of all machines / instances, and 
> you've restarted Flink after adding the files to lib/ ?
> 
> On Mon, Mar 15, 2021 at 3:42 PM Robert Cullen <cinquate...@gmail.com 
> <mailto:cinquate...@gmail.com>> wrote:
> Shuiqiang,
> 
> I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib directory
> 
> When submitting this job to my flink cluster I’m getting this stack trace at 
> runtime:
> 
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>     at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>     at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at akka.actor.Actor.aroundReceive(Actor.scala:517)
>     at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: 
> Cannot load user class: 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
> ClassLoader info: URL ClassLoader:
> Class not resolvable through given classloader.
>     at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>     at 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
>     at 
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
>     at 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>     at 
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
>     at java.lang.Class.forName0(Native Method)
>     at java.lang.Class.forName(Class.java:348)
>     at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
>     at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>     at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>     at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>     at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>     at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
>     at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>     at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>     at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>     at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>     at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
>     ... 9 more
> 
> On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <acqua....@gmail.com 
> <mailto:acqua....@gmail.com>> wrote:
> Hi Robert,
> 
> You can refer to 
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
>  
> <https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py>
>  for the whole example.
> 
> Best,
> Shuiqiang
> 
> Robert Cullen <cinquate...@gmail.com <mailto:cinquate...@gmail.com>> 
> 于2021年3月13日周六 上午4:01写道:
> Shuiqiang, Can you include the import statements?  thanks.
> 
> On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <acqua....@gmail.com 
> <mailto:acqua....@gmail.com>> wrote:
> Hi Robert,
> 
> Kafka Connector is provided in Python DataStream API since release-1.12.0. 
> And the documentation for it is lacking, we will make it up soon.
> 
> The following code shows how to apply KafkaConsumers and KafkaProducer:
> ```
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 
> # define the schema of the message from kafka, here the data is in json 
> format.
> type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', 
> 'payPlatform', 'provinceId'],
> [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
> Types.INT()])
> json_row_schema = 
> JsonRowDeserializationSchema.builder().type_info(type_info).build()
> 
> # define the kafka connection properties.
> kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id 
> <http://group.id/>': 'pyflink-e2e-source'}
> 
> # create the KafkaConsumer and KafkaProducer with the specified topic name, 
> serialization/deserialization schema and properties.
> kafka_consumer = FlinkKafkaConsumer("timer-stream-source", json_row_schema, 
> kafka_props)
> kafka_producer = FlinkKafkaProducer("timer-stream-sink", 
> SimpleStringSchema(), kafka_props)
> 
> # set the kafka source to consume data from earliest offset.
> kafka_consumer.set_start_from_earliest()
> 
> # create a DataStream from kafka consumer source
> ds = env.add_source(kafka_consumer)  
> 
> result_stream = ...
> 
> # write the result into kafka by a kafka producer sink.
> result_stream.add_sink(kafka_producer)
> ```
> 
> Best,
> Shuiqiang
> 
> Robert Cullen <cinquate...@gmail.com <mailto:cinquate...@gmail.com>> 
> 于2021年3月13日周六 上午12:56写道:
> I’ve scoured the web looking for an example of using a Kafka source for a 
> DataStream in python. Can someone finish this example?
> 
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> ds = env.from_collection( KAFKA_SOURCE )
> ...
> -- 
> Robert Cullen
> 240-475-4490
> 
> 
> -- 
> Robert Cullen
> 240-475-4490
> 
> 
> -- 
> Robert Cullen
> 240-475-4490
> 
> 
> -- 
> Robert Cullen
> 240-475-4490
> 
> 
> -- 
> Robert Cullen
> 240-475-4490

Reply via email to