DataFlow runner On Thu, Nov 19, 2020 at 2:00 PM Brian Hulette <bhule...@google.com> wrote:
> Hm what runner are you using? It looks like we're trying to encode and > decode the pipeline proto, which isn't possible for a multi-language > pipeline. Are you using a portable runner? > > Brian > > On Thu, Nov 19, 2020 at 10:42 AM Alan Krumholz <alan.krumh...@betterup.co> > wrote: > >> got it, thanks! >> I was using: >> 'xxxxxx.us-east-1' >> Seems using this instead fixes that problem: >> 'xxxxxx.us-east-1.snowflakecomputing.com >> >> I'm now hitting a different error though (now in python): >> >> <ipython-input-42-b27732a0a892> in bq_to_snowflake(bq_table, snow_table, >>> git_branch) >>> 161 ) >>> 162 >>> --> 163 result = pipeline.run() >>> 164 result.wait_until_finish() >>> 165 ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>> in run(self, test_runner_api) >>> 512 # When possible, invoke a round trip through the runner API. >>> 513 if test_runner_api and self._verify_runner_api_compatible(): >>> --> 514 return Pipeline.from_runner_api( 515 self.to_runner_api( >>> use_fake_coders=True), >>> 516 self.runner, >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>> from_runner_api(proto, runner, options, return_context, >>> allow_proto_holders) >>> 890 requirements=proto.requirements) >>> 891 root_transform_id, = proto.root_transform_ids >>> --> 892 p.transforms_stack = [context.transforms.get_by_id( >>> root_transform_id)] >>> 893 # TODO(robertwb): These are only needed to continue construction. >>> Omit? >>> 894 p.applied_labels = { >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>> in get_by_id(self, id) >>> 113 # type: (str) -> PortableObjectT >>> 114 if id not in self._id_to_obj: >>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 >>> self._id_to_proto[id], self._pipeline_context) >>> 117 return self._id_to_obj[id] >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>> from_runner_api(proto, context) >>> 1277 result.parts = [] >>> 1278 for transform_id in proto.subtransforms: >>> -> 1279 part = context.transforms.get_by_id(transform_id) >>> 1280 part.parent = result >>> 1281 result.parts.append(part) >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>> in get_by_id(self, id) >>> 113 # type: (str) -> PortableObjectT >>> 114 if id not in self._id_to_obj: >>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 >>> self._id_to_proto[id], self._pipeline_context) >>> 117 return self._id_to_obj[id] >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>> from_runner_api(proto, context) >>> 1277 result.parts = [] >>> 1278 for transform_id in proto.subtransforms: >>> -> 1279 part = context.transforms.get_by_id(transform_id) >>> 1280 part.parent = result >>> 1281 result.parts.append(part) >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>> in get_by_id(self, id) >>> 113 # type: (str) -> PortableObjectT >>> 114 if id not in self._id_to_obj: >>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 >>> self._id_to_proto[id], self._pipeline_context) >>> 117 return self._id_to_obj[id] >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>> from_runner_api(proto, context) >>> 1277 result.parts = [] >>> 1278 for transform_id in proto.subtransforms: >>> -> 1279 part = context.transforms.get_by_id(transform_id) >>> 1280 part.parent = result >>> 1281 result.parts.append(part) >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>> in get_by_id(self, id) >>> 113 # type: (str) -> PortableObjectT >>> 114 if id not in self._id_to_obj: >>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 >>> self._id_to_proto[id], self._pipeline_context) >>> 117 return self._id_to_obj[id] >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>> from_runner_api(proto, context) >>> 1277 result.parts = [] >>> 1278 for transform_id in proto.subtransforms: >>> -> 1279 part = context.transforms.get_by_id(transform_id) >>> 1280 part.parent = result >>> 1281 result.parts.append(part) >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>> in get_by_id(self, id) >>> 113 # type: (str) -> PortableObjectT >>> 114 if id not in self._id_to_obj: >>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 >>> self._id_to_proto[id], self._pipeline_context) >>> 117 return self._id_to_obj[id] >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>> from_runner_api(proto, context) >>> 1216 is_python_side_input(side_input_tags[0]) if side_input_tags else >>> False) >>> 1217 >>> -> 1218 transform = ptransform.PTransform.from_runner_api(proto, context >>> ) >>> 1219 if uses_python_sideinput_tags: >>> 1220 # Ordering is important here. >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py >>> in from_runner_api(cls, proto, context) >>> 688 if proto is None or proto.spec is None or not proto.spec.urn: >>> 689 return None >>> --> 690 parameter_type, constructor = cls._known_urns[proto.spec.urn] >>> 691 >>> 692 try: KeyError: 'beam:transform:write_files:v1' >> >> >> >> I'll keep trying to make this work but sharing it in case you can easily >> see what the problem is >> >> Thanks so much! >> >> On Thu, Nov 19, 2020 at 1:30 PM Brian Hulette <bhule...@google.com> >> wrote: >> >>> Hi Alan, >>> Sorry this error message is so verbose. What are you passing for the >>> server_name argument [1]? It looks like that's what the Java stacktrace is >>> complaining about: >>> >>> java.lang.IllegalArgumentException: serverName must be in format >>> <account_name>.snowflakecomputing.com >>> >>> [1] >>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/snowflake.py#L302 >>> >>> On Thu, Nov 19, 2020 at 10:16 AM Alan Krumholz < >>> alan.krumh...@betterup.co> wrote: >>> >>>> I'm trying to replace my custom/problematic snowflake sink with the >>>> new: >>>> https://beam.apache.org/documentation/io/built-in/snowflake/#writing-to-snowflake >>>> >>>> However when I try to run my pipeline (using python) I get this Java >>>> error: >>>> >>>> RuntimeError: java.lang.RuntimeException: Failed to build transform >>>>> beam:external:java:snowflake:write:v1 from spec urn: >>>>> "beam:external:java:snowflake:write:v1" >>>> >>>> >>>> It is hard to understand why it is failing from reading the partial java >>>> error trace I get in the output: >>>> >>>>> at >>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:130) >>>>> at >>>>> org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:357) >>>>> at >>>>> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:433) >>>>> at >>>>> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:488) >>>>> at >>>>> org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:232) >>>>> at >>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172) >>>>> at >>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331) >>>>> at >>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817) >>>>> at >>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) >>>>> at >>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) >>>>> at >>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) >>>>> at >>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) >>>>> at java.base/java.lang.Thread.run(Thread.java:832) >>>>> Caused by: java.lang.IllegalArgumentException: serverName must be in >>>>> format <account_name>.snowflakecomputing.com >>>>> at >>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:141) >>>>> at >>>>> org.apache.beam.sdk.io.snowflake.SnowflakeIO$DataSourceConfiguration.withServerName(SnowflakeIO.java:1700) >>>>> at >>>>> org.apache.beam.sdk.io.snowflake.crosslanguage.CrossLanguageConfiguration.getDataSourceConfiguration(CrossLanguageConfiguration.java:166) >>>>> at >>>>> org.apache.beam.sdk.io.snowflake.crosslanguage.WriteBuilder.buildExternal(WriteBuilder.java:78) >>>>> at >>>>> org.apache.beam.sdk.io.snowflake.crosslanguage.WriteBuilder.buildExternal(WriteBuilder.java:34) >>>>> at >>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:125) >>>>> ... 12 more >>>> >>>> >>>> >>>> any clue how I can debug/fix this using python? >>>> >>>> >>>> >>>>