[ https://issues.apache.org/jira/browse/BEAM-6218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16721459#comment-16721459 ]
Maximilian Michels commented on BEAM-6218: ------------------------------------------ This looks like an error from the Python SDK harness. > TensorFlow Model Analysis Fails when using the portable Flink runner > -------------------------------------------------------------------- > > Key: BEAM-6218 > URL: https://issues.apache.org/jira/browse/BEAM-6218 > Project: Beam > Issue Type: Bug > Components: runner-flink, sdk-py-harness > Reporter: Andrew Packer > Priority: Major > > Running a simple model analysis pipeline, trying to use the portable flink > runner running against a local cluster: > {code:python} > import apache_beam as beam > import tensorflow_model_analysis as tfma > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import portable_runner > def pipeline(root): > data_location = './dataset/' > data = root | 'ReadData' >> beam.io.ReadFromTFRecord(data_location) > results = data | 'ExtractEvaluateAndWriteResults' >> > tfma.EvaluateAndWriteResults( > eval_saved_model_path='./model/15427633886/', > output_path='./output/', > display_only_data_location=data_location) > def run(argv=None): > runner = portable_runner.PortableRunner() > pipeline_options = > PipelineOptions(experiments=['beam_fn_api'],sdk_location='container',job_endpoint='localhost:8099',setup_file='./setup.py') > runner.run(pipeline, pipeline_options) > if __name__ == '__main__': > run() > {code} > Versions: > Apache Beam 2.8.0 > TensorFlow Model Analysis: 0.9.2 > Apache Flink: 1.5.3 > > Stack Trace: > {code} > [flink-runner-job-server] ERROR > org.apache.beam.runners.flink.FlinkJobInvocation - Error during job > invocation > BeamApp-apacker-1212082216-2dd571ba_359d85b7-4e08-49f3-bdc7-34cdb0e779bf. > org.apache.flink.client.program.ProgramInvocationException: Job > 22e7e9d229977f3f0518c37f507f5e07 failed. > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:452) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:216) > at > org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:193) > at > org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173) > at > org.apache.beam.runners.flink.FlinkJobInvocation.runPipeline(FlinkJobInvocation.java:121) > at > org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111) > at > org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58) > at > org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > completed with illegal application status: UNKNOWN. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:150) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262) > ... 13 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.RuntimeException: Error received from SDK harness for instruction > 22: Traceback (most recent call last): > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 131, in _execute > response = task() > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 166, in <lambda> > self._execute(lambda: worker.do_instruction(work), work) > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 212, in do_instruction > request.instruction_id) > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 231, in process_bundle > self.data_channel_factory) > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 343, in __init__ > self.ops = self.create_execution_tree(self.process_bundle_descriptor) > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 385, in create_execution_tree > descriptor.transforms, key=topological_height, reverse=True)]) > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 320, in wrapper > result = cache[args] = func(*args) > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 368, in get_operation > in descriptor.transforms[transform_id].outputs.items() > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 367, in <dictcomp> > for tag, pcoll_id > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 320, in wrapper > result = cache[args] = func(*args) > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 368, in get_operation > in descriptor.transforms[transform_id].outputs.items() > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 367, in <dictcomp> > for tag, pcoll_id > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 320, in wrapper > result = cache[args] = func(*args) > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 368, in get_operation > in descriptor.transforms[transform_id].outputs.items() > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 367, in <dictcomp> > for tag, pcoll_id > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 320, in wrapper > result = cache[args] = func(*args) > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 368, in get_operation > in descriptor.transforms[transform_id].outputs.items() > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 367, in <dictcomp> > for tag, pcoll_id > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 320, in wrapper > result = cache[args] = func(*args) > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 368, in get_operation > in descriptor.transforms[transform_id].outputs.items() > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 367, in <dictcomp> > for tag, pcoll_id > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 320, in wrapper > result = cache[args] = func(*args) > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 368, in get_operation > in descriptor.transforms[transform_id].outputs.items() > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 367, in <dictcomp> > for tag, pcoll_id > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 320, in wrapper > result = cache[args] = func(*args) > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 368, in get_operation > in descriptor.transforms[transform_id].outputs.items() > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 367, in <dictcomp> > for tag, pcoll_id > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 320, in wrapper > result = cache[args] = func(*args) > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 371, in get_operation > transform_id, transform_consumers) > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 482, in create_operation > return creator(self, transform_id, transform_proto, payload, consumers) > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 654, in create > serialized_fn, parameter) > File > "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 692, in _create_pardo_operation > dofn_data = pickler.loads(serialized_fn) > File > "/usr/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", > line 246, in loads > return dill.loads(s) > File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 316, in > loads > return load(file, ignore) > File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 304, in > load > obj = pik.load() > File "/usr/local/lib/python2.7/pickle.py", line 864, in load > dispatch[key](self) > File "/usr/local/lib/python2.7/pickle.py", line 1096, in load_global > klass = self.find_class(module, name) > File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 465, in > find_class > return StockUnpickler.find_class(self, module, name) > File "/usr/local/lib/python2.7/pickle.py", line 1132, in find_class > klass = getattr(mod, name) > AttributeError: 'module' object has no attribute '_SliceDoFn' > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) > at > org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:263) > at > org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.$closeResource(FlinkExecutableStageFunction.java:188) > at > org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.mapPartition(FlinkExecutableStageFunction.java:188) > at > org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712) > ... 1 more > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)