We are working on this. ETA is the end of this quarter. Created https://issues.apache.org/jira/browse/BEAM-11332 for tracking.
Thanks, Cham On Mon, Nov 23, 2020 at 5:42 AM Alan Krumholz <alan.krumh...@betterup.co> wrote: > Hi Cham, > I'm guessing this means I won't be able to use Snowflake IO with python on > dataflow until then? > Is there a timeline for that work to be completed? > > Thank you so much for your help > > > > On Fri, Nov 20, 2020 at 9:33 PM Chamikara Jayalath <chamik...@google.com> > wrote: > >> This is because Python does not understand that Java specific >> "beam:transform:write_files:v1" transform. Hopefully this is one of those >> issues that will get resolved when we update Dataflow to directly consume >> portable protos (we are working on this now). >> >> Thanks, >> Cham >> >> On Fri, Nov 20, 2020 at 11:14 AM Brian Hulette <bhule...@google.com> >> wrote: >> >>> +Chamikara Jayalath <chamik...@google.com> any idea why this is still >>> doing a runner api roundtrip and failing? It's a multi-language pipeline, >>> and Alan has it configured to run on Dataflow runner V2. >>> >>> On Fri, Nov 20, 2020 at 10:36 AM Alan Krumholz < >>> alan.krumh...@betterup.co> wrote: >>> >>>> Just tried that and still getting this: >>>> >>>> ---------------------------------------------------------------------------KeyError >>>> Traceback (most recent call >>>> last)<ipython-input-81-a936d3a5fa70> in <module>----> 1 bq_to_snowflake( >>>> 2 'ml-betterup.coach_search.distances', 3 >>>> 'analytics.ml.coach_search_distances', 4 'master') >>>> <ipython-input-78-096896321fde> in bq_to_snowflake(bq_table, snow_table, >>>> git_branch) 138 ) 139 --> 140 result = pipeline.run() 141 >>>> result.wait_until_finish() 142 >>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>> run(self, test_runner_api) 512 # When possible, invoke a round >>>> trip through the runner API. 513 if test_runner_api and >>>> self._verify_runner_api_compatible():--> 514 return >>>> Pipeline.from_runner_api( 515 >>>> self.to_runner_api(use_fake_coders=True), 516 self.runner, >>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>> from_runner_api(proto, runner, options, return_context, >>>> allow_proto_holders) 890 requirements=proto.requirements) >>>> 891 root_transform_id, = proto.root_transform_ids--> 892 >>>> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)] >>>> 893 # TODO(robertwb): These are only needed to continue construction. >>>> Omit? 894 p.applied_labels = { >>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT 114 >>>> if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >>>> self._obj_type.from_runner_api( 116 self._id_to_proto[id], >>>> self._pipeline_context) 117 return self._id_to_obj[id] >>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>> from_runner_api(proto, context) 1277 result.parts = [] 1278 >>>> for transform_id in proto.subtransforms:-> 1279 part = >>>> context.transforms.get_by_id(transform_id) 1280 part.parent = >>>> result 1281 result.parts.append(part) >>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT 114 >>>> if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >>>> self._obj_type.from_runner_api( 116 self._id_to_proto[id], >>>> self._pipeline_context) 117 return self._id_to_obj[id] >>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>> from_runner_api(proto, context) 1277 result.parts = [] 1278 >>>> for transform_id in proto.subtransforms:-> 1279 part = >>>> context.transforms.get_by_id(transform_id) 1280 part.parent = >>>> result 1281 result.parts.append(part) >>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT 114 >>>> if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >>>> self._obj_type.from_runner_api( 116 self._id_to_proto[id], >>>> self._pipeline_context) 117 return self._id_to_obj[id] >>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>> from_runner_api(proto, context) 1277 result.parts = [] 1278 >>>> for transform_id in proto.subtransforms:-> 1279 part = >>>> context.transforms.get_by_id(transform_id) 1280 part.parent = >>>> result 1281 result.parts.append(part) >>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT 114 >>>> if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >>>> self._obj_type.from_runner_api( 116 self._id_to_proto[id], >>>> self._pipeline_context) 117 return self._id_to_obj[id] >>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>> from_runner_api(proto, context) 1277 result.parts = [] 1278 >>>> for transform_id in proto.subtransforms:-> 1279 part = >>>> context.transforms.get_by_id(transform_id) 1280 part.parent = >>>> result 1281 result.parts.append(part) >>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT 114 >>>> if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >>>> self._obj_type.from_runner_api( 116 self._id_to_proto[id], >>>> self._pipeline_context) 117 return self._id_to_obj[id] >>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>> from_runner_api(proto, context) 1216 >>>> is_python_side_input(side_input_tags[0]) if side_input_tags else False) >>>> 1217 -> 1218 transform = ptransform.PTransform.from_runner_api(proto, >>>> context) 1219 if uses_python_sideinput_tags: 1220 # Ordering >>>> is important here. >>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py >>>> in from_runner_api(cls, proto, context) 688 if proto is None or >>>> proto.spec is None or not proto.spec.urn: 689 return None--> 690 >>>> parameter_type, constructor = cls._known_urns[proto.spec.urn] 691 >>>> 692 try: >>>> KeyError: 'beam:transform:write_files:v1' >>>> >>>> >>>> >>>> On Fri, Nov 20, 2020 at 11:18 AM Brian Hulette <bhule...@google.com> >>>> wrote: >>>> >>>>> Hm try passing in the args as they would appear in >>>>> `sys.argv`, PipelineOptions(['--experiments=use_runner_v2']) >>>>> >>>>> On Thu, Nov 19, 2020 at 12:14 PM Alan Krumholz < >>>>> alan.krumh...@betterup.co> wrote: >>>>> >>>>>> How can I pass that flag using the SDK? >>>>>> Tried this: >>>>>> >>>>>> pipeline = beam.Pipeline(options=PipelineOptions(experiments= >>>>>>> ['use_runner_v2'], ...) >>>>>> >>>>>> >>>>>> but still getting a similar error: >>>>>> >>>>>> ---------------------------------------------------------------------------KeyError >>>>>> Traceback (most recent call >>>>>> last)<ipython-input-69-a936d3a5fa70> in <module>----> 1 bq_to_snowflake( >>>>>> 2 'ml-betterup.coach_search.distances', 3 >>>>>> 'analytics.ml.coach_search_distances', 4 'master') >>>>>> <ipython-input-66-2b6bf3c9334b> in bq_to_snowflake(bq_table, snow_table, >>>>>> git_branch) 138 ) 139 --> 140 result = pipeline.run() >>>>>> 141 result.wait_until_finish() 142 >>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>>> run(self, test_runner_api) 512 # When possible, invoke a round >>>>>> trip through the runner API. 513 if test_runner_api and >>>>>> self._verify_runner_api_compatible():--> 514 return >>>>>> Pipeline.from_runner_api( 515 >>>>>> self.to_runner_api(use_fake_coders=True), 516 self.runner, >>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>>> from_runner_api(proto, runner, options, return_context, >>>>>> allow_proto_holders) 890 requirements=proto.requirements) >>>>>> 891 root_transform_id, = proto.root_transform_ids--> 892 >>>>>> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)] >>>>>> 893 # TODO(robertwb): These are only needed to continue >>>>>> construction. Omit? 894 p.applied_labels = { >>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT >>>>>> 114 if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >>>>>> self._obj_type.from_runner_api( 116 self._id_to_proto[id], >>>>>> self._pipeline_context) 117 return self._id_to_obj[id] >>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>>> from_runner_api(proto, context) 1277 result.parts = [] 1278 >>>>>> for transform_id in proto.subtransforms:-> 1279 part = >>>>>> context.transforms.get_by_id(transform_id) 1280 part.parent = >>>>>> result 1281 result.parts.append(part) >>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT >>>>>> 114 if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >>>>>> self._obj_type.from_runner_api( 116 self._id_to_proto[id], >>>>>> self._pipeline_context) 117 return self._id_to_obj[id] >>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>>> from_runner_api(proto, context) 1277 result.parts = [] 1278 >>>>>> for transform_id in proto.subtransforms:-> 1279 part = >>>>>> context.transforms.get_by_id(transform_id) 1280 part.parent = >>>>>> result 1281 result.parts.append(part) >>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT >>>>>> 114 if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >>>>>> self._obj_type.from_runner_api( 116 self._id_to_proto[id], >>>>>> self._pipeline_context) 117 return self._id_to_obj[id] >>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>>> from_runner_api(proto, context) 1277 result.parts = [] 1278 >>>>>> for transform_id in proto.subtransforms:-> 1279 part = >>>>>> context.transforms.get_by_id(transform_id) 1280 part.parent = >>>>>> result 1281 result.parts.append(part) >>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT >>>>>> 114 if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >>>>>> self._obj_type.from_runner_api( 116 self._id_to_proto[id], >>>>>> self._pipeline_context) 117 return self._id_to_obj[id] >>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>>> from_runner_api(proto, context) 1277 result.parts = [] 1278 >>>>>> for transform_id in proto.subtransforms:-> 1279 part = >>>>>> context.transforms.get_by_id(transform_id) 1280 part.parent = >>>>>> result 1281 result.parts.append(part) >>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT >>>>>> 114 if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >>>>>> self._obj_type.from_runner_api( 116 self._id_to_proto[id], >>>>>> self._pipeline_context) 117 return self._id_to_obj[id] >>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>>> from_runner_api(proto, context) 1216 >>>>>> is_python_side_input(side_input_tags[0]) if side_input_tags else False) >>>>>> 1217 -> 1218 transform = >>>>>> ptransform.PTransform.from_runner_api(proto, context) 1219 if >>>>>> uses_python_sideinput_tags: 1220 # Ordering is important here. >>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py >>>>>> in from_runner_api(cls, proto, context) 688 if proto is None or >>>>>> proto.spec is None or not proto.spec.urn: 689 return None--> >>>>>> 690 parameter_type, constructor = cls._known_urns[proto.spec.urn] >>>>>> 691 692 try: >>>>>> KeyError: 'beam:transform:write_files:v1' >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Nov 19, 2020 at 2:18 PM Brian Hulette <bhule...@google.com> >>>>>> wrote: >>>>>> >>>>>>> Ah ok, you'll need to use Dataflow runner v2 [1] to run this >>>>>>> pipeline (add the flag '--experiments=use_runner_v2'). See also [2]. >>>>>>> >>>>>>> [1] >>>>>>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#python_11 >>>>>>> [2] >>>>>>> https://cloud.google.com/blog/products/data-analytics/multi-language-sdks-for-building-cloud-pipelines >>>>>>> >>>>>>> On Thu, Nov 19, 2020 at 11:10 AM Alan Krumholz < >>>>>>> alan.krumh...@betterup.co> wrote: >>>>>>> >>>>>>>> DataFlow runner >>>>>>>> >>>>>>>> On Thu, Nov 19, 2020 at 2:00 PM Brian Hulette <bhule...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hm what runner are you using? It looks like we're trying to encode >>>>>>>>> and decode the pipeline proto, which isn't possible for a >>>>>>>>> multi-language >>>>>>>>> pipeline. Are you using a portable runner? >>>>>>>>> >>>>>>>>> Brian >>>>>>>>> >>>>>>>>> On Thu, Nov 19, 2020 at 10:42 AM Alan Krumholz < >>>>>>>>> alan.krumh...@betterup.co> wrote: >>>>>>>>> >>>>>>>>>> got it, thanks! >>>>>>>>>> I was using: >>>>>>>>>> 'xxxxxx.us-east-1' >>>>>>>>>> Seems using this instead fixes that problem: >>>>>>>>>> 'xxxxxx.us-east-1.snowflakecomputing.com >>>>>>>>>> >>>>>>>>>> I'm now hitting a different error though (now in python): >>>>>>>>>> >>>>>>>>>> <ipython-input-42-b27732a0a892> in bq_to_snowflake(bq_table, >>>>>>>>>>> snow_table, git_branch) >>>>>>>>>>> 161 ) >>>>>>>>>>> 162 >>>>>>>>>>> --> 163 result = pipeline.run() >>>>>>>>>>> 164 result.wait_until_finish() >>>>>>>>>>> 165 >>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>>>>>>>> in run(self, test_runner_api) >>>>>>>>>>> 512 # When possible, invoke a round trip through the runner API. >>>>>>>>>>> 513 if test_runner_api and self._verify_runner_api_compatible(): >>>>>>>>>>> --> 514 return Pipeline.from_runner_api( 515 self.to_runner_api( >>>>>>>>>>> use_fake_coders=True), >>>>>>>>>>> 516 self.runner, >>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>>>>>>>> in from_runner_api(proto, runner, options, return_context, >>>>>>>>>>> allow_proto_holders) >>>>>>>>>>> 890 requirements=proto.requirements) >>>>>>>>>>> 891 root_transform_id, = proto.root_transform_ids >>>>>>>>>>> --> 892 p.transforms_stack = [context.transforms.get_by_id( >>>>>>>>>>> root_transform_id)] >>>>>>>>>>> 893 # TODO(robertwb): These are only needed to continue >>>>>>>>>>> construction. Omit? >>>>>>>>>>> 894 p.applied_labels = { >>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>>>>>>> in get_by_id(self, id) >>>>>>>>>>> 113 # type: (str) -> PortableObjectT >>>>>>>>>>> 114 if id not in self._id_to_obj: >>>>>>>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( >>>>>>>>>>> 116 self._id_to_proto[id], self._pipeline_context) >>>>>>>>>>> 117 return self._id_to_obj[id] >>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>>>>>>>> in from_runner_api(proto, context) >>>>>>>>>>> 1277 result.parts = [] >>>>>>>>>>> 1278 for transform_id in proto.subtransforms: >>>>>>>>>>> -> 1279 part = context.transforms.get_by_id(transform_id) >>>>>>>>>>> 1280 part.parent = result >>>>>>>>>>> 1281 result.parts.append(part) >>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>>>>>>> in get_by_id(self, id) >>>>>>>>>>> 113 # type: (str) -> PortableObjectT >>>>>>>>>>> 114 if id not in self._id_to_obj: >>>>>>>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( >>>>>>>>>>> 116 self._id_to_proto[id], self._pipeline_context) >>>>>>>>>>> 117 return self._id_to_obj[id] >>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>>>>>>>> in from_runner_api(proto, context) >>>>>>>>>>> 1277 result.parts = [] >>>>>>>>>>> 1278 for transform_id in proto.subtransforms: >>>>>>>>>>> -> 1279 part = context.transforms.get_by_id(transform_id) >>>>>>>>>>> 1280 part.parent = result >>>>>>>>>>> 1281 result.parts.append(part) >>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>>>>>>> in get_by_id(self, id) >>>>>>>>>>> 113 # type: (str) -> PortableObjectT >>>>>>>>>>> 114 if id not in self._id_to_obj: >>>>>>>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( >>>>>>>>>>> 116 self._id_to_proto[id], self._pipeline_context) >>>>>>>>>>> 117 return self._id_to_obj[id] >>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>>>>>>>> in from_runner_api(proto, context) >>>>>>>>>>> 1277 result.parts = [] >>>>>>>>>>> 1278 for transform_id in proto.subtransforms: >>>>>>>>>>> -> 1279 part = context.transforms.get_by_id(transform_id) >>>>>>>>>>> 1280 part.parent = result >>>>>>>>>>> 1281 result.parts.append(part) >>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>>>>>>> in get_by_id(self, id) >>>>>>>>>>> 113 # type: (str) -> PortableObjectT >>>>>>>>>>> 114 if id not in self._id_to_obj: >>>>>>>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( >>>>>>>>>>> 116 self._id_to_proto[id], self._pipeline_context) >>>>>>>>>>> 117 return self._id_to_obj[id] >>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>>>>>>>> in from_runner_api(proto, context) >>>>>>>>>>> 1277 result.parts = [] >>>>>>>>>>> 1278 for transform_id in proto.subtransforms: >>>>>>>>>>> -> 1279 part = context.transforms.get_by_id(transform_id) >>>>>>>>>>> 1280 part.parent = result >>>>>>>>>>> 1281 result.parts.append(part) >>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>>>>>>> in get_by_id(self, id) >>>>>>>>>>> 113 # type: (str) -> PortableObjectT >>>>>>>>>>> 114 if id not in self._id_to_obj: >>>>>>>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( >>>>>>>>>>> 116 self._id_to_proto[id], self._pipeline_context) >>>>>>>>>>> 117 return self._id_to_obj[id] >>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>>>>>>>> in from_runner_api(proto, context) >>>>>>>>>>> 1216 is_python_side_input(side_input_tags[0]) if >>>>>>>>>>> side_input_tags else False) >>>>>>>>>>> 1217 >>>>>>>>>>> -> 1218 transform = ptransform.PTransform.from_runner_api(proto, >>>>>>>>>>> context) >>>>>>>>>>> 1219 if uses_python_sideinput_tags: >>>>>>>>>>> 1220 # Ordering is important here. >>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py >>>>>>>>>>> in from_runner_api(cls, proto, context) >>>>>>>>>>> 688 if proto is None or proto.spec is None or not proto.spec.urn >>>>>>>>>>> : >>>>>>>>>>> 689 return None >>>>>>>>>>> --> 690 parameter_type, constructor = cls._known_urns[proto.spec >>>>>>>>>>> .urn] >>>>>>>>>>> 691 >>>>>>>>>>> 692 try: KeyError: 'beam:transform:write_files:v1' >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I'll keep trying to make this work but sharing it in case you can >>>>>>>>>> easily see what the problem is >>>>>>>>>> >>>>>>>>>> Thanks so much! >>>>>>>>>> >>>>>>>>>> On Thu, Nov 19, 2020 at 1:30 PM Brian Hulette < >>>>>>>>>> bhule...@google.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Alan, >>>>>>>>>>> Sorry this error message is so verbose. What are you passing for >>>>>>>>>>> the server_name argument [1]? It looks like that's what the Java >>>>>>>>>>> stacktrace is complaining about: >>>>>>>>>>> >>>>>>>>>>> java.lang.IllegalArgumentException: serverName must be in >>>>>>>>>>> format <account_name>.snowflakecomputing.com >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/snowflake.py#L302 >>>>>>>>>>> >>>>>>>>>>> On Thu, Nov 19, 2020 at 10:16 AM Alan Krumholz < >>>>>>>>>>> alan.krumh...@betterup.co> wrote: >>>>>>>>>>> >>>>>>>>>>>> I'm trying to replace my custom/problematic snowflake sink with >>>>>>>>>>>> the new: >>>>>>>>>>>> https://beam.apache.org/documentation/io/built-in/snowflake/#writing-to-snowflake >>>>>>>>>>>> >>>>>>>>>>>> However when I try to run my pipeline (using python) I get >>>>>>>>>>>> this Java error: >>>>>>>>>>>> >>>>>>>>>>>> RuntimeError: java.lang.RuntimeException: Failed to build >>>>>>>>>>>>> transform beam:external:java:snowflake:write:v1 from spec urn: >>>>>>>>>>>>> "beam:external:java:snowflake:write:v1" >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> It is hard to understand why it is failing from reading the >>>>>>>>>>>> partial java error trace I get in the output: >>>>>>>>>>>> >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:130) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:357) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:433) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:488) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:232) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) >>>>>>>>>>>>> at >>>>>>>>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) >>>>>>>>>>>>> at >>>>>>>>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) >>>>>>>>>>>>> at java.base/java.lang.Thread.run(Thread.java:832) >>>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: serverName must be >>>>>>>>>>>>> in format <account_name>.snowflakecomputing.com >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:141) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.beam.sdk.io.snowflake.SnowflakeIO$DataSourceConfiguration.withServerName(SnowflakeIO.java:1700) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.beam.sdk.io.snowflake.crosslanguage.CrossLanguageConfiguration.getDataSourceConfiguration(CrossLanguageConfiguration.java:166) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.beam.sdk.io.snowflake.crosslanguage.WriteBuilder.buildExternal(WriteBuilder.java:78) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.beam.sdk.io.snowflake.crosslanguage.WriteBuilder.buildExternal(WriteBuilder.java:34) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:125) >>>>>>>>>>>>> ... 12 more >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> any clue how I can debug/fix this using python? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>