Re: snowflake io in python
Thank you! On Mon, Nov 23, 2020 at 10:53 PM Chamikara Jayalath wrote: > We are working on this. ETA is the end of this quarter. Created > https://issues.apache.org/jira/browse/BEAM-11332 for tracking. > > Thanks, > Cham > > On Mon, Nov 23, 2020 at 5:42 AM Alan Krumholz > wrote: > >> Hi Cham, >> I'm guessing this means I won't be able to use Snowflake IO with python >> on dataflow until then? >> Is there a timeline for that work to be completed? >> >> Thank you so much for your help >> >> >> >> On Fri, Nov 20, 2020 at 9:33 PM Chamikara Jayalath >> wrote: >> >>> This is because Python does not understand that Java specific >>> "beam:transform:write_files:v1" transform. Hopefully this is one of those >>> issues that will get resolved when we update Dataflow to directly consume >>> portable protos (we are working on this now). >>> >>> Thanks, >>> Cham >>> >>> On Fri, Nov 20, 2020 at 11:14 AM Brian Hulette >>> wrote: >>> >>>> +Chamikara Jayalath any idea why this is still >>>> doing a runner api roundtrip and failing? It's a multi-language pipeline, >>>> and Alan has it configured to run on Dataflow runner V2. >>>> >>>> On Fri, Nov 20, 2020 at 10:36 AM Alan Krumholz < >>>> alan.krumh...@betterup.co> wrote: >>>> >>>>> Just tried that and still getting this: >>>>> >>>>> ---KeyError >>>>> Traceback (most recent call >>>>> last) in > 1 bq_to_snowflake( >>>>> 2 'ml-betterup.coach_search.distances', 3 >>>>> 'analytics.ml.coach_search_distances', 4 'master') >>>>> in bq_to_snowflake(bq_table, snow_table, >>>>> git_branch)138 )139 --> 140 result = pipeline.run() >>>>> 141 result.wait_until_finish()142 >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>> run(self, test_runner_api)512 # When possible, invoke a round >>>>> trip through the runner API.513 if test_runner_api and >>>>> self._verify_runner_api_compatible():--> 514 return >>>>> Pipeline.from_runner_api(515 >>>>> self.to_runner_api(use_fake_coders=True),516 self.runner, >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>> from_runner_api(proto, runner, options, return_context, >>>>> allow_proto_holders)890 requirements=proto.requirements) >>>>> 891 root_transform_id, = proto.root_transform_ids--> 892 >>>>> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)] >>>>> 893 # TODO(robertwb): These are only needed to continue construction. >>>>> Omit?894 p.applied_labels = { >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>> in get_by_id(self, id)113 # type: (str) -> PortableObjectT >>>>> 114 if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >>>>> self._obj_type.from_runner_api(116 self._id_to_proto[id], >>>>> self._pipeline_context)117 return self._id_to_obj[id] >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>> from_runner_api(proto, context) 1277 result.parts = [] 1278 >>>>> for transform_id in proto.subtransforms:-> 1279 part = >>>>> context.transforms.get_by_id(transform_id) 1280 part.parent = >>>>> result 1281 result.parts.append(part) >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>> in get_by_id(self, id)113 # type: (str) -> PortableObjectT >>>>> 114 if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >>>>> self._obj_type.from_runner_api(116 self._id_to_proto[id], >>>>> self._pipeline_context)117 return self._id_to_obj[id] >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>> from_runner_api(proto, context) 1277 result.parts = [] 1278 >>>>> for transform_id in proto
Re: snowflake io in python
Hi Cham, I'm guessing this means I won't be able to use Snowflake IO with python on dataflow until then? Is there a timeline for that work to be completed? Thank you so much for your help On Fri, Nov 20, 2020 at 9:33 PM Chamikara Jayalath wrote: > This is because Python does not understand that Java specific > "beam:transform:write_files:v1" transform. Hopefully this is one of those > issues that will get resolved when we update Dataflow to directly consume > portable protos (we are working on this now). > > Thanks, > Cham > > On Fri, Nov 20, 2020 at 11:14 AM Brian Hulette > wrote: > >> +Chamikara Jayalath any idea why this is still >> doing a runner api roundtrip and failing? It's a multi-language pipeline, >> and Alan has it configured to run on Dataflow runner V2. >> >> On Fri, Nov 20, 2020 at 10:36 AM Alan Krumholz >> wrote: >> >>> Just tried that and still getting this: >>> >>> ---KeyError >>> Traceback (most recent call >>> last) in > 1 bq_to_snowflake( >>> 2 'ml-betterup.coach_search.distances', 3 >>> 'analytics.ml.coach_search_distances', 4 'master') >>> in bq_to_snowflake(bq_table, snow_table, >>> git_branch)138 )139 --> 140 result = pipeline.run()141 >>>result.wait_until_finish()142 >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>> run(self, test_runner_api)512 # When possible, invoke a round >>> trip through the runner API.513 if test_runner_api and >>> self._verify_runner_api_compatible():--> 514 return >>> Pipeline.from_runner_api(515 >>> self.to_runner_api(use_fake_coders=True),516 self.runner, >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>> from_runner_api(proto, runner, options, return_context, >>> allow_proto_holders)890 requirements=proto.requirements)891 >>> root_transform_id, = proto.root_transform_ids--> 892 >>> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)] >>> 893 # TODO(robertwb): These are only needed to continue construction. >>> Omit?894 p.applied_labels = { >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>> in get_by_id(self, id)113 # type: (str) -> PortableObjectT114 >>>if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >>> self._obj_type.from_runner_api(116 self._id_to_proto[id], >>> self._pipeline_context)117 return self._id_to_obj[id] >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>> from_runner_api(proto, context) 1277 result.parts = [] 1278 for >>> transform_id in proto.subtransforms:-> 1279 part = >>> context.transforms.get_by_id(transform_id) 1280 part.parent = >>> result 1281 result.parts.append(part) >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>> in get_by_id(self, id)113 # type: (str) -> PortableObjectT114 >>>if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >>> self._obj_type.from_runner_api(116 self._id_to_proto[id], >>> self._pipeline_context)117 return self._id_to_obj[id] >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>> from_runner_api(proto, context) 1277 result.parts = [] 1278 for >>> transform_id in proto.subtransforms:-> 1279 part = >>> context.transforms.get_by_id(transform_id) 1280 part.parent = >>> result 1281 result.parts.append(part) >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>> in get_by_id(self, id)113 # type: (str) -> PortableObjectT114 >>>if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >>> self._obj_type.from_runner_api(116 self._id_to_proto[id], >>> self._pipeline_context)117 return self._id_to_obj[id] >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>> from_runner_api(proto, context) 1277 result.parts = [] 1278 for >>> transform_id in proto.subtransforms:-> 1279 part = >>> context.transforms.get_by_id(tra
Re: snowflake io in python
._known_urns[proto.spec.urn] 691 692 try: KeyError: 'beam:transform:write_files:v1' On Fri, Nov 20, 2020 at 11:18 AM Brian Hulette wrote: > Hm try passing in the args as they would appear in > `sys.argv`, PipelineOptions(['--experiments=use_runner_v2']) > > On Thu, Nov 19, 2020 at 12:14 PM Alan Krumholz > wrote: > >> How can I pass that flag using the SDK? >> Tried this: >> >> pipeline = beam.Pipeline(options=PipelineOptions(experiments= >>> ['use_runner_v2'], ...) >> >> >> but still getting a similar error: >> >> ---KeyError >> Traceback (most recent call >> last) in > 1 bq_to_snowflake( >> 2 'ml-betterup.coach_search.distances', 3 >> 'analytics.ml.coach_search_distances', 4 'master') >> in bq_to_snowflake(bq_table, snow_table, >> git_branch)138 )139 --> 140 result = pipeline.run()141 >> result.wait_until_finish()142 >> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >> run(self, test_runner_api)512 # When possible, invoke a round trip >> through the runner API.513 if test_runner_api and >> self._verify_runner_api_compatible():--> 514 return >> Pipeline.from_runner_api(515 >> self.to_runner_api(use_fake_coders=True),516 self.runner, >> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >> from_runner_api(proto, runner, options, return_context, allow_proto_holders) >>890 requirements=proto.requirements)891 >> root_transform_id, = proto.root_transform_ids--> 892 p.transforms_stack >> = [context.transforms.get_by_id(root_transform_id)]893 # >> TODO(robertwb): These are only needed to continue construction. Omit?894 >> p.applied_labels = { >> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >> in get_by_id(self, id)113 # type: (str) -> PortableObjectT114 >> if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >> self._obj_type.from_runner_api(116 self._id_to_proto[id], >> self._pipeline_context)117 return self._id_to_obj[id] >> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >> from_runner_api(proto, context) 1277 result.parts = [] 1278 for >> transform_id in proto.subtransforms:-> 1279 part = >> context.transforms.get_by_id(transform_id) 1280 part.parent = result >> 1281 result.parts.append(part) >> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >> in get_by_id(self, id)113 # type: (str) -> PortableObjectT114 >> if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >> self._obj_type.from_runner_api(116 self._id_to_proto[id], >> self._pipeline_context)117 return self._id_to_obj[id] >> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >> from_runner_api(proto, context) 1277 result.parts = [] 1278 for >> transform_id in proto.subtransforms:-> 1279 part = >> context.transforms.get_by_id(transform_id) 1280 part.parent = result >> 1281 result.parts.append(part) >> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >> in get_by_id(self, id)113 # type: (str) -> PortableObjectT114 >> if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >> self._obj_type.from_runner_api(116 self._id_to_proto[id], >> self._pipeline_context)117 return self._id_to_obj[id] >> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >> from_runner_api(proto, context) 1277 result.parts = [] 1278 for >> transform_id in proto.subtransforms:-> 1279 part = >> context.transforms.get_by_id(transform_id) 1280 part.parent = result >> 1281 result.parts.append(part) >> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >> in get_by_id(self, id)113 # type: (str) -> PortableObjectT114 >> if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >> self._obj_type.from_runner_api(116 self._id_to_proto[id], >> self._pipeline_context)117 return self._id_to_obj[id] >> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >> from_runner_api(proto, conte
Re: snowflake io in python
is None or proto.spec is None or not proto.spec.urn:689 return None--> 690 parameter_type, constructor = cls._known_urns[proto.spec.urn] 691 692 try: KeyError: 'beam:transform:write_files:v1' On Thu, Nov 19, 2020 at 2:18 PM Brian Hulette wrote: > Ah ok, you'll need to use Dataflow runner v2 [1] to run this pipeline (add > the flag '--experiments=use_runner_v2'). See also [2]. > > [1] > https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#python_11 > [2] > https://cloud.google.com/blog/products/data-analytics/multi-language-sdks-for-building-cloud-pipelines > > On Thu, Nov 19, 2020 at 11:10 AM Alan Krumholz > wrote: > >> DataFlow runner >> >> On Thu, Nov 19, 2020 at 2:00 PM Brian Hulette >> wrote: >> >>> Hm what runner are you using? It looks like we're trying to encode and >>> decode the pipeline proto, which isn't possible for a multi-language >>> pipeline. Are you using a portable runner? >>> >>> Brian >>> >>> On Thu, Nov 19, 2020 at 10:42 AM Alan Krumholz < >>> alan.krumh...@betterup.co> wrote: >>> >>>> got it, thanks! >>>> I was using: >>>> 'xx.us-east-1' >>>> Seems using this instead fixes that problem: >>>> 'xx.us-east-1.snowflakecomputing.com >>>> >>>> I'm now hitting a different error though (now in python): >>>> >>>> in bq_to_snowflake(bq_table, >>>>> snow_table, git_branch) >>>>> 161 ) >>>>> 162 >>>>> --> 163 result = pipeline.run() >>>>> 164 result.wait_until_finish() >>>>> 165 >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>> in run(self, test_runner_api) >>>>> 512 # When possible, invoke a round trip through the runner API. >>>>> 513 if test_runner_api and self._verify_runner_api_compatible(): >>>>> --> 514 return Pipeline.from_runner_api( 515 self.to_runner_api( >>>>> use_fake_coders=True), >>>>> 516 self.runner, >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>> in from_runner_api(proto, runner, options, return_context, >>>>> allow_proto_holders) >>>>> 890 requirements=proto.requirements) >>>>> 891 root_transform_id, = proto.root_transform_ids >>>>> --> 892 p.transforms_stack = [context.transforms.get_by_id( >>>>> root_transform_id)] >>>>> 893 # TODO(robertwb): These are only needed to continue construction. >>>>> Omit? >>>>> 894 p.applied_labels = { >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>> in get_by_id(self, id) >>>>> 113 # type: (str) -> PortableObjectT >>>>> 114 if id not in self._id_to_obj: >>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 >>>>> self._id_to_proto[id], self._pipeline_context) >>>>> 117 return self._id_to_obj[id] >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>> in from_runner_api(proto, context) >>>>> 1277 result.parts = [] >>>>> 1278 for transform_id in proto.subtransforms: >>>>> -> 1279 part = context.transforms.get_by_id(transform_id) >>>>> 1280 part.parent = result >>>>> 1281 result.parts.append(part) >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>> in get_by_id(self, id) >>>>> 113 # type: (str) -> PortableObjectT >>>>> 114 if id not in self._id_to_obj: >>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 >>>>> self._id_to_proto[id], self._pipeline_context) >>>>> 117 return self._id_to_obj[id] >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>> in from_runner_api(proto, context) >>>>> 1277 result.parts = [] >>>>> 1278 for transform_id in proto.subtransforms: >>>>> -> 1279 part = context.transforms.get_by_id(transform_id) >>>>> 1280 part.parent = result >>>>> 1281 result.parts.append(part) >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>> in get_by_id(self, id) >>>>> 113 # type: (str) -> PortableObjectT >>>>> 114 if id not i
Re: snowflake io in python
DataFlow runner On Thu, Nov 19, 2020 at 2:00 PM Brian Hulette wrote: > Hm what runner are you using? It looks like we're trying to encode and > decode the pipeline proto, which isn't possible for a multi-language > pipeline. Are you using a portable runner? > > Brian > > On Thu, Nov 19, 2020 at 10:42 AM Alan Krumholz > wrote: > >> got it, thanks! >> I was using: >> 'xx.us-east-1' >> Seems using this instead fixes that problem: >> 'xx.us-east-1.snowflakecomputing.com >> >> I'm now hitting a different error though (now in python): >> >> in bq_to_snowflake(bq_table, snow_table, >>> git_branch) >>> 161 ) >>> 162 >>> --> 163 result = pipeline.run() >>> 164 result.wait_until_finish() >>> 165 ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>> in run(self, test_runner_api) >>> 512 # When possible, invoke a round trip through the runner API. >>> 513 if test_runner_api and self._verify_runner_api_compatible(): >>> --> 514 return Pipeline.from_runner_api( 515 self.to_runner_api( >>> use_fake_coders=True), >>> 516 self.runner, >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>> from_runner_api(proto, runner, options, return_context, >>> allow_proto_holders) >>> 890 requirements=proto.requirements) >>> 891 root_transform_id, = proto.root_transform_ids >>> --> 892 p.transforms_stack = [context.transforms.get_by_id( >>> root_transform_id)] >>> 893 # TODO(robertwb): These are only needed to continue construction. >>> Omit? >>> 894 p.applied_labels = { >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>> in get_by_id(self, id) >>> 113 # type: (str) -> PortableObjectT >>> 114 if id not in self._id_to_obj: >>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 >>> self._id_to_proto[id], self._pipeline_context) >>> 117 return self._id_to_obj[id] >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>> from_runner_api(proto, context) >>> 1277 result.parts = [] >>> 1278 for transform_id in proto.subtransforms: >>> -> 1279 part = context.transforms.get_by_id(transform_id) >>> 1280 part.parent = result >>> 1281 result.parts.append(part) >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>> in get_by_id(self, id) >>> 113 # type: (str) -> PortableObjectT >>> 114 if id not in self._id_to_obj: >>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 >>> self._id_to_proto[id], self._pipeline_context) >>> 117 return self._id_to_obj[id] >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>> from_runner_api(proto, context) >>> 1277 result.parts = [] >>> 1278 for transform_id in proto.subtransforms: >>> -> 1279 part = context.transforms.get_by_id(transform_id) >>> 1280 part.parent = result >>> 1281 result.parts.append(part) >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>> in get_by_id(self, id) >>> 113 # type: (str) -> PortableObjectT >>> 114 if id not in self._id_to_obj: >>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 >>> self._id_to_proto[id], self._pipeline_context) >>> 117 return self._id_to_obj[id] >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>> from_runner_api(proto, context) >>> 1277 result.parts = [] >>> 1278 for transform_id in proto.subtransforms: >>> -> 1279 part = context.transforms.get_by_id(transform_id) >>> 1280 part.parent = result >>> 1281 result.parts.append(part) >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>> in get_by_id(self, id) >>> 113 # type: (str) -> PortableObjectT >>> 114 if id not in self._id_to_obj: >>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116 >>> self._id_to_proto[id], self._pipeline_context) >>> 117 return self._id_to_obj[id] >>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>> from_runner_api(proto, context) >>> 1277 result.parts = [] >>> 1278 for transform_id in proto.subtransforms: >>> -> 1279 part = context.transforms.get_by_id(transform_id) >>> 1280 part.parent = result >>> 1281
Re: snowflake io in python
2 try: KeyError: 'beam:transform:write_files:v1' I'll keep trying to make this work but sharing it in case you can easily see what the problem is Thanks so much! On Thu, Nov 19, 2020 at 1:30 PM Brian Hulette wrote: > Hi Alan, > Sorry this error message is so verbose. What are you passing for the > server_name argument [1]? It looks like that's what the Java stacktrace is > complaining about: > > java.lang.IllegalArgumentException: serverName must be in format > .snowflakecomputing.com > > [1] > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/snowflake.py#L302 > > On Thu, Nov 19, 2020 at 10:16 AM Alan Krumholz > wrote: > >> I'm trying to replace my custom/problematic snowflake sink with the new: >> https://beam.apache.org/documentation/io/built-in/snowflake/#writing-to-snowflake >> >> However when I try to run my pipeline (using python) I get this Java >> error: >> >> RuntimeError: java.lang.RuntimeException: Failed to build transform >>> beam:external:java:snowflake:write:v1 from spec urn: >>> "beam:external:java:snowflake:write:v1" >> >> >> It is hard to understand why it is failing from reading the partial java >> error trace I get in the output: >> >>> at >>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:130) >>> at >>> org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:357) >>> at >>> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:433) >>> at >>> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:488) >>> at >>> org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:232) >>> at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172) >>> at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331) >>> at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817) >>> at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) >>> at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) >>> at >>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) >>> at >>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) >>> at java.base/java.lang.Thread.run(Thread.java:832) >>> Caused by: java.lang.IllegalArgumentException: serverName must be in format >>> .snowflakecomputing.com >>> at >>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:141) >>> at >>> org.apache.beam.sdk.io.snowflake.SnowflakeIO$DataSourceConfiguration.withServerName(SnowflakeIO.java:1700) >>> at >>> org.apache.beam.sdk.io.snowflake.crosslanguage.CrossLanguageConfiguration.getDataSourceConfiguration(CrossLanguageConfiguration.java:166) >>> at >>> org.apache.beam.sdk.io.snowflake.crosslanguage.WriteBuilder.buildExternal(WriteBuilder.java:78) >>> at >>> org.apache.beam.sdk.io.snowflake.crosslanguage.WriteBuilder.buildExternal(WriteBuilder.java:34) >>> at >>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:125) >>> ... 12 more >> >> >> >> any clue how I can debug/fix this using python? >> >> >> >>
snowflake io in python
I'm trying to replace my custom/problematic snowflake sink with the new: https://beam.apache.org/documentation/io/built-in/snowflake/#writing-to-snowflake However when I try to run my pipeline (using python) I get this Java error: RuntimeError: java.lang.RuntimeException: Failed to build transform > beam:external:java:snowflake:write:v1 from spec urn: > "beam:external:java:snowflake:write:v1" It is hard to understand why it is failing from reading the partial java error trace I get in the output: > at > org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:130) > at > org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:357) > at > org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:433) > at > org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:488) > at > org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:232) > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172) > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331) > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817) > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) > at java.base/java.lang.Thread.run(Thread.java:832) > Caused by: java.lang.IllegalArgumentException: serverName must be in format > .snowflakecomputing.com > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:141) > at > org.apache.beam.sdk.io.snowflake.SnowflakeIO$DataSourceConfiguration.withServerName(SnowflakeIO.java:1700) > at > org.apache.beam.sdk.io.snowflake.crosslanguage.CrossLanguageConfiguration.getDataSourceConfiguration(CrossLanguageConfiguration.java:166) > at > org.apache.beam.sdk.io.snowflake.crosslanguage.WriteBuilder.buildExternal(WriteBuilder.java:78) > at > org.apache.beam.sdk.io.snowflake.crosslanguage.WriteBuilder.buildExternal(WriteBuilder.java:34) > at > org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:125) > ... 12 more any clue how I can debug/fix this using python?
Re: Dataflow isn't parallelizing
This seems to work! Thanks so much Eugene and Luke! On Fri, Sep 11, 2020 at 11:33 AM Luke Cwik wrote: > Inserting the Reshuffle is the easiest answer to test that parallelization > starts happening. > > If the performance is good but you're materializing too much data at the > shuffle boundary you'll want to convert your high fanout function (?Read > from Snowflake?) into a splittable DoFn. > > On Fri, Sep 11, 2020 at 9:56 AM Eugene Kirpichov > wrote: > >> Hi, >> >> Most likely this is because of fusion - see >> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#fusion-optimization >> . You need to insert a Reshuffle.viaRandomKey(), most likely after the >> first step. >> >> On Fri, Sep 11, 2020 at 9:41 AM Alan Krumholz >> wrote: >> >>> Hi DataFlow team, >>> I have a simple pipeline that I'm trying to speed up using DataFlow: >>> >>> [image: image.png] >>> >>> As you can see the bottleneck is the "transcribe mp3" step. I was hoping >>> DataFlow would be able to run many of these in parallel to speed up the >>> total execution time. >>> >>> However it seems it doesn't do that... and instead keeps executing it >>> all independent inputs sequentially >>> Even when I tried to force it to start with many workers it rapidly >>> shuts down most of them and only keeps one alive and doesn't ever seem to >>> parallelize this step :( >>> >>> Any advice on what else to try to make it do this? >>> >>> Thanks so much! >>> >> >> >> -- >> Eugene Kirpichov >> http://www.linkedin.com/in/eugenekirpichov >> >
Dataflow isn't parallelizing
Hi DataFlow team, I have a simple pipeline that I'm trying to speed up using DataFlow: [image: image.png] As you can see the bottleneck is the "transcribe mp3" step. I was hoping DataFlow would be able to run many of these in parallel to speed up the total execution time. However it seems it doesn't do that... and instead keeps executing it all independent inputs sequentially Even when I tried to force it to start with many workers it rapidly shuts down most of them and only keeps one alive and doesn't ever seem to parallelize this step :( Any advice on what else to try to make it do this? Thanks so much!
Re: Installing ffmpeg on a python dataflow job
Hi Luke, I copied the setup.py file from the docs without modifying it much and it worked now. Thank you On Thu, Sep 10, 2020 at 11:12 AM Luke Cwik wrote: > Did ffmpeg install change $PATH? (this may not be visible to the current > process) > Have you tried the full path to the executable? > > On Wed, Sep 9, 2020 at 1:48 PM Alan Krumholz > wrote: > >> Hi DataFlow team, >> >> We are trying to use ffmpeg to process some video data using dataflow. >> In order to do this we need the worker nodes to have ffmpeg installed. >> >> After reading Beam docs I created a setup.py file for my job like this: >> >> #!/usr/bin/python >> import subprocess >> from distutils.command.build import build as _build >> import setuptools >> >> class build(_build): >> sub_commands = _build.sub_commands + [('CustomCommands', None)] >> >> class CustomCommands(setuptools.Command): >> def initialize_options(self): >> pass >> >> def finalize_options(self): >> pass >> >> def RunCustomCommand(self, command_list): >> p = subprocess.Popen( >> command_list, >> stdin=subprocess.PIPE, >> stdout=subprocess.PIPE, >> stderr=subprocess.STDOUT) >> stdout_data, _ = p.communicate() >> if p.returncode != 0: >> raise RuntimeError( >> 'Command %s failed: exit code: %s' % ( >> command_list, p.returncode)) >> >> def run(self): >> for command in CUSTOM_COMMANDS: >> self.RunCustomCommand(command) >> >> CUSTOM_COMMANDS = [ >> ['apt-get', 'update'], >> ['apt-get', 'install', '-y', 'ffmpeg']] >> REQUIRED_PACKAGES = [ >> 'boto3==1.11.17', >> 'ffmpeg-python==0.2.0', >> 'google-cloud-storage==1.31.0'] >> setuptools.setup( >> name='DataflowJob', >> version='0.1', >> install_requires=REQUIRED_PACKAGES, >> packages=setuptools.find_packages(), >> mdclass={ >> 'build': build, >> 'CustomCommands': CustomCommands}) >> >> However, when I run the job I still get an error saying that ffmpeg is >> not installed: "No such file or directory: 'ffmpeg'" >> >> Any clue what am I doing wrong? >> >> Thanks so much! >> >>
Installing ffmpeg on a python dataflow job
Hi DataFlow team, We are trying to use ffmpeg to process some video data using dataflow. In order to do this we need the worker nodes to have ffmpeg installed. After reading Beam docs I created a setup.py file for my job like this: #!/usr/bin/python import subprocess from distutils.command.build import build as _build import setuptools class build(_build): sub_commands = _build.sub_commands + [('CustomCommands', None)] class CustomCommands(setuptools.Command): def initialize_options(self): pass def finalize_options(self): pass def RunCustomCommand(self, command_list): p = subprocess.Popen( command_list, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) stdout_data, _ = p.communicate() if p.returncode != 0: raise RuntimeError( 'Command %s failed: exit code: %s' % ( command_list, p.returncode)) def run(self): for command in CUSTOM_COMMANDS: self.RunCustomCommand(command) CUSTOM_COMMANDS = [ ['apt-get', 'update'], ['apt-get', 'install', '-y', 'ffmpeg']] REQUIRED_PACKAGES = [ 'boto3==1.11.17', 'ffmpeg-python==0.2.0', 'google-cloud-storage==1.31.0'] setuptools.setup( name='DataflowJob', version='0.1', install_requires=REQUIRED_PACKAGES, packages=setuptools.find_packages(), mdclass={ 'build': build, 'CustomCommands': CustomCommands}) However, when I run the job I still get an error saying that ffmpeg is not installed: "No such file or directory: 'ffmpeg'" Any clue what am I doing wrong? Thanks so much!
Re: daily dataflow job failing today
makes sense. I'll add this workaround for now. Thanks so much for your help! On Wed, Feb 12, 2020 at 10:33 AM Valentyn Tymofieiev wrote: > Alan, Dataflow workers preinstall Beam SDK dependencies, including (a > working version) of avro-python3. So after reading your email once again, I > think in your case you were not able to install Beam SDK locally. So a > workaround for you would be to `pip install avro-python3==1.9.1` or `pip > install pycodestyle` before installing Beam, until AVRO-2737 is resolved. > > > On Wed, Feb 12, 2020 at 10:21 AM Valentyn Tymofieiev > wrote: > >> Ah, there's already https://issues.apache.org/jira/browse/AVRO-2737 and >> it received attention. >> >> On Wed, Feb 12, 2020 at 10:19 AM Valentyn Tymofieiev >> wrote: >> >>> Opened https://issues.apache.org/jira/browse/AVRO-2738 >>> >>> On Wed, Feb 12, 2020 at 10:14 AM Valentyn Tymofieiev < >>> valen...@google.com> wrote: >>> >>>> Here's a short repro: >>>> >>>> :~$ docker run -it --entrypoint=/bin/bash python:3.7-stretch >>>> root@04b45a100d16:/# pip install avro-python3 >>>> Collecting avro-python3 >>>> Downloading avro-python3-1.9.2.tar.gz (37 kB) >>>> ERROR: Command errored out with exit status 1: >>>> command: /usr/local/bin/python -c 'import sys, setuptools, >>>> tokenize; sys.argv[0] = >>>> '"'"'/tmp/pip-install-mmy4vspt/avro-python3/setup.py'"'"'; >>>> __file__='"'"'/tmp/pip-install-mmy4vspt/avro-python3/setup.py'"'"';f=getattr(tokenize, >>>> '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"', >>>> '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' >>>> egg_info --egg-base /tmp/pip-install-mmy4vspt/avro-python3/pip-egg-info >>>> cwd: /tmp/pip-install-mmy4vspt/avro-python3/ >>>> Complete output (5 lines): >>>> Traceback (most recent call last): >>>> File "", line 1, in >>>> File "/tmp/pip-install-mmy4vspt/avro-python3/setup.py", line 41, >>>> in >>>> import pycodestyle >>>> ModuleNotFoundError: No module named 'pycodestyle' >>>> >>>> ERROR: Command errored out with exit status 1: python setup.py egg_info >>>> Check the logs for full command output. >>>> root@04b45a100d16:/# >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Wed, Feb 12, 2020 at 10:14 AM Valentyn Tymofieiev < >>>> valen...@google.com> wrote: >>>> >>>>> Yes, it is a bug in the recent Avro release. We should report it >>>>> to the Avro maintainers. The workaround is to downgrade avro-python3 to >>>>> 1.9.1, for example via requirements.txt. >>>>> >>>>> On Wed, Feb 12, 2020 at 10:06 AM Steve Niemitz >>>>> wrote: >>>>> >>>>>> avro-python3 1.9.2 was released on pypi 4 hours ago, and >>>>>> added pycodestyle as a dependency, probably related? >>>>>> >>>>>> On Wed, Feb 12, 2020 at 1:03 PM Luke Cwik wrote: >>>>>> >>>>>>> +dev >>>>>>> >>>>>>> There was recently an update to add autoformatting to the Python >>>>>>> SDK[1]. >>>>>>> >>>>>>> I'm seeing this during testing of a PR as well. >>>>>>> >>>>>>> 1: >>>>>>> https://lists.apache.org/thread.html/448bb5c2d73fbd74eec7aacb5f28fa2f9d791784c2e53a2e3325627a%40%3Cdev.beam.apache.org%3E >>>>>>> >>>>>>> On Wed, Feb 12, 2020 at 9:57 AM Alan Krumholz < >>>>>>> alan.krumh...@betterup.co> wrote: >>>>>>> >>>>>>>> Some more information for this as I still can't get to fix it >>>>>>>> >>>>>>>> This job is triggered using the beam[gcp] python sdk from a >>>>>>>> KubeFlow Pipelines component which runs on top of docker image: >>>>>>>> tensorflow/tensorflow:1.13.1-py3 >>>>>>>> >>>>>>>> I just checked and that image hasn't been updated recently. I a
Re: daily dataflow job failing today
Some more information for this as I still can't get to fix it This job is triggered using the beam[gcp] python sdk from a KubeFlow Pipelines component which runs on top of docker image: tensorflow/tensorflow:1.13.1-py3 I just checked and that image hasn't been updated recently. I also redeployed my pipeline to another (older) deployment of KFP and it gives me the same error (which tells me this isn't an internal KFP problem) The exact same pipeline/code running on the exact same image has been running fine for days. Did anything changed on the beam/dataflow side since yesterday morning? Thanks for your help! this is a production pipeline that is not running for us :( On Wed, Feb 12, 2020 at 7:21 AM Alan Krumholz wrote: > Hi, I have a scheduled daily job that I have been running fine in dataflow > for days now. > We haven't changed anything on this code but this morning run failed (it > couldn't even spin up the job) > The job submits a setup.py file (that also hasn't changed) but maybe is > causing the problem? (based on the error I'm getting) > > Anyone else having the same issue? or know how to fix it? > Thanks! > > ERROR: Complete output from command python setup.py egg_info: > 2 ERROR: Traceback (most recent call last): > 3 File "", line 1, in > 4 File "/tmp/pip-install-42zyi89t/avro-python3/setup.py", line 41, in > > 5 import pycodestyle > 6 ImportError: No module named 'pycodestyle' > 7 > 8ERROR: Command "python setup.py egg_info" failed with error code 1 in > /tmp/pip-install-42zyi89t/avro-python3/ > 9 ERROR: Complete output from command python setup.py egg_info: > 10 ERROR: Traceback (most recent call last): > 11 File "", line 1, in > 12 File "/tmp/pip-install-wrqytf9a/avro-python3/setup.py", line 41, in > > 13 import pycodestyle > 14 ImportError: No module named 'pycodestyle' > 15 > 16ERROR: Command "python setup.py egg_info" failed with error code 1 in > /tmp/pip-install-wrqytf9a/avro-python3/ >
daily dataflow job failing today
Hi, I have a scheduled daily job that I have been running fine in dataflow for days now. We haven't changed anything on this code but this morning run failed (it couldn't even spin up the job) The job submits a setup.py file (that also hasn't changed) but maybe is causing the problem? (based on the error I'm getting) Anyone else having the same issue? or know how to fix it? Thanks! ERROR: Complete output from command python setup.py egg_info: 2 ERROR: Traceback (most recent call last): 3 File "", line 1, in 4 File "/tmp/pip-install-42zyi89t/avro-python3/setup.py", line 41, in 5 import pycodestyle 6 ImportError: No module named 'pycodestyle' 7 8ERROR: Command "python setup.py egg_info" failed with error code 1 in /tmp/pip-install-42zyi89t/avro-python3/ 9 ERROR: Complete output from command python setup.py egg_info: 10 ERROR: Traceback (most recent call last): 11 File "", line 1, in 12 File "/tmp/pip-install-wrqytf9a/avro-python3/setup.py", line 41, in 13 import pycodestyle 14 ImportError: No module named 'pycodestyle' 15 16ERROR: Command "python setup.py egg_info" failed with error code 1 in /tmp/pip-install-wrqytf9a/avro-python3/
Re: dataflow job was working fine last night and it isn't now
perfect! thank you! On Fri, Feb 7, 2020 at 10:54 AM Valentyn Tymofieiev wrote: > Thanks for your feedback. We expect that this issue will be fixed in > cloudpickle==1.3.0. Per [1], this release may be available next week. > > After that you can install the fixed version of cloudpickle until the AI > notebook image picks up the new version. > > [1] https://github.com/cloudpipe/cloudpickle/pull/337 > > On Tue, Feb 4, 2020 at 12:44 PM Alan Krumholz > wrote: > >> Seems like the image we use in KFP to orchestrate the job has >> cloudpickle==0.8.1 >> and that one doesn't seem to cause issues. >> I think I'm unblock for now but I'm sure I won't be the last one to try >> to do this using GCP managed notebooks :( >> >> Thanks for all the help! >> >> >> On Tue, Feb 4, 2020 at 12:24 PM Alan Krumholz >> wrote: >> >>> I'm using a managed notebook instance from GCP >>> It seems those already come with cloudpickle==1.2.2 as soon as you >>> provision it. apache-beam[gcp] will then install dill==0.3.1.1 I'm >>> going to try to uninstall cloudpickle before installing apache-beam and see >>> if this fixes the problem >>> >>> Thank you >>> >>> On Tue, Feb 4, 2020 at 11:54 AM Valentyn Tymofieiev >>> wrote: >>> >>>> The fact that you have cloudpickle==1.2.2 further confirms that you >>>> may be hitting the same error as >>>> https://stackoverflow.com/questions/42960637/python-3-5-dill-pickling-unpickling-on-different-servers-keyerror-classtype >>>> . >>>> >>>> Could you try to start over with a clean virtual environment? >>>> >>>> On Tue, Feb 4, 2020 at 11:46 AM Alan Krumholz < >>>> alan.krumh...@betterup.co> wrote: >>>> >>>>> Hi Valentyn, >>>>> >>>>> Here is my pip freeze on my machine (note that the error is in >>>>> dataflow, the job runs fine in my machine) >>>>> >>>>> ansiwrap==0.8.4 >>>>> apache-beam==2.19.0 >>>>> arrow==0.15.5 >>>>> asn1crypto==1.3.0 >>>>> astroid==2.3.3 >>>>> astropy==3.2.3 >>>>> attrs==19.3.0 >>>>> avro-python3==1.9.1 >>>>> azure-common==1.1.24 >>>>> azure-storage-blob==2.1.0 >>>>> azure-storage-common==2.1.0 >>>>> backcall==0.1.0 >>>>> bcolz==1.2.1 >>>>> binaryornot==0.4.4 >>>>> bleach==3.1.0 >>>>> boto3==1.11.9 >>>>> botocore==1.14.9 >>>>> cachetools==3.1.1 >>>>> certifi==2019.11.28 >>>>> cffi==1.13.2 >>>>> chardet==3.0.4 >>>>> Click==7.0 >>>>> cloudpickle==1.2.2 >>>>> colorama==0.4.3 >>>>> configparser==4.0.2 >>>>> confuse==1.0.0 >>>>> cookiecutter==1.7.0 >>>>> crcmod==1.7 >>>>> cryptography==2.8 >>>>> cycler==0.10.0 >>>>> daal==2019.0 >>>>> datalab==1.1.5 >>>>> decorator==4.4.1 >>>>> defusedxml==0.6.0 >>>>> dill==0.3.1.1 >>>>> distro==1.0.1 >>>>> docker==4.1.0 >>>>> docopt==0.6.2 >>>>> docutils==0.15.2 >>>>> entrypoints==0.3 >>>>> enum34==1.1.6 >>>>> fairing==0.5.3 >>>>> fastavro==0.21.24 >>>>> fasteners==0.15 >>>>> fsspec==0.6.2 >>>>> future==0.18.2 >>>>> gcsfs==0.6.0 >>>>> gitdb2==2.0.6 >>>>> GitPython==3.0.5 >>>>> google-api-core==1.16.0 >>>>> google-api-python-client==1.7.11 >>>>> google-apitools==0.5.28 >>>>> google-auth==1.11.0 >>>>> google-auth-httplib2==0.0.3 >>>>> google-auth-oauthlib==0.4.1 >>>>> google-cloud-bigquery==1.17.1 >>>>> google-cloud-bigtable==1.0.0 >>>>> google-cloud-core==1.2.0 >>>>> google-cloud-dataproc==0.6.1 >>>>> google-cloud-datastore==1.7.4 >>>>> google-cloud-language==1.3.0 >>>>> google-cloud-logging==1.14.0 >>>>> google-cloud-monitoring==0.31.1 >>>>> google-cloud-pubsub==1.0.2 >>>>> google-cloud-secret-manager==0.1.1 >>>>> google-cloud-spanner==1.13.0 >>>>> google-cloud-storage==1.25.0 >>>>> google-cloud-translate==2.0.0 >>>>> google-compute-
Re: seems beam.util.GroupIntoBatches is not supported in DataFlow. Any alternative?
OK, seems like beam.BatchElements(max_batch_size=x) will do the trick for me and runs fine in DataFlow! On Wed, Feb 5, 2020 at 7:38 AM Alan Krumholz wrote: > Actually beam.GroupIntoBatches() gives me the same error as > beam.util.GroupIntoBatches() :( > back to square one. > > Any other ideas? > > Thank you! > > > On Wed, Feb 5, 2020 at 7:32 AM Alan Krumholz > wrote: > >> Never mind there seems to be a beam.GroupIntoBatches() that I >> should have originally used instead of beam.util.GroupIntoBatches() >> >> On Wed, Feb 5, 2020 at 7:19 AM Alan Krumholz >> wrote: >> >>> Hello, I'm having issues running beam.util.GroupIntoBatches() in >>> DataFlow. >>> >>> I get the following error message: >>> >>> Exception: Requested execution of a stateful DoFn, but no user state >>>> context is available. This likely means that the current runner does not >>>> support the execution of stateful DoFns >>> >>> >>> Seems to be related to: >>> >>> https://stackoverflow.com/questions/56403572/no-userstate-context-is-available-google-cloud-dataflow >>> >>> Is there another way I can achieve the same using other beam function? >>> >>> I basically want to batch rows into groups of 100 as it is a lot faster >>> to transform all at once than doing it 1 by 1. >>> >>> I also was planning to use this function for a custom snowflake sink (so >>> I could insert many rows at once) >>> >>> I'm sure there must be another way to do this in DataFlow but not sure >>> how? >>> >>> Thanks so much! >>> >>
Re: seems beam.util.GroupIntoBatches is not supported in DataFlow. Any alternative?
Actually beam.GroupIntoBatches() gives me the same error as beam.util.GroupIntoBatches() :( back to square one. Any other ideas? Thank you! On Wed, Feb 5, 2020 at 7:32 AM Alan Krumholz wrote: > Never mind there seems to be a beam.GroupIntoBatches() that I > should have originally used instead of beam.util.GroupIntoBatches() > > On Wed, Feb 5, 2020 at 7:19 AM Alan Krumholz > wrote: > >> Hello, I'm having issues running beam.util.GroupIntoBatches() in DataFlow. >> >> I get the following error message: >> >> Exception: Requested execution of a stateful DoFn, but no user state >>> context is available. This likely means that the current runner does not >>> support the execution of stateful DoFns >> >> >> Seems to be related to: >> >> https://stackoverflow.com/questions/56403572/no-userstate-context-is-available-google-cloud-dataflow >> >> Is there another way I can achieve the same using other beam function? >> >> I basically want to batch rows into groups of 100 as it is a lot faster >> to transform all at once than doing it 1 by 1. >> >> I also was planning to use this function for a custom snowflake sink (so >> I could insert many rows at once) >> >> I'm sure there must be another way to do this in DataFlow but not sure >> how? >> >> Thanks so much! >> >
Re: seems beam.util.GroupIntoBatches is not supported in DataFlow. Any alternative?
Never mind there seems to be a beam.GroupIntoBatches() that I should have originally used instead of beam.util.GroupIntoBatches() On Wed, Feb 5, 2020 at 7:19 AM Alan Krumholz wrote: > Hello, I'm having issues running beam.util.GroupIntoBatches() in DataFlow. > > I get the following error message: > > Exception: Requested execution of a stateful DoFn, but no user state >> context is available. This likely means that the current runner does not >> support the execution of stateful DoFns > > > Seems to be related to: > > https://stackoverflow.com/questions/56403572/no-userstate-context-is-available-google-cloud-dataflow > > Is there another way I can achieve the same using other beam function? > > I basically want to batch rows into groups of 100 as it is a lot faster to > transform all at once than doing it 1 by 1. > > I also was planning to use this function for a custom snowflake sink (so I > could insert many rows at once) > > I'm sure there must be another way to do this in DataFlow but not sure how? > > Thanks so much! >
seems beam.util.GroupIntoBatches is not supported in DataFlow. Any alternative?
Hello, I'm having issues running beam.util.GroupIntoBatches() in DataFlow. I get the following error message: Exception: Requested execution of a stateful DoFn, but no user state > context is available. This likely means that the current runner does not > support the execution of stateful DoFns Seems to be related to: https://stackoverflow.com/questions/56403572/no-userstate-context-is-available-google-cloud-dataflow Is there another way I can achieve the same using other beam function? I basically want to batch rows into groups of 100 as it is a lot faster to transform all at once than doing it 1 by 1. I also was planning to use this function for a custom snowflake sink (so I could insert many rows at once) I'm sure there must be another way to do this in DataFlow but not sure how? Thanks so much!
Re: dataflow job was working fine last night and it isn't now
Seems like the image we use in KFP to orchestrate the job has cloudpickle==0.8.1 and that one doesn't seem to cause issues. I think I'm unblock for now but I'm sure I won't be the last one to try to do this using GCP managed notebooks :( Thanks for all the help! On Tue, Feb 4, 2020 at 12:24 PM Alan Krumholz wrote: > I'm using a managed notebook instance from GCP > It seems those already come with cloudpickle==1.2.2 as soon as you > provision it. apache-beam[gcp] will then install dill==0.3.1.1 I'm going > to try to uninstall cloudpickle before installing apache-beam and see if > this fixes the problem > > Thank you > > On Tue, Feb 4, 2020 at 11:54 AM Valentyn Tymofieiev > wrote: > >> The fact that you have cloudpickle==1.2.2 further confirms that you may >> be hitting the same error as >> https://stackoverflow.com/questions/42960637/python-3-5-dill-pickling-unpickling-on-different-servers-keyerror-classtype >> . >> >> Could you try to start over with a clean virtual environment? >> >> On Tue, Feb 4, 2020 at 11:46 AM Alan Krumholz >> wrote: >> >>> Hi Valentyn, >>> >>> Here is my pip freeze on my machine (note that the error is in dataflow, >>> the job runs fine in my machine) >>> >>> ansiwrap==0.8.4 >>> apache-beam==2.19.0 >>> arrow==0.15.5 >>> asn1crypto==1.3.0 >>> astroid==2.3.3 >>> astropy==3.2.3 >>> attrs==19.3.0 >>> avro-python3==1.9.1 >>> azure-common==1.1.24 >>> azure-storage-blob==2.1.0 >>> azure-storage-common==2.1.0 >>> backcall==0.1.0 >>> bcolz==1.2.1 >>> binaryornot==0.4.4 >>> bleach==3.1.0 >>> boto3==1.11.9 >>> botocore==1.14.9 >>> cachetools==3.1.1 >>> certifi==2019.11.28 >>> cffi==1.13.2 >>> chardet==3.0.4 >>> Click==7.0 >>> cloudpickle==1.2.2 >>> colorama==0.4.3 >>> configparser==4.0.2 >>> confuse==1.0.0 >>> cookiecutter==1.7.0 >>> crcmod==1.7 >>> cryptography==2.8 >>> cycler==0.10.0 >>> daal==2019.0 >>> datalab==1.1.5 >>> decorator==4.4.1 >>> defusedxml==0.6.0 >>> dill==0.3.1.1 >>> distro==1.0.1 >>> docker==4.1.0 >>> docopt==0.6.2 >>> docutils==0.15.2 >>> entrypoints==0.3 >>> enum34==1.1.6 >>> fairing==0.5.3 >>> fastavro==0.21.24 >>> fasteners==0.15 >>> fsspec==0.6.2 >>> future==0.18.2 >>> gcsfs==0.6.0 >>> gitdb2==2.0.6 >>> GitPython==3.0.5 >>> google-api-core==1.16.0 >>> google-api-python-client==1.7.11 >>> google-apitools==0.5.28 >>> google-auth==1.11.0 >>> google-auth-httplib2==0.0.3 >>> google-auth-oauthlib==0.4.1 >>> google-cloud-bigquery==1.17.1 >>> google-cloud-bigtable==1.0.0 >>> google-cloud-core==1.2.0 >>> google-cloud-dataproc==0.6.1 >>> google-cloud-datastore==1.7.4 >>> google-cloud-language==1.3.0 >>> google-cloud-logging==1.14.0 >>> google-cloud-monitoring==0.31.1 >>> google-cloud-pubsub==1.0.2 >>> google-cloud-secret-manager==0.1.1 >>> google-cloud-spanner==1.13.0 >>> google-cloud-storage==1.25.0 >>> google-cloud-translate==2.0.0 >>> google-compute-engine==20191210.0 >>> google-resumable-media==0.4.1 >>> googleapis-common-protos==1.51.0 >>> grpc-google-iam-v1==0.12.3 >>> grpcio==1.26.0 >>> h5py==2.10.0 >>> hdfs==2.5.8 >>> html5lib==1.0.1 >>> htmlmin==0.1.12 >>> httplib2==0.12.0 >>> icc-rt==2020.0.133 >>> idna==2.8 >>> ijson==2.6.1 >>> imageio==2.6.1 >>> importlib-metadata==1.4.0 >>> intel-numpy==1.15.1 >>> intel-openmp==2020.0.133 >>> intel-scikit-learn==0.19.2 >>> intel-scipy==1.1.0 >>> ipykernel==5.1.4 >>> ipython==7.9.0 >>> ipython-genutils==0.2.0 >>> ipython-sql==0.3.9 >>> ipywidgets==7.5.1 >>> isort==4.3.21 >>> jedi==0.16.0 >>> Jinja2==2.11.0 >>> jinja2-time==0.2.0 >>> jmespath==0.9.4 >>> joblib==0.14.1 >>> json5==0.8.5 >>> jsonschema==3.2.0 >>> jupyter==1.0.0 >>> jupyter-aihub-deploy-extension==0.1 >>> jupyter-client==5.3.4 >>> jupyter-console==6.1.0 >>> jupyter-contrib-core==0.3.3 >>> jupyter-contrib-nbextensions==0.5.1 >>> jupyter-core==4.6.1 >>> jupyter-highlight-selected-word==0.2.0 >>> jupyter-http-over-ws==0.0.7 >>> jupyter-latex-envs==1.4.6 >>
Re: dataflow job was working fine last night and it isn't now
I'm using a managed notebook instance from GCP It seems those already come with cloudpickle==1.2.2 as soon as you provision it. apache-beam[gcp] will then install dill==0.3.1.1 I'm going to try to uninstall cloudpickle before installing apache-beam and see if this fixes the problem Thank you On Tue, Feb 4, 2020 at 11:54 AM Valentyn Tymofieiev wrote: > The fact that you have cloudpickle==1.2.2 further confirms that you may > be hitting the same error as > https://stackoverflow.com/questions/42960637/python-3-5-dill-pickling-unpickling-on-different-servers-keyerror-classtype > . > > Could you try to start over with a clean virtual environment? > > On Tue, Feb 4, 2020 at 11:46 AM Alan Krumholz > wrote: > >> Hi Valentyn, >> >> Here is my pip freeze on my machine (note that the error is in dataflow, >> the job runs fine in my machine) >> >> ansiwrap==0.8.4 >> apache-beam==2.19.0 >> arrow==0.15.5 >> asn1crypto==1.3.0 >> astroid==2.3.3 >> astropy==3.2.3 >> attrs==19.3.0 >> avro-python3==1.9.1 >> azure-common==1.1.24 >> azure-storage-blob==2.1.0 >> azure-storage-common==2.1.0 >> backcall==0.1.0 >> bcolz==1.2.1 >> binaryornot==0.4.4 >> bleach==3.1.0 >> boto3==1.11.9 >> botocore==1.14.9 >> cachetools==3.1.1 >> certifi==2019.11.28 >> cffi==1.13.2 >> chardet==3.0.4 >> Click==7.0 >> cloudpickle==1.2.2 >> colorama==0.4.3 >> configparser==4.0.2 >> confuse==1.0.0 >> cookiecutter==1.7.0 >> crcmod==1.7 >> cryptography==2.8 >> cycler==0.10.0 >> daal==2019.0 >> datalab==1.1.5 >> decorator==4.4.1 >> defusedxml==0.6.0 >> dill==0.3.1.1 >> distro==1.0.1 >> docker==4.1.0 >> docopt==0.6.2 >> docutils==0.15.2 >> entrypoints==0.3 >> enum34==1.1.6 >> fairing==0.5.3 >> fastavro==0.21.24 >> fasteners==0.15 >> fsspec==0.6.2 >> future==0.18.2 >> gcsfs==0.6.0 >> gitdb2==2.0.6 >> GitPython==3.0.5 >> google-api-core==1.16.0 >> google-api-python-client==1.7.11 >> google-apitools==0.5.28 >> google-auth==1.11.0 >> google-auth-httplib2==0.0.3 >> google-auth-oauthlib==0.4.1 >> google-cloud-bigquery==1.17.1 >> google-cloud-bigtable==1.0.0 >> google-cloud-core==1.2.0 >> google-cloud-dataproc==0.6.1 >> google-cloud-datastore==1.7.4 >> google-cloud-language==1.3.0 >> google-cloud-logging==1.14.0 >> google-cloud-monitoring==0.31.1 >> google-cloud-pubsub==1.0.2 >> google-cloud-secret-manager==0.1.1 >> google-cloud-spanner==1.13.0 >> google-cloud-storage==1.25.0 >> google-cloud-translate==2.0.0 >> google-compute-engine==20191210.0 >> google-resumable-media==0.4.1 >> googleapis-common-protos==1.51.0 >> grpc-google-iam-v1==0.12.3 >> grpcio==1.26.0 >> h5py==2.10.0 >> hdfs==2.5.8 >> html5lib==1.0.1 >> htmlmin==0.1.12 >> httplib2==0.12.0 >> icc-rt==2020.0.133 >> idna==2.8 >> ijson==2.6.1 >> imageio==2.6.1 >> importlib-metadata==1.4.0 >> intel-numpy==1.15.1 >> intel-openmp==2020.0.133 >> intel-scikit-learn==0.19.2 >> intel-scipy==1.1.0 >> ipykernel==5.1.4 >> ipython==7.9.0 >> ipython-genutils==0.2.0 >> ipython-sql==0.3.9 >> ipywidgets==7.5.1 >> isort==4.3.21 >> jedi==0.16.0 >> Jinja2==2.11.0 >> jinja2-time==0.2.0 >> jmespath==0.9.4 >> joblib==0.14.1 >> json5==0.8.5 >> jsonschema==3.2.0 >> jupyter==1.0.0 >> jupyter-aihub-deploy-extension==0.1 >> jupyter-client==5.3.4 >> jupyter-console==6.1.0 >> jupyter-contrib-core==0.3.3 >> jupyter-contrib-nbextensions==0.5.1 >> jupyter-core==4.6.1 >> jupyter-highlight-selected-word==0.2.0 >> jupyter-http-over-ws==0.0.7 >> jupyter-latex-envs==1.4.6 >> jupyter-nbextensions-configurator==0.4.1 >> jupyterlab==1.2.6 >> jupyterlab-git==0.9.0 >> jupyterlab-server==1.0.6 >> keyring==10.1 >> keyrings.alt==1.3 >> kiwisolver==1.1.0 >> kubernetes==10.0.1 >> lazy-object-proxy==1.4.3 >> llvmlite==0.31.0 >> lxml==4.4.2 >> Markdown==3.1.1 >> MarkupSafe==1.1.1 >> matplotlib==3.0.3 >> mccabe==0.6.1 >> missingno==0.4.2 >> mistune==0.8.4 >> mkl==2019.0 >> mkl-fft==1.0.6 >> mkl-random==1.0.1.1 >> mock==2.0.0 >> monotonic==1.5 >> more-itertools==8.1.0 >> nbconvert==5.6.1 >> nbdime==1.1.0 >> nbformat==5.0.4 >> networkx==2.4 >> nltk==3.4.5 >> notebook==6.0.3 >> numba==0.47.0 >> numpy==1.15.1 >> oauth2client==3.0.0 >> oauthlib==3.1.0
Re: dataflow job was working fine last night and it isn't now
Here is a test job that sometimes fails and sometimes doesn't (but most times do). There seems to be something stochastic that causes this as after several tests a couple of them did succeed def test_error( bq_table: str) -> str: import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions class GenData(beam.DoFn): def process(self, _): for _ in range (2): yield {'a':1,'b':2} def get_bigquery_schema(): from apache_beam.io.gcp.internal.clients import bigquery table_schema = bigquery.TableSchema() columns = [ ["a","integer","nullable"], ["b","integer","nullable"] ] for column in columns: column_schema = bigquery.TableFieldSchema() column_schema.name = column[0] column_schema.type = column[1] column_schema.mode = column[2] table_schema.fields.append(column_schema) return table_schema pipeline = beam.Pipeline(options=PipelineOptions( project='my-project', temp_location = 'gs://my-bucket/temp', staging_location = 'gs://my-bucket/staging', runner='DataflowRunner' )) #pipeline = beam.Pipeline() ( pipeline | 'Empty start' >> beam.Create(['']) | 'Generate Data' >> beam.ParDo(GenData()) #| 'print' >> beam.Map(print) | 'Write to BigQuery' >> beam.io.WriteToBigQuery( project=bq_table.split(':')[0], dataset=bq_table.split(':')[1].split('.')[0], table=bq_table.split(':')[1].split('.')[1], schema=get_bigquery_schema(), create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE) ) result = pipeline.run() result.wait_until_finish() return True test_error( bq_table = 'my-project:my_dataset.my_table' ) On Tue, Feb 4, 2020 at 10:04 AM Alan Krumholz wrote: > I tried breaking apart my pipeline. Seems the step that breaks it is: > beam.io.WriteToBigQuery > > Let me see if I can create a self contained example that breaks to share > with you > > Thanks! > > On Tue, Feb 4, 2020 at 9:53 AM Pablo Estrada wrote: > >> Hm that's odd. No changes to the pipeline? Are you able to share some of >> the code? >> >> +Udi Meiri do you have any idea what could be going >> on here? >> >> On Tue, Feb 4, 2020 at 9:25 AM Alan Krumholz >> wrote: >> >>> Hi Pablo, >>> This is strange... it doesn't seem to be the last beam release as last >>> night it was already using 2.19.0 I wonder if it was some release from the >>> DataFlow team (not beam related): >>> Job typeBatch >>> Job status Succeeded >>> SDK version >>> Apache Beam Python 3.5 SDK 2.19.0 >>> Region >>> us-central1 >>> Start timeFebruary 3, 2020 at 9:28:35 PM GMT-8 >>> Elapsed time5 min 11 sec >>> >>> On Tue, Feb 4, 2020 at 9:15 AM Pablo Estrada wrote: >>> >>>> Hi Alan, >>>> could it be that you're picking up the new Apache Beam 2.19.0 release? >>>> Could you try depending on beam 2.18.0 to see if the issue surfaces when >>>> using the new release? >>>> >>>> If something was working and no longer works, it sounds like a bug. >>>> This may have to do with how we pickle (dill / cloudpickle) - see this >>>> question >>>> https://stackoverflow.com/questions/42960637/python-3-5-dill-pickling-unpickling-on-different-servers-keyerror-classtype >>>> Best >>>> -P. >>>> >>>> On Tue, Feb 4, 2020 at 6:22 AM Alan Krumholz >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> I was running a dataflow job in GCP last night and it was running fine. >>>>> This morning this same exact job is failing with the following error: >>>>> >>>>> Error message from worker: Traceback (most recent call last): File >>>>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", >>>>> line 286, in loads return dill.loads(s) File >>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 275, in loads >>>>> return load(file, ignore, **kwds) File >>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 270, in load >>>>> return Unpickler(file, ignore=ignore, **kwds).load() File >>&g
Re: dataflow job was working fine last night and it isn't now
I tried breaking apart my pipeline. Seems the step that breaks it is: beam.io.WriteToBigQuery Let me see if I can create a self contained example that breaks to share with you Thanks! On Tue, Feb 4, 2020 at 9:53 AM Pablo Estrada wrote: > Hm that's odd. No changes to the pipeline? Are you able to share some of > the code? > > +Udi Meiri do you have any idea what could be going on > here? > > On Tue, Feb 4, 2020 at 9:25 AM Alan Krumholz > wrote: > >> Hi Pablo, >> This is strange... it doesn't seem to be the last beam release as last >> night it was already using 2.19.0 I wonder if it was some release from the >> DataFlow team (not beam related): >> Job typeBatch >> Job status Succeeded >> SDK version >> Apache Beam Python 3.5 SDK 2.19.0 >> Region >> us-central1 >> Start timeFebruary 3, 2020 at 9:28:35 PM GMT-8 >> Elapsed time5 min 11 sec >> >> On Tue, Feb 4, 2020 at 9:15 AM Pablo Estrada wrote: >> >>> Hi Alan, >>> could it be that you're picking up the new Apache Beam 2.19.0 release? >>> Could you try depending on beam 2.18.0 to see if the issue surfaces when >>> using the new release? >>> >>> If something was working and no longer works, it sounds like a bug. This >>> may have to do with how we pickle (dill / cloudpickle) - see this question >>> https://stackoverflow.com/questions/42960637/python-3-5-dill-pickling-unpickling-on-different-servers-keyerror-classtype >>> Best >>> -P. >>> >>> On Tue, Feb 4, 2020 at 6:22 AM Alan Krumholz >>> wrote: >>> >>>> Hi, >>>> >>>> I was running a dataflow job in GCP last night and it was running fine. >>>> This morning this same exact job is failing with the following error: >>>> >>>> Error message from worker: Traceback (most recent call last): File >>>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", >>>> line 286, in loads return dill.loads(s) File >>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 275, in loads >>>> return load(file, ignore, **kwds) File >>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 270, in load >>>> return Unpickler(file, ignore=ignore, **kwds).load() File >>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 472, in load >>>> obj = StockUnpickler.load(self) File >>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 577, in >>>> _load_type return _reverse_typemap[name] KeyError: 'ClassType' During >>>> handling of the above exception, another exception occurred: Traceback >>>> (most recent call last): File >>>> "/usr/local/lib/python3.5/site-packages/dataflow_worker/batchworker.py", >>>> line 648, in do_work work_executor.execute() File >>>> "/usr/local/lib/python3.5/site-packages/dataflow_worker/executor.py", line >>>> 176, in execute op.start() File "apache_beam/runners/worker/operations.py", >>>> line 649, in apache_beam.runners.worker.operations.DoOperation.start File >>>> "apache_beam/runners/worker/operations.py", line 651, in >>>> apache_beam.runners.worker.operations.DoOperation.start File >>>> "apache_beam/runners/worker/operations.py", line 652, in >>>> apache_beam.runners.worker.operations.DoOperation.start File >>>> "apache_beam/runners/worker/operations.py", line 261, in >>>> apache_beam.runners.worker.operations.Operation.start File >>>> "apache_beam/runners/worker/operations.py", line 266, in >>>> apache_beam.runners.worker.operations.Operation.start File >>>> "apache_beam/runners/worker/operations.py", line 597, in >>>> apache_beam.runners.worker.operations.DoOperation.setup File >>>> "apache_beam/runners/worker/operations.py", line 602, in >>>> apache_beam.runners.worker.operations.DoOperation.setup File >>>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", >>>> line 290, in loads return dill.loads(s) File >>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 275, in loads >>>> return load(file, ignore, **kwds) File >>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 270, in load >>>> return Unpickler(file, ignore=ignore, **kwds).load() File >>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 472, in load >>>> obj = StockUnpickler.load(self) File >>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 577, in >>>> _load_type return _reverse_typemap[name] KeyError: 'ClassType' >>>> >>>> >>>> If I use a local runner it still runs fine. >>>> Anyone else experiencing something similar today? (or know how to fix >>>> this?) >>>> >>>> Thanks! >>>> >>>
Re: dataflow job was working fine last night and it isn't now
Hi Pablo, This is strange... it doesn't seem to be the last beam release as last night it was already using 2.19.0 I wonder if it was some release from the DataFlow team (not beam related): Job typeBatch Job status Succeeded SDK version Apache Beam Python 3.5 SDK 2.19.0 Region us-central1 Start timeFebruary 3, 2020 at 9:28:35 PM GMT-8 Elapsed time5 min 11 sec On Tue, Feb 4, 2020 at 9:15 AM Pablo Estrada wrote: > Hi Alan, > could it be that you're picking up the new Apache Beam 2.19.0 release? > Could you try depending on beam 2.18.0 to see if the issue surfaces when > using the new release? > > If something was working and no longer works, it sounds like a bug. This > may have to do with how we pickle (dill / cloudpickle) - see this question > https://stackoverflow.com/questions/42960637/python-3-5-dill-pickling-unpickling-on-different-servers-keyerror-classtype > Best > -P. > > On Tue, Feb 4, 2020 at 6:22 AM Alan Krumholz > wrote: > >> Hi, >> >> I was running a dataflow job in GCP last night and it was running fine. >> This morning this same exact job is failing with the following error: >> >> Error message from worker: Traceback (most recent call last): File >> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", >> line 286, in loads return dill.loads(s) File >> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 275, in loads >> return load(file, ignore, **kwds) File >> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 270, in load >> return Unpickler(file, ignore=ignore, **kwds).load() File >> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 472, in load >> obj = StockUnpickler.load(self) File >> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 577, in >> _load_type return _reverse_typemap[name] KeyError: 'ClassType' During >> handling of the above exception, another exception occurred: Traceback >> (most recent call last): File >> "/usr/local/lib/python3.5/site-packages/dataflow_worker/batchworker.py", >> line 648, in do_work work_executor.execute() File >> "/usr/local/lib/python3.5/site-packages/dataflow_worker/executor.py", line >> 176, in execute op.start() File "apache_beam/runners/worker/operations.py", >> line 649, in apache_beam.runners.worker.operations.DoOperation.start File >> "apache_beam/runners/worker/operations.py", line 651, in >> apache_beam.runners.worker.operations.DoOperation.start File >> "apache_beam/runners/worker/operations.py", line 652, in >> apache_beam.runners.worker.operations.DoOperation.start File >> "apache_beam/runners/worker/operations.py", line 261, in >> apache_beam.runners.worker.operations.Operation.start File >> "apache_beam/runners/worker/operations.py", line 266, in >> apache_beam.runners.worker.operations.Operation.start File >> "apache_beam/runners/worker/operations.py", line 597, in >> apache_beam.runners.worker.operations.DoOperation.setup File >> "apache_beam/runners/worker/operations.py", line 602, in >> apache_beam.runners.worker.operations.DoOperation.setup File >> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", >> line 290, in loads return dill.loads(s) File >> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 275, in loads >> return load(file, ignore, **kwds) File >> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 270, in load >> return Unpickler(file, ignore=ignore, **kwds).load() File >> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 472, in load >> obj = StockUnpickler.load(self) File >> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 577, in >> _load_type return _reverse_typemap[name] KeyError: 'ClassType' >> >> >> If I use a local runner it still runs fine. >> Anyone else experiencing something similar today? (or know how to fix >> this?) >> >> Thanks! >> >
dataflow job was working fine last night and it isn't now
Hi, I was running a dataflow job in GCP last night and it was running fine. This morning this same exact job is failing with the following error: Error message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", line 286, in loads return dill.loads(s) File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 275, in loads return load(file, ignore, **kwds) File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 270, in load return Unpickler(file, ignore=ignore, **kwds).load() File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 472, in load obj = StockUnpickler.load(self) File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 577, in _load_type return _reverse_typemap[name] KeyError: 'ClassType' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.5/site-packages/dataflow_worker/batchworker.py", line 648, in do_work work_executor.execute() File "/usr/local/lib/python3.5/site-packages/dataflow_worker/executor.py", line 176, in execute op.start() File "apache_beam/runners/worker/operations.py", line 649, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 651, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 652, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 261, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py", line 597, in apache_beam.runners.worker.operations.DoOperation.setup File "apache_beam/runners/worker/operations.py", line 602, in apache_beam.runners.worker.operations.DoOperation.setup File "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", line 290, in loads return dill.loads(s) File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 275, in loads return load(file, ignore, **kwds) File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 270, in load return Unpickler(file, ignore=ignore, **kwds).load() File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 472, in load obj = StockUnpickler.load(self) File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 577, in _load_type return _reverse_typemap[name] KeyError: 'ClassType' If I use a local runner it still runs fine. Anyone else experiencing something similar today? (or know how to fix this?) Thanks!
Local error when using requirements_file for external runner.
I have a simple python pipeline that uses a publicly available (PyPI) library. I can run my pipeline fine using my local runner. I can also run it fine when using DataFlow runner if I provide a setup_file to the pipeline. However, when I try to do this by using a requirements_file instead of a setup_file (recommended and cleaner way when pipeline has only PyPI dependencies) I get an error in my local machine and the job is never submitted to DataFlow. I did some digging and the problem seems to be that when you use a requirements_file the python SDK tries running the following command in the local machine before submitting the external job: python -m pip download --dest /tmp/dataflow-requirements-cache -r /tmp/requirements.txt --exists-action i --no-binary :all: This command seems to be trying to install all these other libraries (apart from the one in my requirements_file): azure-common-1.1.24.zip azure-storage-blob-2.1.0.tar.gz boto3-1.11.9.tar.gz botocore-1.14.9.tar.gz certifi-2019.11.28.tar.gz cffi-1.13.2.tar.gz pycryptodomex-3.9.6.tar.gz pyOpenSSL-19.1.0.tar.gz pytz-2019.3.tar.gz requests-2.22.0.tar.gz urllib3-1.25.8.tar.gz It installs some of them fine but the error seems to come when it tries to install "cryptography": Collecting azure-common<2.0.0 Using cached azure-common-1.1.24.zip (18 kB) Saved /tmp/dataflow-requirements-cache/azure-common-1.1.24.zip Collecting azure-storage-blob<12.0.0 Using cached azure-storage-blob-2.1.0.tar.gz (83 kB) Saved /tmp/dataflow-requirements-cache/azure-storage-blob-2.1.0.tar.gz Collecting boto3<1.12,>=1.4.4 Using cached boto3-1.11.9.tar.gz (98 kB) Saved /tmp/dataflow-requirements-cache/boto3-1.11.9.tar.gz Collecting botocore<1.15,>=1.5.0 Using cached botocore-1.14.9.tar.gz (6.1 MB) Saved /tmp/dataflow-requirements-cache/botocore-1.14.9.tar.gz Collecting requests<2.23.0 Using cached requests-2.22.0.tar.gz (113 kB) Saved /tmp/dataflow-requirements-cache/requests-2.22.0.tar.gz Collecting urllib3<1.26.0,>=1.20 Using cached urllib3-1.25.8.tar.gz (261 kB) Saved /tmp/dataflow-requirements-cache/urllib3-1.25.8.tar.gz Collecting certifi<2021.0.0 Using cached certifi-2019.11.28.tar.gz (156 kB) Saved /tmp/dataflow-requirements-cache/certifi-2019.11.28.tar.gz Collecting pytz<2021.0 Using cached pytz-2019.3.tar.gz (312 kB) Saved /tmp/dataflow-requirements-cache/pytz-2019.3.tar.gz Collecting pycryptodomex!=3.5.0,<4.0.0,>=3.2 Using cached pycryptodomex-3.9.6.tar.gz (15.5 MB) Saved /tmp/dataflow-requirements-cache/pycryptodomex-3.9.6.tar.gz Collecting pyOpenSSL<21.0.0,>=16.2.0 Using cached pyOpenSSL-19.1.0.tar.gz (160 kB) Saved /tmp/dataflow-requirements-cache/pyOpenSSL-19.1.0.tar.gz Collecting cffi<1.14,>=1.9 Using cached cffi-1.13.2.tar.gz (460 kB) Saved /tmp/dataflow-requirements-cache/cffi-1.13.2.tar.gz Collecting cryptography<3.0.0,>=1.8.2 Using cached cryptography-2.8.tar.gz (504 kB) Installing build dependencies ... error ERROR: Command errored out with exit status 1: command: /opt/conda/bin/python /opt/conda/lib/python3.6/site-packages/pip install --ignore-installed --no-user --prefix /tmp/pip-build-env-wq57ycwk/overlay --no-warn-script-location --no-binary :all: --only-binary :none: -i https://pypi.org/simple -- 'setuptools>=40.6.0' wheel 'cffi>=1.8,!=1.11.3; platform_python_implementation != '"'"'PyPy'"'"'' Has anyone else seen this problem? and is there an easy way to fix it? Thank you!
Re: snowflake source/sink
Hi Robert, beam.util.GroupIntoBatches(x) works perfectly. Thanks so much! On Wed, Jan 29, 2020 at 12:58 PM Robert Bradshaw wrote: > You could use the beam.util.BatchElements transform to batch rows into > larger chunks. > > On Wed, Jan 29, 2020 at 12:01 PM Alan Krumholz > wrote: > > > > Thanks Brice! I'll look into wrapping the connector. > > > > One more question. > > I'm trying now to develop a sink too. This is what I have: > > > > def writeSnowflake(row): > > import snowflake.connector > > ctx = snowflake.connector.connect(...) > > cs = ctx.cursor() > > cs.execute( > > 'INSERT INTO t(a, b) VALUES ({a}, {b})'.format( > > a = str(row['a']), > > b = str(row['b']) > > ) > > ) > > return row > > > > pipeline = beam.Pipeline(...) > > p = ( > > pipeline > > | 'Read from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(...)) > > | 'Write to SnowFlake' >> beam.Map(writeSnowflake) > > ) > > > > This seems to (slowly) work but it feels extremely inefficient to send > an INSERT query to the DW for each row in the dataset. > > Is there an easy way to have the pipeline maybe stack all my data rows > into chunks of 1,000 so I can insert these by chunks instead. I'm mostly > curious about how to have the pipeline pass 1K rows at a time to > "writeSnowflake()" instead of passing one by one. > > Maybe by using a GroupByKey transformation and using randomly sampled > keys to create the chunks of the desired size? (or can you think of a > better way to achieve this?) > > > > Thank you so much for all your help! > > > > On Wed, Jan 29, 2020 at 11:30 AM Brice Giesbrecht > wrote: > >> > >> Lovely! You absolutely have the right idea. > >> > >> Your comments are spot on but the suboptimal solution that works is > usually preferable to the optimal one that doesn't (exist). > >> > >> I don't have any experience with the Snowflake connector but if it can > be kept around and reused and is slow or expensive to create, you may want > to consider a wrapper/class to manage the client/connector that you can use > across your pipeline steps if needed. > >> > >> Happy processing. > >> -Brice > >> > >> On Wed, Jan 29, 2020 at 2:01 PM Alan Krumholz < > alan.krumh...@betterup.co> wrote: > >>> > >>> Hi Brice, > >>> > >>> Thanks so much for your suggestion! I did the following and it seems > to work: > >>> > >>> class ReadSnowflake(beam.DoFn): > >>> def process(self, _): > >>> import snowflake.connector > >>> ctx = snowflake.connector.connect(...) > >>> cs = ctx.cursor() > >>> cs.execute('SELECT a, b FROM t') > >>> while True: > >>> data = cs.fetchmany(1000) > >>> if len(data) == 0: > >>> break; > >>> for d in data: > >>> yield {'a':d[0], 'b':d[1]} > >>> > >>> pipeline = beam.Pipeline() > >>> p = ( > >>> pipeline > >>> | 'Empty start' >> beam.Create(['']) > >>> | 'Read from Snowflake' >> beam.ParDo(ReadSnowflake()) > >>> | 'Write to BigQuery' >> beam.io.WriteToBigQuery(...)) > >>> > >>> > >>> I know this is not optimal as it is reading sequentially from > snowflake (instead of doing it in parallel as I'm sure the BQ source does) > but apart from that, do you see any other problems (or possible > improvements) with this code? > >>> > >>> > >>> Thank you so much! > >>> > >>> On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht > wrote: > >>>> > >>>> I am using a pattern which I saw online (but can't seem to locate) > which performs i/o in a DoFn using a standard python client. I use this to > read and write to Google cloud storage in streaming mode. You could use > this idea to perform almost any i/o. Depending on your use case and > workflow, this may be an approach you could consider. Shout if you need > some boilerplate. > >>>> > >>>> It does look like native support is coming and you know it is true as > I read it on the internet.:) > >>>> > https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/ > >>>> > >>
Re: snowflake source/sink
Thanks Brice! I'll look into wrapping the connector. One more question. I'm trying now to develop a sink too. This is what I have: def writeSnowflake(row): import snowflake.connector ctx = snowflake.connector.connect(...) cs = ctx.cursor() cs.execute( 'INSERT INTO t(a, b) VALUES ({a}, {b})'.format( a = str(row['a']), b = str(row['b']) ) ) return row pipeline = beam.Pipeline(...) p = ( pipeline | 'Read from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(...)) | 'Write to SnowFlake' >> beam.Map(writeSnowflake) ) This seems to (slowly) work but it feels extremely inefficient to send an INSERT query to the DW for each row in the dataset. Is there an easy way to have the pipeline maybe stack all my data rows into chunks of 1,000 so I can insert these by chunks instead. I'm mostly curious about how to have the pipeline pass 1K rows at a time to "writeSnowflake()" instead of passing one by one. Maybe by using a GroupByKey transformation and using randomly sampled keys to create the chunks of the desired size? (or can you think of a better way to achieve this?) Thank you so much for all your help! On Wed, Jan 29, 2020 at 11:30 AM Brice Giesbrecht wrote: > Lovely! You absolutely have the right idea. > > Your comments are spot on but the suboptimal solution that works is > usually preferable to the optimal one that doesn't (exist). > > I don't have any experience with the Snowflake connector but if it can be > kept around and reused and is slow or expensive to create, you may want to > consider a wrapper/class to manage the client/connector that you can use > across your pipeline steps if needed. > > Happy processing. > -Brice > > On Wed, Jan 29, 2020 at 2:01 PM Alan Krumholz > wrote: > >> Hi Brice, >> >> Thanks so much for your suggestion! I did the following and it seems to >> work: >> >> class ReadSnowflake(beam.DoFn): >> def process(self, _): >> import snowflake.connector >> ctx = snowflake.connector.connect(...) >> cs = ctx.cursor() >> cs.execute('SELECT a, b FROM t') >> while True: >> data = cs.fetchmany(1000) >> if len(data) == 0: >> break; >> for d in data: >> yield {'a':d[0], 'b':d[1]} >> >> pipeline = beam.Pipeline() >> p = ( >> pipeline >> | 'Empty start' >> beam.Create(['']) >> | 'Read from Snowflake' >> beam.ParDo(ReadSnowflake()) >> | 'Write to BigQuery' >> beam.io.WriteToBigQuery(...)) >> >> >> I know this is not optimal as it is reading sequentially from snowflake >> (instead of doing it in parallel as I'm sure the BQ source does) but apart >> from that, do you see any other problems (or possible improvements) with >> this code? >> >> >> Thank you so much! >> >> On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht >> wrote: >> >>> I am using a pattern which I saw online (but can't seem to locate) which >>> performs i/o in a DoFn using a standard python client. I use this to read >>> and write to Google cloud storage in streaming mode. You could use this >>> idea to perform almost any i/o. Depending on your use case and workflow, >>> this may be an approach you could consider. Shout if you need some >>> boilerplate. >>> >>> It does look like native support is coming and you know it is true as I >>> read it on the internet.:) >>> >>> https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/ >>> >>> >>> You could also setup an external service or endpoint to perform the >>> query and read the results into your pipeline in a pipeline step similar to >>> the enrichment idea here: >>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1 >>> >>> And you could always write your own connector. Not a task to be taken >>> too lightly but it can be done. >>> >>> HTH >>> >>> >>> On Wed, Jan 29, 2020 at 10:17 AM Alan Krumholz < >>> alan.krumh...@betterup.co> wrote: >>> >>>> Thanks for sharing this Erik! >>>> >>>> It would be really nice/convenient to have a python option to do >>>> something like that. Our ML team is mostly a python shop and we are also >>>> using kubeflow pipelines to orchestrate our ML pipelines (mostly using >>>> their python sdk to author these). >>>> >>>> Please let
Re: snowflake source/sink
Hi Brice, Thanks so much for your suggestion! I did the following and it seems to work: class ReadSnowflake(beam.DoFn): def process(self, _): import snowflake.connector ctx = snowflake.connector.connect(...) cs = ctx.cursor() cs.execute('SELECT a, b FROM t') while True: data = cs.fetchmany(1000) if len(data) == 0: break; for d in data: yield {'a':d[0], 'b':d[1]} pipeline = beam.Pipeline() p = ( pipeline | 'Empty start' >> beam.Create(['']) | 'Read from Snowflake' >> beam.ParDo(ReadSnowflake()) | 'Write to BigQuery' >> beam.io.WriteToBigQuery(...)) I know this is not optimal as it is reading sequentially from snowflake (instead of doing it in parallel as I'm sure the BQ source does) but apart from that, do you see any other problems (or possible improvements) with this code? Thank you so much! On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht wrote: > I am using a pattern which I saw online (but can't seem to locate) which > performs i/o in a DoFn using a standard python client. I use this to read > and write to Google cloud storage in streaming mode. You could use this > idea to perform almost any i/o. Depending on your use case and workflow, > this may be an approach you could consider. Shout if you need some > boilerplate. > > It does look like native support is coming and you know it is true as I > read it on the internet.:) > > https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/ > > > You could also setup an external service or endpoint to perform the query > and read the results into your pipeline in a pipeline step similar to the > enrichment idea here: > https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1 > > And you could always write your own connector. Not a task to be taken too > lightly but it can be done. > > HTH > > > On Wed, Jan 29, 2020 at 10:17 AM Alan Krumholz > wrote: > >> Thanks for sharing this Erik! >> >> It would be really nice/convenient to have a python option to do >> something like that. Our ML team is mostly a python shop and we are also >> using kubeflow pipelines to orchestrate our ML pipelines (mostly using >> their python sdk to author these). >> >> Please let me know if you can think of any way we could do this with >> python. >> >> Thanks so much! >> >> >> >> >> >> On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey < >> erik.will...@panderasystems.com> wrote: >> >>> You can use the JDBC driver. Here's a blog that describes JDBC usage in >>> general: >>> https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/ >>> >>> >>> On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz < >>> alan.krumh...@betterup.co> wrote: >>> >>>> Hi, >>>> We are using beam and (dataflow) at my company and would like to use it >>>> to read and write data from snowflake. >>>> >>>> Does anybody know if there are any source/sink available for snowflake? >>>> >>>> if not, what would be the easiest way to create those? (maybe there is >>>> something for sqlalchemy that we could leverage for that?) >>>> >>>> >>>> Thanks so much! >>>> >>>> Alan >>>> >>>
Re: snowflake source/sink
Thanks for sharing this Erik! It would be really nice/convenient to have a python option to do something like that. Our ML team is mostly a python shop and we are also using kubeflow pipelines to orchestrate our ML pipelines (mostly using their python sdk to author these). Please let me know if you can think of any way we could do this with python. Thanks so much! On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey < erik.will...@panderasystems.com> wrote: > You can use the JDBC driver. Here's a blog that describes JDBC usage in > general: > https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/ > > > On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz > wrote: > >> Hi, >> We are using beam and (dataflow) at my company and would like to use it >> to read and write data from snowflake. >> >> Does anybody know if there are any source/sink available for snowflake? >> >> if not, what would be the easiest way to create those? (maybe there is >> something for sqlalchemy that we could leverage for that?) >> >> >> Thanks so much! >> >> Alan >> >
snowflake source/sink
Hi, We are using beam and (dataflow) at my company and would like to use it to read and write data from snowflake. Does anybody know if there are any source/sink available for snowflake? if not, what would be the easiest way to create those? (maybe there is something for sqlalchemy that we could leverage for that?) Thanks so much! Alan