Re: snowflake io in python

2020-11-30 Thread Alan Krumholz
Thank you!

On Mon, Nov 23, 2020 at 10:53 PM Chamikara Jayalath 
wrote:

> We are working on this. ETA is the end of this quarter. Created
> https://issues.apache.org/jira/browse/BEAM-11332 for tracking.
>
> Thanks,
> Cham
>
> On Mon, Nov 23, 2020 at 5:42 AM Alan Krumholz 
> wrote:
>
>> Hi Cham,
>> I'm guessing this means I won't be able to use Snowflake IO with python
>> on dataflow until then?
>> Is there a timeline for that work to be completed?
>>
>> Thank you so much for your help
>>
>>
>>
>> On Fri, Nov 20, 2020 at 9:33 PM Chamikara Jayalath 
>> wrote:
>>
>>> This is because Python does not understand that Java specific
>>> "beam:transform:write_files:v1" transform. Hopefully this is one of those
>>> issues that will get resolved when we update Dataflow to directly consume
>>> portable protos (we are working on this now).
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Fri, Nov 20, 2020 at 11:14 AM Brian Hulette 
>>> wrote:
>>>
>>>> +Chamikara Jayalath  any idea why this is still
>>>> doing a runner api roundtrip and failing? It's a multi-language pipeline,
>>>> and Alan has it configured to run on Dataflow runner V2.
>>>>
>>>> On Fri, Nov 20, 2020 at 10:36 AM Alan Krumholz <
>>>> alan.krumh...@betterup.co> wrote:
>>>>
>>>>> Just tried that and still getting this:
>>>>>
>>>>> ---KeyError
>>>>>   Traceback (most recent call 
>>>>> last) in > 1 bq_to_snowflake(  
>>>>> 2 'ml-betterup.coach_search.distances',  3 
>>>>> 'analytics.ml.coach_search_distances',  4 'master')
>>>>>  in bq_to_snowflake(bq_table, snow_table, 
>>>>> git_branch)138 )139 --> 140 result = pipeline.run()
>>>>> 141 result.wait_until_finish()142
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>>>> run(self, test_runner_api)512   # When possible, invoke a round 
>>>>> trip through the runner API.513   if test_runner_api and 
>>>>> self._verify_runner_api_compatible():--> 514 return 
>>>>> Pipeline.from_runner_api(515 
>>>>> self.to_runner_api(use_fake_coders=True),516 self.runner,
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>>>> from_runner_api(proto, runner, options, return_context, 
>>>>> allow_proto_holders)890 requirements=proto.requirements)
>>>>> 891 root_transform_id, = proto.root_transform_ids--> 892 
>>>>> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>>>>> 893 # TODO(robertwb): These are only needed to continue construction. 
>>>>> Omit?894 p.applied_labels = {
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>>>  in get_by_id(self, id)113 # type: (str) -> PortableObjectT
>>>>> 114 if id not in self._id_to_obj:--> 115   self._id_to_obj[id] = 
>>>>> self._obj_type.from_runner_api(116   self._id_to_proto[id], 
>>>>> self._pipeline_context)117 return self._id_to_obj[id]
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>>>> from_runner_api(proto, context)   1277 result.parts = []   1278 
>>>>> for transform_id in proto.subtransforms:-> 1279   part = 
>>>>> context.transforms.get_by_id(transform_id)   1280   part.parent = 
>>>>> result   1281   result.parts.append(part)
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>>>  in get_by_id(self, id)113 # type: (str) -> PortableObjectT
>>>>> 114 if id not in self._id_to_obj:--> 115   self._id_to_obj[id] = 
>>>>> self._obj_type.from_runner_api(116   self._id_to_proto[id], 
>>>>> self._pipeline_context)117 return self._id_to_obj[id]
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>>>> from_runner_api(proto, context)   1277 result.parts = []   1278 
>>>>> for transform_id in proto

Re: snowflake io in python

2020-11-23 Thread Alan Krumholz
Hi Cham,
I'm guessing this means I won't be able to use Snowflake IO with python on
dataflow until then?
Is there a timeline for that work to be completed?

Thank you so much for your help



On Fri, Nov 20, 2020 at 9:33 PM Chamikara Jayalath 
wrote:

> This is because Python does not understand that Java specific
> "beam:transform:write_files:v1" transform. Hopefully this is one of those
> issues that will get resolved when we update Dataflow to directly consume
> portable protos (we are working on this now).
>
> Thanks,
> Cham
>
> On Fri, Nov 20, 2020 at 11:14 AM Brian Hulette 
> wrote:
>
>> +Chamikara Jayalath  any idea why this is still
>> doing a runner api roundtrip and failing? It's a multi-language pipeline,
>> and Alan has it configured to run on Dataflow runner V2.
>>
>> On Fri, Nov 20, 2020 at 10:36 AM Alan Krumholz 
>> wrote:
>>
>>> Just tried that and still getting this:
>>>
>>> ---KeyError
>>>   Traceback (most recent call 
>>> last) in > 1 bq_to_snowflake(
>>>   2 'ml-betterup.coach_search.distances',  3 
>>> 'analytics.ml.coach_search_distances',  4 'master')
>>>  in bq_to_snowflake(bq_table, snow_table, 
>>> git_branch)138 )139 --> 140 result = pipeline.run()141  
>>>result.wait_until_finish()142
>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>> run(self, test_runner_api)512   # When possible, invoke a round 
>>> trip through the runner API.513   if test_runner_api and 
>>> self._verify_runner_api_compatible():--> 514 return 
>>> Pipeline.from_runner_api(515 
>>> self.to_runner_api(use_fake_coders=True),516 self.runner,
>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>> from_runner_api(proto, runner, options, return_context, 
>>> allow_proto_holders)890 requirements=proto.requirements)891 
>>> root_transform_id, = proto.root_transform_ids--> 892 
>>> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>>> 893 # TODO(robertwb): These are only needed to continue construction. 
>>> Omit?894 p.applied_labels = {
>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>  in get_by_id(self, id)113 # type: (str) -> PortableObjectT114  
>>>if id not in self._id_to_obj:--> 115   self._id_to_obj[id] = 
>>> self._obj_type.from_runner_api(116   self._id_to_proto[id], 
>>> self._pipeline_context)117 return self._id_to_obj[id]
>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>> from_runner_api(proto, context)   1277 result.parts = []   1278 for 
>>> transform_id in proto.subtransforms:-> 1279   part = 
>>> context.transforms.get_by_id(transform_id)   1280   part.parent = 
>>> result   1281   result.parts.append(part)
>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>  in get_by_id(self, id)113 # type: (str) -> PortableObjectT114  
>>>if id not in self._id_to_obj:--> 115   self._id_to_obj[id] = 
>>> self._obj_type.from_runner_api(116   self._id_to_proto[id], 
>>> self._pipeline_context)117 return self._id_to_obj[id]
>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>> from_runner_api(proto, context)   1277 result.parts = []   1278 for 
>>> transform_id in proto.subtransforms:-> 1279   part = 
>>> context.transforms.get_by_id(transform_id)   1280   part.parent = 
>>> result   1281   result.parts.append(part)
>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>  in get_by_id(self, id)113 # type: (str) -> PortableObjectT114  
>>>if id not in self._id_to_obj:--> 115   self._id_to_obj[id] = 
>>> self._obj_type.from_runner_api(116   self._id_to_proto[id], 
>>> self._pipeline_context)117 return self._id_to_obj[id]
>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>> from_runner_api(proto, context)   1277 result.parts = []   1278 for 
>>> transform_id in proto.subtransforms:-> 1279   part = 
>>> context.transforms.get_by_id(tra

Re: snowflake io in python

2020-11-20 Thread Alan Krumholz
._known_urns[proto.spec.urn]
  691 692 try:
KeyError: 'beam:transform:write_files:v1'



On Fri, Nov 20, 2020 at 11:18 AM Brian Hulette  wrote:

> Hm try passing in the args as they would appear in
> `sys.argv`, PipelineOptions(['--experiments=use_runner_v2'])
>
> On Thu, Nov 19, 2020 at 12:14 PM Alan Krumholz 
> wrote:
>
>> How can I pass that flag using the SDK?
>> Tried this:
>>
>> pipeline = beam.Pipeline(options=PipelineOptions(experiments=
>>> ['use_runner_v2'], ...)
>>
>>
>> but still getting a similar error:
>>
>> ---KeyError
>>   Traceback (most recent call 
>> last) in > 1 bq_to_snowflake( 
>>  2 'ml-betterup.coach_search.distances',  3 
>> 'analytics.ml.coach_search_distances',  4 'master')
>>  in bq_to_snowflake(bq_table, snow_table, 
>> git_branch)138 )139 --> 140 result = pipeline.run()141   
>>   result.wait_until_finish()142
>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>> run(self, test_runner_api)512   # When possible, invoke a round trip 
>> through the runner API.513   if test_runner_api and 
>> self._verify_runner_api_compatible():--> 514 return 
>> Pipeline.from_runner_api(515 
>> self.to_runner_api(use_fake_coders=True),516 self.runner,
>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>> from_runner_api(proto, runner, options, return_context, allow_proto_holders) 
>>890 requirements=proto.requirements)891 
>> root_transform_id, = proto.root_transform_ids--> 892 p.transforms_stack 
>> = [context.transforms.get_by_id(root_transform_id)]893 # 
>> TODO(robertwb): These are only needed to continue construction. Omit?894 
>> p.applied_labels = {
>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>  in get_by_id(self, id)113 # type: (str) -> PortableObjectT114   
>>   if id not in self._id_to_obj:--> 115   self._id_to_obj[id] = 
>> self._obj_type.from_runner_api(116   self._id_to_proto[id], 
>> self._pipeline_context)117 return self._id_to_obj[id]
>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>> from_runner_api(proto, context)   1277 result.parts = []   1278 for 
>> transform_id in proto.subtransforms:-> 1279   part = 
>> context.transforms.get_by_id(transform_id)   1280   part.parent = result 
>>   1281   result.parts.append(part)
>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>  in get_by_id(self, id)113 # type: (str) -> PortableObjectT114   
>>   if id not in self._id_to_obj:--> 115   self._id_to_obj[id] = 
>> self._obj_type.from_runner_api(116   self._id_to_proto[id], 
>> self._pipeline_context)117 return self._id_to_obj[id]
>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>> from_runner_api(proto, context)   1277 result.parts = []   1278 for 
>> transform_id in proto.subtransforms:-> 1279   part = 
>> context.transforms.get_by_id(transform_id)   1280   part.parent = result 
>>   1281   result.parts.append(part)
>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>  in get_by_id(self, id)113 # type: (str) -> PortableObjectT114   
>>   if id not in self._id_to_obj:--> 115   self._id_to_obj[id] = 
>> self._obj_type.from_runner_api(116   self._id_to_proto[id], 
>> self._pipeline_context)117 return self._id_to_obj[id]
>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>> from_runner_api(proto, context)   1277 result.parts = []   1278 for 
>> transform_id in proto.subtransforms:-> 1279   part = 
>> context.transforms.get_by_id(transform_id)   1280   part.parent = result 
>>   1281   result.parts.append(part)
>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>  in get_by_id(self, id)113 # type: (str) -> PortableObjectT114   
>>   if id not in self._id_to_obj:--> 115   self._id_to_obj[id] = 
>> self._obj_type.from_runner_api(116   self._id_to_proto[id], 
>> self._pipeline_context)117 return self._id_to_obj[id]
>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>> from_runner_api(proto, conte

Re: snowflake io in python

2020-11-19 Thread Alan Krumholz
 is None or
proto.spec is None or not proto.spec.urn:689   return None-->
690 parameter_type, constructor = cls._known_urns[proto.spec.urn]
  691 692 try:
KeyError: 'beam:transform:write_files:v1'



On Thu, Nov 19, 2020 at 2:18 PM Brian Hulette  wrote:

> Ah ok, you'll need to use Dataflow runner v2 [1] to run this pipeline (add
> the flag '--experiments=use_runner_v2'). See also [2].
>
> [1]
> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#python_11
> [2]
> https://cloud.google.com/blog/products/data-analytics/multi-language-sdks-for-building-cloud-pipelines
>
> On Thu, Nov 19, 2020 at 11:10 AM Alan Krumholz 
> wrote:
>
>> DataFlow runner
>>
>> On Thu, Nov 19, 2020 at 2:00 PM Brian Hulette 
>> wrote:
>>
>>> Hm what runner are you using? It looks like we're trying to encode and
>>> decode the pipeline proto, which isn't possible for a multi-language
>>> pipeline. Are you using a portable runner?
>>>
>>> Brian
>>>
>>> On Thu, Nov 19, 2020 at 10:42 AM Alan Krumholz <
>>> alan.krumh...@betterup.co> wrote:
>>>
>>>> got it, thanks!
>>>> I was using:
>>>> 'xx.us-east-1'
>>>> Seems using this instead fixes that problem:
>>>> 'xx.us-east-1.snowflakecomputing.com
>>>>
>>>> I'm now hitting a different error though (now in python):
>>>>
>>>>  in bq_to_snowflake(bq_table,
>>>>> snow_table, git_branch)
>>>>> 161 )
>>>>> 162
>>>>> --> 163 result = pipeline.run()
>>>>> 164 result.wait_until_finish()
>>>>> 165
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py
>>>>> in run(self, test_runner_api)
>>>>> 512 # When possible, invoke a round trip through the runner API.
>>>>> 513 if test_runner_api and self._verify_runner_api_compatible():
>>>>> --> 514 return Pipeline.from_runner_api( 515 self.to_runner_api(
>>>>> use_fake_coders=True),
>>>>> 516 self.runner,
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py
>>>>> in from_runner_api(proto, runner, options, return_context,
>>>>> allow_proto_holders)
>>>>> 890 requirements=proto.requirements)
>>>>> 891 root_transform_id, = proto.root_transform_ids
>>>>> --> 892 p.transforms_stack = [context.transforms.get_by_id(
>>>>> root_transform_id)]
>>>>> 893 # TODO(robertwb): These are only needed to continue construction.
>>>>> Omit?
>>>>> 894 p.applied_labels = {
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>>> in get_by_id(self, id)
>>>>> 113 # type: (str) -> PortableObjectT
>>>>> 114 if id not in self._id_to_obj:
>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116
>>>>> self._id_to_proto[id], self._pipeline_context)
>>>>> 117 return self._id_to_obj[id]
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py
>>>>> in from_runner_api(proto, context)
>>>>> 1277 result.parts = []
>>>>> 1278 for transform_id in proto.subtransforms:
>>>>> -> 1279 part = context.transforms.get_by_id(transform_id)
>>>>> 1280 part.parent = result
>>>>> 1281 result.parts.append(part)
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>>> in get_by_id(self, id)
>>>>> 113 # type: (str) -> PortableObjectT
>>>>> 114 if id not in self._id_to_obj:
>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116
>>>>> self._id_to_proto[id], self._pipeline_context)
>>>>> 117 return self._id_to_obj[id]
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py
>>>>> in from_runner_api(proto, context)
>>>>> 1277 result.parts = []
>>>>> 1278 for transform_id in proto.subtransforms:
>>>>> -> 1279 part = context.transforms.get_by_id(transform_id)
>>>>> 1280 part.parent = result
>>>>> 1281 result.parts.append(part)
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>>> in get_by_id(self, id)
>>>>> 113 # type: (str) -> PortableObjectT
>>>>> 114 if id not i

Re: snowflake io in python

2020-11-19 Thread Alan Krumholz
DataFlow runner

On Thu, Nov 19, 2020 at 2:00 PM Brian Hulette  wrote:

> Hm what runner are you using? It looks like we're trying to encode and
> decode the pipeline proto, which isn't possible for a multi-language
> pipeline. Are you using a portable runner?
>
> Brian
>
> On Thu, Nov 19, 2020 at 10:42 AM Alan Krumholz 
> wrote:
>
>> got it, thanks!
>> I was using:
>> 'xx.us-east-1'
>> Seems using this instead fixes that problem:
>> 'xx.us-east-1.snowflakecomputing.com
>>
>> I'm now hitting a different error though (now in python):
>>
>>  in bq_to_snowflake(bq_table, snow_table,
>>> git_branch)
>>> 161 )
>>> 162
>>> --> 163 result = pipeline.run()
>>> 164 result.wait_until_finish()
>>> 165 ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py
>>> in run(self, test_runner_api)
>>> 512 # When possible, invoke a round trip through the runner API.
>>> 513 if test_runner_api and self._verify_runner_api_compatible():
>>> --> 514 return Pipeline.from_runner_api( 515 self.to_runner_api(
>>> use_fake_coders=True),
>>> 516 self.runner,
>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in
>>> from_runner_api(proto, runner, options, return_context,
>>> allow_proto_holders)
>>> 890 requirements=proto.requirements)
>>> 891 root_transform_id, = proto.root_transform_ids
>>> --> 892 p.transforms_stack = [context.transforms.get_by_id(
>>> root_transform_id)]
>>> 893 # TODO(robertwb): These are only needed to continue construction.
>>> Omit?
>>> 894 p.applied_labels = {
>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>> in get_by_id(self, id)
>>> 113 # type: (str) -> PortableObjectT
>>> 114 if id not in self._id_to_obj:
>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116
>>> self._id_to_proto[id], self._pipeline_context)
>>> 117 return self._id_to_obj[id]
>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in
>>> from_runner_api(proto, context)
>>> 1277 result.parts = []
>>> 1278 for transform_id in proto.subtransforms:
>>> -> 1279 part = context.transforms.get_by_id(transform_id)
>>> 1280 part.parent = result
>>> 1281 result.parts.append(part)
>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>> in get_by_id(self, id)
>>> 113 # type: (str) -> PortableObjectT
>>> 114 if id not in self._id_to_obj:
>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116
>>> self._id_to_proto[id], self._pipeline_context)
>>> 117 return self._id_to_obj[id]
>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in
>>> from_runner_api(proto, context)
>>> 1277 result.parts = []
>>> 1278 for transform_id in proto.subtransforms:
>>> -> 1279 part = context.transforms.get_by_id(transform_id)
>>> 1280 part.parent = result
>>> 1281 result.parts.append(part)
>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>> in get_by_id(self, id)
>>> 113 # type: (str) -> PortableObjectT
>>> 114 if id not in self._id_to_obj:
>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116
>>> self._id_to_proto[id], self._pipeline_context)
>>> 117 return self._id_to_obj[id]
>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in
>>> from_runner_api(proto, context)
>>> 1277 result.parts = []
>>> 1278 for transform_id in proto.subtransforms:
>>> -> 1279 part = context.transforms.get_by_id(transform_id)
>>> 1280 part.parent = result
>>> 1281 result.parts.append(part)
>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>> in get_by_id(self, id)
>>> 113 # type: (str) -> PortableObjectT
>>> 114 if id not in self._id_to_obj:
>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116
>>> self._id_to_proto[id], self._pipeline_context)
>>> 117 return self._id_to_obj[id]
>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in
>>> from_runner_api(proto, context)
>>> 1277 result.parts = []
>>> 1278 for transform_id in proto.subtransforms:
>>> -> 1279 part = context.transforms.get_by_id(transform_id)
>>> 1280 part.parent = result
>>> 1281

Re: snowflake io in python

2020-11-19 Thread Alan Krumholz
2 try: KeyError: 'beam:transform:write_files:v1'



I'll keep trying to make this work but sharing it in case you can easily
see what the problem is

Thanks so much!

On Thu, Nov 19, 2020 at 1:30 PM Brian Hulette  wrote:

> Hi Alan,
> Sorry this error message is so verbose. What are you passing for the
> server_name argument [1]? It looks like that's what the Java stacktrace is
> complaining about:
>
> java.lang.IllegalArgumentException: serverName must be in format
> .snowflakecomputing.com
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/snowflake.py#L302
>
> On Thu, Nov 19, 2020 at 10:16 AM Alan Krumholz 
> wrote:
>
>> I'm trying to replace my custom/problematic snowflake sink with the new:
>> https://beam.apache.org/documentation/io/built-in/snowflake/#writing-to-snowflake
>>
>> However when I try to run my pipeline  (using python) I get this Java
>> error:
>>
>> RuntimeError: java.lang.RuntimeException: Failed to build transform
>>> beam:external:java:snowflake:write:v1 from spec urn:
>>> "beam:external:java:snowflake:write:v1"
>>
>>
>> It is hard to understand why it is failing from reading the partial java 
>> error trace I get in the output:
>>
>>> at 
>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:130)
>>> at 
>>> org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:357)
>>> at 
>>> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:433)
>>> at 
>>> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:488)
>>> at 
>>> org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:232)
>>> at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
>>> at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
>>> at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
>>> at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>>> at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>>> at 
>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
>>> at 
>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
>>> at java.base/java.lang.Thread.run(Thread.java:832)
>>> Caused by: java.lang.IllegalArgumentException: serverName must be in format 
>>> .snowflakecomputing.com
>>> at 
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:141)
>>> at 
>>> org.apache.beam.sdk.io.snowflake.SnowflakeIO$DataSourceConfiguration.withServerName(SnowflakeIO.java:1700)
>>> at 
>>> org.apache.beam.sdk.io.snowflake.crosslanguage.CrossLanguageConfiguration.getDataSourceConfiguration(CrossLanguageConfiguration.java:166)
>>> at 
>>> org.apache.beam.sdk.io.snowflake.crosslanguage.WriteBuilder.buildExternal(WriteBuilder.java:78)
>>> at 
>>> org.apache.beam.sdk.io.snowflake.crosslanguage.WriteBuilder.buildExternal(WriteBuilder.java:34)
>>> at 
>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:125)
>>> ... 12 more
>>
>>
>>
>>  any clue how I can debug/fix this using python?
>>
>>
>>
>>


snowflake io in python

2020-11-19 Thread Alan Krumholz
I'm trying to replace my custom/problematic snowflake sink with the new:
https://beam.apache.org/documentation/io/built-in/snowflake/#writing-to-snowflake

However when I try to run my pipeline  (using python) I get this Java error:

RuntimeError: java.lang.RuntimeException: Failed to build transform
> beam:external:java:snowflake:write:v1 from spec urn:
> "beam:external:java:snowflake:write:v1"


It is hard to understand why it is failing from reading the partial
java error trace I get in the output:

> at 
> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:130)
>   at 
> org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:357)
>   at 
> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:433)
>   at 
> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:488)
>   at 
> org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:232)
>   at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
>   at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
>   at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
>   at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>   at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
>   at java.base/java.lang.Thread.run(Thread.java:832)
> Caused by: java.lang.IllegalArgumentException: serverName must be in format 
> .snowflakecomputing.com
>   at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:141)
>   at 
> org.apache.beam.sdk.io.snowflake.SnowflakeIO$DataSourceConfiguration.withServerName(SnowflakeIO.java:1700)
>   at 
> org.apache.beam.sdk.io.snowflake.crosslanguage.CrossLanguageConfiguration.getDataSourceConfiguration(CrossLanguageConfiguration.java:166)
>   at 
> org.apache.beam.sdk.io.snowflake.crosslanguage.WriteBuilder.buildExternal(WriteBuilder.java:78)
>   at 
> org.apache.beam.sdk.io.snowflake.crosslanguage.WriteBuilder.buildExternal(WriteBuilder.java:34)
>   at 
> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:125)
>   ... 12 more



 any clue how I can debug/fix this using python?


Re: Dataflow isn't parallelizing

2020-09-11 Thread Alan Krumholz
This seems to work!


Thanks so much Eugene and Luke!

On Fri, Sep 11, 2020 at 11:33 AM Luke Cwik  wrote:

> Inserting the Reshuffle is the easiest answer to test that parallelization
> starts happening.
>
> If the performance is good but you're materializing too much data at the
> shuffle boundary you'll want to convert your high fanout function (?Read
> from Snowflake?) into a splittable DoFn.
>
> On Fri, Sep 11, 2020 at 9:56 AM Eugene Kirpichov 
> wrote:
>
>> Hi,
>>
>> Most likely this is because of fusion - see
>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#fusion-optimization
>> . You need to insert a Reshuffle.viaRandomKey(), most likely after the
>> first step.
>>
>> On Fri, Sep 11, 2020 at 9:41 AM Alan Krumholz 
>> wrote:
>>
>>> Hi DataFlow team,
>>> I have a simple pipeline that I'm trying to speed up using DataFlow:
>>>
>>> [image: image.png]
>>>
>>> As you can see the bottleneck is the "transcribe mp3" step. I was hoping
>>> DataFlow would be able to run many of these in parallel to speed up the
>>> total execution time.
>>>
>>> However it seems it doesn't do that... and instead keeps executing it
>>> all independent inputs sequentially
>>> Even when I tried to force it to start with many workers it rapidly
>>> shuts down most of them and only keeps one alive and doesn't ever seem to
>>> parallelize this step :(
>>>
>>> Any advice on what else to try to make it do this?
>>>
>>> Thanks so much!
>>>
>>
>>
>> --
>> Eugene Kirpichov
>> http://www.linkedin.com/in/eugenekirpichov
>>
>


Dataflow isn't parallelizing

2020-09-11 Thread Alan Krumholz
Hi DataFlow team,
I have a simple pipeline that I'm trying to speed up using DataFlow:

[image: image.png]

As you can see the bottleneck is the "transcribe mp3" step. I was hoping
DataFlow would be able to run many of these in parallel to speed up the
total execution time.

However it seems it doesn't do that... and instead keeps executing it all
independent inputs sequentially
Even when I tried to force it to start with many workers it rapidly shuts
down most of them and only keeps one alive and doesn't ever seem to
parallelize this step :(

Any advice on what else to try to make it do this?

Thanks so much!


Re: Installing ffmpeg on a python dataflow job

2020-09-11 Thread Alan Krumholz
Hi Luke,
I copied the setup.py file from the docs without modifying it much and it
worked now.

Thank you

On Thu, Sep 10, 2020 at 11:12 AM Luke Cwik  wrote:

> Did ffmpeg install change $PATH? (this may not be visible to the current
> process)
> Have you tried the full path to the executable?
>
> On Wed, Sep 9, 2020 at 1:48 PM Alan Krumholz 
> wrote:
>
>> Hi DataFlow team,
>>
>> We are trying to use ffmpeg to process some video data using dataflow.
>> In order to do this we need the worker nodes to have ffmpeg installed.
>>
>> After reading Beam docs I created a setup.py file for my job like this:
>>
>> #!/usr/bin/python
>> import subprocess
>> from distutils.command.build import build as _build
>> import setuptools
>>
>> class build(_build):
>> sub_commands = _build.sub_commands + [('CustomCommands', None)]
>>
>> class CustomCommands(setuptools.Command):
>> def initialize_options(self):
>> pass
>>
>> def finalize_options(self):
>> pass
>>
>> def RunCustomCommand(self, command_list):
>> p = subprocess.Popen(
>> command_list,
>> stdin=subprocess.PIPE,
>> stdout=subprocess.PIPE,
>> stderr=subprocess.STDOUT)
>> stdout_data, _ = p.communicate()
>> if p.returncode != 0:
>> raise RuntimeError(
>> 'Command %s failed: exit code: %s' % (
>> command_list, p.returncode))
>>
>> def run(self):
>> for command in CUSTOM_COMMANDS:
>> self.RunCustomCommand(command)
>>
>> CUSTOM_COMMANDS = [
>> ['apt-get', 'update'],
>> ['apt-get', 'install', '-y', 'ffmpeg']]
>> REQUIRED_PACKAGES = [
>> 'boto3==1.11.17',
>> 'ffmpeg-python==0.2.0',
>> 'google-cloud-storage==1.31.0']
>> setuptools.setup(
>> name='DataflowJob',
>> version='0.1',
>> install_requires=REQUIRED_PACKAGES,
>> packages=setuptools.find_packages(),
>> mdclass={
>> 'build': build,
>> 'CustomCommands': CustomCommands})
>>
>> However, when I run the job I still get an error saying that ffmpeg is
>> not installed: "No such file or directory: 'ffmpeg'"
>>
>> Any clue what am I doing wrong?
>>
>> Thanks so much!
>>
>>


Installing ffmpeg on a python dataflow job

2020-09-09 Thread Alan Krumholz
Hi DataFlow team,

We are trying to use ffmpeg to process some video data using dataflow.
In order to do this we need the worker nodes to have ffmpeg installed.

After reading Beam docs I created a setup.py file for my job like this:

#!/usr/bin/python
import subprocess
from distutils.command.build import build as _build
import setuptools

class build(_build):
sub_commands = _build.sub_commands + [('CustomCommands', None)]

class CustomCommands(setuptools.Command):
def initialize_options(self):
pass

def finalize_options(self):
pass

def RunCustomCommand(self, command_list):
p = subprocess.Popen(
command_list,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
stdout_data, _ = p.communicate()
if p.returncode != 0:
raise RuntimeError(
'Command %s failed: exit code: %s' % (
command_list, p.returncode))

def run(self):
for command in CUSTOM_COMMANDS:
self.RunCustomCommand(command)

CUSTOM_COMMANDS = [
['apt-get', 'update'],
['apt-get', 'install', '-y', 'ffmpeg']]
REQUIRED_PACKAGES = [
'boto3==1.11.17',
'ffmpeg-python==0.2.0',
'google-cloud-storage==1.31.0']
setuptools.setup(
name='DataflowJob',
version='0.1',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages(),
mdclass={
'build': build,
'CustomCommands': CustomCommands})

However, when I run the job I still get an error saying that ffmpeg is not
installed: "No such file or directory: 'ffmpeg'"

Any clue what am I doing wrong?

Thanks so much!


Re: daily dataflow job failing today

2020-02-12 Thread Alan Krumholz
makes sense. I'll add this workaround for now.
Thanks so much for your help!

On Wed, Feb 12, 2020 at 10:33 AM Valentyn Tymofieiev 
wrote:

> Alan, Dataflow workers preinstall Beam SDK dependencies, including (a
> working version) of avro-python3. So after reading your email once again, I
> think in your case you were not able to install Beam SDK locally. So a
> workaround for you would be to `pip install avro-python3==1.9.1` or `pip
> install pycodestyle`  before installing Beam, until AVRO-2737 is resolved.
>
>
> On Wed, Feb 12, 2020 at 10:21 AM Valentyn Tymofieiev 
> wrote:
>
>> Ah, there's already https://issues.apache.org/jira/browse/AVRO-2737 and
>> it received attention.
>>
>> On Wed, Feb 12, 2020 at 10:19 AM Valentyn Tymofieiev 
>> wrote:
>>
>>> Opened https://issues.apache.org/jira/browse/AVRO-2738
>>>
>>> On Wed, Feb 12, 2020 at 10:14 AM Valentyn Tymofieiev <
>>> valen...@google.com> wrote:
>>>
>>>> Here's a short repro:
>>>>
>>>> :~$ docker run -it --entrypoint=/bin/bash python:3.7-stretch
>>>> root@04b45a100d16:/# pip install avro-python3
>>>> Collecting avro-python3
>>>>   Downloading avro-python3-1.9.2.tar.gz (37 kB)
>>>> ERROR: Command errored out with exit status 1:
>>>>  command: /usr/local/bin/python -c 'import sys, setuptools,
>>>> tokenize; sys.argv[0] =
>>>> '"'"'/tmp/pip-install-mmy4vspt/avro-python3/setup.py'"'"';
>>>> __file__='"'"'/tmp/pip-install-mmy4vspt/avro-python3/setup.py'"'"';f=getattr(tokenize,
>>>> '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"',
>>>> '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))'
>>>> egg_info --egg-base /tmp/pip-install-mmy4vspt/avro-python3/pip-egg-info
>>>>  cwd: /tmp/pip-install-mmy4vspt/avro-python3/
>>>> Complete output (5 lines):
>>>> Traceback (most recent call last):
>>>>   File "", line 1, in 
>>>>   File "/tmp/pip-install-mmy4vspt/avro-python3/setup.py", line 41,
>>>> in 
>>>> import pycodestyle
>>>> ModuleNotFoundError: No module named 'pycodestyle'
>>>> 
>>>> ERROR: Command errored out with exit status 1: python setup.py egg_info
>>>> Check the logs for full command output.
>>>> root@04b45a100d16:/#
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Feb 12, 2020 at 10:14 AM Valentyn Tymofieiev <
>>>> valen...@google.com> wrote:
>>>>
>>>>> Yes, it is a bug in the recent Avro release. We should report it
>>>>> to the Avro maintainers. The workaround is to downgrade avro-python3 to
>>>>> 1.9.1, for example via requirements.txt.
>>>>>
>>>>> On Wed, Feb 12, 2020 at 10:06 AM Steve Niemitz 
>>>>> wrote:
>>>>>
>>>>>> avro-python3 1.9.2 was released on pypi 4 hours ago, and
>>>>>> added pycodestyle as a dependency, probably related?
>>>>>>
>>>>>> On Wed, Feb 12, 2020 at 1:03 PM Luke Cwik  wrote:
>>>>>>
>>>>>>> +dev 
>>>>>>>
>>>>>>> There was recently an update to add autoformatting to the Python
>>>>>>> SDK[1].
>>>>>>>
>>>>>>> I'm seeing this during testing of a PR as well.
>>>>>>>
>>>>>>> 1:
>>>>>>> https://lists.apache.org/thread.html/448bb5c2d73fbd74eec7aacb5f28fa2f9d791784c2e53a2e3325627a%40%3Cdev.beam.apache.org%3E
>>>>>>>
>>>>>>> On Wed, Feb 12, 2020 at 9:57 AM Alan Krumholz <
>>>>>>> alan.krumh...@betterup.co> wrote:
>>>>>>>
>>>>>>>> Some more information for this as I still can't get to fix it
>>>>>>>>
>>>>>>>> This job is triggered using the beam[gcp] python sdk from a
>>>>>>>> KubeFlow Pipelines component which runs on top of docker image:
>>>>>>>> tensorflow/tensorflow:1.13.1-py3
>>>>>>>>
>>>>>>>> I just checked and that image hasn't been updated recently. I a

Re: daily dataflow job failing today

2020-02-12 Thread Alan Krumholz
Some more information for this as I still can't get to fix it

This job is triggered using the beam[gcp] python sdk from a KubeFlow
Pipelines component which runs on top of docker image:
tensorflow/tensorflow:1.13.1-py3

I just checked and that image hasn't been updated recently. I also
redeployed my pipeline to another (older) deployment of KFP and it gives me
the same error (which tells me this isn't an internal KFP problem)

The exact same pipeline/code running on the exact same image has been
running fine for days. Did anything changed on the beam/dataflow side since
yesterday morning?

Thanks for your help! this is a production pipeline that is not running for
us :(



On Wed, Feb 12, 2020 at 7:21 AM Alan Krumholz 
wrote:

> Hi, I have a scheduled daily job that I have been running fine in dataflow
> for days now.
> We haven't changed anything on this code but this morning run failed  (it
> couldn't even spin up the job)
> The job submits a setup.py file (that also hasn't changed) but maybe is
> causing the problem? (based on the error I'm getting)
>
> Anyone else having the same issue? or know how to fix it?
> Thanks!
>
> ERROR: Complete output from command python setup.py egg_info:
> 2 ERROR: Traceback (most recent call last):
> 3 File "", line 1, in 
> 4 File "/tmp/pip-install-42zyi89t/avro-python3/setup.py", line 41, in
> 
> 5 import pycodestyle
> 6 ImportError: No module named 'pycodestyle'
> 7 
> 8ERROR: Command "python setup.py egg_info" failed with error code 1 in
> /tmp/pip-install-42zyi89t/avro-python3/
> 9 ERROR: Complete output from command python setup.py egg_info:
> 10 ERROR: Traceback (most recent call last):
> 11 File "", line 1, in 
> 12 File "/tmp/pip-install-wrqytf9a/avro-python3/setup.py", line 41, in
> 
> 13 import pycodestyle
> 14 ImportError: No module named 'pycodestyle'
> 15 
> 16ERROR: Command "python setup.py egg_info" failed with error code 1 in
> /tmp/pip-install-wrqytf9a/avro-python3/
>


daily dataflow job failing today

2020-02-12 Thread Alan Krumholz
Hi, I have a scheduled daily job that I have been running fine in dataflow
for days now.
We haven't changed anything on this code but this morning run failed  (it
couldn't even spin up the job)
The job submits a setup.py file (that also hasn't changed) but maybe is
causing the problem? (based on the error I'm getting)

Anyone else having the same issue? or know how to fix it?
Thanks!

ERROR: Complete output from command python setup.py egg_info:
2 ERROR: Traceback (most recent call last):
3 File "", line 1, in 
4 File "/tmp/pip-install-42zyi89t/avro-python3/setup.py", line 41, in

5 import pycodestyle
6 ImportError: No module named 'pycodestyle'
7 
8ERROR: Command "python setup.py egg_info" failed with error code 1 in
/tmp/pip-install-42zyi89t/avro-python3/
9 ERROR: Complete output from command python setup.py egg_info:
10 ERROR: Traceback (most recent call last):
11 File "", line 1, in 
12 File "/tmp/pip-install-wrqytf9a/avro-python3/setup.py", line 41, in

13 import pycodestyle
14 ImportError: No module named 'pycodestyle'
15 
16ERROR: Command "python setup.py egg_info" failed with error code 1 in
/tmp/pip-install-wrqytf9a/avro-python3/


Re: dataflow job was working fine last night and it isn't now

2020-02-07 Thread Alan Krumholz
perfect! thank you!

On Fri, Feb 7, 2020 at 10:54 AM Valentyn Tymofieiev 
wrote:

> Thanks for your feedback. We expect that this issue will be fixed in
> cloudpickle==1.3.0. Per [1], this release may be available next week.
>
> After that you can install the fixed version of cloudpickle until the AI
> notebook image picks up the new version.
>
> [1] https://github.com/cloudpipe/cloudpickle/pull/337
>
> On Tue, Feb 4, 2020 at 12:44 PM Alan Krumholz 
> wrote:
>
>> Seems like the image we use in KFP to orchestrate the job has 
>> cloudpickle==0.8.1
>> and that one doesn't seem to cause issues.
>> I think I'm unblock for now but I'm sure I won't be the last one to try
>> to do this using GCP managed notebooks :(
>>
>> Thanks for all the help!
>>
>>
>> On Tue, Feb 4, 2020 at 12:24 PM Alan Krumholz 
>> wrote:
>>
>>> I'm using a managed notebook instance from GCP
>>> It seems those already come with cloudpickle==1.2.2 as soon as you
>>> provision it. apache-beam[gcp] will then install dill==0.3.1.1 I'm
>>> going to try to uninstall cloudpickle before installing apache-beam and see
>>> if this fixes the problem
>>>
>>> Thank you
>>>
>>> On Tue, Feb 4, 2020 at 11:54 AM Valentyn Tymofieiev 
>>> wrote:
>>>
>>>> The fact that you have cloudpickle==1.2.2 further confirms that you
>>>> may be hitting the same error as
>>>> https://stackoverflow.com/questions/42960637/python-3-5-dill-pickling-unpickling-on-different-servers-keyerror-classtype
>>>>  .
>>>>
>>>> Could you try to start over with a clean virtual environment?
>>>>
>>>> On Tue, Feb 4, 2020 at 11:46 AM Alan Krumholz <
>>>> alan.krumh...@betterup.co> wrote:
>>>>
>>>>> Hi Valentyn,
>>>>>
>>>>> Here is my pip freeze on my machine (note that the error is in
>>>>> dataflow, the job runs fine in my machine)
>>>>>
>>>>> ansiwrap==0.8.4
>>>>> apache-beam==2.19.0
>>>>> arrow==0.15.5
>>>>> asn1crypto==1.3.0
>>>>> astroid==2.3.3
>>>>> astropy==3.2.3
>>>>> attrs==19.3.0
>>>>> avro-python3==1.9.1
>>>>> azure-common==1.1.24
>>>>> azure-storage-blob==2.1.0
>>>>> azure-storage-common==2.1.0
>>>>> backcall==0.1.0
>>>>> bcolz==1.2.1
>>>>> binaryornot==0.4.4
>>>>> bleach==3.1.0
>>>>> boto3==1.11.9
>>>>> botocore==1.14.9
>>>>> cachetools==3.1.1
>>>>> certifi==2019.11.28
>>>>> cffi==1.13.2
>>>>> chardet==3.0.4
>>>>> Click==7.0
>>>>> cloudpickle==1.2.2
>>>>> colorama==0.4.3
>>>>> configparser==4.0.2
>>>>> confuse==1.0.0
>>>>> cookiecutter==1.7.0
>>>>> crcmod==1.7
>>>>> cryptography==2.8
>>>>> cycler==0.10.0
>>>>> daal==2019.0
>>>>> datalab==1.1.5
>>>>> decorator==4.4.1
>>>>> defusedxml==0.6.0
>>>>> dill==0.3.1.1
>>>>> distro==1.0.1
>>>>> docker==4.1.0
>>>>> docopt==0.6.2
>>>>> docutils==0.15.2
>>>>> entrypoints==0.3
>>>>> enum34==1.1.6
>>>>> fairing==0.5.3
>>>>> fastavro==0.21.24
>>>>> fasteners==0.15
>>>>> fsspec==0.6.2
>>>>> future==0.18.2
>>>>> gcsfs==0.6.0
>>>>> gitdb2==2.0.6
>>>>> GitPython==3.0.5
>>>>> google-api-core==1.16.0
>>>>> google-api-python-client==1.7.11
>>>>> google-apitools==0.5.28
>>>>> google-auth==1.11.0
>>>>> google-auth-httplib2==0.0.3
>>>>> google-auth-oauthlib==0.4.1
>>>>> google-cloud-bigquery==1.17.1
>>>>> google-cloud-bigtable==1.0.0
>>>>> google-cloud-core==1.2.0
>>>>> google-cloud-dataproc==0.6.1
>>>>> google-cloud-datastore==1.7.4
>>>>> google-cloud-language==1.3.0
>>>>> google-cloud-logging==1.14.0
>>>>> google-cloud-monitoring==0.31.1
>>>>> google-cloud-pubsub==1.0.2
>>>>> google-cloud-secret-manager==0.1.1
>>>>> google-cloud-spanner==1.13.0
>>>>> google-cloud-storage==1.25.0
>>>>> google-cloud-translate==2.0.0
>>>>> google-compute-

Re: seems beam.util.GroupIntoBatches is not supported in DataFlow. Any alternative?

2020-02-05 Thread Alan Krumholz
OK, seems like beam.BatchElements(max_batch_size=x) will do the trick for
me and runs fine in DataFlow!

On Wed, Feb 5, 2020 at 7:38 AM Alan Krumholz 
wrote:

> Actually beam.GroupIntoBatches() gives me the same error as
> beam.util.GroupIntoBatches() :(
> back to square one.
>
> Any other ideas?
>
> Thank you!
>
>
> On Wed, Feb 5, 2020 at 7:32 AM Alan Krumholz 
> wrote:
>
>> Never mind there seems to be a  beam.GroupIntoBatches()  that I
>> should have originally used instead of beam.util.GroupIntoBatches()
>>
>> On Wed, Feb 5, 2020 at 7:19 AM Alan Krumholz 
>> wrote:
>>
>>> Hello, I'm having issues running beam.util.GroupIntoBatches() in
>>> DataFlow.
>>>
>>> I get the following error message:
>>>
>>> Exception: Requested execution of a stateful DoFn, but no user state
>>>> context is available. This likely means that the current runner does not
>>>> support the execution of stateful DoFns
>>>
>>>
>>> Seems to be related to:
>>>
>>> https://stackoverflow.com/questions/56403572/no-userstate-context-is-available-google-cloud-dataflow
>>>
>>> Is there another way I can achieve the same using other beam function?
>>>
>>> I basically want to batch rows into groups of 100 as it is a lot faster
>>> to transform all at once than doing it 1 by 1.
>>>
>>> I also was planning to use this function for a custom snowflake sink (so
>>> I could insert many rows at once)
>>>
>>> I'm sure there must be another way to do this in DataFlow but not sure
>>> how?
>>>
>>> Thanks so much!
>>>
>>


Re: seems beam.util.GroupIntoBatches is not supported in DataFlow. Any alternative?

2020-02-05 Thread Alan Krumholz
Actually beam.GroupIntoBatches() gives me the same error as
beam.util.GroupIntoBatches() :(
back to square one.

Any other ideas?

Thank you!


On Wed, Feb 5, 2020 at 7:32 AM Alan Krumholz 
wrote:

> Never mind there seems to be a  beam.GroupIntoBatches()  that I
> should have originally used instead of beam.util.GroupIntoBatches()
>
> On Wed, Feb 5, 2020 at 7:19 AM Alan Krumholz 
> wrote:
>
>> Hello, I'm having issues running beam.util.GroupIntoBatches() in DataFlow.
>>
>> I get the following error message:
>>
>> Exception: Requested execution of a stateful DoFn, but no user state
>>> context is available. This likely means that the current runner does not
>>> support the execution of stateful DoFns
>>
>>
>> Seems to be related to:
>>
>> https://stackoverflow.com/questions/56403572/no-userstate-context-is-available-google-cloud-dataflow
>>
>> Is there another way I can achieve the same using other beam function?
>>
>> I basically want to batch rows into groups of 100 as it is a lot faster
>> to transform all at once than doing it 1 by 1.
>>
>> I also was planning to use this function for a custom snowflake sink (so
>> I could insert many rows at once)
>>
>> I'm sure there must be another way to do this in DataFlow but not sure
>> how?
>>
>> Thanks so much!
>>
>


Re: seems beam.util.GroupIntoBatches is not supported in DataFlow. Any alternative?

2020-02-05 Thread Alan Krumholz
Never mind there seems to be a  beam.GroupIntoBatches()  that I should have
originally used instead of beam.util.GroupIntoBatches()

On Wed, Feb 5, 2020 at 7:19 AM Alan Krumholz 
wrote:

> Hello, I'm having issues running beam.util.GroupIntoBatches() in DataFlow.
>
> I get the following error message:
>
> Exception: Requested execution of a stateful DoFn, but no user state
>> context is available. This likely means that the current runner does not
>> support the execution of stateful DoFns
>
>
> Seems to be related to:
>
> https://stackoverflow.com/questions/56403572/no-userstate-context-is-available-google-cloud-dataflow
>
> Is there another way I can achieve the same using other beam function?
>
> I basically want to batch rows into groups of 100 as it is a lot faster to
> transform all at once than doing it 1 by 1.
>
> I also was planning to use this function for a custom snowflake sink (so I
> could insert many rows at once)
>
> I'm sure there must be another way to do this in DataFlow but not sure how?
>
> Thanks so much!
>


seems beam.util.GroupIntoBatches is not supported in DataFlow. Any alternative?

2020-02-05 Thread Alan Krumholz
Hello, I'm having issues running beam.util.GroupIntoBatches() in DataFlow.

I get the following error message:

Exception: Requested execution of a stateful DoFn, but no user state
> context is available. This likely means that the current runner does not
> support the execution of stateful DoFns


Seems to be related to:
https://stackoverflow.com/questions/56403572/no-userstate-context-is-available-google-cloud-dataflow

Is there another way I can achieve the same using other beam function?

I basically want to batch rows into groups of 100 as it is a lot faster to
transform all at once than doing it 1 by 1.

I also was planning to use this function for a custom snowflake sink (so I
could insert many rows at once)

I'm sure there must be another way to do this in DataFlow but not sure how?

Thanks so much!


Re: dataflow job was working fine last night and it isn't now

2020-02-04 Thread Alan Krumholz
Seems like the image we use in KFP to orchestrate the job has
cloudpickle==0.8.1
and that one doesn't seem to cause issues.
I think I'm unblock for now but I'm sure I won't be the last one to try to
do this using GCP managed notebooks :(

Thanks for all the help!


On Tue, Feb 4, 2020 at 12:24 PM Alan Krumholz 
wrote:

> I'm using a managed notebook instance from GCP
> It seems those already come with cloudpickle==1.2.2 as soon as you
> provision it. apache-beam[gcp] will then install dill==0.3.1.1 I'm going
> to try to uninstall cloudpickle before installing apache-beam and see if
> this fixes the problem
>
> Thank you
>
> On Tue, Feb 4, 2020 at 11:54 AM Valentyn Tymofieiev 
> wrote:
>
>> The fact that you have cloudpickle==1.2.2 further confirms that you may
>> be hitting the same error as
>> https://stackoverflow.com/questions/42960637/python-3-5-dill-pickling-unpickling-on-different-servers-keyerror-classtype
>>  .
>>
>> Could you try to start over with a clean virtual environment?
>>
>> On Tue, Feb 4, 2020 at 11:46 AM Alan Krumholz 
>> wrote:
>>
>>> Hi Valentyn,
>>>
>>> Here is my pip freeze on my machine (note that the error is in dataflow,
>>> the job runs fine in my machine)
>>>
>>> ansiwrap==0.8.4
>>> apache-beam==2.19.0
>>> arrow==0.15.5
>>> asn1crypto==1.3.0
>>> astroid==2.3.3
>>> astropy==3.2.3
>>> attrs==19.3.0
>>> avro-python3==1.9.1
>>> azure-common==1.1.24
>>> azure-storage-blob==2.1.0
>>> azure-storage-common==2.1.0
>>> backcall==0.1.0
>>> bcolz==1.2.1
>>> binaryornot==0.4.4
>>> bleach==3.1.0
>>> boto3==1.11.9
>>> botocore==1.14.9
>>> cachetools==3.1.1
>>> certifi==2019.11.28
>>> cffi==1.13.2
>>> chardet==3.0.4
>>> Click==7.0
>>> cloudpickle==1.2.2
>>> colorama==0.4.3
>>> configparser==4.0.2
>>> confuse==1.0.0
>>> cookiecutter==1.7.0
>>> crcmod==1.7
>>> cryptography==2.8
>>> cycler==0.10.0
>>> daal==2019.0
>>> datalab==1.1.5
>>> decorator==4.4.1
>>> defusedxml==0.6.0
>>> dill==0.3.1.1
>>> distro==1.0.1
>>> docker==4.1.0
>>> docopt==0.6.2
>>> docutils==0.15.2
>>> entrypoints==0.3
>>> enum34==1.1.6
>>> fairing==0.5.3
>>> fastavro==0.21.24
>>> fasteners==0.15
>>> fsspec==0.6.2
>>> future==0.18.2
>>> gcsfs==0.6.0
>>> gitdb2==2.0.6
>>> GitPython==3.0.5
>>> google-api-core==1.16.0
>>> google-api-python-client==1.7.11
>>> google-apitools==0.5.28
>>> google-auth==1.11.0
>>> google-auth-httplib2==0.0.3
>>> google-auth-oauthlib==0.4.1
>>> google-cloud-bigquery==1.17.1
>>> google-cloud-bigtable==1.0.0
>>> google-cloud-core==1.2.0
>>> google-cloud-dataproc==0.6.1
>>> google-cloud-datastore==1.7.4
>>> google-cloud-language==1.3.0
>>> google-cloud-logging==1.14.0
>>> google-cloud-monitoring==0.31.1
>>> google-cloud-pubsub==1.0.2
>>> google-cloud-secret-manager==0.1.1
>>> google-cloud-spanner==1.13.0
>>> google-cloud-storage==1.25.0
>>> google-cloud-translate==2.0.0
>>> google-compute-engine==20191210.0
>>> google-resumable-media==0.4.1
>>> googleapis-common-protos==1.51.0
>>> grpc-google-iam-v1==0.12.3
>>> grpcio==1.26.0
>>> h5py==2.10.0
>>> hdfs==2.5.8
>>> html5lib==1.0.1
>>> htmlmin==0.1.12
>>> httplib2==0.12.0
>>> icc-rt==2020.0.133
>>> idna==2.8
>>> ijson==2.6.1
>>> imageio==2.6.1
>>> importlib-metadata==1.4.0
>>> intel-numpy==1.15.1
>>> intel-openmp==2020.0.133
>>> intel-scikit-learn==0.19.2
>>> intel-scipy==1.1.0
>>> ipykernel==5.1.4
>>> ipython==7.9.0
>>> ipython-genutils==0.2.0
>>> ipython-sql==0.3.9
>>> ipywidgets==7.5.1
>>> isort==4.3.21
>>> jedi==0.16.0
>>> Jinja2==2.11.0
>>> jinja2-time==0.2.0
>>> jmespath==0.9.4
>>> joblib==0.14.1
>>> json5==0.8.5
>>> jsonschema==3.2.0
>>> jupyter==1.0.0
>>> jupyter-aihub-deploy-extension==0.1
>>> jupyter-client==5.3.4
>>> jupyter-console==6.1.0
>>> jupyter-contrib-core==0.3.3
>>> jupyter-contrib-nbextensions==0.5.1
>>> jupyter-core==4.6.1
>>> jupyter-highlight-selected-word==0.2.0
>>> jupyter-http-over-ws==0.0.7
>>> jupyter-latex-envs==1.4.6
>>

Re: dataflow job was working fine last night and it isn't now

2020-02-04 Thread Alan Krumholz
I'm using a managed notebook instance from GCP
It seems those already come with cloudpickle==1.2.2 as soon as you
provision it. apache-beam[gcp] will then install dill==0.3.1.1 I'm going to
try to uninstall cloudpickle before installing apache-beam and see if this
fixes the problem

Thank you

On Tue, Feb 4, 2020 at 11:54 AM Valentyn Tymofieiev 
wrote:

> The fact that you have cloudpickle==1.2.2 further confirms that you may
> be hitting the same error as
> https://stackoverflow.com/questions/42960637/python-3-5-dill-pickling-unpickling-on-different-servers-keyerror-classtype
>  .
>
> Could you try to start over with a clean virtual environment?
>
> On Tue, Feb 4, 2020 at 11:46 AM Alan Krumholz 
> wrote:
>
>> Hi Valentyn,
>>
>> Here is my pip freeze on my machine (note that the error is in dataflow,
>> the job runs fine in my machine)
>>
>> ansiwrap==0.8.4
>> apache-beam==2.19.0
>> arrow==0.15.5
>> asn1crypto==1.3.0
>> astroid==2.3.3
>> astropy==3.2.3
>> attrs==19.3.0
>> avro-python3==1.9.1
>> azure-common==1.1.24
>> azure-storage-blob==2.1.0
>> azure-storage-common==2.1.0
>> backcall==0.1.0
>> bcolz==1.2.1
>> binaryornot==0.4.4
>> bleach==3.1.0
>> boto3==1.11.9
>> botocore==1.14.9
>> cachetools==3.1.1
>> certifi==2019.11.28
>> cffi==1.13.2
>> chardet==3.0.4
>> Click==7.0
>> cloudpickle==1.2.2
>> colorama==0.4.3
>> configparser==4.0.2
>> confuse==1.0.0
>> cookiecutter==1.7.0
>> crcmod==1.7
>> cryptography==2.8
>> cycler==0.10.0
>> daal==2019.0
>> datalab==1.1.5
>> decorator==4.4.1
>> defusedxml==0.6.0
>> dill==0.3.1.1
>> distro==1.0.1
>> docker==4.1.0
>> docopt==0.6.2
>> docutils==0.15.2
>> entrypoints==0.3
>> enum34==1.1.6
>> fairing==0.5.3
>> fastavro==0.21.24
>> fasteners==0.15
>> fsspec==0.6.2
>> future==0.18.2
>> gcsfs==0.6.0
>> gitdb2==2.0.6
>> GitPython==3.0.5
>> google-api-core==1.16.0
>> google-api-python-client==1.7.11
>> google-apitools==0.5.28
>> google-auth==1.11.0
>> google-auth-httplib2==0.0.3
>> google-auth-oauthlib==0.4.1
>> google-cloud-bigquery==1.17.1
>> google-cloud-bigtable==1.0.0
>> google-cloud-core==1.2.0
>> google-cloud-dataproc==0.6.1
>> google-cloud-datastore==1.7.4
>> google-cloud-language==1.3.0
>> google-cloud-logging==1.14.0
>> google-cloud-monitoring==0.31.1
>> google-cloud-pubsub==1.0.2
>> google-cloud-secret-manager==0.1.1
>> google-cloud-spanner==1.13.0
>> google-cloud-storage==1.25.0
>> google-cloud-translate==2.0.0
>> google-compute-engine==20191210.0
>> google-resumable-media==0.4.1
>> googleapis-common-protos==1.51.0
>> grpc-google-iam-v1==0.12.3
>> grpcio==1.26.0
>> h5py==2.10.0
>> hdfs==2.5.8
>> html5lib==1.0.1
>> htmlmin==0.1.12
>> httplib2==0.12.0
>> icc-rt==2020.0.133
>> idna==2.8
>> ijson==2.6.1
>> imageio==2.6.1
>> importlib-metadata==1.4.0
>> intel-numpy==1.15.1
>> intel-openmp==2020.0.133
>> intel-scikit-learn==0.19.2
>> intel-scipy==1.1.0
>> ipykernel==5.1.4
>> ipython==7.9.0
>> ipython-genutils==0.2.0
>> ipython-sql==0.3.9
>> ipywidgets==7.5.1
>> isort==4.3.21
>> jedi==0.16.0
>> Jinja2==2.11.0
>> jinja2-time==0.2.0
>> jmespath==0.9.4
>> joblib==0.14.1
>> json5==0.8.5
>> jsonschema==3.2.0
>> jupyter==1.0.0
>> jupyter-aihub-deploy-extension==0.1
>> jupyter-client==5.3.4
>> jupyter-console==6.1.0
>> jupyter-contrib-core==0.3.3
>> jupyter-contrib-nbextensions==0.5.1
>> jupyter-core==4.6.1
>> jupyter-highlight-selected-word==0.2.0
>> jupyter-http-over-ws==0.0.7
>> jupyter-latex-envs==1.4.6
>> jupyter-nbextensions-configurator==0.4.1
>> jupyterlab==1.2.6
>> jupyterlab-git==0.9.0
>> jupyterlab-server==1.0.6
>> keyring==10.1
>> keyrings.alt==1.3
>> kiwisolver==1.1.0
>> kubernetes==10.0.1
>> lazy-object-proxy==1.4.3
>> llvmlite==0.31.0
>> lxml==4.4.2
>> Markdown==3.1.1
>> MarkupSafe==1.1.1
>> matplotlib==3.0.3
>> mccabe==0.6.1
>> missingno==0.4.2
>> mistune==0.8.4
>> mkl==2019.0
>> mkl-fft==1.0.6
>> mkl-random==1.0.1.1
>> mock==2.0.0
>> monotonic==1.5
>> more-itertools==8.1.0
>> nbconvert==5.6.1
>> nbdime==1.1.0
>> nbformat==5.0.4
>> networkx==2.4
>> nltk==3.4.5
>> notebook==6.0.3
>> numba==0.47.0
>> numpy==1.15.1
>> oauth2client==3.0.0
>> oauthlib==3.1.0

Re: dataflow job was working fine last night and it isn't now

2020-02-04 Thread Alan Krumholz
Here is a test job that sometimes fails and sometimes doesn't (but most
times do).
There seems to be something stochastic that causes this as after several
tests a couple of them did succeed


def test_error(
bq_table: str) -> str:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

class GenData(beam.DoFn):
def process(self, _):
for _ in range (2):
yield {'a':1,'b':2}


def get_bigquery_schema():
from apache_beam.io.gcp.internal.clients import bigquery

table_schema = bigquery.TableSchema()
columns = [
["a","integer","nullable"],
["b","integer","nullable"]
]

for column in columns:
column_schema = bigquery.TableFieldSchema()
column_schema.name = column[0]
column_schema.type = column[1]
column_schema.mode = column[2]
table_schema.fields.append(column_schema)

return table_schema

pipeline = beam.Pipeline(options=PipelineOptions(
project='my-project',
temp_location = 'gs://my-bucket/temp',
staging_location = 'gs://my-bucket/staging',
runner='DataflowRunner'
))
#pipeline = beam.Pipeline()

(
pipeline
| 'Empty start' >> beam.Create([''])
| 'Generate Data' >> beam.ParDo(GenData())
#| 'print' >> beam.Map(print)
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
project=bq_table.split(':')[0],
dataset=bq_table.split(':')[1].split('.')[0],
table=bq_table.split(':')[1].split('.')[1],
schema=get_bigquery_schema(),

create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,

write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
)

result = pipeline.run()
result.wait_until_finish()

return True

test_error(
    bq_table = 'my-project:my_dataset.my_table'
)

On Tue, Feb 4, 2020 at 10:04 AM Alan Krumholz 
wrote:

> I tried breaking apart my pipeline. Seems the step that breaks it is:
> beam.io.WriteToBigQuery
>
> Let me see if I can create a self contained example that breaks to share
> with you
>
> Thanks!
>
> On Tue, Feb 4, 2020 at 9:53 AM Pablo Estrada  wrote:
>
>> Hm that's odd. No changes to the pipeline? Are you able to share some of
>> the code?
>>
>> +Udi Meiri  do you have any idea what could be going
>> on here?
>>
>> On Tue, Feb 4, 2020 at 9:25 AM Alan Krumholz 
>> wrote:
>>
>>> Hi Pablo,
>>> This is strange... it doesn't seem to be the last beam release as last
>>> night it was already using 2.19.0 I wonder if it was some release from the
>>> DataFlow team (not beam related):
>>> Job typeBatch
>>> Job status Succeeded
>>> SDK version
>>> Apache Beam Python 3.5 SDK 2.19.0
>>> Region
>>> us-central1
>>> Start timeFebruary 3, 2020 at 9:28:35 PM GMT-8
>>> Elapsed time5 min 11 sec
>>>
>>> On Tue, Feb 4, 2020 at 9:15 AM Pablo Estrada  wrote:
>>>
>>>> Hi Alan,
>>>> could it be that you're picking up the new Apache Beam 2.19.0 release?
>>>> Could you try depending on beam 2.18.0 to see if the issue surfaces when
>>>> using the new release?
>>>>
>>>> If something was working and no longer works, it sounds like a bug.
>>>> This may have to do with how we pickle (dill / cloudpickle) - see this
>>>> question
>>>> https://stackoverflow.com/questions/42960637/python-3-5-dill-pickling-unpickling-on-different-servers-keyerror-classtype
>>>> Best
>>>> -P.
>>>>
>>>> On Tue, Feb 4, 2020 at 6:22 AM Alan Krumholz 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I was running a dataflow job in GCP last night and it was running fine.
>>>>> This morning this same exact job is failing with the following error:
>>>>>
>>>>> Error message from worker: Traceback (most recent call last): File
>>>>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
>>>>> line 286, in loads return dill.loads(s) File
>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 275, in loads
>>>>> return load(file, ignore, **kwds) File
>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 270, in load
>>>>> return Unpickler(file, ignore=ignore, **kwds).load() File
>>&g

Re: dataflow job was working fine last night and it isn't now

2020-02-04 Thread Alan Krumholz
I tried breaking apart my pipeline. Seems the step that breaks it is:
beam.io.WriteToBigQuery

Let me see if I can create a self contained example that breaks to share
with you

Thanks!

On Tue, Feb 4, 2020 at 9:53 AM Pablo Estrada  wrote:

> Hm that's odd. No changes to the pipeline? Are you able to share some of
> the code?
>
> +Udi Meiri  do you have any idea what could be going on
> here?
>
> On Tue, Feb 4, 2020 at 9:25 AM Alan Krumholz 
> wrote:
>
>> Hi Pablo,
>> This is strange... it doesn't seem to be the last beam release as last
>> night it was already using 2.19.0 I wonder if it was some release from the
>> DataFlow team (not beam related):
>> Job typeBatch
>> Job status Succeeded
>> SDK version
>> Apache Beam Python 3.5 SDK 2.19.0
>> Region
>> us-central1
>> Start timeFebruary 3, 2020 at 9:28:35 PM GMT-8
>> Elapsed time5 min 11 sec
>>
>> On Tue, Feb 4, 2020 at 9:15 AM Pablo Estrada  wrote:
>>
>>> Hi Alan,
>>> could it be that you're picking up the new Apache Beam 2.19.0 release?
>>> Could you try depending on beam 2.18.0 to see if the issue surfaces when
>>> using the new release?
>>>
>>> If something was working and no longer works, it sounds like a bug. This
>>> may have to do with how we pickle (dill / cloudpickle) - see this question
>>> https://stackoverflow.com/questions/42960637/python-3-5-dill-pickling-unpickling-on-different-servers-keyerror-classtype
>>> Best
>>> -P.
>>>
>>> On Tue, Feb 4, 2020 at 6:22 AM Alan Krumholz 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I was running a dataflow job in GCP last night and it was running fine.
>>>> This morning this same exact job is failing with the following error:
>>>>
>>>> Error message from worker: Traceback (most recent call last): File
>>>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
>>>> line 286, in loads return dill.loads(s) File
>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 275, in loads
>>>> return load(file, ignore, **kwds) File
>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 270, in load
>>>> return Unpickler(file, ignore=ignore, **kwds).load() File
>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 472, in load
>>>> obj = StockUnpickler.load(self) File
>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 577, in
>>>> _load_type return _reverse_typemap[name] KeyError: 'ClassType' During
>>>> handling of the above exception, another exception occurred: Traceback
>>>> (most recent call last): File
>>>> "/usr/local/lib/python3.5/site-packages/dataflow_worker/batchworker.py",
>>>> line 648, in do_work work_executor.execute() File
>>>> "/usr/local/lib/python3.5/site-packages/dataflow_worker/executor.py", line
>>>> 176, in execute op.start() File "apache_beam/runners/worker/operations.py",
>>>> line 649, in apache_beam.runners.worker.operations.DoOperation.start File
>>>> "apache_beam/runners/worker/operations.py", line 651, in
>>>> apache_beam.runners.worker.operations.DoOperation.start File
>>>> "apache_beam/runners/worker/operations.py", line 652, in
>>>> apache_beam.runners.worker.operations.DoOperation.start File
>>>> "apache_beam/runners/worker/operations.py", line 261, in
>>>> apache_beam.runners.worker.operations.Operation.start File
>>>> "apache_beam/runners/worker/operations.py", line 266, in
>>>> apache_beam.runners.worker.operations.Operation.start File
>>>> "apache_beam/runners/worker/operations.py", line 597, in
>>>> apache_beam.runners.worker.operations.DoOperation.setup File
>>>> "apache_beam/runners/worker/operations.py", line 602, in
>>>> apache_beam.runners.worker.operations.DoOperation.setup File
>>>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
>>>> line 290, in loads return dill.loads(s) File
>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 275, in loads
>>>> return load(file, ignore, **kwds) File
>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 270, in load
>>>> return Unpickler(file, ignore=ignore, **kwds).load() File
>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 472, in load
>>>> obj = StockUnpickler.load(self) File
>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 577, in
>>>> _load_type return _reverse_typemap[name] KeyError: 'ClassType'
>>>>
>>>>
>>>> If I use a local runner it still runs fine.
>>>> Anyone else experiencing something similar today? (or know how to fix
>>>> this?)
>>>>
>>>> Thanks!
>>>>
>>>


Re: dataflow job was working fine last night and it isn't now

2020-02-04 Thread Alan Krumholz
Hi Pablo,
This is strange... it doesn't seem to be the last beam release as last
night it was already using 2.19.0 I wonder if it was some release from the
DataFlow team (not beam related):
Job typeBatch
Job status Succeeded
SDK version
Apache Beam Python 3.5 SDK 2.19.0
Region
us-central1
Start timeFebruary 3, 2020 at 9:28:35 PM GMT-8
Elapsed time5 min 11 sec

On Tue, Feb 4, 2020 at 9:15 AM Pablo Estrada  wrote:

> Hi Alan,
> could it be that you're picking up the new Apache Beam 2.19.0 release?
> Could you try depending on beam 2.18.0 to see if the issue surfaces when
> using the new release?
>
> If something was working and no longer works, it sounds like a bug. This
> may have to do with how we pickle (dill / cloudpickle) - see this question
> https://stackoverflow.com/questions/42960637/python-3-5-dill-pickling-unpickling-on-different-servers-keyerror-classtype
> Best
> -P.
>
> On Tue, Feb 4, 2020 at 6:22 AM Alan Krumholz 
> wrote:
>
>> Hi,
>>
>> I was running a dataflow job in GCP last night and it was running fine.
>> This morning this same exact job is failing with the following error:
>>
>> Error message from worker: Traceback (most recent call last): File
>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
>> line 286, in loads return dill.loads(s) File
>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 275, in loads
>> return load(file, ignore, **kwds) File
>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 270, in load
>> return Unpickler(file, ignore=ignore, **kwds).load() File
>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 472, in load
>> obj = StockUnpickler.load(self) File
>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 577, in
>> _load_type return _reverse_typemap[name] KeyError: 'ClassType' During
>> handling of the above exception, another exception occurred: Traceback
>> (most recent call last): File
>> "/usr/local/lib/python3.5/site-packages/dataflow_worker/batchworker.py",
>> line 648, in do_work work_executor.execute() File
>> "/usr/local/lib/python3.5/site-packages/dataflow_worker/executor.py", line
>> 176, in execute op.start() File "apache_beam/runners/worker/operations.py",
>> line 649, in apache_beam.runners.worker.operations.DoOperation.start File
>> "apache_beam/runners/worker/operations.py", line 651, in
>> apache_beam.runners.worker.operations.DoOperation.start File
>> "apache_beam/runners/worker/operations.py", line 652, in
>> apache_beam.runners.worker.operations.DoOperation.start File
>> "apache_beam/runners/worker/operations.py", line 261, in
>> apache_beam.runners.worker.operations.Operation.start File
>> "apache_beam/runners/worker/operations.py", line 266, in
>> apache_beam.runners.worker.operations.Operation.start File
>> "apache_beam/runners/worker/operations.py", line 597, in
>> apache_beam.runners.worker.operations.DoOperation.setup File
>> "apache_beam/runners/worker/operations.py", line 602, in
>> apache_beam.runners.worker.operations.DoOperation.setup File
>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
>> line 290, in loads return dill.loads(s) File
>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 275, in loads
>> return load(file, ignore, **kwds) File
>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 270, in load
>> return Unpickler(file, ignore=ignore, **kwds).load() File
>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 472, in load
>> obj = StockUnpickler.load(self) File
>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 577, in
>> _load_type return _reverse_typemap[name] KeyError: 'ClassType'
>>
>>
>> If I use a local runner it still runs fine.
>> Anyone else experiencing something similar today? (or know how to fix
>> this?)
>>
>> Thanks!
>>
>


dataflow job was working fine last night and it isn't now

2020-02-04 Thread Alan Krumholz
Hi,

I was running a dataflow job in GCP last night and it was running fine.
This morning this same exact job is failing with the following error:

Error message from worker: Traceback (most recent call last): File
"/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
line 286, in loads return dill.loads(s) File
"/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 275, in loads
return load(file, ignore, **kwds) File
"/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 270, in load
return Unpickler(file, ignore=ignore, **kwds).load() File
"/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self) File
"/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 577, in
_load_type return _reverse_typemap[name] KeyError: 'ClassType' During
handling of the above exception, another exception occurred: Traceback
(most recent call last): File
"/usr/local/lib/python3.5/site-packages/dataflow_worker/batchworker.py",
line 648, in do_work work_executor.execute() File
"/usr/local/lib/python3.5/site-packages/dataflow_worker/executor.py", line
176, in execute op.start() File "apache_beam/runners/worker/operations.py",
line 649, in apache_beam.runners.worker.operations.DoOperation.start File
"apache_beam/runners/worker/operations.py", line 651, in
apache_beam.runners.worker.operations.DoOperation.start File
"apache_beam/runners/worker/operations.py", line 652, in
apache_beam.runners.worker.operations.DoOperation.start File
"apache_beam/runners/worker/operations.py", line 261, in
apache_beam.runners.worker.operations.Operation.start File
"apache_beam/runners/worker/operations.py", line 266, in
apache_beam.runners.worker.operations.Operation.start File
"apache_beam/runners/worker/operations.py", line 597, in
apache_beam.runners.worker.operations.DoOperation.setup File
"apache_beam/runners/worker/operations.py", line 602, in
apache_beam.runners.worker.operations.DoOperation.setup File
"/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
line 290, in loads return dill.loads(s) File
"/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 275, in loads
return load(file, ignore, **kwds) File
"/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 270, in load
return Unpickler(file, ignore=ignore, **kwds).load() File
"/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self) File
"/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 577, in
_load_type return _reverse_typemap[name] KeyError: 'ClassType'


If I use a local runner it still runs fine.
Anyone else experiencing something similar today? (or know how to fix this?)

Thanks!


Local error when using requirements_file for external runner.

2020-02-03 Thread Alan Krumholz
I have a simple python pipeline that uses a publicly available (PyPI)
library.

I can run my pipeline fine using my local runner.

I can also run it fine when using DataFlow runner if I provide a setup_file
to the pipeline.

However, when I try to do this by using a requirements_file instead of a
setup_file (recommended and cleaner way when pipeline has only PyPI
dependencies) I get an error in my local machine and the job is never
submitted to DataFlow.

I did some digging and the problem seems to be that when you use a
requirements_file the python SDK tries running the following command in the
local machine before submitting the external job:

python -m pip download --dest /tmp/dataflow-requirements-cache -r
/tmp/requirements.txt --exists-action i --no-binary :all:


This command seems to be trying to install all these other libraries (apart
from the one in my requirements_file):

azure-common-1.1.24.zip
azure-storage-blob-2.1.0.tar.gz
boto3-1.11.9.tar.gz
botocore-1.14.9.tar.gz
certifi-2019.11.28.tar.gz
cffi-1.13.2.tar.gz
pycryptodomex-3.9.6.tar.gz
pyOpenSSL-19.1.0.tar.gz
pytz-2019.3.tar.gz
requests-2.22.0.tar.gz
urllib3-1.25.8.tar.gz

It installs some of them fine but the error seems to come when it tries to
install "cryptography":

Collecting azure-common<2.0.0
  Using cached azure-common-1.1.24.zip (18 kB)
  Saved /tmp/dataflow-requirements-cache/azure-common-1.1.24.zip
Collecting azure-storage-blob<12.0.0
  Using cached azure-storage-blob-2.1.0.tar.gz (83 kB)
  Saved /tmp/dataflow-requirements-cache/azure-storage-blob-2.1.0.tar.gz
Collecting boto3<1.12,>=1.4.4
  Using cached boto3-1.11.9.tar.gz (98 kB)
  Saved /tmp/dataflow-requirements-cache/boto3-1.11.9.tar.gz
Collecting botocore<1.15,>=1.5.0
  Using cached botocore-1.14.9.tar.gz (6.1 MB)
  Saved /tmp/dataflow-requirements-cache/botocore-1.14.9.tar.gz
Collecting requests<2.23.0
  Using cached requests-2.22.0.tar.gz (113 kB)
  Saved /tmp/dataflow-requirements-cache/requests-2.22.0.tar.gz
Collecting urllib3<1.26.0,>=1.20
  Using cached urllib3-1.25.8.tar.gz (261 kB)
  Saved /tmp/dataflow-requirements-cache/urllib3-1.25.8.tar.gz
Collecting certifi<2021.0.0
  Using cached certifi-2019.11.28.tar.gz (156 kB)
  Saved /tmp/dataflow-requirements-cache/certifi-2019.11.28.tar.gz
Collecting pytz<2021.0
  Using cached pytz-2019.3.tar.gz (312 kB)
  Saved /tmp/dataflow-requirements-cache/pytz-2019.3.tar.gz
Collecting pycryptodomex!=3.5.0,<4.0.0,>=3.2
  Using cached pycryptodomex-3.9.6.tar.gz (15.5 MB)
  Saved /tmp/dataflow-requirements-cache/pycryptodomex-3.9.6.tar.gz
Collecting pyOpenSSL<21.0.0,>=16.2.0
  Using cached pyOpenSSL-19.1.0.tar.gz (160 kB)
  Saved /tmp/dataflow-requirements-cache/pyOpenSSL-19.1.0.tar.gz
Collecting cffi<1.14,>=1.9
  Using cached cffi-1.13.2.tar.gz (460 kB)
  Saved /tmp/dataflow-requirements-cache/cffi-1.13.2.tar.gz
Collecting cryptography<3.0.0,>=1.8.2
  Using cached cryptography-2.8.tar.gz (504 kB)
  Installing build dependencies ... error  ERROR: Command errored out
with exit status 1:
   command: /opt/conda/bin/python
/opt/conda/lib/python3.6/site-packages/pip install --ignore-installed
--no-user --prefix /tmp/pip-build-env-wq57ycwk/overlay
--no-warn-script-location --no-binary :all: --only-binary :none: -i
https://pypi.org/simple -- 'setuptools>=40.6.0' wheel
'cffi>=1.8,!=1.11.3; platform_python_implementation != '"'"'PyPy'"'"''



Has anyone else seen this problem? and is there an easy way to fix it?


Thank you!


Re: snowflake source/sink

2020-01-30 Thread Alan Krumholz
Hi Robert, beam.util.GroupIntoBatches(x) works perfectly. Thanks so much!

On Wed, Jan 29, 2020 at 12:58 PM Robert Bradshaw 
wrote:

> You could use the beam.util.BatchElements transform to batch rows into
> larger chunks.
>
> On Wed, Jan 29, 2020 at 12:01 PM Alan Krumholz
>  wrote:
> >
> > Thanks Brice! I'll look into wrapping the connector.
> >
> > One more question.
> > I'm trying now to develop a sink too. This is what I have:
> >
> > def writeSnowflake(row):
> > import snowflake.connector
> > ctx = snowflake.connector.connect(...)
> > cs = ctx.cursor()
> > cs.execute(
> > 'INSERT INTO t(a, b) VALUES ({a}, {b})'.format(
> > a = str(row['a']),
> > b = str(row['b'])
> > )
> > )
> > return row
> >
> > pipeline = beam.Pipeline(...)
> > p = (
> > pipeline
> > | 'Read from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(...))
> > | 'Write to SnowFlake' >> beam.Map(writeSnowflake)
> > )
> >
> > This seems to (slowly) work but it feels extremely inefficient to send
> an INSERT query to the DW for each row in the dataset.
> > Is there an easy way to have the pipeline maybe stack all my data rows
> into chunks of 1,000 so I can insert these by chunks instead. I'm mostly
> curious about how to have the pipeline pass 1K rows at a time to
> "writeSnowflake()" instead of passing one by one.
> > Maybe by using a GroupByKey transformation and using randomly sampled
> keys to create the chunks of the desired size? (or can you think of a
> better way to achieve this?)
> >
> > Thank you so much for all your help!
> >
> > On Wed, Jan 29, 2020 at 11:30 AM Brice Giesbrecht 
> wrote:
> >>
> >> Lovely! You absolutely have the right idea.
> >>
> >> Your comments are spot on but the suboptimal solution that works is
> usually preferable to the optimal one that doesn't (exist).
> >>
> >> I don't have any experience with the Snowflake connector but if it can
> be kept around and reused and is slow or expensive to create, you may want
> to consider a wrapper/class to manage the client/connector that you can use
> across your pipeline steps if needed.
> >>
> >> Happy processing.
> >> -Brice
> >>
> >> On Wed, Jan 29, 2020 at 2:01 PM Alan Krumholz <
> alan.krumh...@betterup.co> wrote:
> >>>
> >>> Hi Brice,
> >>>
> >>> Thanks so much for your suggestion! I did the following and it seems
> to work:
> >>>
> >>> class ReadSnowflake(beam.DoFn):
> >>> def process(self, _):
> >>> import snowflake.connector
> >>> ctx = snowflake.connector.connect(...)
> >>> cs = ctx.cursor()
> >>> cs.execute('SELECT a, b FROM t')
> >>> while True:
> >>> data =  cs.fetchmany(1000)
> >>> if len(data) == 0:
> >>> break;
> >>> for d in data:
> >>> yield {'a':d[0], 'b':d[1]}
> >>>
> >>> pipeline = beam.Pipeline()
> >>> p = (
> >>> pipeline
> >>> | 'Empty start' >> beam.Create([''])
> >>> | 'Read from Snowflake' >> beam.ParDo(ReadSnowflake())
> >>> | 'Write to BigQuery' >> beam.io.WriteToBigQuery(...))
> >>>
> >>>
> >>> I know this is not optimal as it is reading sequentially from
> snowflake (instead of doing it in parallel as I'm sure the BQ source does)
> but apart from that, do you see any other problems (or possible
> improvements) with this code?
> >>>
> >>>
> >>> Thank you so much!
> >>>
> >>> On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht 
> wrote:
> >>>>
> >>>> I am using a pattern which I saw online (but can't seem to locate)
> which performs i/o in a DoFn using a standard python client. I use this to
> read and write to Google cloud storage in streaming mode. You could use
> this idea to perform almost any i/o. Depending on your use case and
> workflow, this may be an approach you could consider. Shout if you need
> some boilerplate.
> >>>>
> >>>> It does look like native support is coming and you know it is true as
> I read it on the internet.:)
> >>>>
> https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/
> >>>>
> >>

Re: snowflake source/sink

2020-01-29 Thread Alan Krumholz
Thanks Brice! I'll look into wrapping the connector.

One more question.
I'm trying now to develop a sink too. This is what I have:

def writeSnowflake(row):
import snowflake.connector
ctx = snowflake.connector.connect(...)
cs = ctx.cursor()
cs.execute(
'INSERT INTO t(a, b) VALUES ({a}, {b})'.format(
a = str(row['a']),
b = str(row['b'])
)
)
return row

pipeline = beam.Pipeline(...)
p = (
pipeline
| 'Read from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(...))
| 'Write to SnowFlake' >> beam.Map(writeSnowflake)
)

This seems to (slowly) work but it feels extremely inefficient to send an
INSERT query to the DW for each row in the dataset.
Is there an easy way to have the pipeline maybe stack all my data rows into
chunks of 1,000 so I can insert these by chunks instead. I'm mostly curious
about how to have the pipeline pass 1K rows at a time to "writeSnowflake()"
instead of passing one by one.
Maybe by using a GroupByKey transformation and using randomly sampled keys
to create the chunks of the desired size? (or can you think of a better way
to achieve this?)

Thank you so much for all your help!

On Wed, Jan 29, 2020 at 11:30 AM Brice Giesbrecht  wrote:

> Lovely! You absolutely have the right idea.
>
> Your comments are spot on but the suboptimal solution that works is
> usually preferable to the optimal one that doesn't (exist).
>
> I don't have any experience with the Snowflake connector but if it can be
> kept around and reused and is slow or expensive to create, you may want to
> consider a wrapper/class to manage the client/connector that you can use
> across your pipeline steps if needed.
>
> Happy processing.
> -Brice
>
> On Wed, Jan 29, 2020 at 2:01 PM Alan Krumholz 
> wrote:
>
>> Hi Brice,
>>
>> Thanks so much for your suggestion! I did the following and it seems to
>> work:
>>
>> class ReadSnowflake(beam.DoFn):
>> def process(self, _):
>> import snowflake.connector
>> ctx = snowflake.connector.connect(...)
>> cs = ctx.cursor()
>> cs.execute('SELECT a, b FROM t')
>> while True:
>> data =  cs.fetchmany(1000)
>> if len(data) == 0:
>> break;
>> for d in data:
>> yield {'a':d[0], 'b':d[1]}
>>
>> pipeline = beam.Pipeline()
>> p = (
>> pipeline
>> | 'Empty start' >> beam.Create([''])
>> | 'Read from Snowflake' >> beam.ParDo(ReadSnowflake())
>> | 'Write to BigQuery' >> beam.io.WriteToBigQuery(...))
>>
>>
>> I know this is not optimal as it is reading sequentially from snowflake
>> (instead of doing it in parallel as I'm sure the BQ source does) but apart
>> from that, do you see any other problems (or possible improvements) with
>> this code?
>>
>>
>> Thank you so much!
>>
>> On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht 
>> wrote:
>>
>>> I am using a pattern which I saw online (but can't seem to locate) which
>>> performs i/o in a DoFn using a standard python client. I use this to read
>>> and write to Google cloud storage in streaming mode. You could use this
>>> idea to perform almost any i/o. Depending on your use case and workflow,
>>> this may be an approach you could consider. Shout if you need some
>>> boilerplate.
>>>
>>> It does look like native support is coming and you know it is true as I
>>> read it on the internet.:)
>>>
>>> https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/
>>>
>>>
>>> You could also setup an external service or endpoint to perform the
>>> query and read the results into your pipeline in a pipeline step similar to
>>> the enrichment idea here:
>>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>
>>> And you could always write your own connector. Not a task to be taken
>>> too lightly but it can be done.
>>>
>>> HTH
>>>
>>>
>>> On Wed, Jan 29, 2020 at 10:17 AM Alan Krumholz <
>>> alan.krumh...@betterup.co> wrote:
>>>
>>>> Thanks for sharing this Erik!
>>>>
>>>> It would be really nice/convenient to have a python option to do
>>>> something like that. Our ML team is mostly a python shop and we are also
>>>> using kubeflow pipelines to orchestrate our ML pipelines (mostly using
>>>> their python sdk to author these).
>>>>
>>>> Please let

Re: snowflake source/sink

2020-01-29 Thread Alan Krumholz
Hi Brice,

Thanks so much for your suggestion! I did the following and it seems to
work:

class ReadSnowflake(beam.DoFn):
def process(self, _):
import snowflake.connector
ctx = snowflake.connector.connect(...)
cs = ctx.cursor()
cs.execute('SELECT a, b FROM t')
while True:
data =  cs.fetchmany(1000)
if len(data) == 0:
break;
for d in data:
yield {'a':d[0], 'b':d[1]}

pipeline = beam.Pipeline()
p = (
pipeline
| 'Empty start' >> beam.Create([''])
| 'Read from Snowflake' >> beam.ParDo(ReadSnowflake())
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(...))


I know this is not optimal as it is reading sequentially from snowflake
(instead of doing it in parallel as I'm sure the BQ source does) but apart
from that, do you see any other problems (or possible improvements) with
this code?


Thank you so much!

On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht  wrote:

> I am using a pattern which I saw online (but can't seem to locate) which
> performs i/o in a DoFn using a standard python client. I use this to read
> and write to Google cloud storage in streaming mode. You could use this
> idea to perform almost any i/o. Depending on your use case and workflow,
> this may be an approach you could consider. Shout if you need some
> boilerplate.
>
> It does look like native support is coming and you know it is true as I
> read it on the internet.:)
>
> https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/
>
>
> You could also setup an external service or endpoint to perform the query
> and read the results into your pipeline in a pipeline step similar to the
> enrichment idea here:
> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>
> And you could always write your own connector. Not a task to be taken too
> lightly but it can be done.
>
> HTH
>
>
> On Wed, Jan 29, 2020 at 10:17 AM Alan Krumholz 
> wrote:
>
>> Thanks for sharing this Erik!
>>
>> It would be really nice/convenient to have a python option to do
>> something like that. Our ML team is mostly a python shop and we are also
>> using kubeflow pipelines to orchestrate our ML pipelines (mostly using
>> their python sdk to author these).
>>
>> Please let me know if you can think of any way we could do this with
>> python.
>>
>> Thanks so much!
>>
>>
>>
>>
>>
>> On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey <
>> erik.will...@panderasystems.com> wrote:
>>
>>> You can use the JDBC driver.  Here's a blog that describes JDBC usage in
>>> general:
>>> https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/
>>>
>>>
>>> On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz <
>>> alan.krumh...@betterup.co> wrote:
>>>
>>>> Hi,
>>>> We are using beam and (dataflow) at my company and would like to use it
>>>> to read and write data from snowflake.
>>>>
>>>> Does anybody know if there are any source/sink available for snowflake?
>>>>
>>>> if not, what would be the easiest way to create those? (maybe there is
>>>> something for sqlalchemy that we could leverage for that?)
>>>>
>>>>
>>>> Thanks so much!
>>>>
>>>> Alan
>>>>
>>>


Re: snowflake source/sink

2020-01-29 Thread Alan Krumholz
Thanks for sharing this Erik!

It would be really nice/convenient to have a python option to do something
like that. Our ML team is mostly a python shop and we are also using
kubeflow pipelines to orchestrate our ML pipelines (mostly using their
python sdk to author these).

Please let me know if you can think of any way we could do this with python.

Thanks so much!





On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey <
erik.will...@panderasystems.com> wrote:

> You can use the JDBC driver.  Here's a blog that describes JDBC usage in
> general:
> https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/
>
>
> On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz 
> wrote:
>
>> Hi,
>> We are using beam and (dataflow) at my company and would like to use it
>> to read and write data from snowflake.
>>
>> Does anybody know if there are any source/sink available for snowflake?
>>
>> if not, what would be the easiest way to create those? (maybe there is
>> something for sqlalchemy that we could leverage for that?)
>>
>>
>> Thanks so much!
>>
>> Alan
>>
>


snowflake source/sink

2020-01-27 Thread Alan Krumholz
Hi,
We are using beam and (dataflow) at my company and would like to use it to
read and write data from snowflake.

Does anybody know if there are any source/sink available for snowflake?

if not, what would be the easiest way to create those? (maybe there is
something for sqlalchemy that we could leverage for that?)


Thanks so much!

Alan