Just tried that and still getting this: ---------------------------------------------------------------------------KeyError Traceback (most recent call last)<ipython-input-81-a936d3a5fa70> in <module>----> 1 bq_to_snowflake( 2 'ml-betterup.coach_search.distances', 3 'analytics.ml.coach_search_distances', 4 'master') <ipython-input-78-096896321fde> in bq_to_snowflake(bq_table, snow_table, git_branch) 138 ) 139 --> 140 result = pipeline.run() 141 result.wait_until_finish() 142 ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in run(self, test_runner_api) 512 # When possible, invoke a round trip through the runner API. 513 if test_runner_api and self._verify_runner_api_compatible():--> 514 return Pipeline.from_runner_api( 515 self.to_runner_api(use_fake_coders=True), 516 self.runner, ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in from_runner_api(proto, runner, options, return_context, allow_proto_holders) 890 requirements=proto.requirements) 891 root_transform_id, = proto.root_transform_ids--> 892 p.transforms_stack = [context.transforms.get_by_id(root_transform_id)] 893 # TODO(robertwb): These are only needed to continue construction. Omit? 894 p.applied_labels = { ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py in get_by_id(self, id) 113 # type: (str) -> PortableObjectT 114 if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 self._id_to_proto[id], self._pipeline_context) 117 return self._id_to_obj[id] ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in from_runner_api(proto, context) 1277 result.parts = [] 1278 for transform_id in proto.subtransforms:-> 1279 part = context.transforms.get_by_id(transform_id) 1280 part.parent = result 1281 result.parts.append(part) ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py in get_by_id(self, id) 113 # type: (str) -> PortableObjectT 114 if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 self._id_to_proto[id], self._pipeline_context) 117 return self._id_to_obj[id] ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in from_runner_api(proto, context) 1277 result.parts = [] 1278 for transform_id in proto.subtransforms:-> 1279 part = context.transforms.get_by_id(transform_id) 1280 part.parent = result 1281 result.parts.append(part) ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py in get_by_id(self, id) 113 # type: (str) -> PortableObjectT 114 if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 self._id_to_proto[id], self._pipeline_context) 117 return self._id_to_obj[id] ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in from_runner_api(proto, context) 1277 result.parts = [] 1278 for transform_id in proto.subtransforms:-> 1279 part = context.transforms.get_by_id(transform_id) 1280 part.parent = result 1281 result.parts.append(part) ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py in get_by_id(self, id) 113 # type: (str) -> PortableObjectT 114 if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 self._id_to_proto[id], self._pipeline_context) 117 return self._id_to_obj[id] ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in from_runner_api(proto, context) 1277 result.parts = [] 1278 for transform_id in proto.subtransforms:-> 1279 part = context.transforms.get_by_id(transform_id) 1280 part.parent = result 1281 result.parts.append(part) ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py in get_by_id(self, id) 113 # type: (str) -> PortableObjectT 114 if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 self._id_to_proto[id], self._pipeline_context) 117 return self._id_to_obj[id] ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in from_runner_api(proto, context) 1216 is_python_side_input(side_input_tags[0]) if side_input_tags else False) 1217 -> 1218 transform = ptransform.PTransform.from_runner_api(proto, context) 1219 if uses_python_sideinput_tags: 1220 # Ordering is important here. ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py in from_runner_api(cls, proto, context) 688 if proto is None or proto.spec is None or not proto.spec.urn: 689 return None--> 690 parameter_type, constructor = cls._known_urns[proto.spec.urn] 691 692 try: KeyError: 'beam:transform:write_files:v1'
On Fri, Nov 20, 2020 at 11:18 AM Brian Hulette <bhule...@google.com> wrote: > Hm try passing in the args as they would appear in > `sys.argv`, PipelineOptions(['--experiments=use_runner_v2']) > > On Thu, Nov 19, 2020 at 12:14 PM Alan Krumholz <alan.krumh...@betterup.co> > wrote: > >> How can I pass that flag using the SDK? >> Tried this: >> >> pipeline = beam.Pipeline(options=PipelineOptions(experiments= >>> ['use_runner_v2'], ...) >> >> >> but still getting a similar error: >> >> ---------------------------------------------------------------------------KeyError >> Traceback (most recent call >> last)<ipython-input-69-a936d3a5fa70> in <module>----> 1 bq_to_snowflake( >> 2 'ml-betterup.coach_search.distances', 3 >> 'analytics.ml.coach_search_distances', 4 'master') >> <ipython-input-66-2b6bf3c9334b> in bq_to_snowflake(bq_table, snow_table, >> git_branch) 138 ) 139 --> 140 result = pipeline.run() 141 >> result.wait_until_finish() 142 >> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >> run(self, test_runner_api) 512 # When possible, invoke a round trip >> through the runner API. 513 if test_runner_api and >> self._verify_runner_api_compatible():--> 514 return >> Pipeline.from_runner_api( 515 >> self.to_runner_api(use_fake_coders=True), 516 self.runner, >> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >> from_runner_api(proto, runner, options, return_context, allow_proto_holders) >> 890 requirements=proto.requirements) 891 >> root_transform_id, = proto.root_transform_ids--> 892 p.transforms_stack >> = [context.transforms.get_by_id(root_transform_id)] 893 # >> TODO(robertwb): These are only needed to continue construction. Omit? 894 >> p.applied_labels = { >> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT 114 >> if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >> self._obj_type.from_runner_api( 116 self._id_to_proto[id], >> self._pipeline_context) 117 return self._id_to_obj[id] >> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >> from_runner_api(proto, context) 1277 result.parts = [] 1278 for >> transform_id in proto.subtransforms:-> 1279 part = >> context.transforms.get_by_id(transform_id) 1280 part.parent = result >> 1281 result.parts.append(part) >> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT 114 >> if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >> self._obj_type.from_runner_api( 116 self._id_to_proto[id], >> self._pipeline_context) 117 return self._id_to_obj[id] >> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >> from_runner_api(proto, context) 1277 result.parts = [] 1278 for >> transform_id in proto.subtransforms:-> 1279 part = >> context.transforms.get_by_id(transform_id) 1280 part.parent = result >> 1281 result.parts.append(part) >> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT 114 >> if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >> self._obj_type.from_runner_api( 116 self._id_to_proto[id], >> self._pipeline_context) 117 return self._id_to_obj[id] >> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >> from_runner_api(proto, context) 1277 result.parts = [] 1278 for >> transform_id in proto.subtransforms:-> 1279 part = >> context.transforms.get_by_id(transform_id) 1280 part.parent = result >> 1281 result.parts.append(part) >> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT 114 >> if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >> self._obj_type.from_runner_api( 116 self._id_to_proto[id], >> self._pipeline_context) 117 return self._id_to_obj[id] >> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >> from_runner_api(proto, context) 1277 result.parts = [] 1278 for >> transform_id in proto.subtransforms:-> 1279 part = >> context.transforms.get_by_id(transform_id) 1280 part.parent = result >> 1281 result.parts.append(part) >> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT 114 >> if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >> self._obj_type.from_runner_api( 116 self._id_to_proto[id], >> self._pipeline_context) 117 return self._id_to_obj[id] >> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >> from_runner_api(proto, context) 1216 >> is_python_side_input(side_input_tags[0]) if side_input_tags else False) >> 1217 -> 1218 transform = ptransform.PTransform.from_runner_api(proto, >> context) 1219 if uses_python_sideinput_tags: 1220 # Ordering >> is important here. >> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py >> in from_runner_api(cls, proto, context) 688 if proto is None or >> proto.spec is None or not proto.spec.urn: 689 return None--> 690 >> parameter_type, constructor = cls._known_urns[proto.spec.urn] 691 >> 692 try: >> KeyError: 'beam:transform:write_files:v1' >> >> >> >> On Thu, Nov 19, 2020 at 2:18 PM Brian Hulette <bhule...@google.com> >> wrote: >> >>> Ah ok, you'll need to use Dataflow runner v2 [1] to run this pipeline >>> (add the flag '--experiments=use_runner_v2'). See also [2]. >>> >>> [1] >>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#python_11 >>> [2] >>> https://cloud.google.com/blog/products/data-analytics/multi-language-sdks-for-building-cloud-pipelines >>> >>> On Thu, Nov 19, 2020 at 11:10 AM Alan Krumholz < >>> alan.krumh...@betterup.co> wrote: >>> >>>> DataFlow runner >>>> >>>> On Thu, Nov 19, 2020 at 2:00 PM Brian Hulette <bhule...@google.com> >>>> wrote: >>>> >>>>> Hm what runner are you using? It looks like we're trying to encode and >>>>> decode the pipeline proto, which isn't possible for a multi-language >>>>> pipeline. Are you using a portable runner? >>>>> >>>>> Brian >>>>> >>>>> On Thu, Nov 19, 2020 at 10:42 AM Alan Krumholz < >>>>> alan.krumh...@betterup.co> wrote: >>>>> >>>>>> got it, thanks! >>>>>> I was using: >>>>>> 'xxxxxx.us-east-1' >>>>>> Seems using this instead fixes that problem: >>>>>> 'xxxxxx.us-east-1.snowflakecomputing.com >>>>>> >>>>>> I'm now hitting a different error though (now in python): >>>>>> >>>>>> <ipython-input-42-b27732a0a892> in bq_to_snowflake(bq_table, >>>>>>> snow_table, git_branch) >>>>>>> 161 ) >>>>>>> 162 >>>>>>> --> 163 result = pipeline.run() >>>>>>> 164 result.wait_until_finish() >>>>>>> 165 >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>>>> in run(self, test_runner_api) >>>>>>> 512 # When possible, invoke a round trip through the runner API. >>>>>>> 513 if test_runner_api and self._verify_runner_api_compatible(): >>>>>>> --> 514 return Pipeline.from_runner_api( 515 self.to_runner_api( >>>>>>> use_fake_coders=True), >>>>>>> 516 self.runner, >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>>>> in from_runner_api(proto, runner, options, return_context, >>>>>>> allow_proto_holders) >>>>>>> 890 requirements=proto.requirements) >>>>>>> 891 root_transform_id, = proto.root_transform_ids >>>>>>> --> 892 p.transforms_stack = [context.transforms.get_by_id( >>>>>>> root_transform_id)] >>>>>>> 893 # TODO(robertwb): These are only needed to continue >>>>>>> construction. Omit? >>>>>>> 894 p.applied_labels = { >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>>> in get_by_id(self, id) >>>>>>> 113 # type: (str) -> PortableObjectT >>>>>>> 114 if id not in self._id_to_obj: >>>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 >>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>> 117 return self._id_to_obj[id] >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>>>> in from_runner_api(proto, context) >>>>>>> 1277 result.parts = [] >>>>>>> 1278 for transform_id in proto.subtransforms: >>>>>>> -> 1279 part = context.transforms.get_by_id(transform_id) >>>>>>> 1280 part.parent = result >>>>>>> 1281 result.parts.append(part) >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>>> in get_by_id(self, id) >>>>>>> 113 # type: (str) -> PortableObjectT >>>>>>> 114 if id not in self._id_to_obj: >>>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 >>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>> 117 return self._id_to_obj[id] >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>>>> in from_runner_api(proto, context) >>>>>>> 1277 result.parts = [] >>>>>>> 1278 for transform_id in proto.subtransforms: >>>>>>> -> 1279 part = context.transforms.get_by_id(transform_id) >>>>>>> 1280 part.parent = result >>>>>>> 1281 result.parts.append(part) >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>>> in get_by_id(self, id) >>>>>>> 113 # type: (str) -> PortableObjectT >>>>>>> 114 if id not in self._id_to_obj: >>>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 >>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>> 117 return self._id_to_obj[id] >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>>>> in from_runner_api(proto, context) >>>>>>> 1277 result.parts = [] >>>>>>> 1278 for transform_id in proto.subtransforms: >>>>>>> -> 1279 part = context.transforms.get_by_id(transform_id) >>>>>>> 1280 part.parent = result >>>>>>> 1281 result.parts.append(part) >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>>> in get_by_id(self, id) >>>>>>> 113 # type: (str) -> PortableObjectT >>>>>>> 114 if id not in self._id_to_obj: >>>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 >>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>> 117 return self._id_to_obj[id] >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>>>> in from_runner_api(proto, context) >>>>>>> 1277 result.parts = [] >>>>>>> 1278 for transform_id in proto.subtransforms: >>>>>>> -> 1279 part = context.transforms.get_by_id(transform_id) >>>>>>> 1280 part.parent = result >>>>>>> 1281 result.parts.append(part) >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>>> in get_by_id(self, id) >>>>>>> 113 # type: (str) -> PortableObjectT >>>>>>> 114 if id not in self._id_to_obj: >>>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 >>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>> 117 return self._id_to_obj[id] >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>>>> in from_runner_api(proto, context) >>>>>>> 1216 is_python_side_input(side_input_tags[0]) if side_input_tags >>>>>>> else False) >>>>>>> 1217 >>>>>>> -> 1218 transform = ptransform.PTransform.from_runner_api(proto, >>>>>>> context) >>>>>>> 1219 if uses_python_sideinput_tags: >>>>>>> 1220 # Ordering is important here. >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py >>>>>>> in from_runner_api(cls, proto, context) >>>>>>> 688 if proto is None or proto.spec is None or not proto.spec.urn: >>>>>>> 689 return None >>>>>>> --> 690 parameter_type, constructor = cls._known_urns[proto.spec.urn >>>>>>> ] >>>>>>> 691 >>>>>>> 692 try: KeyError: 'beam:transform:write_files:v1' >>>>>> >>>>>> >>>>>> >>>>>> I'll keep trying to make this work but sharing it in case you can >>>>>> easily see what the problem is >>>>>> >>>>>> Thanks so much! >>>>>> >>>>>> On Thu, Nov 19, 2020 at 1:30 PM Brian Hulette <bhule...@google.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Alan, >>>>>>> Sorry this error message is so verbose. What are you passing for the >>>>>>> server_name argument [1]? It looks like that's what the Java stacktrace >>>>>>> is >>>>>>> complaining about: >>>>>>> >>>>>>> java.lang.IllegalArgumentException: serverName must be in format >>>>>>> <account_name>.snowflakecomputing.com >>>>>>> >>>>>>> [1] >>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/snowflake.py#L302 >>>>>>> >>>>>>> On Thu, Nov 19, 2020 at 10:16 AM Alan Krumholz < >>>>>>> alan.krumh...@betterup.co> wrote: >>>>>>> >>>>>>>> I'm trying to replace my custom/problematic snowflake sink with the >>>>>>>> new: >>>>>>>> https://beam.apache.org/documentation/io/built-in/snowflake/#writing-to-snowflake >>>>>>>> >>>>>>>> However when I try to run my pipeline (using python) I get this >>>>>>>> Java error: >>>>>>>> >>>>>>>> RuntimeError: java.lang.RuntimeException: Failed to build >>>>>>>>> transform beam:external:java:snowflake:write:v1 from spec urn: >>>>>>>>> "beam:external:java:snowflake:write:v1" >>>>>>>> >>>>>>>> >>>>>>>> It is hard to understand why it is failing from reading the partial >>>>>>>> java error trace I get in the output: >>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:130) >>>>>>>>> at >>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:357) >>>>>>>>> at >>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:433) >>>>>>>>> at >>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:488) >>>>>>>>> at >>>>>>>>> org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:232) >>>>>>>>> at >>>>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172) >>>>>>>>> at >>>>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331) >>>>>>>>> at >>>>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817) >>>>>>>>> at >>>>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) >>>>>>>>> at >>>>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) >>>>>>>>> at >>>>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) >>>>>>>>> at >>>>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) >>>>>>>>> at java.base/java.lang.Thread.run(Thread.java:832) >>>>>>>>> Caused by: java.lang.IllegalArgumentException: serverName must be in >>>>>>>>> format <account_name>.snowflakecomputing.com >>>>>>>>> at >>>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:141) >>>>>>>>> at >>>>>>>>> org.apache.beam.sdk.io.snowflake.SnowflakeIO$DataSourceConfiguration.withServerName(SnowflakeIO.java:1700) >>>>>>>>> at >>>>>>>>> org.apache.beam.sdk.io.snowflake.crosslanguage.CrossLanguageConfiguration.getDataSourceConfiguration(CrossLanguageConfiguration.java:166) >>>>>>>>> at >>>>>>>>> org.apache.beam.sdk.io.snowflake.crosslanguage.WriteBuilder.buildExternal(WriteBuilder.java:78) >>>>>>>>> at >>>>>>>>> org.apache.beam.sdk.io.snowflake.crosslanguage.WriteBuilder.buildExternal(WriteBuilder.java:34) >>>>>>>>> at >>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:125) >>>>>>>>> ... 12 more >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> any clue how I can debug/fix this using python? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>