Thanks for the invitation and for the answer.

I tried the Count resample function and I still have the same issue, so I
guess it doesn't come from my resample function, but here is the code in
case :

def resample_function(candles):

    sorted_candles = sorted(filter(lambda x: x.date is not None,
candles), key=lambda candle: candle.date)
    if len(sorted_candles) > 0:
        return Candle(
            sorted_candles[-1].date,
            sorted_candles[0].open,
            max(candle.high for candle in candles),
            min(candle.low for candle in candles),
            sorted_candles[-1].close,
            sum((candle.volume for candle in candles), .0)
        )


The fact is that the pipeline seems stucked on the GroupByKey inside the
CombineGlobaly PTransform before the call of my resample_function (if the
GCP web interface is accurate).

I tried the with to have in my pipeline only native python type with the
CountCombineFn and it's still stucked.

Here is what I can see on my GCP console (this screenshot shows 36 minutes
by I waited for 5 hours to be sure) :
[image: Selection_070.png]


On Wed, Aug 16, 2017 at 1:08 AM Lukasz Cwik <[email protected]> wrote:

> I have invited you to the slack channel.
>
> 2 million data points doesn't seem like it should be an issue.
> Have you considered trying a simpler combiner like Count to see if the
> bottleneck is with the combiner that you are supplying?
> Also, could you share the code for what resample_function does?
>
> On Mon, Aug 14, 2017 at 2:43 AM, Tristan Marechaux <
> [email protected]> wrote:
>
>> Hi all,
>>
>> I wrote a Beam Pipeline written with the python SDK that resample a
>> timeseries containing data points everery minute to a 5-minutes timeserie.
>>
>> My pipeline looks like:
>> input_data |
>> WindowInto(FixedWindows(size=timedelta(minutes=5).total_seconds())) |
>> CombineGlobaly(resample_function)
>>
>> When I run it with the local or DataFlow runner with a small dataset, it
>> works and does what I want.
>>
>> But when I try to run it on the DataFlow runner with a bigger dataset (1
>> 700 000 datapoints timestamped over 15 years) it stay stuck for hours on
>> the GroupByKey step of CombineGlobaly.
>>
>> My question is : Did I do something wrong with the design of my pipeline?
>>
>> PS: Can someone invite me to the slack channel?
>> --
>>
>> Tristan Marechaux
>>
>> Data Scientist | *Walnut Algorithms*
>>
>> Mobile : +33 627804399 <+33627804399>
>>
>> Email: [email protected]
>>
>> Web: www.walnutalgorithms.com
>>
>
> --

Tristan Marechaux

Data Scientist | *Walnut Algorithms*

Mobile : +33 627804399 <+33627804399>

Email: [email protected]

Web: www.walnutalgorithms.com

Reply via email to