[ 
https://issues.apache.org/jira/browse/BEAM-6218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16721459#comment-16721459
 ] 

Maximilian Michels commented on BEAM-6218:
------------------------------------------

This looks like an error from the Python SDK harness.

> TensorFlow Model Analysis Fails when using the portable Flink runner
> --------------------------------------------------------------------
>
>                 Key: BEAM-6218
>                 URL: https://issues.apache.org/jira/browse/BEAM-6218
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink, sdk-py-harness
>            Reporter: Andrew Packer
>            Priority: Major
>
> Running a simple model analysis pipeline, trying to use the portable flink 
> runner running against a local cluster:
> {code:python}
> import apache_beam as beam
> import tensorflow_model_analysis as tfma
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import portable_runner
> def pipeline(root):
>   data_location = './dataset/'
>   data = root | 'ReadData' >> beam.io.ReadFromTFRecord(data_location)
>   results = data | 'ExtractEvaluateAndWriteResults' >> 
> tfma.EvaluateAndWriteResults(
>       eval_saved_model_path='./model/15427633886/',
>       output_path='./output/',
>       display_only_data_location=data_location)
> def run(argv=None):
>   runner = portable_runner.PortableRunner()
>   pipeline_options = 
> PipelineOptions(experiments=['beam_fn_api'],sdk_location='container',job_endpoint='localhost:8099',setup_file='./setup.py')
>   runner.run(pipeline, pipeline_options)
> if __name__ == '__main__':
>   run()
> {code}
> Versions:
> Apache Beam 2.8.0
> TensorFlow Model Analysis: 0.9.2
> Apache Flink: 1.5.3
>  
> Stack Trace:
> {code}
> [flink-runner-job-server] ERROR 
> org.apache.beam.runners.flink.FlinkJobInvocation - Error during job 
> invocation 
> BeamApp-apacker-1212082216-2dd571ba_359d85b7-4e08-49f3-bdc7-34cdb0e779bf.
> org.apache.flink.client.program.ProgramInvocationException: Job 
> 22e7e9d229977f3f0518c37f507f5e07 failed.
>         at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>         at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
>         at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:452)
>         at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>         at 
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:216)
>         at 
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:193)
>         at 
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
>         at 
> org.apache.beam.runners.flink.FlinkJobInvocation.runPipeline(FlinkJobInvocation.java:121)
>         at 
> org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)
>         at 
> org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)
>         at 
> org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> completed with illegal application status: UNKNOWN.
>         at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:150)
>         at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
>         ... 13 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Error received from SDK harness for instruction 
> 22: Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 131, in _execute
>     response = task()
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 166, in <lambda>
>     self._execute(lambda: worker.do_instruction(work), work)
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 212, in do_instruction
>     request.instruction_id)
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 231, in process_bundle
>     self.data_channel_factory)
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 343, in __init__
>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 385, in create_execution_tree
>     descriptor.transforms, key=topological_height, reverse=True)])
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 320, in wrapper
>     result = cache[args] = func(*args)
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 368, in get_operation
>     in descriptor.transforms[transform_id].outputs.items()
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 367, in <dictcomp>
>     for tag, pcoll_id
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 320, in wrapper
>     result = cache[args] = func(*args)
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 368, in get_operation
>     in descriptor.transforms[transform_id].outputs.items()
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 367, in <dictcomp>
>     for tag, pcoll_id
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 320, in wrapper
>     result = cache[args] = func(*args)
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 368, in get_operation
>     in descriptor.transforms[transform_id].outputs.items()
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 367, in <dictcomp>
>     for tag, pcoll_id
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 320, in wrapper
>     result = cache[args] = func(*args)
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 368, in get_operation
>     in descriptor.transforms[transform_id].outputs.items()
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 367, in <dictcomp>
>     for tag, pcoll_id
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 320, in wrapper
>     result = cache[args] = func(*args)
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 368, in get_operation
>     in descriptor.transforms[transform_id].outputs.items()
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 367, in <dictcomp>
>     for tag, pcoll_id
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 320, in wrapper
>     result = cache[args] = func(*args)
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 368, in get_operation
>     in descriptor.transforms[transform_id].outputs.items()
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 367, in <dictcomp>
>     for tag, pcoll_id
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 320, in wrapper
>     result = cache[args] = func(*args)
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 368, in get_operation
>     in descriptor.transforms[transform_id].outputs.items()
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 367, in <dictcomp>
>     for tag, pcoll_id
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 320, in wrapper
>     result = cache[args] = func(*args)
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 371, in get_operation
>     transform_id, transform_consumers)
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 482, in create_operation
>     return creator(self, transform_id, transform_proto, payload, consumers)
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 654, in create
>     serialized_fn, parameter)
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 692, in _create_pardo_operation
>     dofn_data = pickler.loads(serialized_fn)
>   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", 
> line 246, in loads
>     return dill.loads(s)
>   File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 316, in 
> loads
>     return load(file, ignore)
>   File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 304, in 
> load
>     obj = pik.load()
>   File "/usr/local/lib/python2.7/pickle.py", line 864, in load
>     dispatch[key](self)
>   File "/usr/local/lib/python2.7/pickle.py", line 1096, in load_global
>     klass = self.find_class(module, name)
>   File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 465, in 
> find_class
>     return StockUnpickler.find_class(self, module, name)
>   File "/usr/local/lib/python2.7/pickle.py", line 1132, in find_class
>     klass = getattr(mod, name)
> AttributeError: 'module' object has no attribute '_SliceDoFn'
>         at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>         at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>         at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
>         at 
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:263)
>         at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.$closeResource(FlinkExecutableStageFunction.java:188)
>         at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.mapPartition(FlinkExecutableStageFunction.java:188)
>         at 
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
>         at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>         at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
>         ... 1 more
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to