[ 
https://issues.apache.org/jira/browse/BEAM-6294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738470#comment-16738470
 ] 

Thomas Weise commented on BEAM-6294:
------------------------------------

Error that shows up shortly after job enters running status:

{code:java}
TimerException{java.lang.RuntimeException: Failed to finish remote bundle}
        at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Failed to finish remote bundle
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:559)
        at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:87)
        at 
org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:677)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:671)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:377)
        at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
        ... 7 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: 
Error received from SDK harness for instruction 37: Traceback (most recent call 
last):
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 148, in _execute
    response = task()
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 183, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 256, in do_instruction
    request.instruction_id)
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 272, in process_bundle
    bundle_processor.process_bundle(instruction_id)
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 489, in process_bundle
    ].process_encoded(data.data)
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 125, in process_encoded
    input_stream, True)
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/coders/coder_impl.py",
 line 945, in decode_from_stream
    value = self._value_coder.decode_from_stream(in_stream, nested)
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/coders/coder_impl.py",
 line 990, in decode_from_stream
    return self._value_coder.decode(in_stream.read(value_length))
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/coders/coder_impl.py",
 line 162, in decode
    return self.decode_from_stream(create_InputStream(encoded), False)
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/coders/coder_impl.py",
 line 385, in decode_from_stream
    raise ValueError('Unknown type tag %x' % t)
ValueError: Unknown type tag 78

        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
        at 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:263)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:556)
        ... 13 more
Caused by: java.lang.RuntimeException: Error received from SDK harness for 
instruction 37: Traceback (most recent call last):
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 148, in _execute
    response = task()
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 183, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 256, in do_instruction
    request.instruction_id)
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 272, in process_bundle
    bundle_processor.process_bundle(instruction_id)
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 489, in process_bundle
    ].process_encoded(data.data)
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 125, in process_encoded
    input_stream, True)
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/coders/coder_impl.py",
 line 945, in decode_from_stream
    value = self._value_coder.decode_from_stream(in_stream, nested)
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/coders/coder_impl.py",
 line 990, in decode_from_stream
    return self._value_coder.decode(in_stream.read(value_length))
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/coders/coder_impl.py",
 line 162, in decode
    return self.decode_from_stream(create_InputStream(encoded), False)
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/coders/coder_impl.py",
 line 385, in decode_from_stream
    raise ValueError('Unknown type tag %x' % t)
ValueError: Unknown type tag 78

        at 
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
        at 
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140)
        at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)
        at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
        at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
        at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263)
        at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683)
        at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        ... 3 more{code}

> Use Flink's redistribute for reshuffle.
> ---------------------------------------
>
>                 Key: BEAM-6294
>                 URL: https://issues.apache.org/jira/browse/BEAM-6294
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-flink, sdk-py-core
>            Reporter: Robert Bradshaw
>            Assignee: Robert Bradshaw
>            Priority: Major
>             Fix For: 2.10.0
>
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Python needs to publish the URN over the FnAPI which is pretty easy, but 
> Flink also needs to ensure that the composite structure does not get fused. 
> Unlike with GBK, we can't assume all runners implement this as a primitive. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to