Ah ok, you'll need to use Dataflow runner v2 [1] to run this pipeline (add
the flag '--experiments=use_runner_v2'). See also [2].

[1]
https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#python_11
[2]
https://cloud.google.com/blog/products/data-analytics/multi-language-sdks-for-building-cloud-pipelines

On Thu, Nov 19, 2020 at 11:10 AM Alan Krumholz <alan.krumh...@betterup.co>
wrote:

> DataFlow runner
>
> On Thu, Nov 19, 2020 at 2:00 PM Brian Hulette <bhule...@google.com> wrote:
>
>> Hm what runner are you using? It looks like we're trying to encode and
>> decode the pipeline proto, which isn't possible for a multi-language
>> pipeline. Are you using a portable runner?
>>
>> Brian
>>
>> On Thu, Nov 19, 2020 at 10:42 AM Alan Krumholz <alan.krumh...@betterup.co>
>> wrote:
>>
>>> got it, thanks!
>>> I was using:
>>> 'xxxxxx.us-east-1'
>>> Seems using this instead fixes that problem:
>>> 'xxxxxx.us-east-1.snowflakecomputing.com
>>>
>>> I'm now hitting a different error though (now in python):
>>>
>>> <ipython-input-42-b27732a0a892> in bq_to_snowflake(bq_table,
>>>> snow_table, git_branch)
>>>> 161 )
>>>> 162
>>>> --> 163 result = pipeline.run()
>>>> 164 result.wait_until_finish()
>>>> 165 ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py
>>>> in run(self, test_runner_api)
>>>> 512 # When possible, invoke a round trip through the runner API.
>>>> 513 if test_runner_api and self._verify_runner_api_compatible():
>>>> --> 514 return Pipeline.from_runner_api( 515 self.to_runner_api(
>>>> use_fake_coders=True),
>>>> 516 self.runner,
>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in
>>>> from_runner_api(proto, runner, options, return_context,
>>>> allow_proto_holders)
>>>> 890 requirements=proto.requirements)
>>>> 891 root_transform_id, = proto.root_transform_ids
>>>> --> 892 p.transforms_stack = [context.transforms.get_by_id(
>>>> root_transform_id)]
>>>> 893 # TODO(robertwb): These are only needed to continue construction.
>>>> Omit?
>>>> 894 p.applied_labels = {
>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>> in get_by_id(self, id)
>>>> 113 # type: (str) -> PortableObjectT
>>>> 114 if id not in self._id_to_obj:
>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116
>>>> self._id_to_proto[id], self._pipeline_context)
>>>> 117 return self._id_to_obj[id]
>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in
>>>> from_runner_api(proto, context)
>>>> 1277 result.parts = []
>>>> 1278 for transform_id in proto.subtransforms:
>>>> -> 1279 part = context.transforms.get_by_id(transform_id)
>>>> 1280 part.parent = result
>>>> 1281 result.parts.append(part)
>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>> in get_by_id(self, id)
>>>> 113 # type: (str) -> PortableObjectT
>>>> 114 if id not in self._id_to_obj:
>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116
>>>> self._id_to_proto[id], self._pipeline_context)
>>>> 117 return self._id_to_obj[id]
>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in
>>>> from_runner_api(proto, context)
>>>> 1277 result.parts = []
>>>> 1278 for transform_id in proto.subtransforms:
>>>> -> 1279 part = context.transforms.get_by_id(transform_id)
>>>> 1280 part.parent = result
>>>> 1281 result.parts.append(part)
>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>> in get_by_id(self, id)
>>>> 113 # type: (str) -> PortableObjectT
>>>> 114 if id not in self._id_to_obj:
>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116
>>>> self._id_to_proto[id], self._pipeline_context)
>>>> 117 return self._id_to_obj[id]
>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in
>>>> from_runner_api(proto, context)
>>>> 1277 result.parts = []
>>>> 1278 for transform_id in proto.subtransforms:
>>>> -> 1279 part = context.transforms.get_by_id(transform_id)
>>>> 1280 part.parent = result
>>>> 1281 result.parts.append(part)
>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>> in get_by_id(self, id)
>>>> 113 # type: (str) -> PortableObjectT
>>>> 114 if id not in self._id_to_obj:
>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116
>>>> self._id_to_proto[id], self._pipeline_context)
>>>> 117 return self._id_to_obj[id]
>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in
>>>> from_runner_api(proto, context)
>>>> 1277 result.parts = []
>>>> 1278 for transform_id in proto.subtransforms:
>>>> -> 1279 part = context.transforms.get_by_id(transform_id)
>>>> 1280 part.parent = result
>>>> 1281 result.parts.append(part)
>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>> in get_by_id(self, id)
>>>> 113 # type: (str) -> PortableObjectT
>>>> 114 if id not in self._id_to_obj:
>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116
>>>> self._id_to_proto[id], self._pipeline_context)
>>>> 117 return self._id_to_obj[id]
>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in
>>>> from_runner_api(proto, context)
>>>> 1216 is_python_side_input(side_input_tags[0]) if side_input_tags else
>>>> False)
>>>> 1217
>>>> -> 1218 transform = ptransform.PTransform.from_runner_api(proto,
>>>> context)
>>>> 1219 if uses_python_sideinput_tags:
>>>> 1220 # Ordering is important here.
>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py
>>>> in from_runner_api(cls, proto, context)
>>>> 688 if proto is None or proto.spec is None or not proto.spec.urn:
>>>> 689 return None
>>>> --> 690 parameter_type, constructor = cls._known_urns[proto.spec.urn]
>>>> 691
>>>> 692 try: KeyError: 'beam:transform:write_files:v1'
>>>
>>>
>>>
>>> I'll keep trying to make this work but sharing it in case you can easily
>>> see what the problem is
>>>
>>> Thanks so much!
>>>
>>> On Thu, Nov 19, 2020 at 1:30 PM Brian Hulette <bhule...@google.com>
>>> wrote:
>>>
>>>> Hi Alan,
>>>> Sorry this error message is so verbose. What are you passing for the
>>>> server_name argument [1]? It looks like that's what the Java stacktrace is
>>>> complaining about:
>>>>
>>>> java.lang.IllegalArgumentException: serverName must be in format
>>>> <account_name>.snowflakecomputing.com
>>>>
>>>> [1]
>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/snowflake.py#L302
>>>>
>>>> On Thu, Nov 19, 2020 at 10:16 AM Alan Krumholz <
>>>> alan.krumh...@betterup.co> wrote:
>>>>
>>>>> I'm trying to replace my custom/problematic snowflake sink with the
>>>>> new:
>>>>> https://beam.apache.org/documentation/io/built-in/snowflake/#writing-to-snowflake
>>>>>
>>>>> However when I try to run my pipeline  (using python) I get this Java
>>>>> error:
>>>>>
>>>>> RuntimeError: java.lang.RuntimeException: Failed to build transform
>>>>>> beam:external:java:snowflake:write:v1 from spec urn:
>>>>>> "beam:external:java:snowflake:write:v1"
>>>>>
>>>>>
>>>>> It is hard to understand why it is failing from reading the partial java 
>>>>> error trace I get in the output:
>>>>>
>>>>>> at 
>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:130)
>>>>>>  at 
>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:357)
>>>>>>  at 
>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:433)
>>>>>>  at 
>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:488)
>>>>>>  at 
>>>>>> org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:232)
>>>>>>  at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
>>>>>>  at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
>>>>>>  at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
>>>>>>  at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>>>>>>  at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>>>>>>  at 
>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
>>>>>>  at 
>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
>>>>>>  at java.base/java.lang.Thread.run(Thread.java:832)
>>>>>> Caused by: java.lang.IllegalArgumentException: serverName must be in 
>>>>>> format <account_name>.snowflakecomputing.com
>>>>>>  at 
>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:141)
>>>>>>  at 
>>>>>> org.apache.beam.sdk.io.snowflake.SnowflakeIO$DataSourceConfiguration.withServerName(SnowflakeIO.java:1700)
>>>>>>  at 
>>>>>> org.apache.beam.sdk.io.snowflake.crosslanguage.CrossLanguageConfiguration.getDataSourceConfiguration(CrossLanguageConfiguration.java:166)
>>>>>>  at 
>>>>>> org.apache.beam.sdk.io.snowflake.crosslanguage.WriteBuilder.buildExternal(WriteBuilder.java:78)
>>>>>>  at 
>>>>>> org.apache.beam.sdk.io.snowflake.crosslanguage.WriteBuilder.buildExternal(WriteBuilder.java:34)
>>>>>>  at 
>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:125)
>>>>>>  ... 12 more
>>>>>
>>>>>
>>>>>
>>>>>  any clue how I can debug/fix this using python?
>>>>>
>>>>>
>>>>>
>>>>>

Reply via email to