How can I pass that flag using the SDK?
Tried this:

pipeline = beam.Pipeline(options=PipelineOptions(experiments=
> ['use_runner_v2'], ...)

but still getting a similar error:

                                 Traceback (most recent call
last)<ipython-input-69-a936d3a5fa70> in <module>----> 1
bq_to_snowflake(      2     'ml-betterup.coach_search.distances',
3     '',      4     'master')
<ipython-input-66-2b6bf3c9334b> in bq_to_snowflake(bq_table,
snow_table, git_branch)    138     )    139 --> 140     result =    141     result.wait_until_finish()    142
~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/ in
run(self, test_runner_api)    512       # When possible, invoke a
round trip through the runner API.    513       if test_runner_api and
self._verify_runner_api_compatible():--> 514         return
Pipeline.from_runner_api(    515
self.to_runner_api(use_fake_coders=True),    516
~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/ in
from_runner_api(proto, runner, options, return_context,
allow_proto_holders)    890         requirements=proto.requirements)
 891     root_transform_id, = proto.root_transform_ids--> 892
p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
   893     # TODO(robertwb): These are only needed to continue
construction. Omit?    894     p.applied_labels = {
in get_by_id(self, id)    113     # type: (str) -> PortableObjectT
114     if id not in self._id_to_obj:--> 115       self._id_to_obj[id]
= self._obj_type.from_runner_api(    116
self._id_to_proto[id], self._pipeline_context)    117     return
~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/ in
from_runner_api(proto, context)   1277 = []   1278
 for transform_id in proto.subtransforms:-> 1279       part =
context.transforms.get_by_id(transform_id)   1280       part.parent =
result   1281
in get_by_id(self, id)    113     # type: (str) -> PortableObjectT
114     if id not in self._id_to_obj:--> 115       self._id_to_obj[id]
= self._obj_type.from_runner_api(    116
self._id_to_proto[id], self._pipeline_context)    117     return
~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/ in
from_runner_api(proto, context)   1277 = []   1278
 for transform_id in proto.subtransforms:-> 1279       part =
context.transforms.get_by_id(transform_id)   1280       part.parent =
result   1281
in get_by_id(self, id)    113     # type: (str) -> PortableObjectT
114     if id not in self._id_to_obj:--> 115       self._id_to_obj[id]
= self._obj_type.from_runner_api(    116
self._id_to_proto[id], self._pipeline_context)    117     return
~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/ in
from_runner_api(proto, context)   1277 = []   1278
 for transform_id in proto.subtransforms:-> 1279       part =
context.transforms.get_by_id(transform_id)   1280       part.parent =
result   1281
in get_by_id(self, id)    113     # type: (str) -> PortableObjectT
114     if id not in self._id_to_obj:--> 115       self._id_to_obj[id]
= self._obj_type.from_runner_api(    116
self._id_to_proto[id], self._pipeline_context)    117     return
~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/ in
from_runner_api(proto, context)   1277 = []   1278
 for transform_id in proto.subtransforms:-> 1279       part =
context.transforms.get_by_id(transform_id)   1280       part.parent =
result   1281
in get_by_id(self, id)    113     # type: (str) -> PortableObjectT
114     if id not in self._id_to_obj:--> 115       self._id_to_obj[id]
= self._obj_type.from_runner_api(    116
self._id_to_proto[id], self._pipeline_context)    117     return
~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/ in
from_runner_api(proto, context)   1216
is_python_side_input(side_input_tags[0]) if side_input_tags else
False)   1217 -> 1218     transform =
ptransform.PTransform.from_runner_api(proto, context)   1219     if
uses_python_sideinput_tags:   1220       # Ordering is important here.
in from_runner_api(cls, proto, context)    688     if proto is None or
proto.spec is None or not proto.spec.urn:    689       return None-->
690     parameter_type, constructor = cls._known_urns[proto.spec.urn]
  691     692     try:
KeyError: 'beam:transform:write_files:v1'

On Thu, Nov 19, 2020 at 2:18 PM Brian Hulette <> wrote:

> Ah ok, you'll need to use Dataflow runner v2 [1] to run this pipeline (add
> the flag '--experiments=use_runner_v2'). See also [2].
> [1]
> [2]
> On Thu, Nov 19, 2020 at 11:10 AM Alan Krumholz <>
> wrote:
>> DataFlow runner
>> On Thu, Nov 19, 2020 at 2:00 PM Brian Hulette <>
>> wrote:
>>> Hm what runner are you using? It looks like we're trying to encode and
>>> decode the pipeline proto, which isn't possible for a multi-language
>>> pipeline. Are you using a portable runner?
>>> Brian
>>> On Thu, Nov 19, 2020 at 10:42 AM Alan Krumholz <
>>>> wrote:
>>>> got it, thanks!
>>>> I was using:
>>>> ''
>>>> Seems using this instead fixes that problem:
>>>> '
>>>> I'm now hitting a different error though (now in python):
>>>> <ipython-input-42-b27732a0a892> in bq_to_snowflake(bq_table,
>>>>> snow_table, git_branch)
>>>>> 161 )
>>>>> 162
>>>>> --> 163 result =
>>>>> 164 result.wait_until_finish()
>>>>> 165
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/
>>>>> in run(self, test_runner_api)
>>>>> 512 # When possible, invoke a round trip through the runner API.
>>>>> 513 if test_runner_api and self._verify_runner_api_compatible():
>>>>> --> 514 return Pipeline.from_runner_api( 515 self.to_runner_api(
>>>>> use_fake_coders=True),
>>>>> 516 self.runner,
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/
>>>>> in from_runner_api(proto, runner, options, return_context,
>>>>> allow_proto_holders)
>>>>> 890 requirements=proto.requirements)
>>>>> 891 root_transform_id, = proto.root_transform_ids
>>>>> --> 892 p.transforms_stack = [context.transforms.get_by_id(
>>>>> root_transform_id)]
>>>>> 893 # TODO(robertwb): These are only needed to continue construction.
>>>>> Omit?
>>>>> 894 p.applied_labels = {
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/
>>>>> in get_by_id(self, id)
>>>>> 113 # type: (str) -> PortableObjectT
>>>>> 114 if id not in self._id_to_obj:
>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116
>>>>> self._id_to_proto[id], self._pipeline_context)
>>>>> 117 return self._id_to_obj[id]
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/
>>>>> in from_runner_api(proto, context)
>>>>> 1277 = []
>>>>> 1278 for transform_id in proto.subtransforms:
>>>>> -> 1279 part = context.transforms.get_by_id(transform_id)
>>>>> 1280 part.parent = result
>>>>> 1281
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/
>>>>> in get_by_id(self, id)
>>>>> 113 # type: (str) -> PortableObjectT
>>>>> 114 if id not in self._id_to_obj:
>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116
>>>>> self._id_to_proto[id], self._pipeline_context)
>>>>> 117 return self._id_to_obj[id]
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/
>>>>> in from_runner_api(proto, context)
>>>>> 1277 = []
>>>>> 1278 for transform_id in proto.subtransforms:
>>>>> -> 1279 part = context.transforms.get_by_id(transform_id)
>>>>> 1280 part.parent = result
>>>>> 1281
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/
>>>>> in get_by_id(self, id)
>>>>> 113 # type: (str) -> PortableObjectT
>>>>> 114 if id not in self._id_to_obj:
>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116
>>>>> self._id_to_proto[id], self._pipeline_context)
>>>>> 117 return self._id_to_obj[id]
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/
>>>>> in from_runner_api(proto, context)
>>>>> 1277 = []
>>>>> 1278 for transform_id in proto.subtransforms:
>>>>> -> 1279 part = context.transforms.get_by_id(transform_id)
>>>>> 1280 part.parent = result
>>>>> 1281
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/
>>>>> in get_by_id(self, id)
>>>>> 113 # type: (str) -> PortableObjectT
>>>>> 114 if id not in self._id_to_obj:
>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116
>>>>> self._id_to_proto[id], self._pipeline_context)
>>>>> 117 return self._id_to_obj[id]
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/
>>>>> in from_runner_api(proto, context)
>>>>> 1277 = []
>>>>> 1278 for transform_id in proto.subtransforms:
>>>>> -> 1279 part = context.transforms.get_by_id(transform_id)
>>>>> 1280 part.parent = result
>>>>> 1281
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/
>>>>> in get_by_id(self, id)
>>>>> 113 # type: (str) -> PortableObjectT
>>>>> 114 if id not in self._id_to_obj:
>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116
>>>>> self._id_to_proto[id], self._pipeline_context)
>>>>> 117 return self._id_to_obj[id]
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/
>>>>> in from_runner_api(proto, context)
>>>>> 1216 is_python_side_input(side_input_tags[0]) if side_input_tags else
>>>>> False)
>>>>> 1217
>>>>> -> 1218 transform = ptransform.PTransform.from_runner_api(proto,
>>>>> context)
>>>>> 1219 if uses_python_sideinput_tags:
>>>>> 1220 # Ordering is important here.
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/transforms/
>>>>> in from_runner_api(cls, proto, context)
>>>>> 688 if proto is None or proto.spec is None or not proto.spec.urn:
>>>>> 689 return None
>>>>> --> 690 parameter_type, constructor = cls._known_urns[proto.spec.urn]
>>>>> 691
>>>>> 692 try: KeyError: 'beam:transform:write_files:v1'
>>>> I'll keep trying to make this work but sharing it in case you can
>>>> easily see what the problem is
>>>> Thanks so much!
>>>> On Thu, Nov 19, 2020 at 1:30 PM Brian Hulette <>
>>>> wrote:
>>>>> Hi Alan,
>>>>> Sorry this error message is so verbose. What are you passing for the
>>>>> server_name argument [1]? It looks like that's what the Java stacktrace is
>>>>> complaining about:
>>>>> java.lang.IllegalArgumentException: serverName must be in format
>>>>> <account_name>
>>>>> [1]
>>>>> On Thu, Nov 19, 2020 at 10:16 AM Alan Krumholz <
>>>>>> wrote:
>>>>>> I'm trying to replace my custom/problematic snowflake sink with the
>>>>>> new:
>>>>>> However when I try to run my pipeline  (using python) I get this Java
>>>>>> error:
>>>>>> RuntimeError: java.lang.RuntimeException: Failed to build transform
>>>>>>> beam:external:java:snowflake:write:v1 from spec urn:
>>>>>>> "beam:external:java:snowflake:write:v1"
>>>>>> It is hard to understand why it is failing from reading the partial java 
>>>>>> error trace I get in the output:
>>>>>>> at 
>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(
>>>>>>>         at 
>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(
>>>>>>>         at 
>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService.expand(
>>>>>>>         at 
>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService.expand(
>>>>>>>         at 
>>>>>>> org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(
>>>>>>>         at 
>>>>>>>         at 
>>>>>>>         at 
>>>>>>>         at 
>>>>>>>         at 
>>>>>>>         at 
>>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>>>>         at 
>>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor$
>>>>>>>         at java.base/
>>>>>>> Caused by: java.lang.IllegalArgumentException: serverName must be in 
>>>>>>> format <account_name>
>>>>>>>         at 
>>>>>>>         at 
>>>>>>>         at 
>>>>>>>         at 
>>>>>>>         at 
>>>>>>>         at 
>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(
>>>>>>>         ... 12 more
>>>>>>  any clue how I can debug/fix this using python?

Reply via email to