I thought there may be some additional logging with the coder id since the message just said "accumulator coder id %s". It looks like this is just a bad set of arguments in checkArgument, I put up a fix in [1]. So in fact "external_1HolderCoder" is the coder id. This looks like it could be an issue with component id assignment for cross-language. It could be instructive to look at the final Pipeline proto and/or the protos in the expansion request and response. Do they contain another HolderCoder component with a different id?
Regarding the DirectRunner error, it looks like SDF is in fact supported in the FnApiRunner, but there's some kind of issue with requirements declarations. I bet if you get past that though you'll likely run into the same issue as on Flink, it looks to me like something is wrong with the Pipeline proto. [1] https://github.com/apache/beam/pull/12522 On Tue, Aug 4, 2020 at 11:50 AM Boyuan Zhang <[email protected]> wrote: > Hi Piotr, > > Are you using the beam master head to dev? Can you share your code? The > x-lang transform can be tested with Flink runner, where SDF is also > supported, such as > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/flink_runner_test.py#L205-L261 > > On Tue, Aug 4, 2020 at 9:42 AM Piotr Szuberski < > [email protected]> wrote: > >> Is there a simple way to register the splittable dofn for cross-language >> usage? It's a bit a black box to me right now. >> >> The most meaningful logs for Flink are the ones I pasted and the >> following: >> >> apache_beam.utils.subprocess_server: INFO: b'[grpc-default-executor-0] >> WARN org.apache.beam.runners.jobsubmission.InMemoryJobService - Encountered >> Unexpected Exception during validation' >> apache_beam.utils.subprocess_server: INFO: b'java.lang.RuntimeException: >> Failed to validate transform ref_AppliedPTransform_Write to Spanner/Write >> mutations to Cloud Spanner/Schema >> View/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)_31' >> >> and a shortened oneline message: >> [...] DEBUG: Stages: ['ref_AppliedPTransform_Generate input/Impulse_3\n >> Generate input/Impulse:beam:transform:impulse:v1\n must follow: \n >> downstream_side_inputs: <unknown>', 'ref_AppliedPTransform_Generate >> input/FlatMap(<lambda at core.py:2826>)_4\n Generate input/FlatMap(<lambda >> at core.py:2826>):beam:transform:pardo:v1\n must follow: \n >> downstream_side_inputs: <unknown>', 'ref_AppliedPTransform_Generate >> input/Map(decode)_6\n [...] >> >> On 2020/08/03 23:40:42, Brian Hulette <[email protected]> wrote: >> > The DirectRunner error looks like it's because the FnApiRunner doesn't >> > support SDF. >> > >> > What is the coder id for the Flink error? It looks like the full stack >> > trace should contain it. >> > >> > On Mon, Aug 3, 2020 at 10:09 AM Piotr Szuberski < >> [email protected]> >> > wrote: >> > >> > > I'm Writing SpannerIO.Write cross-language transform and when I try >> to run >> > > it from python I receive errors: >> > > >> > > On Flink: >> > > apache_beam.utils.subprocess_server: INFO: b'Caused by: >> > > java.lang.IllegalArgumentException: Transform external_1HolderCoder >> uses >> > > unknown accumulator coder id %s' >> > > apache_beam.utils.subprocess_server: INFO: b'\tat >> > > >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:216)' >> > > apache_beam.utils.subprocess_server: INFO: b'\tat >> > > >> org.apache.beam.runners.core.construction.graph.PipelineValidator.validateCombine(PipelineValidator.java:273)' >> > > >> > > On DirectRunner: >> > > File >> > > >> "/Users/piotr/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >> > > line 181, in run_via_runner_api >> > > self._validate_requirements(pipeline_proto) >> > > File >> > > >> "/Users/piotr/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >> > > line 264, in _validate_requirements >> > > raise ValueError( >> > > ValueError: Missing requirement declaration: >> > > {'beam:requirement:pardo:splittable_dofn:v1'} >> > > >> > > I suppose that SpannerIO.Write uses a transform that cannot be >> translated >> > > in cross-language usage? I'm not sure whether there is something I >> can do >> > > about it. >> > > >> > > >> > >> >
