Thanks for the invitation and for the answer.
I tried the Count resample function and I still have the same issue, so I
guess it doesn't come from my resample function, but here is the code in
case :
def resample_function(candles):
sorted_candles = sorted(filter(lambda x: x.date is not None,
candles), key=lambda candle: candle.date)
if len(sorted_candles) > 0:
return Candle(
sorted_candles[-1].date,
sorted_candles[0].open,
max(candle.high for candle in candles),
min(candle.low for candle in candles),
sorted_candles[-1].close,
sum((candle.volume for candle in candles), .0)
)
The fact is that the pipeline seems stucked on the GroupByKey inside the
CombineGlobaly PTransform before the call of my resample_function (if the
GCP web interface is accurate).
I tried the with to have in my pipeline only native python type with the
CountCombineFn and it's still stucked.
Here is what I can see on my GCP console (this screenshot shows 36 minutes
by I waited for 5 hours to be sure) :
[image: Selection_070.png]
On Wed, Aug 16, 2017 at 1:08 AM Lukasz Cwik <[email protected]> wrote:
> I have invited you to the slack channel.
>
> 2 million data points doesn't seem like it should be an issue.
> Have you considered trying a simpler combiner like Count to see if the
> bottleneck is with the combiner that you are supplying?
> Also, could you share the code for what resample_function does?
>
> On Mon, Aug 14, 2017 at 2:43 AM, Tristan Marechaux <
> [email protected]> wrote:
>
>> Hi all,
>>
>> I wrote a Beam Pipeline written with the python SDK that resample a
>> timeseries containing data points everery minute to a 5-minutes timeserie.
>>
>> My pipeline looks like:
>> input_data |
>> WindowInto(FixedWindows(size=timedelta(minutes=5).total_seconds())) |
>> CombineGlobaly(resample_function)
>>
>> When I run it with the local or DataFlow runner with a small dataset, it
>> works and does what I want.
>>
>> But when I try to run it on the DataFlow runner with a bigger dataset (1
>> 700 000 datapoints timestamped over 15 years) it stay stuck for hours on
>> the GroupByKey step of CombineGlobaly.
>>
>> My question is : Did I do something wrong with the design of my pipeline?
>>
>> PS: Can someone invite me to the slack channel?
>> --
>>
>> Tristan Marechaux
>>
>> Data Scientist | *Walnut Algorithms*
>>
>> Mobile : +33 627804399 <+33627804399>
>>
>> Email: [email protected]
>>
>> Web: www.walnutalgorithms.com
>>
>
> --
Tristan Marechaux
Data Scientist | *Walnut Algorithms*
Mobile : +33 627804399 <+33627804399>
Email: [email protected]
Web: www.walnutalgorithms.com