Re: Write-through-cache in State logic

2019-08-28 Thread Maximilian Michels

I've tried to put the current design into code. Any feedback appreciated for 
these changes to enable caching of user state:

Proto: https://github.com/apache/beam/pull/9440
Runner: https://github.com/apache/beam/pull/9374
Python SDK: https://github.com/apache/beam/pull/9418

Thanks,
Max

On 28.08.19 11:48, Maximilian Michels wrote:

> Just to clarify, the repeated list of cache tokens in the process
> bundle request is used to validate reading *and* stored when writing?
> In that sense, should they just be called version identifiers or
> something like that?

We could call them version identifiers, though cache tokens were always
a means to identify versions of a state.

On 28.08.19 11:10, Maximilian Michels wrote:
>> cachetools sounds like a fine choice to me.
>
> For the first version I've implemented a simple LRU cache. If you want
> to have a look:
> 
https://github.com/apache/beam/pull/9418/files#diff-ed2d70e99442b6e1668e30409d3383a6R60
>
>
>> Open up a PR for the proto changes and we can work through any minor
>> comments there.
>
> Proto changes: https://github.com/apache/beam/pull/9440
>
>
> Thanks,
> Max
>
> On 27.08.19 23:00, Robert Bradshaw wrote:
>> Just to clarify, the repeated list of cache tokens in the process
>> bundle request is used to validate reading *and* stored when writing?
>> In that sense, should they just be called version identifiers or
>> something like that?
>>
>> On Tue, Aug 27, 2019 at 11:33 AM Maximilian Michels 
>> wrote:
>>>
>>> Thanks. Updated:
>>>
>>> message ProcessBundleRequest {
>>>    // (Required) A reference to the process bundle descriptor that
>>> must be
>>>    // instantiated and executed by the SDK harness.
>>>    string process_bundle_descriptor_reference = 1;
>>>
>>>    // A cache token which can be used by an SDK to check for the
>>> validity
>>>    // of cached elements which have a cache token associated.
>>>    message CacheToken {
>>>
>>>  // A flag to indicate a cache token is valid for user state.
>>>  message UserState {}
>>>
>>>  // A flag to indicate a cache token is valid for a side input.
>>>  message SideInput {
>>>    // The id of a side input.
>>>    string side_input = 1;
>>>  }
>>>
>>>  // The scope of a cache token.
>>>  oneof type {
>>>    UserState user_state = 1;
>>>    SideInput side_input = 2;
>>>  }
>>>
>>>  // The cache token identifier which should be globally unique.
>>>  bytes token = 10;
>>>    }
>>>
>>>    // (Optional) A list of cache tokens that can be used by an SDK
>>> to reuse
>>>    // cached data returned by the State API across multiple bundles.
>>>    repeated CacheToken cache_tokens = 2;
>>> }
>>>
>>> On 27.08.19 19:22, Lukasz Cwik wrote:
>>>
>>> SideInputState -> SideInput (side_input_state -> side_input)
>>> + more comments around the messages and the fields.
>>>
>>>
>>> On Tue, Aug 27, 2019 at 10:18 AM Maximilian Michels 
>>> wrote:

 We would have to differentiate cache tokens for user state and side
 inputs. How about something like this?

 message ProcessBundleRequest {
    // (Required) A reference to the process bundle descriptor that
 must be
    // instantiated and executed by the SDK harness.
    string process_bundle_descriptor_reference = 1;

    message CacheToken {

  message UserState {
  }

  message SideInputState {
    string side_input_id = 1;
  }

  oneof type {
    UserState user_state = 1;
    SideInputState side_input_state = 2;
  }

  bytes token = 10;
    }

    // (Optional) A list of cache tokens that can be used by an SDK
 to reuse
    // cached data returned by the State API across multiple bundles.
    repeated CacheToken cache_tokens = 2;
 }

 -Max

 On 27.08.19 18:43, Lukasz Cwik wrote:

 The bundles view of side inputs should never change during
 processing and should have a point in time snapshot.

 I was just trying to say that the cache token for side inputs being
 deferred till side input request time simplified the runners
 implementation since that is conclusively when the runner would
 need to take a look at the side input. Putting them as part of the
 ProcesBundleRequest complicates that but does make the SDK
 implementation significantly simpler which is a win.

 On Tue, Aug 27, 2019 at 9:14 AM Maximilian Michels 
 wrote:
>
> Thanks for the quick response.
>
> Just to clarify, the issue with versioning side input is also present
> when supplying the cache tokens on a request basis instead of per
> bundle. The SDK never knows when the Runner receives a new version of
> the side input. Like you pointed out, it needs to mark side inputs as
> stale and generate new cache tokens for the stale side inputs.
>
> The difference between per-request tokens 

Re: Write-through-cache in State logic

2019-08-28 Thread Maximilian Michels

Just to clarify, the repeated list of cache tokens in the process
bundle request is used to validate reading *and* stored when writing?
In that sense, should they just be called version identifiers or
something like that?


We could call them version identifiers, though cache tokens were always 
a means to identify versions of a state.


On 28.08.19 11:10, Maximilian Michels wrote:

cachetools sounds like a fine choice to me.


For the first version I've implemented a simple LRU cache. If you want 
to have a look: 
https://github.com/apache/beam/pull/9418/files#diff-ed2d70e99442b6e1668e30409d3383a6R60 



Open up a PR for the proto changes and we can work through any minor 
comments there.


Proto changes: https://github.com/apache/beam/pull/9440


Thanks,
Max

On 27.08.19 23:00, Robert Bradshaw wrote:

Just to clarify, the repeated list of cache tokens in the process
bundle request is used to validate reading *and* stored when writing?
In that sense, should they just be called version identifiers or
something like that?

On Tue, Aug 27, 2019 at 11:33 AM Maximilian Michels  
wrote:


Thanks. Updated:

message ProcessBundleRequest {
   // (Required) A reference to the process bundle descriptor that 
must be

   // instantiated and executed by the SDK harness.
   string process_bundle_descriptor_reference = 1;

   // A cache token which can be used by an SDK to check for the 
validity

   // of cached elements which have a cache token associated.
   message CacheToken {

 // A flag to indicate a cache token is valid for user state.
 message UserState {}

 // A flag to indicate a cache token is valid for a side input.
 message SideInput {
   // The id of a side input.
   string side_input = 1;
 }

 // The scope of a cache token.
 oneof type {
   UserState user_state = 1;
   SideInput side_input = 2;
 }

 // The cache token identifier which should be globally unique.
 bytes token = 10;
   }

   // (Optional) A list of cache tokens that can be used by an SDK to 
reuse

   // cached data returned by the State API across multiple bundles.
   repeated CacheToken cache_tokens = 2;
}

On 27.08.19 19:22, Lukasz Cwik wrote:

SideInputState -> SideInput (side_input_state -> side_input)
+ more comments around the messages and the fields.


On Tue, Aug 27, 2019 at 10:18 AM Maximilian Michels  
wrote:


We would have to differentiate cache tokens for user state and side 
inputs. How about something like this?


message ProcessBundleRequest {
   // (Required) A reference to the process bundle descriptor that 
must be

   // instantiated and executed by the SDK harness.
   string process_bundle_descriptor_reference = 1;

   message CacheToken {

 message UserState {
 }

 message SideInputState {
   string side_input_id = 1;
 }

 oneof type {
   UserState user_state = 1;
   SideInputState side_input_state = 2;
 }

 bytes token = 10;
   }

   // (Optional) A list of cache tokens that can be used by an SDK 
to reuse

   // cached data returned by the State API across multiple bundles.
   repeated CacheToken cache_tokens = 2;
}

-Max

On 27.08.19 18:43, Lukasz Cwik wrote:

The bundles view of side inputs should never change during 
processing and should have a point in time snapshot.


I was just trying to say that the cache token for side inputs being 
deferred till side input request time simplified the runners 
implementation since that is conclusively when the runner would need 
to take a look at the side input. Putting them as part of the 
ProcesBundleRequest complicates that but does make the SDK 
implementation significantly simpler which is a win.


On Tue, Aug 27, 2019 at 9:14 AM Maximilian Michels  
wrote:


Thanks for the quick response.

Just to clarify, the issue with versioning side input is also present
when supplying the cache tokens on a request basis instead of per
bundle. The SDK never knows when the Runner receives a new version of
the side input. Like you pointed out, it needs to mark side inputs as
stale and generate new cache tokens for the stale side inputs.

The difference between per-request tokens and per-bundle tokens 
would be
that the side input can only change after a bundle completes vs. 
during
the bundle. Side inputs are always fuzzy in that regard because 
there is
no precise instance where side inputs are atomically updated, other 
than

the assumption that they eventually will be updated. In that regard
per-bundle tokens for side input seem to be fine.

All of the above is not an issue for user state, as its cache can 
remain

valid for the lifetime of a Runner<=>SDK Harness connection. A simple
solution would be to not cache side input because there are many cases
where the caching just adds additional overhead. However, I can also
imagine cases where side input is valid forever and caching would be
very beneficial.

For the first version I want to focus on user state because that's 
where
I 

Re: Write-through-cache in State logic

2019-08-28 Thread Maximilian Michels

cachetools sounds like a fine choice to me.


For the first version I've implemented a simple LRU cache. If you want 
to have a look: 
https://github.com/apache/beam/pull/9418/files#diff-ed2d70e99442b6e1668e30409d3383a6R60



Open up a PR for the proto changes and we can work through any minor comments 
there.


Proto changes: https://github.com/apache/beam/pull/9440


Thanks,
Max

On 27.08.19 23:00, Robert Bradshaw wrote:

Just to clarify, the repeated list of cache tokens in the process
bundle request is used to validate reading *and* stored when writing?
In that sense, should they just be called version identifiers or
something like that?

On Tue, Aug 27, 2019 at 11:33 AM Maximilian Michels  wrote:


Thanks. Updated:

message ProcessBundleRequest {
   // (Required) A reference to the process bundle descriptor that must be
   // instantiated and executed by the SDK harness.
   string process_bundle_descriptor_reference = 1;

   // A cache token which can be used by an SDK to check for the validity
   // of cached elements which have a cache token associated.
   message CacheToken {

 // A flag to indicate a cache token is valid for user state.
 message UserState {}

 // A flag to indicate a cache token is valid for a side input.
 message SideInput {
   // The id of a side input.
   string side_input = 1;
 }

 // The scope of a cache token.
 oneof type {
   UserState user_state = 1;
   SideInput side_input = 2;
 }

 // The cache token identifier which should be globally unique.
 bytes token = 10;
   }

   // (Optional) A list of cache tokens that can be used by an SDK to reuse
   // cached data returned by the State API across multiple bundles.
   repeated CacheToken cache_tokens = 2;
}

On 27.08.19 19:22, Lukasz Cwik wrote:

SideInputState -> SideInput (side_input_state -> side_input)
+ more comments around the messages and the fields.


On Tue, Aug 27, 2019 at 10:18 AM Maximilian Michels  wrote:


We would have to differentiate cache tokens for user state and side inputs. How 
about something like this?

message ProcessBundleRequest {
   // (Required) A reference to the process bundle descriptor that must be
   // instantiated and executed by the SDK harness.
   string process_bundle_descriptor_reference = 1;

   message CacheToken {

 message UserState {
 }

 message SideInputState {
   string side_input_id = 1;
 }

 oneof type {
   UserState user_state = 1;
   SideInputState side_input_state = 2;
 }

 bytes token = 10;
   }

   // (Optional) A list of cache tokens that can be used by an SDK to reuse
   // cached data returned by the State API across multiple bundles.
   repeated CacheToken cache_tokens = 2;
}

-Max

On 27.08.19 18:43, Lukasz Cwik wrote:

The bundles view of side inputs should never change during processing and 
should have a point in time snapshot.

I was just trying to say that the cache token for side inputs being deferred 
till side input request time simplified the runners implementation since that 
is conclusively when the runner would need to take a look at the side input. 
Putting them as part of the ProcesBundleRequest complicates that but does make 
the SDK implementation significantly simpler which is a win.

On Tue, Aug 27, 2019 at 9:14 AM Maximilian Michels  wrote:


Thanks for the quick response.

Just to clarify, the issue with versioning side input is also present
when supplying the cache tokens on a request basis instead of per
bundle. The SDK never knows when the Runner receives a new version of
the side input. Like you pointed out, it needs to mark side inputs as
stale and generate new cache tokens for the stale side inputs.

The difference between per-request tokens and per-bundle tokens would be
that the side input can only change after a bundle completes vs. during
the bundle. Side inputs are always fuzzy in that regard because there is
no precise instance where side inputs are atomically updated, other than
the assumption that they eventually will be updated. In that regard
per-bundle tokens for side input seem to be fine.

All of the above is not an issue for user state, as its cache can remain
valid for the lifetime of a Runner<=>SDK Harness connection. A simple
solution would be to not cache side input because there are many cases
where the caching just adds additional overhead. However, I can also
imagine cases where side input is valid forever and caching would be
very beneficial.

For the first version I want to focus on user state because that's where
I see the most benefit for caching. I don't see a problem though for the
Runner to detect new side input and reflect that in the cache tokens
supplied for a new bundle.

-Max

On 26.08.19 22:27, Lukasz Cwik wrote:

Your summary below makes sense to me. I can see that recovery from
rolling back doesn't need to be a priority and simplifies the solution
for user state caching down to one token.

Providing cache 

Re: Write-through-cache in State logic

2019-08-27 Thread Robert Bradshaw
Just to clarify, the repeated list of cache tokens in the process
bundle request is used to validate reading *and* stored when writing?
In that sense, should they just be called version identifiers or
something like that?

On Tue, Aug 27, 2019 at 11:33 AM Maximilian Michels  wrote:
>
> Thanks. Updated:
>
> message ProcessBundleRequest {
>   // (Required) A reference to the process bundle descriptor that must be
>   // instantiated and executed by the SDK harness.
>   string process_bundle_descriptor_reference = 1;
>
>   // A cache token which can be used by an SDK to check for the validity
>   // of cached elements which have a cache token associated.
>   message CacheToken {
>
> // A flag to indicate a cache token is valid for user state.
> message UserState {}
>
> // A flag to indicate a cache token is valid for a side input.
> message SideInput {
>   // The id of a side input.
>   string side_input = 1;
> }
>
> // The scope of a cache token.
> oneof type {
>   UserState user_state = 1;
>   SideInput side_input = 2;
> }
>
> // The cache token identifier which should be globally unique.
> bytes token = 10;
>   }
>
>   // (Optional) A list of cache tokens that can be used by an SDK to reuse
>   // cached data returned by the State API across multiple bundles.
>   repeated CacheToken cache_tokens = 2;
> }
>
> On 27.08.19 19:22, Lukasz Cwik wrote:
>
> SideInputState -> SideInput (side_input_state -> side_input)
> + more comments around the messages and the fields.
>
>
> On Tue, Aug 27, 2019 at 10:18 AM Maximilian Michels  wrote:
>>
>> We would have to differentiate cache tokens for user state and side inputs. 
>> How about something like this?
>>
>> message ProcessBundleRequest {
>>   // (Required) A reference to the process bundle descriptor that must be
>>   // instantiated and executed by the SDK harness.
>>   string process_bundle_descriptor_reference = 1;
>>
>>   message CacheToken {
>>
>> message UserState {
>> }
>>
>> message SideInputState {
>>   string side_input_id = 1;
>> }
>>
>> oneof type {
>>   UserState user_state = 1;
>>   SideInputState side_input_state = 2;
>> }
>>
>> bytes token = 10;
>>   }
>>
>>   // (Optional) A list of cache tokens that can be used by an SDK to reuse
>>   // cached data returned by the State API across multiple bundles.
>>   repeated CacheToken cache_tokens = 2;
>> }
>>
>> -Max
>>
>> On 27.08.19 18:43, Lukasz Cwik wrote:
>>
>> The bundles view of side inputs should never change during processing and 
>> should have a point in time snapshot.
>>
>> I was just trying to say that the cache token for side inputs being deferred 
>> till side input request time simplified the runners implementation since 
>> that is conclusively when the runner would need to take a look at the side 
>> input. Putting them as part of the ProcesBundleRequest complicates that but 
>> does make the SDK implementation significantly simpler which is a win.
>>
>> On Tue, Aug 27, 2019 at 9:14 AM Maximilian Michels  wrote:
>>>
>>> Thanks for the quick response.
>>>
>>> Just to clarify, the issue with versioning side input is also present
>>> when supplying the cache tokens on a request basis instead of per
>>> bundle. The SDK never knows when the Runner receives a new version of
>>> the side input. Like you pointed out, it needs to mark side inputs as
>>> stale and generate new cache tokens for the stale side inputs.
>>>
>>> The difference between per-request tokens and per-bundle tokens would be
>>> that the side input can only change after a bundle completes vs. during
>>> the bundle. Side inputs are always fuzzy in that regard because there is
>>> no precise instance where side inputs are atomically updated, other than
>>> the assumption that they eventually will be updated. In that regard
>>> per-bundle tokens for side input seem to be fine.
>>>
>>> All of the above is not an issue for user state, as its cache can remain
>>> valid for the lifetime of a Runner<=>SDK Harness connection. A simple
>>> solution would be to not cache side input because there are many cases
>>> where the caching just adds additional overhead. However, I can also
>>> imagine cases where side input is valid forever and caching would be
>>> very beneficial.
>>>
>>> For the first version I want to focus on user state because that's where
>>> I see the most benefit for caching. I don't see a problem though for the
>>> Runner to detect new side input and reflect that in the cache tokens
>>> supplied for a new bundle.
>>>
>>> -Max
>>>
>>> On 26.08.19 22:27, Lukasz Cwik wrote:
>>> > Your summary below makes sense to me. I can see that recovery from
>>> > rolling back doesn't need to be a priority and simplifies the solution
>>> > for user state caching down to one token.
>>> >
>>> > Providing cache tokens upfront does require the Runner to know what
>>> > "version" of everything it may supply to the SDK upfront (instead of on
>>> > 

Re: Write-through-cache in State logic

2019-08-27 Thread Robert Bradshaw
On Sun, Aug 18, 2019 at 7:30 PM Rakesh Kumar  wrote:
>
> not to completely hijack Max's question but a tangential question regarding 
> LRU cache.
>
> What is the preferred python library for LRU cache?
> I noticed that cachetools [1] is used as one of the dependencies for GCP [2]. 
> Cachetools[1] has LRU cache and it supports Python 2 & 3. It can potentially 
> support our use case.  Can we move cachetools to the required pacakge list 
> [3] and use it for cross bundle caching?
>
> 1. https://pypi.org/project/cachetools/
> 2. 
> https://github.com/apache/beam/blob/96abacba9b8c7475c753eb3c0b58cca27c46feb1/sdks/python/setup.py#L143
> 3. 
> https://github.com/apache/beam/blob/96abacba9b8c7475c753eb3c0b58cca27c46feb1/sdks/python/setup.py#L104

cachetools sounds like a fine choice to me.


Re: Write-through-cache in State logic

2019-08-27 Thread Lukasz Cwik
Open up a PR for the proto changes and we can work through any minor
comments there.

On Tue, Aug 27, 2019 at 11:33 AM Maximilian Michels  wrote:

> Thanks. Updated:
>
> message ProcessBundleRequest {
>   // (Required) A reference to the process bundle descriptor that must be  // 
> instantiated and executed by the SDK harness.  string 
> process_bundle_descriptor_reference = 1;
>
>   // A cache token which can be used by an SDK to check for the validity  // 
> of cached elements which have a cache token associated.  message CacheToken {
>
> // A flag to indicate a cache token is valid for user state.message 
> UserState {}
>
> // A flag to indicate a cache token is valid for a side input.message 
> SideInput {
>   // The id of a side input.  string side_input = 1;
> }
>
> // The scope of a cache token.oneof type {
>   UserState user_state = 1;
>   SideInput side_input = 2;
> }
>
> // The cache token identifier which should be globally unique.bytes 
> token = 10;
>   }
>
>   // (Optional) A list of cache tokens that can be used by an SDK to reuse  
> // cached data returned by the State API across multiple bundles.  repeated 
> CacheToken cache_tokens = 2;
> }
>
> On 27.08.19 19:22, Lukasz Cwik wrote:
>
> SideInputState -> SideInput (side_input_state -> side_input)
> + more comments around the messages and the fields.
>
>
> On Tue, Aug 27, 2019 at 10:18 AM Maximilian Michels 
> wrote:
>
>> We would have to differentiate cache tokens for user state and side
>> inputs. How about something like this?
>>
>> message ProcessBundleRequest {
>>   // (Required) A reference to the process bundle descriptor that must be  
>> // instantiated and executed by the SDK harness.  string 
>> process_bundle_descriptor_reference = 1;
>>
>>   message CacheToken {
>>
>> message UserState {
>> }
>>
>> message SideInputState {
>>   string side_input_id = 1;
>> }
>>
>> oneof type {
>>   UserState user_state = 1;
>>   SideInputState side_input_state = 2;
>> }
>>
>> bytes token = 10;
>>   }
>>
>>   // (Optional) A list of cache tokens that can be used by an SDK to reuse  
>> // cached data returned by the State API across multiple bundles.  repeated 
>> CacheToken cache_tokens = 2;
>> }
>>
>> -Max
>>
>> On 27.08.19 18:43, Lukasz Cwik wrote:
>>
>> The bundles view of side inputs should never change during processing and
>> should have a point in time snapshot.
>>
>> I was just trying to say that the cache token for side inputs being
>> deferred till side input request time simplified the runners implementation
>> since that is conclusively when the runner would need to take a look at the
>> side input. Putting them as part of the ProcesBundleRequest complicates
>> that but does make the SDK implementation significantly simpler which is a
>> win.
>>
>> On Tue, Aug 27, 2019 at 9:14 AM Maximilian Michels 
>> wrote:
>>
>>> Thanks for the quick response.
>>>
>>> Just to clarify, the issue with versioning side input is also present
>>> when supplying the cache tokens on a request basis instead of per
>>> bundle. The SDK never knows when the Runner receives a new version of
>>> the side input. Like you pointed out, it needs to mark side inputs as
>>> stale and generate new cache tokens for the stale side inputs.
>>>
>>> The difference between per-request tokens and per-bundle tokens would be
>>> that the side input can only change after a bundle completes vs. during
>>> the bundle. Side inputs are always fuzzy in that regard because there is
>>> no precise instance where side inputs are atomically updated, other than
>>> the assumption that they eventually will be updated. In that regard
>>> per-bundle tokens for side input seem to be fine.
>>>
>>> All of the above is not an issue for user state, as its cache can remain
>>> valid for the lifetime of a Runner<=>SDK Harness connection. A simple
>>> solution would be to not cache side input because there are many cases
>>> where the caching just adds additional overhead. However, I can also
>>> imagine cases where side input is valid forever and caching would be
>>> very beneficial.
>>>
>>> For the first version I want to focus on user state because that's where
>>> I see the most benefit for caching. I don't see a problem though for the
>>> Runner to detect new side input and reflect that in the cache tokens
>>> supplied for a new bundle.
>>>
>>> -Max
>>>
>>> On 26.08.19 22:27, Lukasz Cwik wrote:
>>> > Your summary below makes sense to me. I can see that recovery from
>>> > rolling back doesn't need to be a priority and simplifies the solution
>>> > for user state caching down to one token.
>>> >
>>> > Providing cache tokens upfront does require the Runner to know what
>>> > "version" of everything it may supply to the SDK upfront (instead of
>>> on
>>> > request) which would mean that the Runner may need to have a mapping
>>> > from cache token to internal version identifier for things like side

Re: Write-through-cache in State logic

2019-08-27 Thread Lukasz Cwik
SideInputState -> SideInput (side_input_state -> side_input)
+ more comments around the messages and the fields.


On Tue, Aug 27, 2019 at 10:18 AM Maximilian Michels  wrote:

> We would have to differentiate cache tokens for user state and side
> inputs. How about something like this?
>
> message ProcessBundleRequest {
>   // (Required) A reference to the process bundle descriptor that must be  // 
> instantiated and executed by the SDK harness.  string 
> process_bundle_descriptor_reference = 1;
>
>   message CacheToken {
>
> message UserState {
> }
>
> message SideInputState {
>   string side_input_id = 1;
> }
>
> oneof type {
>   UserState user_state = 1;
>   SideInputState side_input_state = 2;
> }
>
> bytes token = 10;
>   }
>
>   // (Optional) A list of cache tokens that can be used by an SDK to reuse  
> // cached data returned by the State API across multiple bundles.  repeated 
> CacheToken cache_tokens = 2;
> }
>
> -Max
>
> On 27.08.19 18:43, Lukasz Cwik wrote:
>
> The bundles view of side inputs should never change during processing and
> should have a point in time snapshot.
>
> I was just trying to say that the cache token for side inputs being
> deferred till side input request time simplified the runners implementation
> since that is conclusively when the runner would need to take a look at the
> side input. Putting them as part of the ProcesBundleRequest complicates
> that but does make the SDK implementation significantly simpler which is a
> win.
>
> On Tue, Aug 27, 2019 at 9:14 AM Maximilian Michels  wrote:
>
>> Thanks for the quick response.
>>
>> Just to clarify, the issue with versioning side input is also present
>> when supplying the cache tokens on a request basis instead of per
>> bundle. The SDK never knows when the Runner receives a new version of
>> the side input. Like you pointed out, it needs to mark side inputs as
>> stale and generate new cache tokens for the stale side inputs.
>>
>> The difference between per-request tokens and per-bundle tokens would be
>> that the side input can only change after a bundle completes vs. during
>> the bundle. Side inputs are always fuzzy in that regard because there is
>> no precise instance where side inputs are atomically updated, other than
>> the assumption that they eventually will be updated. In that regard
>> per-bundle tokens for side input seem to be fine.
>>
>> All of the above is not an issue for user state, as its cache can remain
>> valid for the lifetime of a Runner<=>SDK Harness connection. A simple
>> solution would be to not cache side input because there are many cases
>> where the caching just adds additional overhead. However, I can also
>> imagine cases where side input is valid forever and caching would be
>> very beneficial.
>>
>> For the first version I want to focus on user state because that's where
>> I see the most benefit for caching. I don't see a problem though for the
>> Runner to detect new side input and reflect that in the cache tokens
>> supplied for a new bundle.
>>
>> -Max
>>
>> On 26.08.19 22:27, Lukasz Cwik wrote:
>> > Your summary below makes sense to me. I can see that recovery from
>> > rolling back doesn't need to be a priority and simplifies the solution
>> > for user state caching down to one token.
>> >
>> > Providing cache tokens upfront does require the Runner to know what
>> > "version" of everything it may supply to the SDK upfront (instead of on
>> > request) which would mean that the Runner may need to have a mapping
>> > from cache token to internal version identifier for things like side
>> > inputs which are typically broadcast. The Runner would also need to
>> poll
>> > to see if the side input has changed in the background to not block
>> > processing bundles with "stale" side input data.
>> >
>> > Ping me once you have the Runner PR updated and I'll take a look again.
>> >
>> > On Mon, Aug 26, 2019 at 12:20 PM Maximilian Michels > > > wrote:
>> >
>> > Thank you for the summary Luke. I really appreciate the effort you
>> put
>> > into this!
>> >
>> >  > Based upon your discussion you seem to want option #1
>> >
>> > I'm actually for option #2. The option to cache/invalidate side
>> inputs
>> > is important, and we should incorporate this in the design. That's
>> why
>> > option #1 is not flexible enough. However, a first implementation
>> could
>> > defer caching of side inputs.
>> >
>> > Option #3 was my initial thinking and the first version of the PR,
>> but I
>> > think we agreed that there wouldn't be much gain from keeping a
>> cache
>> > token per state id.
>> >
>> > Option #4 is what is specifically documented in the reference doc
>> and
>> > already part of the Proto, where valid tokens are provided for each
>> new
>> > bundle and also as part of the response of a get/put/clear. We
>> mentioned
>> > that the reply does not have to be waited on 

Re: Write-through-cache in State logic

2019-08-27 Thread Maximilian Michels
We would have to differentiate cache tokens for user state and side 
inputs. How about something like this?


message ProcessBundleRequest {
  // (Required) A reference to the process bundle descriptor that must be 
// instantiated and executed by the SDK harness. string process_bundle_descriptor_reference =1;


  message CacheToken {

message UserState {
}

message SideInputState {
  string side_input_id =1;
}

oneof type {
  UserState user_state =1;
  SideInputState side_input_state =2;
}

bytes token =10;
  }

  // (Optional) A list of cache tokens that can be used by an SDK to reuse 
// cached data returned by the State API across multiple bundles. repeated CacheToken cache_tokens =2;

}

-Max

On 27.08.19 18:43, Lukasz Cwik wrote:
The bundles view of side inputs should never change during processing 
and should have a point in time snapshot.


I was just trying to say that the cache token for side inputs being 
deferred till side input request time simplified the runners 
implementation since that is conclusively when the runner would need 
to take a look at the side input. Putting them as part of the 
ProcesBundleRequest complicates that but does make the SDK 
implementation significantly simpler which is a win.


On Tue, Aug 27, 2019 at 9:14 AM Maximilian Michels > wrote:


Thanks for the quick response.

Just to clarify, the issue with versioning side input is also present
when supplying the cache tokens on a request basis instead of per
bundle. The SDK never knows when the Runner receives a new version of
the side input. Like you pointed out, it needs to mark side inputs as
stale and generate new cache tokens for the stale side inputs.

The difference between per-request tokens and per-bundle tokens
would be
that the side input can only change after a bundle completes vs.
during
the bundle. Side inputs are always fuzzy in that regard because
there is
no precise instance where side inputs are atomically updated,
other than
the assumption that they eventually will be updated. In that regard
per-bundle tokens for side input seem to be fine.

All of the above is not an issue for user state, as its cache can
remain
valid for the lifetime of a Runner<=>SDK Harness connection. A simple
solution would be to not cache side input because there are many
cases
where the caching just adds additional overhead. However, I can also
imagine cases where side input is valid forever and caching would be
very beneficial.

For the first version I want to focus on user state because that's
where
I see the most benefit for caching. I don't see a problem though
for the
Runner to detect new side input and reflect that in the cache tokens
supplied for a new bundle.

-Max

On 26.08.19 22:27, Lukasz Cwik wrote:
> Your summary below makes sense to me. I can see that recovery from
> rolling back doesn't need to be a priority and simplifies the
solution
> for user state caching down to one token.
>
> Providing cache tokens upfront does require the Runner to know what
> "version" of everything it may supply to the SDK upfront
(instead of on
> request) which would mean that the Runner may need to have a
mapping
> from cache token to internal version identifier for things like
side
> inputs which are typically broadcast. The Runner would also need
to poll
> to see if the side input has changed in the background to not block
> processing bundles with "stale" side input data.
>
> Ping me once you have the Runner PR updated and I'll take a look
again.
>
> On Mon, Aug 26, 2019 at 12:20 PM Maximilian Michels
mailto:m...@apache.org>
> >> wrote:
>
>     Thank you for the summary Luke. I really appreciate the
effort you put
>     into this!
>
>      > Based upon your discussion you seem to want option #1
>
>     I'm actually for option #2. The option to cache/invalidate
side inputs
>     is important, and we should incorporate this in the design.
That's why
>     option #1 is not flexible enough. However, a first
implementation could
>     defer caching of side inputs.
>
>     Option #3 was my initial thinking and the first version of
the PR, but I
>     think we agreed that there wouldn't be much gain from
keeping a cache
>     token per state id.
>
>     Option #4 is what is specifically documented in the
reference doc and
>     already part of the Proto, where valid tokens are provided
for each new
>     bundle and also as part of the response of a get/put/clear.
We mentioned
>     that the reply does not have to be waited on synchronously
(I mentioned
>     it even), but it complicates the 

Re: Write-through-cache in State logic

2019-08-27 Thread Lukasz Cwik
The bundles view of side inputs should never change during processing and
should have a point in time snapshot.

I was just trying to say that the cache token for side inputs being
deferred till side input request time simplified the runners implementation
since that is conclusively when the runner would need to take a look at the
side input. Putting them as part of the ProcesBundleRequest complicates
that but does make the SDK implementation significantly simpler which is a
win.

On Tue, Aug 27, 2019 at 9:14 AM Maximilian Michels  wrote:

> Thanks for the quick response.
>
> Just to clarify, the issue with versioning side input is also present
> when supplying the cache tokens on a request basis instead of per
> bundle. The SDK never knows when the Runner receives a new version of
> the side input. Like you pointed out, it needs to mark side inputs as
> stale and generate new cache tokens for the stale side inputs.
>
> The difference between per-request tokens and per-bundle tokens would be
> that the side input can only change after a bundle completes vs. during
> the bundle. Side inputs are always fuzzy in that regard because there is
> no precise instance where side inputs are atomically updated, other than
> the assumption that they eventually will be updated. In that regard
> per-bundle tokens for side input seem to be fine.
>
> All of the above is not an issue for user state, as its cache can remain
> valid for the lifetime of a Runner<=>SDK Harness connection. A simple
> solution would be to not cache side input because there are many cases
> where the caching just adds additional overhead. However, I can also
> imagine cases where side input is valid forever and caching would be
> very beneficial.
>
> For the first version I want to focus on user state because that's where
> I see the most benefit for caching. I don't see a problem though for the
> Runner to detect new side input and reflect that in the cache tokens
> supplied for a new bundle.
>
> -Max
>
> On 26.08.19 22:27, Lukasz Cwik wrote:
> > Your summary below makes sense to me. I can see that recovery from
> > rolling back doesn't need to be a priority and simplifies the solution
> > for user state caching down to one token.
> >
> > Providing cache tokens upfront does require the Runner to know what
> > "version" of everything it may supply to the SDK upfront (instead of on
> > request) which would mean that the Runner may need to have a mapping
> > from cache token to internal version identifier for things like side
> > inputs which are typically broadcast. The Runner would also need to poll
> > to see if the side input has changed in the background to not block
> > processing bundles with "stale" side input data.
> >
> > Ping me once you have the Runner PR updated and I'll take a look again.
> >
> > On Mon, Aug 26, 2019 at 12:20 PM Maximilian Michels  > > wrote:
> >
> > Thank you for the summary Luke. I really appreciate the effort you
> put
> > into this!
> >
> >  > Based upon your discussion you seem to want option #1
> >
> > I'm actually for option #2. The option to cache/invalidate side
> inputs
> > is important, and we should incorporate this in the design. That's
> why
> > option #1 is not flexible enough. However, a first implementation
> could
> > defer caching of side inputs.
> >
> > Option #3 was my initial thinking and the first version of the PR,
> but I
> > think we agreed that there wouldn't be much gain from keeping a cache
> > token per state id.
> >
> > Option #4 is what is specifically documented in the reference doc and
> > already part of the Proto, where valid tokens are provided for each
> new
> > bundle and also as part of the response of a get/put/clear. We
> mentioned
> > that the reply does not have to be waited on synchronously (I
> mentioned
> > it even), but it complicates the implementation. The idea Thomas and
> I
> > expressed was that a response is not even necessary if we assume
> > validity of the upfront provided cache tokens for the lifetime of a
> > bundle and that cache tokens will be invalidated as soon as the
> Runner
> > fails in any way. This is naturally the case for Flink because it
> will
> > simply "forget" its current cache tokens.
> >
> > I currently envision the following schema:
> >
> > Runner
> > ==
> >
> > - Runner generates a globally unique cache token, one for user state
> and
> > one for each side input
> >
> > - The token is supplied to the SDK Harness for each bundle request
> >
> > - For the lifetime of a Runner<=>SDK Harness connection this cache
> token
> > will not change
> > - Runner will generate a new token if the connection/key space
> changes
> > between Runner and SDK Harness
> >
> >
> > SDK
> > ===
> >
> > - For each bundle the SDK worker stores the list of valid cache
> tokens
> > - The SDK Harness keep a 

Re: Write-through-cache in State logic

2019-08-27 Thread Maximilian Michels

Thanks for the quick response.

Just to clarify, the issue with versioning side input is also present 
when supplying the cache tokens on a request basis instead of per 
bundle. The SDK never knows when the Runner receives a new version of 
the side input. Like you pointed out, it needs to mark side inputs as 
stale and generate new cache tokens for the stale side inputs.


The difference between per-request tokens and per-bundle tokens would be 
that the side input can only change after a bundle completes vs. during 
the bundle. Side inputs are always fuzzy in that regard because there is 
no precise instance where side inputs are atomically updated, other than 
the assumption that they eventually will be updated. In that regard 
per-bundle tokens for side input seem to be fine.


All of the above is not an issue for user state, as its cache can remain 
valid for the lifetime of a Runner<=>SDK Harness connection. A simple 
solution would be to not cache side input because there are many cases 
where the caching just adds additional overhead. However, I can also 
imagine cases where side input is valid forever and caching would be 
very beneficial.


For the first version I want to focus on user state because that's where 
I see the most benefit for caching. I don't see a problem though for the 
Runner to detect new side input and reflect that in the cache tokens 
supplied for a new bundle.


-Max

On 26.08.19 22:27, Lukasz Cwik wrote:
Your summary below makes sense to me. I can see that recovery from 
rolling back doesn't need to be a priority and simplifies the solution 
for user state caching down to one token.


Providing cache tokens upfront does require the Runner to know what 
"version" of everything it may supply to the SDK upfront (instead of on 
request) which would mean that the Runner may need to have a mapping 
from cache token to internal version identifier for things like side 
inputs which are typically broadcast. The Runner would also need to poll 
to see if the side input has changed in the background to not block 
processing bundles with "stale" side input data.


Ping me once you have the Runner PR updated and I'll take a look again.

On Mon, Aug 26, 2019 at 12:20 PM Maximilian Michels > wrote:


Thank you for the summary Luke. I really appreciate the effort you put
into this!

 > Based upon your discussion you seem to want option #1

I'm actually for option #2. The option to cache/invalidate side inputs
is important, and we should incorporate this in the design. That's why
option #1 is not flexible enough. However, a first implementation could
defer caching of side inputs.

Option #3 was my initial thinking and the first version of the PR, but I
think we agreed that there wouldn't be much gain from keeping a cache
token per state id.

Option #4 is what is specifically documented in the reference doc and
already part of the Proto, where valid tokens are provided for each new
bundle and also as part of the response of a get/put/clear. We mentioned
that the reply does not have to be waited on synchronously (I mentioned
it even), but it complicates the implementation. The idea Thomas and I
expressed was that a response is not even necessary if we assume
validity of the upfront provided cache tokens for the lifetime of a
bundle and that cache tokens will be invalidated as soon as the Runner
fails in any way. This is naturally the case for Flink because it will
simply "forget" its current cache tokens.

I currently envision the following schema:

Runner
==

- Runner generates a globally unique cache token, one for user state and
one for each side input

- The token is supplied to the SDK Harness for each bundle request 


- For the lifetime of a Runner<=>SDK Harness connection this cache token
will not change
- Runner will generate a new token if the connection/key space changes
between Runner and SDK Harness


SDK
===

- For each bundle the SDK worker stores the list of valid cache tokens
- The SDK Harness keep a global cache across all its (local) workers
which is a LRU cache: state_key => (cache_token, value)
- get: Lookup cache using the valid cache token for the state. If no
match, then fetch from Runner and use the already available token for
caching
- put: Put value in cache with a valid cache token, put value to pending
writes which will be flushed out latest when the bundle ends
- clear: same as put but clear cache

It does look like this is not too far off from what you were describing.
The main difference is that we just work with a single cache token. In
my opinion we do not need the second cache token for writes, as long as
we ensure that we generate a new cache token if the
bundle/checkpoint fails.

I have a draft PR
   for the Runner: 

Re: Write-through-cache in State logic

2019-08-26 Thread Lukasz Cwik
Your summary below makes sense to me. I can see that recovery from rolling
back doesn't need to be a priority and simplifies the solution for user
state caching down to one token.

Providing cache tokens upfront does require the Runner to know what
"version" of everything it may supply to the SDK upfront (instead of on
request) which would mean that the Runner may need to have a mapping from
cache token to internal version identifier for things like side inputs
which are typically broadcast. The Runner would also need to poll to see if
the side input has changed in the background to not block processing
bundles with "stale" side input data.

Ping me once you have the Runner PR updated and I'll take a look again.

On Mon, Aug 26, 2019 at 12:20 PM Maximilian Michels  wrote:

> Thank you for the summary Luke. I really appreciate the effort you put
> into this!
>
> > Based upon your discussion you seem to want option #1
>
> I'm actually for option #2. The option to cache/invalidate side inputs
> is important, and we should incorporate this in the design. That's why
> option #1 is not flexible enough. However, a first implementation could
> defer caching of side inputs.
>
> Option #3 was my initial thinking and the first version of the PR, but I
> think we agreed that there wouldn't be much gain from keeping a cache
> token per state id.
>
> Option #4 is what is specifically documented in the reference doc and
> already part of the Proto, where valid tokens are provided for each new
> bundle and also as part of the response of a get/put/clear. We mentioned
> that the reply does not have to be waited on synchronously (I mentioned
> it even), but it complicates the implementation. The idea Thomas and I
> expressed was that a response is not even necessary if we assume
> validity of the upfront provided cache tokens for the lifetime of a
> bundle and that cache tokens will be invalidated as soon as the Runner
> fails in any way. This is naturally the case for Flink because it will
> simply "forget" its current cache tokens.
>
> I currently envision the following schema:
>
> Runner
> ==
>
> - Runner generates a globally unique cache token, one for user state and
> one for each side input

- The token is supplied to the SDK Harness for each bundle request

- For the lifetime of a Runner<=>SDK Harness connection this cache token
> will not change
> - Runner will generate a new token if the connection/key space changes
> between Runner and SDK Harness


> SDK
> ===
>
> - For each bundle the SDK worker stores the list of valid cache tokens
> - The SDK Harness keep a global cache across all its (local) workers
> which is a LRU cache: state_key => (cache_token, value)
> - get: Lookup cache using the valid cache token for the state. If no
> match, then fetch from Runner and use the already available token for
> caching
> - put: Put value in cache with a valid cache token, put value to pending
> writes which will be flushed out latest when the bundle ends
> - clear: same as put but clear cache
>
> It does look like this is not too far off from what you were describing.
> The main difference is that we just work with a single cache token. In
> my opinion we do not need the second cache token for writes, as long as
> we ensure that we generate a new cache token if the bundle/checkpoint
> fails.
>
> I have a draft PR
>   for the Runner: https://github.com/apache/beam/pull/9374
>   for the SDK: https://github.com/apache/beam/pull/9418
>
> Note that the Runner PR needs to be updated to fully reflected the above
> scheme. The SDK implementation is WIP. I want to make sure that we
> clarify the design before this gets finalized.
>
> Thanks again for all your comments. Much appreciated!
>
> Cheers,
> Max
>
> On 26.08.19 19:58, Lukasz Cwik wrote:
> > There were originally a couple of ideas around how caching could work:
> > 1) One cache token for the entire bundle that is supplied up front. The
> > SDK caches everything using the given token. All reads/clear/append for
> > all types of state happen under this token. Anytime a side input
> > changes, key processing partition range changes or a bundle fails to
> > process, the runner chooses a new cache token effectively invalidating
> > everything in the past>
> > 2) One cache token per type of state that is supplied up front.
> > The SDK caches all requests for a given type using the given cache
> > token. The runner can selectively choose which type to keep and which to
> > invalidate. Bundle failure and key processing partition changes
> > invalidate all user state, side input change invalidates all side inputs.
> >
> > 3) One cache token per state id that is supplied up front.
> > The SDK caches all requests for the given state id using the given cache
> > token. The runner can selectively choose which to invalidate and which
> > to keep. Bundle failure and key processing partition changes invalidate
> > all user state, side input changes only invalidate the side input that

Re: Write-through-cache in State logic

2019-08-26 Thread Maximilian Michels
Thank you for the summary Luke. I really appreciate the effort you put
into this!

> Based upon your discussion you seem to want option #1

I'm actually for option #2. The option to cache/invalidate side inputs
is important, and we should incorporate this in the design. That's why
option #1 is not flexible enough. However, a first implementation could
defer caching of side inputs.

Option #3 was my initial thinking and the first version of the PR, but I
think we agreed that there wouldn't be much gain from keeping a cache
token per state id.

Option #4 is what is specifically documented in the reference doc and
already part of the Proto, where valid tokens are provided for each new
bundle and also as part of the response of a get/put/clear. We mentioned
that the reply does not have to be waited on synchronously (I mentioned
it even), but it complicates the implementation. The idea Thomas and I
expressed was that a response is not even necessary if we assume
validity of the upfront provided cache tokens for the lifetime of a
bundle and that cache tokens will be invalidated as soon as the Runner
fails in any way. This is naturally the case for Flink because it will
simply "forget" its current cache tokens.

I currently envision the following schema:

Runner
==

- Runner generates a globally unique cache token, one for user state and
one for each side input
- The token is supplied to the SDK Harness for each bundle request
- For the lifetime of a Runner<=>SDK Harness connection this cache token
will not change
- Runner will generate a new token if the connection/key space changes
between Runner and SDK Harness

SDK
===

- For each bundle the SDK worker stores the list of valid cache tokens
- The SDK Harness keep a global cache across all its (local) workers
which is a LRU cache: state_key => (cache_token, value)
- get: Lookup cache using the valid cache token for the state. If no
match, then fetch from Runner and use the already available token for
caching
- put: Put value in cache with a valid cache token, put value to pending
writes which will be flushed out latest when the bundle ends
- clear: same as put but clear cache

It does look like this is not too far off from what you were describing.
The main difference is that we just work with a single cache token. In
my opinion we do not need the second cache token for writes, as long as
we ensure that we generate a new cache token if the bundle/checkpoint fails.

I have a draft PR
  for the Runner: https://github.com/apache/beam/pull/9374
  for the SDK: https://github.com/apache/beam/pull/9418

Note that the Runner PR needs to be updated to fully reflected the above
scheme. The SDK implementation is WIP. I want to make sure that we
clarify the design before this gets finalized.

Thanks again for all your comments. Much appreciated!

Cheers,
Max

On 26.08.19 19:58, Lukasz Cwik wrote:
> There were originally a couple of ideas around how caching could work:
> 1) One cache token for the entire bundle that is supplied up front. The
> SDK caches everything using the given token. All reads/clear/append for
> all types of state happen under this token. Anytime a side input
> changes, key processing partition range changes or a bundle fails to
> process, the runner chooses a new cache token effectively invalidating
> everything in the past>
> 2) One cache token per type of state that is supplied up front.
> The SDK caches all requests for a given type using the given cache
> token. The runner can selectively choose which type to keep and which to
> invalidate. Bundle failure and key processing partition changes
> invalidate all user state, side input change invalidates all side inputs.
> 
> 3) One cache token per state id that is supplied up front.
> The SDK caches all requests for the given state id using the given cache
> token. The runner can selectively choose which to invalidate and which
> to keep. Bundle failure and key processing partition changes invalidate
> all user state, side input changes only invalidate the side input that
> changed.
> 
> 4) A cache token on each read/clear/append that is supplied on the
> response of the call with an initial valid set that is supplied at
> start. The runner can selectively choose which to keep on start. Bundle
> failure allows runners to "roll back" to a known good state by selecting
> the previous valid cache token as part of the initial set. Key
> processing partition changes allow runners to keep cached state that
> hasn't changed since it can be tied to a version number of the state
> itself as part of the initial set. Side input changes only invalidate
> the side input that changed.
> 
> Based upon your discussion you seem to want option #1 which doesn't work
> well with side inputs clearing cached state. If we want to have user
> state survive a changing side input, we would want one of the other
> options. I do agree that supplying the cache token upfront is
> significantly simpler. Currently the protos are 

Re: Write-through-cache in State logic

2019-08-26 Thread Lukasz Cwik
There were originally a couple of ideas around how caching could work:
1) One cache token for the entire bundle that is supplied up front. The SDK
caches everything using the given token. All reads/clear/append for all
types of state happen under this token. Anytime a side input changes, key
processing partition range changes or a bundle fails to process, the runner
chooses a new cache token effectively invalidating everything in the past.

2) One cache token per type of state that is supplied up front.
The SDK caches all requests for a given type using the given cache token.
The runner can selectively choose which type to keep and which to
invalidate. Bundle failure and key processing partition changes invalidate
all user state, side input change invalidates all side inputs.

3) One cache token per state id that is supplied up front.
The SDK caches all requests for the given state id using the given cache
token. The runner can selectively choose which to invalidate and which to
keep. Bundle failure and key processing partition changes invalidate all
user state, side input changes only invalidate the side input that changed.

4) A cache token on each read/clear/append that is supplied on the response
of the call with an initial valid set that is supplied at start. The runner
can selectively choose which to keep on start. Bundle failure allows
runners to "roll back" to a known good state by selecting the previous
valid cache token as part of the initial set. Key processing partition
changes allow runners to keep cached state that hasn't changed since it can
be tied to a version number of the state itself as part of the initial set.
Side input changes only invalidate the side input that changed.

Based upon your discussion you seem to want option #1 which doesn't work
well with side inputs clearing cached state. If we want to have user state
survive a changing side input, we would want one of the other options. I do
agree that supplying the cache token upfront is significantly simpler.
Currently the protos are setup for #4 since it was the most flexible and at
the time the pros outweighed the cons.

I don't understand why you think you need to wait for a response for the
append/clear to get its cache token since the only reason you need the
cache token is that you want to use that cached data when processing a
different bundle. I was thinking that the flow on the SDK side would be
something like (assuming there is a global cache of cache token -> (map of
state key -> data))
1) Create a local cache of (map of state key -> data) using the initial set
of valid cache tokens
2) Make all mutations in place on local cache without waiting for response.
3) When response comes back, update global cache with new cache token ->
(map of state key -> data)) (this is when the data becomes visible to other
bundles that start processing)
4) Before the bundle finishes processing, wait for all outstanding state
calls to finish.

To implement caching on the runner side, you would keep track of at most 2
cache tokens per state key, one cache token represents the initial value
when the bundle started while the second represents the modified state. If
the bundle succeeds the runner passes in the set of tokens which represent
the new state, if the bundle fails you process using the original ones.

After thinking through the implementation again, we could supply two cache
tokens for each state id, the first being the set of initial tokens if no
writes happen while the second represents the token to use if the SDK
changes the state. This gives us the simplification where we don't need to
wait for the response before we update the global cache making a typical
blocking cache much easier to do. We also get the benefit that runners can
supply either the same cache token for a state id or different ones. If the
runner supplies the same one then its telling the SDK to make modifications
in place without any rollback (which is good on memory since we are
reducing copies of stuff) or if the runner supplies two different ones then
its telling the SDK to keep the old data around. If we went through with
this new option the SDK side logic would be (assuming there is a global
cache of cache token -> (map of state key -> data)):

1) Create an empty local set of state ids that are dirty when starting a
new bundle (dirty set)

For reads/gets:
2A) If the request is a read (get), use dirty set to choose which cache
token to lookup and use in the global cache. If the global cache is missing
data issue the appropriate request providing the result.

For writes/appends/clear:
2B) if the cache tokens are different for the state id, add the state id to
the dirty set if it isn't there and perform the appropriate modification to
convert the old cached state data to the new state data
3B) modify the global caches data
4B) issue the request to the runner
5B*) add this request to the set of requests to block on before completing
the bundle.

(* Note, there 

Re: Write-through-cache in State logic

2019-08-22 Thread Maximilian Michels
Just to give a quick update here. Rakesh, Thomas, and I had a discussion
about async writes from the Python SDK to the Runner. Robert was also
present for some parts of the discussion.

We concluded that blocking writes with the need to refresh the cache
token each time are not going to provide enough throughput/latency.

We figured that it will be enough to use a single cache token per
Runner<=>SDK Harness connection. This cache token will be provided by
the Runner in the ProcessBundleRequest. Writes will not yield a new
cache token. The advantage is that we can use one cache token for the
life time of the bundle and also across bundles, unless the Runner
switches to a new Runner<=>SDK Harness connection; then the Runner would
have to generate a new cache token.

We might require additional cache tokens for the side inputs. For now,
I'm planning to only tackle user state which seems to be the area where
users have expressed the most need for caching.

-Max

On 21.08.19 20:05, Maximilian Michels wrote:
>> There is probably a misunderstanding here: I'm suggesting to use a worker ID 
>> instead of cache tokens, not additionally.
> 
> Ah! Misread that. We need a changing token to indicate that the cache is
> stale, e.g. checkpoint has failed / restoring from an old checkpoint. If
> the _Runner_ generates a new unique token/id for workers which outlast
> the Runner, then this should work fine. I don't think it is safe for the
> worker to supply the id. The Runner should be in control of cache tokens
> to avoid invalid tokens.
> 
>> In the PR the token is modified as part of updating the state. Doesn't the 
>> SDK need the new token to update it's cache entry also? That's where it 
>> would help the SDK to know the new token upfront.
> 
> If the state is updated in the Runner, a new token has to be generated.
> The old one is not valid anymore. The SDK will use the updated token to
> store the new value in the cache. I understand that it would be nice to
> know the token upfront. That could be possible with some token
> generation scheme. On the other hand, writes can be asynchronous and
> thus not block the UDF.
> 
>> But I believe there is no need to change the token in first place, unless 
>> bundles for the same key (ranges) can be processed by different workers.
> 
> That's certainly possible, e.g. two workers A and B take turn processing
> a certain key range, one bundle after another:
> 
> You process a bundle with a token T with A, then worker B takes over.
> Both have an entry with cache token T. So B goes on to modify the state
> and uses the same cache token T. Then A takes over again. A would have a
> stale cache entry but T would still be a valid cache token.
> 
>> Indeed the fact that Dataflow can dynamically split and merge these ranges 
>> is what makes it trickier. If Flink does not repartition the ranges, then 
>> things are much easier.
> 
> Flink does not dynamically repartition key ranges (yet). If it started
> to support that, we would invalidate the cache tokens for the changed
> partitions.
> 
> 
> I'd suggest the following cache token generation scheme:
> 
> One cache token per key range for user state and one cache token for
> each side input. On writes to user state or changing side input, the
> associated cache token will be renewed.
> 
> On the SDK side, it should be sufficient to let the SDK re-associate all
> its cached data belonging to a valid cache token with a new cache token
> returned by a successful write. This has to happen in the active scope
> (i.e. user state, or a particular side input).
> 
> If the key range changes, new cache tokens have to generated. This
> should happen automatically because the Runner does not checkpoint cache
> tokens and will generate new ones when it restarts from an earlier
> checkpoint.
> 
> The current PR needs to be changed to (1) only keep a single cache token
> per user state and key range (2) add support for cache tokens for each
> side input.
> 
> Hope that makes sense.
> 
> -Max
> 
> On 21.08.19 17:27, Reuven Lax wrote:
>>
>>
>> On Wed, Aug 21, 2019 at 2:16 AM Maximilian Michels > > wrote:
>>
>> Appreciate all your comments! Replying below.
>>
>>
>> @Luke:
>>
>> > Having cache tokens per key would be very expensive indeed and I
>> believe we should go with a single cache token "per" bundle.
>>
>> Thanks for your comments on the PR. I was thinking to propose something
>> along this lines of having cache tokens valid for a particular
>> checkpointing "epoch". That would require even less token renewal than
>> the per-bundle approach.
>>
>>
>> @Thomas, thanks for the input. Some remarks:
>>
>> > Wouldn't it be simpler to have the runner just track a unique ID
>> for each worker and use that to communicate if the cache is valid or
>> not?
>>
>> We do not need a unique id per worker. If a cache token is valid for a
>> particular worker, it is also valid for 

Re: Write-through-cache in State logic

2019-08-21 Thread Maximilian Michels
> There is probably a misunderstanding here: I'm suggesting to use a worker ID 
> instead of cache tokens, not additionally.

Ah! Misread that. We need a changing token to indicate that the cache is
stale, e.g. checkpoint has failed / restoring from an old checkpoint. If
the _Runner_ generates a new unique token/id for workers which outlast
the Runner, then this should work fine. I don't think it is safe for the
worker to supply the id. The Runner should be in control of cache tokens
to avoid invalid tokens.

> In the PR the token is modified as part of updating the state. Doesn't the 
> SDK need the new token to update it's cache entry also? That's where it would 
> help the SDK to know the new token upfront.

If the state is updated in the Runner, a new token has to be generated.
The old one is not valid anymore. The SDK will use the updated token to
store the new value in the cache. I understand that it would be nice to
know the token upfront. That could be possible with some token
generation scheme. On the other hand, writes can be asynchronous and
thus not block the UDF.

> But I believe there is no need to change the token in first place, unless 
> bundles for the same key (ranges) can be processed by different workers.

That's certainly possible, e.g. two workers A and B take turn processing
a certain key range, one bundle after another:

You process a bundle with a token T with A, then worker B takes over.
Both have an entry with cache token T. So B goes on to modify the state
and uses the same cache token T. Then A takes over again. A would have a
stale cache entry but T would still be a valid cache token.

> Indeed the fact that Dataflow can dynamically split and merge these ranges is 
> what makes it trickier. If Flink does not repartition the ranges, then things 
> are much easier.

Flink does not dynamically repartition key ranges (yet). If it started
to support that, we would invalidate the cache tokens for the changed
partitions.


I'd suggest the following cache token generation scheme:

One cache token per key range for user state and one cache token for
each side input. On writes to user state or changing side input, the
associated cache token will be renewed.

On the SDK side, it should be sufficient to let the SDK re-associate all
its cached data belonging to a valid cache token with a new cache token
returned by a successful write. This has to happen in the active scope
(i.e. user state, or a particular side input).

If the key range changes, new cache tokens have to generated. This
should happen automatically because the Runner does not checkpoint cache
tokens and will generate new ones when it restarts from an earlier
checkpoint.

The current PR needs to be changed to (1) only keep a single cache token
per user state and key range (2) add support for cache tokens for each
side input.

Hope that makes sense.

-Max

On 21.08.19 17:27, Reuven Lax wrote:
> 
> 
> On Wed, Aug 21, 2019 at 2:16 AM Maximilian Michels  > wrote:
> 
> Appreciate all your comments! Replying below.
> 
> 
> @Luke:
> 
> > Having cache tokens per key would be very expensive indeed and I
> believe we should go with a single cache token "per" bundle.
> 
> Thanks for your comments on the PR. I was thinking to propose something
> along this lines of having cache tokens valid for a particular
> checkpointing "epoch". That would require even less token renewal than
> the per-bundle approach.
> 
> 
> @Thomas, thanks for the input. Some remarks:
> 
> > Wouldn't it be simpler to have the runner just track a unique ID
> for each worker and use that to communicate if the cache is valid or
> not?
> 
> We do not need a unique id per worker. If a cache token is valid for a
> particular worker, it is also valid for another worker. That is with the
> assumption that key ranges are always disjoint between the workers.
> 
> > * When the bundle is started, the runner tells the worker if the
> cache has become invalid (since it knows if another worker has
> mutated state)
> 
> This is simply done by not transferring the particular cache token. No
> need to declare it invalid explicitly.
> 
> > * When the worker sends mutation requests to the runner, it
> includes its own ID (or the runner already has it as contextual
> information). No need to wait for a response.
> 
> Mutations of cached values can be freely done as long as the cache token
> associated with the state is valid for a particular bundle. Only the
> first time, the Runner needs to wait on the response to store the cache
> token. This can also be done asynchronously.
> 
> > * When the bundle is finished, the runner records the last writer
> (only if a change occurred)
> 
> I believe this is not necessary because there will only be one writer at
> a time for a particular bundle and key range, hence only one writer
> holds a 

Re: Write-through-cache in State logic

2019-08-21 Thread Maximilian Michels
> There is probably a misunderstanding here: I'm suggesting to use a
worker ID instead of cache tokens, not additionally.

Ah! Misread that. We need a changing token to indicate that the cache is
stale, e.g. checkpoint has failed / restoring from an old checkpoint. If
the _Runner_ generates a new unique token/id for workers which outlast
the Runner, then this should work fine. I don't think it is safe for the
worker to supply the id. The Runner should be in control of cache tokens
to avoid invalid tokens.

> In the PR the token is modified as part of updating the state. Doesn't
the SDK need the new token to update it's cache entry also? That's where
it would help the SDK to know the new token upfront.

If the state is updated in the Runner, a new token has to be generated.
The old one is not valid anymore. The SDK will use the updated token to
store the new value in the cache. I understand that it would be nice to
know the token upfront. That could be possible with some token
generation scheme. On the other hand, writes can be asynchronous and
thus not block the UDF.

> But I believe there is no need to change the token in first place,
unless bundles for the same key (ranges) can be processed by different
workers.

That's certainly possible, e.g. two workers A and B take turn processing
a certain key range, one bundle after another:

You process a bundle with a token T with A, then worker B takes over.
Both have an entry with cache token T. So B goes on to modify the state
and uses the same cache token T. Then A takes over again. A would have a
stale cache entry but T would still be a valid cache token.

> Indeed the fact that Dataflow can dynamically split and merge these
ranges is what makes it trickier. If Flink does not repartition the
ranges, then things are much easier.

Flink does not dynamically repartition key ranges (yet). If it started
to support that, we would invalidate the cache tokens for the changed
partitions.


I'd suggest the following cache token generation scheme:

One cache token per key range for user state and one cache token for
each side input. On writes to user state or changing side input, the
associated cache token will be renewed.

On the SDK side, it should be sufficient to let the SDK re-associate all
its cached data belonging to a valid cache token with a new cache token
returned by a successful write. This has to happen in the active scope
(i.e. user state, or a particular side input).

If the key range changes, new cache tokens have to generated. This
should happen automatically because the Runner does not checkpoint cache
tokens and will generate new ones when it restarts from an earlier
checkpoint.

The current PR needs to be changed to (1) only keep a single cache token
per user state and key range (2) add support for cache tokens for each
side input.

Hope that makes sense.

-Max

On 21.08.19 17:27, Reuven Lax wrote:
> 
> 
> On Wed, Aug 21, 2019 at 2:16 AM Maximilian Michels  > wrote:
> 
> Appreciate all your comments! Replying below.
> 
> 
> @Luke:
> 
> > Having cache tokens per key would be very expensive indeed and I
> believe we should go with a single cache token "per" bundle.
> 
> Thanks for your comments on the PR. I was thinking to propose something
> along this lines of having cache tokens valid for a particular
> checkpointing "epoch". That would require even less token renewal than
> the per-bundle approach.
> 
> 
> @Thomas, thanks for the input. Some remarks:
> 
> > Wouldn't it be simpler to have the runner just track a unique ID
> for each worker and use that to communicate if the cache is valid or
> not?
> 
> We do not need a unique id per worker. If a cache token is valid for a
> particular worker, it is also valid for another worker. That is with the
> assumption that key ranges are always disjoint between the workers.
> 
> > * When the bundle is started, the runner tells the worker if the
> cache has become invalid (since it knows if another worker has
> mutated state)
> 
> This is simply done by not transferring the particular cache token. No
> need to declare it invalid explicitly.
> 
> > * When the worker sends mutation requests to the runner, it
> includes its own ID (or the runner already has it as contextual
> information). No need to wait for a response.
> 
> Mutations of cached values can be freely done as long as the cache token
> associated with the state is valid for a particular bundle. Only the
> first time, the Runner needs to wait on the response to store the cache
> token. This can also be done asynchronously.
> 
> > * When the bundle is finished, the runner records the last writer
> (only if a change occurred)
> 
> I believe this is not necessary because there will only be one writer at
> a time for a particular bundle and key range, hence only one writer
> holds a valid cache token 

Re: Write-through-cache in State logic

2019-08-21 Thread Reuven Lax
On Wed, Aug 21, 2019 at 2:16 AM Maximilian Michels  wrote:

> Appreciate all your comments! Replying below.
>
>
> @Luke:
>
> > Having cache tokens per key would be very expensive indeed and I believe
> we should go with a single cache token "per" bundle.
>
> Thanks for your comments on the PR. I was thinking to propose something
> along this lines of having cache tokens valid for a particular
> checkpointing "epoch". That would require even less token renewal than
> the per-bundle approach.
>
>
> @Thomas, thanks for the input. Some remarks:
>
> > Wouldn't it be simpler to have the runner just track a unique ID for
> each worker and use that to communicate if the cache is valid or not?
>
> We do not need a unique id per worker. If a cache token is valid for a
> particular worker, it is also valid for another worker. That is with the
> assumption that key ranges are always disjoint between the workers.
>
> > * When the bundle is started, the runner tells the worker if the cache
> has become invalid (since it knows if another worker has mutated state)
>
> This is simply done by not transferring the particular cache token. No
> need to declare it invalid explicitly.
>
> > * When the worker sends mutation requests to the runner, it includes its
> own ID (or the runner already has it as contextual information). No need to
> wait for a response.
>
> Mutations of cached values can be freely done as long as the cache token
> associated with the state is valid for a particular bundle. Only the
> first time, the Runner needs to wait on the response to store the cache
> token. This can also be done asynchronously.
>
> > * When the bundle is finished, the runner records the last writer (only
> if a change occurred)
>
> I believe this is not necessary because there will only be one writer at
> a time for a particular bundle and key range, hence only one writer
> holds a valid cache token for a particular state and key range.
>
>
> @Reuven:
>
> >  Dataflow divides the keyspace up into lexicographic ranges, and creates
> a cache token per range.
>
> State is always processed partitioned by the Flink workers (hash-based,
> not lexicopgrahical). I don't think that matters though because the key
> ranges do not overlap between the workers. Flink does not support
> dynamically repartitioning the key ranges. Even in case of fine-grained
> recovery of workers and their key ranges, we would simply generate new
> cache tokens for a particular worker.
>

Dataflow's ranges are also hash based. When I said lexicographical, I meant
lexicographical based on the hexadecimal hash value.

Indeed the fact that Dataflow can dynamically split and merge these ranges
is what makes it trickier. If Flink does not repartition the ranges, then
things are much easier.


>
> Thanks,
> Max
>
> On 21.08.19 09:33, Reuven Lax wrote:
> > Dataflow does something like this, however since work is
> > load balanced across workers a per-worker id doesn't work very well.
> > Dataflow divides the keyspace up into lexicographic ranges, and creates
> > a cache token per range.
> >
> > On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise  > > wrote:
> >
> > Commenting here vs. on the PR since related to the overall approach.
> >
> > Wouldn't it be simpler to have the runner just track a unique ID for
> > each worker and use that to communicate if the cache is valid or not?
> >
> > * When the bundle is started, the runner tells the worker if the
> > cache has become invalid (since it knows if another worker has
> > mutated state)
> > * When the worker sends mutation requests to the runner, it includes
> > its own ID (or the runner already has it as contextual information).
> > No need to wait for a response.
> > * When the bundle is finished, the runner records the last writer
> > (only if a change occurred)
> >
> > Whenever current worker ID and last writer ID doesn't match, cache
> > is invalid.
> >
> > Thomas
> >
> >
> > On Tue, Aug 20, 2019 at 11:42 AM Lukasz Cwik  > > wrote:
> >
> > Having cache tokens per key would be very expensive indeed and I
> > believe we should go with a single cache token "per" bundle.
> >
> > On Mon, Aug 19, 2019 at 11:36 AM Maximilian Michels
> > mailto:m...@apache.org>> wrote:
> >
> > Maybe a Beam Python expert can chime in for Rakesh's
> question?
> >
> > Luke, I was assuming cache tokens to be per key and state
> > id. During
> > implementing an initial support on the Runner side, I
> > realized that we
> > probably want cache tokens to only be per state id. Note
> > that if we had
> > per-key cache tokens, the number of cache tokens would
> > approach the
> > total number of keys in an application.
> >
> > If anyone wants to have a look, here is a first version of

Re: Write-through-cache in State logic

2019-08-21 Thread Thomas Weise
-->

On Wed, Aug 21, 2019, 2:16 AM Maximilian Michels  wrote:

> Appreciate all your comments! Replying below.
>
>
> @Luke:
>
> > Having cache tokens per key would be very expensive indeed and I believe
> we should go with a single cache token "per" bundle.
>
> Thanks for your comments on the PR. I was thinking to propose something
> along this lines of having cache tokens valid for a particular
> checkpointing "epoch". That would require even less token renewal than
> the per-bundle approach.
>
>
> @Thomas, thanks for the input. Some remarks:
>
> > Wouldn't it be simpler to have the runner just track a unique ID for
> each worker and use that to communicate if the cache is valid or not?
>
> We do not need a unique id per worker. If a cache token is valid for a
> particular worker, it is also valid for another worker. That is with the
> assumption that key ranges are always disjoint between the workers
>

There is probably a misunderstanding here: I'm suggesting to use a worker
ID instead of cache tokens, not additionally.


> > * When the bundle is started, the runner tells the worker if the cache
> has become invalid (since it knows if another worker has mutated state)
>
> This is simply done by not transferring the particular cache token. No
> need to declare it invalid explicitly.
>
> > * When the worker sends mutation requests to the runner, it includes its
> own ID (or the runner already has it as contextual information). No need to
> wait for a response.
>
> Mutations of cached values can be freely done as long as the cache token
> associated with the state is valid for a particular bundle. Only the
> first time, the Runner needs to wait on the response to store the cache
> token. This can also be done asynchronously.


In the PR the token is modified as part of updating the state. Doesn't the
SDK need the new token to update it's cache entry also?

That's where it would help the SDK to know the new token upfront. But I
believe there is no need to change the token in first place, unless bundles
for the same key (ranges) can be processed by different workers.



>
>
> > * When the bundle is finished, the runner records the last writer (only
> if a change occurred)
>
> I believe this is not necessary because there will only be one writer at
> a time for a particular bundle and key range, hence only one writer
> holds a valid cache token for a particular state and key range.
>
>
> @Reuven:
>
> >  Dataflow divides the keyspace up into lexicographic ranges, and creates
> a cache token per range.
>
> State is always processed partitioned by the Flink workers (hash-based,
> not lexicopgrahical). I don't think that matters though because the key
> ranges do not overlap between the workers. Flink does not support
> dynamically repartitioning the key ranges. Even in case of fine-grained
> recovery of workers and their key ranges, we would simply generate new
> cache tokens for a particular worker.
>
>
> Thanks,
> Max
>
> On 21.08.19 09:33, Reuven Lax wrote:
> > Dataflow does something like this, however since work is
> > load balanced across workers a per-worker id doesn't work very well.
> > Dataflow divides the keyspace up into lexicographic ranges, and creates
> > a cache token per range.
> >
> > On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise  > > wrote:
> >
> > Commenting here vs. on the PR since related to the overall approach.
> >
> > Wouldn't it be simpler to have the runner just track a unique ID for
> > each worker and use that to communicate if the cache is valid or not?
> >
> > * When the bundle is started, the runner tells the worker if the
> > cache has become invalid (since it knows if another worker has
> > mutated state)
> > * When the worker sends mutation requests to the runner, it includes
> > its own ID (or the runner already has it as contextual information).
> > No need to wait for a response.
> > * When the bundle is finished, the runner records the last writer
> > (only if a change occurred)
> >
> > Whenever current worker ID and last writer ID doesn't match, cache
> > is invalid.
> >
> > Thomas
> >
> >
> > On Tue, Aug 20, 2019 at 11:42 AM Lukasz Cwik  > > wrote:
> >
> > Having cache tokens per key would be very expensive indeed and I
> > believe we should go with a single cache token "per" bundle.
> >
> > On Mon, Aug 19, 2019 at 11:36 AM Maximilian Michels
> > mailto:m...@apache.org>> wrote:
> >
> > Maybe a Beam Python expert can chime in for Rakesh's
> question?
> >
> > Luke, I was assuming cache tokens to be per key and state
> > id. During
> > implementing an initial support on the Runner side, I
> > realized that we
> > probably want cache tokens to only be per state id. Note
> > that if we had
> > per-key cache tokens, the number of cache 

Re: Write-through-cache in State logic

2019-08-21 Thread Maximilian Michels
Appreciate all your comments! Replying below.


@Luke:

> Having cache tokens per key would be very expensive indeed and I believe we 
> should go with a single cache token "per" bundle.

Thanks for your comments on the PR. I was thinking to propose something
along this lines of having cache tokens valid for a particular
checkpointing "epoch". That would require even less token renewal than
the per-bundle approach.


@Thomas, thanks for the input. Some remarks:

> Wouldn't it be simpler to have the runner just track a unique ID for each 
> worker and use that to communicate if the cache is valid or not?

We do not need a unique id per worker. If a cache token is valid for a
particular worker, it is also valid for another worker. That is with the
assumption that key ranges are always disjoint between the workers.

> * When the bundle is started, the runner tells the worker if the cache has 
> become invalid (since it knows if another worker has mutated state)

This is simply done by not transferring the particular cache token. No
need to declare it invalid explicitly.

> * When the worker sends mutation requests to the runner, it includes its own 
> ID (or the runner already has it as contextual information). No need to wait 
> for a response.

Mutations of cached values can be freely done as long as the cache token
associated with the state is valid for a particular bundle. Only the
first time, the Runner needs to wait on the response to store the cache
token. This can also be done asynchronously.

> * When the bundle is finished, the runner records the last writer (only if a 
> change occurred)

I believe this is not necessary because there will only be one writer at
a time for a particular bundle and key range, hence only one writer
holds a valid cache token for a particular state and key range.


@Reuven:

>  Dataflow divides the keyspace up into lexicographic ranges, and creates a 
> cache token per range. 

State is always processed partitioned by the Flink workers (hash-based,
not lexicopgrahical). I don't think that matters though because the key
ranges do not overlap between the workers. Flink does not support
dynamically repartitioning the key ranges. Even in case of fine-grained
recovery of workers and their key ranges, we would simply generate new
cache tokens for a particular worker.


Thanks,
Max

On 21.08.19 09:33, Reuven Lax wrote:
> Dataflow does something like this, however since work is
> load balanced across workers a per-worker id doesn't work very well.
> Dataflow divides the keyspace up into lexicographic ranges, and creates
> a cache token per range. 
> 
> On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise  > wrote:
> 
> Commenting here vs. on the PR since related to the overall approach.
> 
> Wouldn't it be simpler to have the runner just track a unique ID for
> each worker and use that to communicate if the cache is valid or not?
> 
> * When the bundle is started, the runner tells the worker if the
> cache has become invalid (since it knows if another worker has
> mutated state)
> * When the worker sends mutation requests to the runner, it includes
> its own ID (or the runner already has it as contextual information).
> No need to wait for a response.
> * When the bundle is finished, the runner records the last writer
> (only if a change occurred)
> 
> Whenever current worker ID and last writer ID doesn't match, cache
> is invalid.
> 
> Thomas
> 
> 
> On Tue, Aug 20, 2019 at 11:42 AM Lukasz Cwik  > wrote:
> 
> Having cache tokens per key would be very expensive indeed and I
> believe we should go with a single cache token "per" bundle.
> 
> On Mon, Aug 19, 2019 at 11:36 AM Maximilian Michels
> mailto:m...@apache.org>> wrote:
> 
> Maybe a Beam Python expert can chime in for Rakesh's question?
> 
> Luke, I was assuming cache tokens to be per key and state
> id. During
> implementing an initial support on the Runner side, I
> realized that we
> probably want cache tokens to only be per state id. Note
> that if we had
> per-key cache tokens, the number of cache tokens would
> approach the
> total number of keys in an application.
> 
> If anyone wants to have a look, here is a first version of
> the Runner
> side for cache tokens. Note that I only implemented cache
> tokens for
> BagUserState for now, but it can be easily added for side
> inputs as well.
> 
> https://github.com/apache/beam/pull/9374
> 
> -Max
> 
> 


Re: Write-through-cache in State logic

2019-08-21 Thread Reuven Lax
Dataflow does something like this, however since work is
load balanced across workers a per-worker id doesn't work very well.
Dataflow divides the keyspace up into lexicographic ranges, and creates a
cache token per range.

On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise  wrote:

> Commenting here vs. on the PR since related to the overall approach.
>
> Wouldn't it be simpler to have the runner just track a unique ID for each
> worker and use that to communicate if the cache is valid or not?
>
> * When the bundle is started, the runner tells the worker if the cache has
> become invalid (since it knows if another worker has mutated state)
> * When the worker sends mutation requests to the runner, it includes its
> own ID (or the runner already has it as contextual information). No need to
> wait for a response.
> * When the bundle is finished, the runner records the last writer (only if
> a change occurred)
>
> Whenever current worker ID and last writer ID doesn't match, cache is
> invalid.
>
> Thomas
>
>
> On Tue, Aug 20, 2019 at 11:42 AM Lukasz Cwik  wrote:
>
>> Having cache tokens per key would be very expensive indeed and I believe
>> we should go with a single cache token "per" bundle.
>>
>> On Mon, Aug 19, 2019 at 11:36 AM Maximilian Michels 
>> wrote:
>>
>>> Maybe a Beam Python expert can chime in for Rakesh's question?
>>>
>>> Luke, I was assuming cache tokens to be per key and state id. During
>>> implementing an initial support on the Runner side, I realized that we
>>> probably want cache tokens to only be per state id. Note that if we had
>>> per-key cache tokens, the number of cache tokens would approach the
>>> total number of keys in an application.
>>>
>>> If anyone wants to have a look, here is a first version of the Runner
>>> side for cache tokens. Note that I only implemented cache tokens for
>>> BagUserState for now, but it can be easily added for side inputs as well.
>>>
>>> https://github.com/apache/beam/pull/9374
>>>
>>> -Max
>>>
>>>
>>>


Re: Write-through-cache in State logic

2019-08-20 Thread Thomas Weise
Commenting here vs. on the PR since related to the overall approach.

Wouldn't it be simpler to have the runner just track a unique ID for each
worker and use that to communicate if the cache is valid or not?

* When the bundle is started, the runner tells the worker if the cache has
become invalid (since it knows if another worker has mutated state)
* When the worker sends mutation requests to the runner, it includes its
own ID (or the runner already has it as contextual information). No need to
wait for a response.
* When the bundle is finished, the runner records the last writer (only if
a change occurred)

Whenever current worker ID and last writer ID doesn't match, cache is
invalid.

Thomas


On Tue, Aug 20, 2019 at 11:42 AM Lukasz Cwik  wrote:

> Having cache tokens per key would be very expensive indeed and I believe
> we should go with a single cache token "per" bundle.
>
> On Mon, Aug 19, 2019 at 11:36 AM Maximilian Michels 
> wrote:
>
>> Maybe a Beam Python expert can chime in for Rakesh's question?
>>
>> Luke, I was assuming cache tokens to be per key and state id. During
>> implementing an initial support on the Runner side, I realized that we
>> probably want cache tokens to only be per state id. Note that if we had
>> per-key cache tokens, the number of cache tokens would approach the
>> total number of keys in an application.
>>
>> If anyone wants to have a look, here is a first version of the Runner
>> side for cache tokens. Note that I only implemented cache tokens for
>> BagUserState for now, but it can be easily added for side inputs as well.
>>
>> https://github.com/apache/beam/pull/9374
>>
>> -Max
>>
>>
>>


Re: Write-through-cache in State logic

2019-08-16 Thread Maximilian Michels
Thanks Luke!

On the note of cache tokens, do we have an idea how cache tokens are
generated and managed by the Runner?

In my mind we will maintain a list of cache tokens scoped by state id
and SDK worker. Cache tokens will not be checkpointed which means
long-running SDK workers will have to request a new cache token after a
Runner restart.

In terms of how cache tokens are generated, I think it suffices to have
an increasing integer, as long as it is scoped by the state id and
maintained for each SDK worker connection.

If that makes sense, it will be rather expensive to transfer a list of
valid cache tokens for each new bundle. Instead, sending over the range
of valid tokens per state id would be much more efficient.

What do you think?

-Max

On 14.08.19 19:47, Lukasz Cwik wrote:
> I took a look and added some clarity/suggestions.
> 
> On Wed, Aug 14, 2019 at 9:53 AM Maximilian Michels  > wrote:
> 
> For the purpose of my own understanding of the matter, I've created a
> document:
> 
> https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/
> 
> 
> It could make sense to clarify and specify things in there for now. I'm
> more than willing to consolidate this document with the caching section
> in the Fn API document.
> 
> -Max
> 
> On 14.08.19 17:13, Lukasz Cwik wrote:
> > Instead of starting a new doc, could we add/update the caching segment
> > of https://s.apache.org/beam-fn-state-api-and-bundle-processing?
> >
> > Everyone has comment access and all Apache Beam PMC can add themselves
> > to be editors since the doc is owned by the Apache Beam PMC gmail
> acocunt.
> >
> > On Wed, Aug 14, 2019 at 7:01 AM Maximilian Michels  
> > >> wrote:
> >
> >     Yes, that makes sense. What do you think about creating a
> document to
> >     summarize the ideas presented here? Also, it would be good to
> capture
> >     the status quo regarding caching in the Python SDK.
> >
> >     -Max
> >
> >     On 13.08.19 22:44, Thomas Weise wrote:
> >     > The token would be needed in general to invalidate the cache
> when
> >     > bundles are processed by different workers.
> >     >
> >     > In the case of the Flink runner we don't have a scenario of
> SDK worker
> >     > surviving the runner in the case of a failure, so there is no
> >     > possibility of inconsistent state as result of a checkpoint
> failure.
> >     >
> >     > --
> >     > sent from mobile
> >     >
> >     > On Tue, Aug 13, 2019, 1:18 PM Maximilian Michels
> mailto:m...@apache.org>
> >     >
> >     > 
>  >     >
> >     >     Thanks for clarifying. Cache-invalidation for side inputs
> >     makes sense.
> >     >
> >     >     In case the Runner fails to checkpoint, could it not
> >     re-attempt the
> >     >     checkpoint? At least in the case of Flink, the cache would
> >     still be
> >     >     valid until another checkpoint is attempted. For other
> Runners
> >     that may
> >     >     not be the case. Also, rolling back state while keeping the
> >     SDK Harness
> >     >     running requires to invalidate the cache.
> >     >
> >     >     -Max
> >     >
> >     >     On 13.08.19 18:09, Lukasz Cwik wrote:
> >     >     >
> >     >     >
> >     >     > On Tue, Aug 13, 2019 at 4:36 AM Maximilian Michels
> >     mailto:m...@apache.org>  >
> >     >     
> >>
> >     >     > 
> >
> >     
>  wrote:
> >     >     >
> >     >     >     Agree that we have to be able to flush before a
> >     checkpoint to
> >     >     avoid
> >     >     >     caching too many elements. Also good point about
> >     checkpoint costs
> >     >     >     increasing with flushing the cache on checkpoints.
> A LRU
> >     cache
> >     >     policy in
> >     >     >     the SDK seems desirable.
> >     >     >
> >     >     >     What is the role of the cache token in the design
> >     document[1]?
> >     >     It looks
> >     >     >     to me that the token is used to give the Runner
> control over
> >     >     which and
> >     >     >     how many elements can be 

Re: Write-through-cache in State logic

2019-08-14 Thread Maximilian Michels
For the purpose of my own understanding of the matter, I've created a
document:
https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/


It could make sense to clarify and specify things in there for now. I'm
more than willing to consolidate this document with the caching section
in the Fn API document.

-Max

On 14.08.19 17:13, Lukasz Cwik wrote:
> Instead of starting a new doc, could we add/update the caching segment
> of https://s.apache.org/beam-fn-state-api-and-bundle-processing?
> 
> Everyone has comment access and all Apache Beam PMC can add themselves
> to be editors since the doc is owned by the Apache Beam PMC gmail acocunt.
> 
> On Wed, Aug 14, 2019 at 7:01 AM Maximilian Michels  > wrote:
> 
> Yes, that makes sense. What do you think about creating a document to
> summarize the ideas presented here? Also, it would be good to capture
> the status quo regarding caching in the Python SDK.
> 
> -Max
> 
> On 13.08.19 22:44, Thomas Weise wrote:
> > The token would be needed in general to invalidate the cache when
> > bundles are processed by different workers.
> >
> > In the case of the Flink runner we don't have a scenario of SDK worker
> > surviving the runner in the case of a failure, so there is no
> > possibility of inconsistent state as result of a checkpoint failure.
> >
> > --
> > sent from mobile
> >
> > On Tue, Aug 13, 2019, 1:18 PM Maximilian Michels  
> > >> wrote:
> >
> >     Thanks for clarifying. Cache-invalidation for side inputs
> makes sense.
> >
> >     In case the Runner fails to checkpoint, could it not
> re-attempt the
> >     checkpoint? At least in the case of Flink, the cache would
> still be
> >     valid until another checkpoint is attempted. For other Runners
> that may
> >     not be the case. Also, rolling back state while keeping the
> SDK Harness
> >     running requires to invalidate the cache.
> >
> >     -Max
> >
> >     On 13.08.19 18:09, Lukasz Cwik wrote:
> >     >
> >     >
> >     > On Tue, Aug 13, 2019 at 4:36 AM Maximilian Michels
> mailto:m...@apache.org>
> >     >
> >     > 
>  >     >
> >     >     Agree that we have to be able to flush before a
> checkpoint to
> >     avoid
> >     >     caching too many elements. Also good point about
> checkpoint costs
> >     >     increasing with flushing the cache on checkpoints. A LRU
> cache
> >     policy in
> >     >     the SDK seems desirable.
> >     >
> >     >     What is the role of the cache token in the design
> document[1]?
> >     It looks
> >     >     to me that the token is used to give the Runner control over
> >     which and
> >     >     how many elements can be cached by the SDK. Why is that
> necessary?
> >     >     Shouldn't this be up to the SDK?
> >     >
> >     >  
> >     > We want to be able to handle the case where the SDK
> completes the
> >     bundle
> >     > successfully but the runner fails to checkpoint the information.
> >     > We also want the runner to be able to pass in cache tokens
> for things
> >     > like side inputs which may change over time (and the SDK
> would not
> >     know
> >     > that this happened).
> >     >  
> >     >
> >     >     -Max
> >     >
> >     >     [1]
> >     >   
> >   
>   
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
> >     >
> >     >     Is it simply to
> >     >     On 12.08.19 19:55, Lukasz Cwik wrote:
> >     >     >
> >     >     >
> >     >     > On Mon, Aug 12, 2019 at 10:09 AM Thomas Weise
> >     mailto:t...@apache.org>  >
> >     >     
> >>
> >     >     > 
> >
> >     
>  wrote:
> >     >     >
> >     >     >
> >     >     >     On Mon, Aug 12, 2019 at 8:53 AM Maximilian Michels
> >     >     mailto:m...@apache.org>
> >
> 
> >     >>
> >     >     >     
> 

Re: Write-through-cache in State logic

2019-08-14 Thread Maximilian Michels
ailto:rober...@google.com> <mailto:rober...@google.com
> <mailto:rober...@google.com>>
> >     <mailto:rober...@google.com <mailto:rober...@google.com>
> <mailto:rober...@google.com <mailto:rober...@google.com>>>
> >     >         <mailto:rober...@google.com
> <mailto:rober...@google.com> <mailto:rober...@google.com
> <mailto:rober...@google.com>>
> >     <mailto:rober...@google.com <mailto:rober...@google.com>
> <mailto:rober...@google.com <mailto:rober...@google.com>>>>> wrote:
> >     >         >     >> >>>>>>>> >>
> >     >         >     >> >>>>>>>> >> This is documented at
> >     >         >     >> >>>>>>>> >>
> >     >         >   
> >     >       
> >   
>    
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
> >     >         >     >> >>>>>>>> >> . Note that it requires
> participation of
> >     >         both the
> >     >         >     runner and the SDK
> >     >         >     >> >>>>>>>> >> (though there are no correctness
> >     issues if
> >     >         one or the
> >     >         >     other side does
> >     >         >     >> >>>>>>>> >> not understand the protocol,
> caching just
> >     >         won't be used).
> >     >         >     >> >>>>>>>> >>
> >     >         >     >> >>>>>>>> >> I don't think it's been implemented
> >     >         anywhere, but
> >     >         >     could be very
> >     >         >     >> >>>>>>>> >> beneficial for performance.
> >     >         >     >> >>>>>>>> >>
> >     >         >     >> >>>>>>>> >> On Wed, Jul 17, 2019 at 6:00 PM
> >     Rakesh Kumar
> >  

Re: Write-through-cache in State logic

2019-08-13 Thread Thomas Weise
 some time ago
> > > > >> >>>>>>>> >
> > https://jira.apache.org/jira/browse/BEAM-5428
> > > > >> >>>>>>>> >
> > > > >> >>>>>>>> > You also marked code where code
> > changes are
> > > required:
> > > > >> >>>>>>>> >
> > > >
> > >
> >
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291
> > > > >> >>>>>>>> >
> > > >
> > >
> >
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
> > > > >> >>>>>>>> >
> > > >
> > >
> >
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465
> > > > >> >>>>>>>> >
> > > > >> >>>>>>>> > I am willing to provide help to
> implement
> > > this. Let me
> > > > know how I can help.
> > > > >> >>>>>>>>
> > > > >> >>>>>>>> As far as I'm aware, no one is actively
> > > working on it
> > > > right now.
> > > > >> >>>>>>>> Please feel free to assign yourself the
> JIRA
> > > entry and
> > > > I'll be happy
> > > > >> >>>>>>>> to answer any questions you might have if
> > > (well probably
> > > > when) these
> > > > >> >>>>>>>> pointers are insufficient.
> > > > >> >>>>>>>>
> > > > >> >>>>>>>> > On Tue, Jul 23, 2019 at 3:47 AM Robert
> > Bradshaw
> > > > mailto:rober...@google.com>
> > <mailto:rober...@google.com <mailto:rober...@google.com>>
> > >     <mailto:rober...@google.com <mailto:rober...@google.com>
> > <mailto:rober...@google.com <mailto:rober...@google.com>>>> wrote:
> > > > >> >>>>>>>> >>
> > > > >> >>>>>>>> >> This is documented at
> > > > >> >>>>>>>> >>
> > > >
> > >
> >
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
> > > > >> >>>>>>>> >> . Note that it requires participation
> of
> > > both the
> > > > runner and the SDK
> > > > >> >>>>>>>> >> (though there are no correctness
> > issues if
> > > one or the
> > > > other side does
> > > > >> >>>>>>>> >> not understand the protocol, caching
> just
> > > won't be used).
> > > > >> >>>>>>>> >>
> > > > >> >>>>>>>> >> I don't think it's been implemented
> > > anywhere, but
> > > > could be very
> > > > >> >>>>>>>> >> beneficial for performance.
> > > > >> >>>>>>>> >>
> > > > >> >>>>>>>> >> On Wed, Jul 17, 2019 at 6:00 PM
> > Rakesh Kumar
> > > > mailto:rakeshku...@lyft.com>
> > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>
> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>
> > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>>> wrote:
> > > > >> >>>>>>>> >> >
> > > > >> >>>>>>>> >> > I checked the python sdk[1] and it
> has
> > > similar
> > > > implementation as Java SDK.
> > > > >> >>>>>>>> >> >
> > > > >> >>>>>>>> >> > I would agree with Thomas. In case
> of
> > > high volume
> > > > event stream and bigger cluster size, network call
> can
> > > potentially
> > > > cause a bottleneck.
> > > > >> >>>>>>>> >> >
> > > > >> >>>>>>>> >> > @Robert
> > > > >> >>>>>>>> >> > I am interested to see the
> > proposal. Can you
> > > > provide me the link of the proposal?
> > > > >> >>>>>>>> >> >
> > > > >> >>>>>>>> >> > [1]:
> > > >
> > >
> >
> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
> > > > >> >>>>>>>> >> >
> > > > >> >>>>>>>> >> >
> > > > >> >>>>>>>> >> > On Tue, Jul 16, 2019 at 9:43 AM
> > Thomas Weise
> > > > mailto:t...@apache.org>
> > <mailto:t...@apache.org <mailto:t...@apache.org>>
> > > <mailto:t...@apache.org <mailto:t...@apache.org>
> > <mailto:t...@apache.org <mailto:t...@apache.org>>>> wrote:
> > > > >> >>>>>>>> >> >>
> > > > >> >>>>>>>> >> >> Thanks for the pointer. For
> streaming,
> > > it will be
> > > > important to support caching across bundles. It
> appears
> > > that even
> > > > the Java SDK doesn't support that yet?
> > > > >> >>>>>>>> >> >>
> > > > >> >>>>>>>> >> >>
> > > >
> > >
> >
> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
> > > > >> >>>>>>>> >> >>
> > > > >> >>>>>>>> >> >> Regarding clear/append: It would
> > be nice
> > > if both
> > > > could occur within a single Fn Api roundtrip when
> > the state is
> > > > persisted.
> > > > >> >>>>>>>> >> >>
> > > > >> >>>>>>>> >> >> Thanks,
> > > > >> >>>>>>>> >> >> Thomas
> > > > >> >>>>>>>> >> >>
> > > > >> >>>>>>>> >> >>
> > > > >> >>>>>>>> >> >>
> > > > >> >>>>>>>> >> >> On Tue, Jul 16, 2019 at 6:58 AM
> > Lukasz Cwik
> > > > mailto:lc...@google.com>
> > <mailto:lc...@google.com <mailto:lc...@google.com>>
> > > <mailto:lc...@google.com <mailto:lc...@google.com>
> > <mailto:lc...@google.com <mailto:lc...@google.com>>>> wrote:
> > > > >> >>>>>>>> >> >>>
> > > > >> >>>>>>>> >> >>> User state is built on top of
> read,
> > > append and
> > > > clear and not off a read and write paradigm to allow
> for
> > > blind appends.
> > > > >> >>>>>>>> >> >>>
> > > > >> >>>>>>>> >> >>> The optimization you speak of can
> > be done
> > > > completely inside the SDK without any additional
> > protocol
> > > being
> > > > required as long as you clear the state first and
> then
> > > append all
> > > > your new data. The Beam Java SDK does this for all
> > runners
> > > when
> > > > executed portably[1]. You could port the same logic
> > to the
> > > Beam
> > > > Python SDK as well.
> > > > >> >>>>>>>> >> >>>
> > > > >> >>>>>>>> >> >>> 1:
> > > >
> > >
> >
> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
> > > > >> >>>>>>>> >> >>>
> > > > >> >>>>>>>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM
> > Robert
> > > Bradshaw
> > > > mailto:rober...@google.com>
> > <mailto:rober...@google.com <mailto:rober...@google.com>>
> > > <mailto:rober...@google.com <mailto:rober...@google.com>
> > <mailto:rober...@google.com <mailto:rober...@google.com>>>> wrote:
> > > > >> >>>>>>>> >> >>>>
> > > > >> >>>>>>>> >> >>>> Python workers also have a
> > per-bundle
> > > SDK-side
> > > > cache. A protocol has
> > > > >> >>>>>>>> >> >>>> been proposed, but hasn't yet
> been
> > > implemented
> > > > in any SDKs or runners.
> > > > >> >>>>>>>> >> >>>>
> > > > >> >>>>>>>> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM
> > Reuven Lax
> > > > mailto:re...@google.com>
> > <mailto:re...@google.com <mailto:re...@google.com>>
> > > <mailto:re...@google.com <mailto:re...@google.com>
> > <mailto:re...@google.com <mailto:re...@google.com>>>> wrote:
> > > > >> >>>>>>>> >> >>>> >
> > > > >> >>>>>>>> >> >>>> > It's runner dependent. Some
> > runners
> > > (e.g. the
> > > > Dataflow runner) do have such a cache, though I
> > think it's
> > > currently
> > > > has a cap for large bags.
> > > > >> >>>>>>>> >> >>>> >
> > > > >> >>>>>>>> >> >>>> > Reuven
> > > > >> >>>>>>>> >> >>>> >
> > > > >> >>>>>>>> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM
> > > Rakesh Kumar
> > > > mailto:rakeshku...@lyft.com>
> > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>
> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>
> > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>>> wrote:
> > > > >> >>>>>>>> >> >>>> >>
> > > > >> >>>>>>>> >> >>>> >> Hi,
> > > > >> >>>>>>>> >> >>>> >>
> > > > >> >>>>>>>> >> >>>> >> I have been using python sdk
> > for the
> > > > application and also using BagState in production. I
> was
> > > wondering
> > > > whether state logic has any write-through-cache
> > > implemented or not.
> > > > If we are sending every read and write request
> through
> > > network then
> > > > it comes with a performance cost. We can avoid
> network
> > > call for a
> > > > read operation if we have write-through-cache.
> > > > >> >>>>>>>> >> >>>> >> I have superficially looked
> > into the
> > > > implementation and I didn't see any cache
> > implementation.
> > > > >> >>>>>>>> >> >>>> >>
> > > > >> >>>>>>>> >> >>>> >> is it possible to have this
> > cache?
> > > would it
> > > > cause any issue if we have the caching layer?
> > > > >> >>>>>>>> >> >>>> >>
> > > >
> > >
> >
>


Re: Write-through-cache in State logic

2019-08-13 Thread Maximilian Michels
;>> >>
> >         >     >> >>>>>>>> >> This is documented at
> >         >     >> >>>>>>>> >>
> >         >   
> >       
>   
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
> >         >     >> >>>>>>>> >> . Note that it requires participation of
> >         both the
> >         >     runner and the SDK
> >         >     >> >>>>>>>> >> (though there are no correctness
> issues if
> >         one or the
> >         >     other side does
> >         >     >> >>>>>>>> >> not understand the protocol, caching just
> >         won't be used).
> >         >     >> >>>>>>>> >>
> >         >     >> >>>>>>>> >> I don't think it's been implemented
> >         anywhere, but
>     >         >     could be very
> >         >     >> >>>>>>>> >> beneficial for performance.
> >         >     >> >>>>>>>> >>
> >         >     >> >>>>>>>> >> On Wed, Jul 17, 2019 at 6:00 PM
> Rakesh Kumar
> >         >     mailto:rakeshku...@lyft.com>
> <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>
> >         <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>
> <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>>> wrote:
> >         >     >> >>>>>>>> >> >
> >         >     >> >>>>>>>> >> > I checked the python sdk[1] and it has
> >         similar
> >         >     implementation as Java SDK.
> >         >     >> >>>>>>>> >> >
> >         >     >> >>>>>>>> >> > I would agree with Thomas. In case of
> >         high volume
> >         >     event stream and bigger cluster size, network call can
> >         potentially
> >         >     cause a bottleneck.
> >         >     >> >>>>>>>> >> >
> >         >     >> >>>>>>>> >> > @Robert
> >         >     >> >>>>>>>> >> > I am interested to see the
> proposal. Can you
> >         >     provide me the link of the proposal?
> >         >     >> >>>>>>>> >> >
> >         >     >> >>>>>>>> >> > [1]:
> >         >   
> >       
>   
> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
> >         >     >> >>>>>>>> >> >
> >         >     >> >>>>>>>> >> >
> >         >     >> >>>>>>>> >> > On Tue, Jul 16, 2019 at 9:43 AM
> Thomas Weise
> >         >     mailto:t...@apache.org>
> <mailto:t...@apache.org <mailto:t...@apache.org>>
> >         <mailto:t...@apache.org <mailto:t...@apache.org>
> <mailto:t...@apache.org <mailto:t...@apache.org>>>> wrote:
> >         >     >> >>>>>>>> >> >>
> >         >     >> >>>>>>>> >> >> Thanks for the pointer. For streaming,
> >         it will be
> >         >     important to support caching across bundles. It appears
> >         that even
> >         >     the Java SDK doesn't support that yet?
> >         >     >> >>>>>>>> >> >>
> >         >     >> >>>>>>>> >> >>
> >         >   
> >       
>   
> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
> >         >     >> >>>>>>>> >> >>
> >         >     >> >>>>>>>> >> >> Regarding clear/append: It would
> be nice
> >         if both
> >         >     could occur within a single Fn Api roundtrip when
> the state is
> >         >     persisted.
> >         >     >> >>>>>>>> >> >>
> >         >     >> >>>>>>>> >> >> Thanks,
> >         >     >> >>>>>>>> >> >> Thomas
> >         >     >> >>>>>>>> >> >>
> >         >     >> >>>>>>>> >> >>
> >         >     >> >>>>>>>> >> >>
> >         >     >> >>>>>>>> >> >> On Tue, Jul 16, 2019 at 6:58 AM
> Lukasz Cwik
> >         >     mailto:lc...@google.com>
> <mailto:lc...@google.com <mailto:lc...@google.com>>
> >         <mailto:lc...@google.com <mailto:lc...@google.com>
> <mailto:lc...@google.com <mailto:lc...@google.com>>>> wrote:
> >         >     >> >>>>>>>> >> >>>
> >         >     >> >>>>>>>> >> >>> User state is built on top of read,
> >         append and
> >         >     clear and not off a read and write paradigm to allow for
> >         blind appends.
> >         >     >> >>>>>>>> >> >>>
> >         >     >> >>>>>>>> >> >>> The optimization you speak of can
> be done
> >         >     completely inside the SDK without any additional
> protocol
> >         being
> >         >     required as long as you clear the state first and then
> >         append all
> >         >     your new data. The Beam Java SDK does this for all
> runners
> >         when
> >         >     executed portably[1]. You could port the same logic
> to the
> >         Beam
> >         >     Python SDK as well.
> >         >     >> >>>>>>>> >> >>>
> >         >     >> >>>>>>>> >> >>> 1:
> >         >   
> >       
>   
> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
> >         >     >> >>>>>>>> >> >>>
> >         >     >> >>>>>>>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM
> Robert
> >         Bradshaw
> >         >     mailto:rober...@google.com>
> <mailto:rober...@google.com <mailto:rober...@google.com>>
> >         <mailto:rober...@google.com <mailto:rober...@google.com>
> <mailto:rober...@google.com <mailto:rober...@google.com>>>> wrote:
> >         >     >> >>>>>>>> >> >>>>
> >         >     >> >>>>>>>> >> >>>> Python workers also have a
> per-bundle
> >         SDK-side
> >         >     cache. A protocol has
> >         >     >> >>>>>>>> >> >>>> been proposed, but hasn't yet been
> >         implemented
> >         >     in any SDKs or runners.
> >         >     >> >>>>>>>> >> >>>>
> >         >     >> >>>>>>>> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM
> Reuven Lax
> >         >     mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>
> >         <mailto:re...@google.com <mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>>> wrote:
> >         >     >> >>>>>>>> >> >>>> >
> >         >     >> >>>>>>>> >> >>>> > It's runner dependent. Some
> runners
> >         (e.g. the
> >         >     Dataflow runner) do have such a cache, though I
> think it's
> >         currently
> >         >     has a cap for large bags.
> >         >     >> >>>>>>>> >> >>>> >
> >         >     >> >>>>>>>> >> >>>> > Reuven
> >         >     >> >>>>>>>> >> >>>> >
> >         >     >> >>>>>>>> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM
> >         Rakesh Kumar
> >         >     mailto:rakeshku...@lyft.com>
> <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>
> >         <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>
> <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>>> wrote:
> >         >     >> >>>>>>>> >> >>>> >>
> >         >     >> >>>>>>>> >> >>>> >> Hi,
> >         >     >> >>>>>>>> >> >>>> >>
> >         >     >> >>>>>>>> >> >>>> >> I have been using python sdk
> for the
> >         >     application and also using BagState in production. I was
> >         wondering
> >         >     whether state logic has any write-through-cache
> >         implemented or not.
> >         >     If we are sending every read and write request through
> >         network then
> >         >     it comes with a performance cost. We can avoid network
> >         call for a
> >         >     read operation if we have write-through-cache.
> >         >     >> >>>>>>>> >> >>>> >> I have superficially looked
> into the
> >         >     implementation and I didn't see any cache
> implementation.
> >         >     >> >>>>>>>> >> >>>> >>
> >         >     >> >>>>>>>> >> >>>> >> is it possible to have this
> cache?
> >         would it
> >         >     cause any issue if we have the caching layer?
> >         >     >> >>>>>>>> >> >>>> >>
> >         >
> >
> 


Re: Write-through-cache in State logic

2019-08-13 Thread Lukasz Cwik
t; > > mailto:rober...@google.com>
> > <mailto:rober...@google.com <mailto:rober...@google.com>>>
> wrote:
> > > >> >>>>>>>> >>
> > > >> >>>>>>>> >> This is documented at
> > > >> >>>>>>>> >>
> > >
> >
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
> > > >> >>>>>>>> >> . Note that it requires participation of
> > both the
> > > runner and the SDK
> > > >> >>>>>>>> >> (though there are no correctness issues if
> > one or the
> > > other side does
> > > >> >>>>>>>> >> not understand the protocol, caching just
> > won't be used).
> > > >> >>>>>>>> >>
> > > >> >>>>>>>> >> I don't think it's been implemented
> > anywhere, but
> > > could be very
> > > >> >>>>>>>> >> beneficial for performance.
> > > >> >>>>>>>> >>
> > > >> >>>>>>>> >> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar
> > > mailto:rakeshku...@lyft.com>
> > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>>
> wrote:
> > > >> >>>>>>>> >> >
> > > >> >>>>>>>> >> > I checked the python sdk[1] and it has
> > similar
> > > implementation as Java SDK.
> > > >> >>>>>>>> >> >
> > > >> >>>>>>>> >> > I would agree with Thomas. In case of
> > high volume
> > > event stream and bigger cluster size, network call can
> > potentially
> > > cause a bottleneck.
> > > >> >>>>>>>> >> >
> > > >> >>>>>>>> >> > @Robert
> > > >> >>>>>>>> >> > I am interested to see the proposal. Can
> you
> > > provide me the link of the proposal?
> > > >> >>>>>>>> >> >
> > > >> >>>>>>>> >> > [1]:
> > >
> >
> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
> > > >> >>>>>>>> >> >
> > > >> >>>>>>>> >> >
> > > >> >>>>>>>> >> > On Tue, Jul 16, 2019 at 9:43 AM Thomas
> Weise
> > > mailto:t...@apache.org>
> > <mailto:t...@apache.org <mailto:t...@apache.org>>> wrote:
> > > >> >>>>>>>> >> >>
> > > >> >>>>>>>> >> >> Thanks for the pointer. For streaming,
> > it will be
> > > important to support caching across bundles. It appears
> > that even
> > > the Java SDK doesn't support that yet?
> > > >> >>>>>>>> >> >>
> > > >> >>>>>>>> >> >>
> > >
> >
> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
> > > >> >>>>>>>> >> >>
> > > >> >>>>>>>> >> >> Regarding clear/append: It would be nice
> > if both
> > > could occur within a single Fn Api roundtrip when the
> state is
> > > persisted.
> > > >> >>>>>>>> >> >>
> > > >> >>>>>>>> >> >> Thanks,
> > > >> >>>>>>>> >> >> Thomas
> > > >> >>>>>>>> >> >>
> > > >> >>>>>>>> >> >>
> > > >> >>>>>>>> >> >>
> > > >> >>>>>>>> >> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz
> Cwik
> > > mailto:lc...@google.com>
> > <mailto:lc...@google.com <mailto:lc...@google.com>>> wrote:
> > > >> >>>>>>>> >> >>>
> > > >> >>>>>>>> >> >>> User state is built on top of read,
> > append and
> > > clear and not off a read and write paradigm to allow for
> > blind appends.
> > > >> >>>>>>>> >> >>>
> > > >> >>>>>>>> >> >>> The optimization you speak of can be
> done
> > > completely inside the SDK without any additional protocol
> > being
> > > required as long as you clear the state first and then
> > append all
> > > your new data. The Beam Java SDK does this for all runners
> > when
> > > executed portably[1]. You could port the same logic to the
> > Beam
> > > Python SDK as well.
> > > >> >>>>>>>> >> >>>
> > > >> >>>>>>>> >> >>> 1:
> > >
> >
> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
> > > >> >>>>>>>> >> >>>
> > > >> >>>>>>>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert
> > Bradshaw
> > > mailto:rober...@google.com>
> > <mailto:rober...@google.com <mailto:rober...@google.com>>>
> wrote:
> > > >> >>>>>>>> >> >>>>
> > > >> >>>>>>>> >> >>>> Python workers also have a per-bundle
> > SDK-side
> > > cache. A protocol has
> > > >> >>>>>>>> >> >>>> been proposed, but hasn't yet been
> > implemented
> > > in any SDKs or runners.
> > > >> >>>>>>>> >> >>>>
> > > >> >>>>>>>> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven
> Lax
> > > mailto:re...@google.com>
> > <mailto:re...@google.com <mailto:re...@google.com>>> wrote:
> > > >> >>>>>>>> >> >>>> >
> > > >> >>>>>>>> >> >>>> > It's runner dependent. Some runners
> > (e.g. the
> > > Dataflow runner) do have such a cache, though I think it's
> > currently
> > > has a cap for large bags.
> > > >> >>>>>>>> >> >>>> >
> > > >> >>>>>>>> >> >>>> > Reuven
> > > >> >>>>>>>> >> >>>> >
> > > >> >>>>>>>> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM
> > Rakesh Kumar
> > > mailto:rakeshku...@lyft.com>
> > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>>
> wrote:
> > > >> >>>>>>>> >> >>>> >>
> > > >> >>>>>>>> >> >>>> >> Hi,
> > > >> >>>>>>>> >> >>>> >>
> > > >> >>>>>>>> >> >>>> >> I have been using python sdk for the
> > > application and also using BagState in production. I was
> > wondering
> > > whether state logic has any write-through-cache
> > implemented or not.
> > > If we are sending every read and write request through
> > network then
> > > it comes with a performance cost. We can avoid network
> > call for a
> > > read operation if we have write-through-cache.
> > > >> >>>>>>>> >> >>>> >> I have superficially looked into the
> > > implementation and I didn't see any cache implementation.
> > > >> >>>>>>>> >> >>>> >>
> > > >> >>>>>>>> >> >>>> >> is it possible to have this cache?
> > would it
> > > cause any issue if we have the caching layer?
> > > >> >>>>>>>> >> >>>> >>
> > >
> >
>


Re: Write-through-cache in State logic

2019-08-13 Thread Maximilian Michels
eshku...@lyft.com>>> wrote:
> >     >> >>>>>>>> >
> >     >> >>>>>>>> > Thanks Robert,
> >     >> >>>>>>>> >
> >     >> >>>>>>>> >  I stumble on the jira that you have created
> some time ago
> >     >> >>>>>>>> > https://jira.apache.org/jira/browse/BEAM-5428
> >     >> >>>>>>>> >
> >     >> >>>>>>>> > You also marked code where code changes are
> required:
> >     >> >>>>>>>> >
> >   
>  
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291
> >     >> >>>>>>>> >
> >   
>  
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
> >     >> >>>>>>>> >
> >   
>  
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465
> >     >> >>>>>>>> >
> >     >> >>>>>>>> > I am willing to provide help to implement
> this. Let me
> >     know how I can help.
> >     >> >>>>>>>>
> >     >> >>>>>>>> As far as I'm aware, no one is actively
> working on it
> >     right now.
> >     >> >>>>>>>> Please feel free to assign yourself the JIRA
> entry and
> >     I'll be happy
> >     >> >>>>>>>> to answer any questions you might have if
> (well probably
> >     when) these
> >     >> >>>>>>>> pointers are insufficient.
> >     >> >>>>>>>>
> >     >> >>>>>>>> > On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw
> >     mailto:rober...@google.com>
> <mailto:rober...@google.com <mailto:rober...@google.com>>> wrote:
> >     >> >>>>>>>> >>
> >     >> >>>>>>>> >> This is documented at
> >     >> >>>>>>>> >>
> >   
>  
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
> >     >> >>>>>>>> >> . Note that it requires participation of
> both the
> >     runner and the SDK
> >     >> >>>>>>>> >> (though there are no correctness issues if
> one or the
> >     other side does
> >     >> >>>>>>>> >> not understand the protocol, caching just
> won't be used).
> >     >> >>>>>>>> >>
> >     >> >>>>>>>> >> I don't think it's been implemented
> anywhere, but
> >     could be very
> >     >> >>>>>>>> >> beneficial for performance.
> >     >> >>>>>>>> >>
> >     >> >>>>>>>> >> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar
> >     mailto:rakeshku...@lyft.com>
> <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>> wrote:
> >     >> >>>>>>>> >> >
> >     >> >>>>>>>> >> > I checked the python sdk[1] and it has
> similar
> >     implementation as Java SDK.
> >     >> >>>>>>>> >> >
> >     >> >>>>>>>> >> > I would agree with Thomas. In case of
> high volume
> >     event stream and bigger cluster size, network call can
> potentially
> >     cause a bottleneck.
> >     >> >>>>>>>> >> >
> >     >> >>>>>>>> >> > @Robert
> >     >> >>>>>>>> >> > I am interested to see the proposal. Can you
> >     provide me the link of the proposal?
> >     >> >>>>>>>> >> >
> >     >> >>>>>>>> >> > [1]:
> >   
>  
> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
> >     >> >>>>>>>> >> >
> >     >> >>>>>>>> >> >
> >     >> >>>>>>>> >> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise
> >     mailto:t...@apache.org>
> <mailto:t...@apache.org <mailto:t...@apache.org>>> wrote:
> >     >> >>>>>>>> >> >>
> >     >> >>>>>>>> >> >> Thanks for the pointer. For streaming,
> it will be
> >     important to support caching across bundles. It appears
> that even
> >     the Java SDK doesn't support that yet?
> >     >> >>>>>>>> >> >>
> >     >> >>>>>>>> >> >>
> >   
>  
> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
> >     >> >>>>>>>> >> >>
> >     >> >>>>>>>> >> >> Regarding clear/append: It would be nice
> if both
> >     could occur within a single Fn Api roundtrip when the state is
> >     persisted.
> >     >> >>>>>>>> >> >>
> >     >> >>>>>>>> >> >> Thanks,
> >     >> >>>>>>>> >> >> Thomas
> >     >> >>>>>>>> >> >>
> >     >> >>>>>>>> >> >>
> >     >> >>>>>>>> >> >>
> >     >> >>>>>>>> >> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik
> >     mailto:lc...@google.com>
> <mailto:lc...@google.com <mailto:lc...@google.com>>> wrote:
> >     >> >>>>>>>> >> >>>
> >     >> >>>>>>>> >> >>> User state is built on top of read,
> append and
> >     clear and not off a read and write paradigm to allow for
> blind appends.
> >     >> >>>>>>>> >> >>>
> >     >> >>>>>>>> >> >>> The optimization you speak of can be done
> >     completely inside the SDK without any additional protocol
> being
> >     required as long as you clear the state first and then
> append all
> >     your new data. The Beam Java SDK does this for all runners
> when
> >     executed portably[1]. You could port the same logic to the
> Beam
> >     Python SDK as well.
> >     >> >>>>>>>> >> >>>
> >     >> >>>>>>>> >> >>> 1:
> >   
>  
> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
> >     >> >>>>>>>> >> >>>
> >     >> >>>>>>>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert
> Bradshaw
> >     mailto:rober...@google.com>
> <mailto:rober...@google.com <mailto:rober...@google.com>>> wrote:
> >     >> >>>>>>>> >> >>>>
> >     >> >>>>>>>> >> >>>> Python workers also have a per-bundle
> SDK-side
> >     cache. A protocol has
> >     >> >>>>>>>> >> >>>> been proposed, but hasn't yet been
> implemented
> >     in any SDKs or runners.
> >     >> >>>>>>>> >> >>>>
> >     >> >>>>>>>> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax
> >     mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>> wrote:
> >     >> >>>>>>>> >> >>>> >
> >     >> >>>>>>>> >> >>>> > It's runner dependent. Some runners
> (e.g. the
> >     Dataflow runner) do have such a cache, though I think it's
> currently
> >     has a cap for large bags.
> >     >> >>>>>>>> >> >>>> >
> >     >> >>>>>>>> >> >>>> > Reuven
> >     >> >>>>>>>> >> >>>> >
> >     >> >>>>>>>> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM
> Rakesh Kumar
> >     mailto:rakeshku...@lyft.com>
> <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>> wrote:
> >     >> >>>>>>>> >> >>>> >>
> >     >> >>>>>>>> >> >>>> >> Hi,
> >     >> >>>>>>>> >> >>>> >>
> >     >> >>>>>>>> >> >>>> >> I have been using python sdk for the
> >     application and also using BagState in production. I was
> wondering
> >     whether state logic has any write-through-cache
> implemented or not.
> >     If we are sending every read and write request through
> network then
> >     it comes with a performance cost. We can avoid network
> call for a
> >     read operation if we have write-through-cache.
> >     >> >>>>>>>> >> >>>> >> I have superficially looked into the
> >     implementation and I didn't see any cache implementation.
> >     >> >>>>>>>> >> >>>> >>
> >     >> >>>>>>>> >> >>>> >> is it possible to have this cache?
> would it
> >     cause any issue if we have the caching layer?
> >     >> >>>>>>>> >> >>>> >>
> >
> 


Re: Write-through-cache in State logic

2019-08-12 Thread Lukasz Cwik
count.add(1)
>> > >> >>>>>>> timer_seconds = (window.end.micros // 100) -
>> 1
>> > >> >>>>>>> timer.set(timer_seconds)
>> > >> >>>>>>>
>> > >> >>>>>>> @userstate.on_timer(timer_spec)
>> > >> >>>>>>> def process_timer(self,
>> > count=beam.DoFn.StateParam(count_state_spec),
>> > window=beam.DoFn.WindowParam):
>> > >> >>>>>>> if count.read() == 0:
>> > >> >>>>>>> logging.warning("###timer fired with count
>> > %d, window %s" % (count.read(), window))
>> > >> >>>>>>>
>> > >> >>>>>>>
>> > >> >>>>>>>
>> > >> >>>>>>> On Thu, Jul 25, 2019 at 5:09 AM Robert Bradshaw
>> > mailto:rober...@google.com>> wrote:
>> > >> >>>>>>>>
>> > >> >>>>>>>> On Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar
>> > mailto:rakeshku...@lyft.com>> wrote:
>> > >> >>>>>>>> >
>> > >> >>>>>>>> > Thanks Robert,
>> > >> >>>>>>>> >
>> > >> >>>>>>>> >  I stumble on the jira that you have created some
>> time ago
>> > >> >>>>>>>> > https://jira.apache.org/jira/browse/BEAM-5428
>> > >> >>>>>>>> >
>> > >> >>>>>>>> > You also marked code where code changes are required:
>> > >> >>>>>>>> >
>> >
>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291
>> > >> >>>>>>>> >
>> >
>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
>> > >> >>>>>>>> >
>> >
>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465
>> > >> >>>>>>>> >
>> > >> >>>>>>>> > I am willing to provide help to implement this. Let me
>> > know how I can help.
>> > >> >>>>>>>>
>> > >> >>>>>>>> As far as I'm aware, no one is actively working on it
>> > right now.
>> > >> >>>>>>>> Please feel free to assign yourself the JIRA entry and
>> > I'll be happy
>> > >> >>>>>>>> to answer any questions you might have if (well probably
>> > when) these
>> > >> >>>>>>>> pointers are insufficient.
>> > >> >>>>>>>>
>> > >> >>>>>>>> > On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw
>> > mailto:rober...@google.com>> wrote:
>> > >> >>>>>>>> >>
>> > >> >>>>>>>> >> This is documented at
>> > >> >>>>>>>> >>
>> >
>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
>> > >> >>>>>>>> >> . Note that it requires participation of both the
>> > runner and the SDK
>> > >> >>>>>>>> >> (though there are no correctness issues if one or the
>> > other side does
>> > >> >>>>>>>> >> not understand the protocol, caching just won't be
>> used).
>> > >> >>>>>>>> >>
>> > >> >>>>>>>> >> I don't think it's been implemented anywhere, but
>> > could be very
>> > >> >>>>>>>> >> beneficial for performance.
>> > >> >>>>>>>> >>
>> > >> >>>>>>>> >> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar
>> > mailto:rakeshku...@lyft.com>> wrote:
>> > >> >>>>>>>> >> >
>> > >> >>>>>>>> >> > I checked the python sdk[1] and it has similar
>> > implementation as Java SDK.
>> > >> >>>>>>>> >> >
>> > >> >>>>>>>> >> > I would agree with Thomas. In case of high volume
>> > event stream and bigger cluster size, network call can potentially
>> > cause a bottleneck.
>> > >> >>>>>>>> >> >
>> > >> >>>>>>>> >> > @Robert
>> > >> >>>>>>>> >> > I am interested to see the proposal. Can you
>> > provide me the link of the proposal?
>> > >> >>>>>>>> >> >
>> > >> >>>>>>>> >> > [1]:
>> >
>> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
>> > >> >>>>>>>> >> >
>> > >> >>>>>>>> >> >
>> > >> >>>>>>>> >> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise
>> > mailto:t...@apache.org>> wrote:
>> > >> >>>>>>>> >> >>
>> > >> >>>>>>>> >> >> Thanks for the pointer. For streaming, it will be
>> > important to support caching across bundles. It appears that even
>> > the Java SDK doesn't support that yet?
>> > >> >>>>>>>> >> >>
>> > >> >>>>>>>> >> >>
>> >
>> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
>> > >> >>>>>>>> >> >>
>> > >> >>>>>>>> >> >> Regarding clear/append: It would be nice if both
>> > could occur within a single Fn Api roundtrip when the state is
>> > persisted.
>> > >> >>>>>>>> >> >>
>> > >> >>>>>>>> >> >> Thanks,
>> > >> >>>>>>>> >> >> Thomas
>> > >> >>>>>>>> >> >>
>> > >> >>>>>>>> >> >>
>> > >> >>>>>>>> >> >>
>> > >> >>>>>>>> >> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik
>> > mailto:lc...@google.com>> wrote:
>> > >> >>>>>>>> >> >>>
>> > >> >>>>>>>> >> >>> User state is built on top of read, append and
>> > clear and not off a read and write paradigm to allow for blind
>> appends.
>> > >> >>>>>>>> >> >>>
>> > >> >>>>>>>> >> >>> The optimization you speak of can be done
>> > completely inside the SDK without any additional protocol being
>> > required as long as you clear the state first and then append all
>> > your new data. The Beam Java SDK does this for all runners when
>> > executed portably[1]. You could port the same logic to the Beam
>> > Python SDK as well.
>> > >> >>>>>>>> >> >>>
>> > >> >>>>>>>> >> >>> 1:
>> >
>> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
>> > >> >>>>>>>> >> >>>
>> > >> >>>>>>>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw
>> > mailto:rober...@google.com>> wrote:
>> > >> >>>>>>>> >> >>>>
>> > >> >>>>>>>> >> >>>> Python workers also have a per-bundle SDK-side
>> > cache. A protocol has
>> > >> >>>>>>>> >> >>>> been proposed, but hasn't yet been implemented
>> > in any SDKs or runners.
>> > >> >>>>>>>> >> >>>>
>> > >> >>>>>>>> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax
>> > mailto:re...@google.com>> wrote:
>> > >> >>>>>>>> >> >>>> >
>> > >> >>>>>>>> >> >>>> > It's runner dependent. Some runners (e.g. the
>> > Dataflow runner) do have such a cache, though I think it's currently
>> > has a cap for large bags.
>> > >> >>>>>>>> >> >>>> >
>> > >> >>>>>>>> >> >>>> > Reuven
>> > >> >>>>>>>> >> >>>> >
>> > >> >>>>>>>> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar
>> > mailto:rakeshku...@lyft.com>> wrote:
>> > >> >>>>>>>> >> >>>> >>
>> > >> >>>>>>>> >> >>>> >> Hi,
>> > >> >>>>>>>> >> >>>> >>
>> > >> >>>>>>>> >> >>>> >> I have been using python sdk for the
>> > application and also using BagState in production. I was wondering
>> > whether state logic has any write-through-cache implemented or not.
>> > If we are sending every read and write request through network then
>> > it comes with a performance cost. We can avoid network call for a
>> > read operation if we have write-through-cache.
>> > >> >>>>>>>> >> >>>> >> I have superficially looked into the
>> > implementation and I didn't see any cache implementation.
>> > >> >>>>>>>> >> >>>> >>
>> > >> >>>>>>>> >> >>>> >> is it possible to have this cache? would it
>> > cause any issue if we have the caching layer?
>> > >> >>>>>>>> >> >>>> >>
>> >
>>
>


Re: Write-through-cache in State logic

2019-08-12 Thread Thomas Weise
gt;>>>>>> > Thanks Robert,
> > >> >>>>>>>> >
> > >> >>>>>>>> >  I stumble on the jira that you have created some time
> ago
> > >> >>>>>>>> > https://jira.apache.org/jira/browse/BEAM-5428
> > >> >>>>>>>> >
> > >> >>>>>>>> > You also marked code where code changes are required:
> > >> >>>>>>>> >
> >
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291
> > >> >>>>>>>> >
> >
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
> > >> >>>>>>>> >
> >
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465
> > >> >>>>>>>> >
> > >> >>>>>>>> > I am willing to provide help to implement this. Let me
> > know how I can help.
> > >> >>>>>>>>
> > >> >>>>>>>> As far as I'm aware, no one is actively working on it
> > right now.
> > >> >>>>>>>> Please feel free to assign yourself the JIRA entry and
> > I'll be happy
> > >> >>>>>>>> to answer any questions you might have if (well probably
> > when) these
> > >> >>>>>>>> pointers are insufficient.
> > >> >>>>>>>>
> > >> >>>>>>>> > On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw
> > mailto:rober...@google.com>> wrote:
> > >> >>>>>>>> >>
> > >> >>>>>>>> >> This is documented at
> > >> >>>>>>>> >>
> >
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
> > >> >>>>>>>> >> . Note that it requires participation of both the
> > runner and the SDK
> > >> >>>>>>>> >> (though there are no correctness issues if one or the
> > other side does
> > >> >>>>>>>> >> not understand the protocol, caching just won't be
> used).
> > >> >>>>>>>> >>
> > >> >>>>>>>> >> I don't think it's been implemented anywhere, but
> > could be very
> > >> >>>>>>>> >> beneficial for performance.
> > >> >>>>>>>> >>
> > >> >>>>>>>> >> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar
> > mailto:rakeshku...@lyft.com>> wrote:
> > >> >>>>>>>> >> >
> > >> >>>>>>>> >> > I checked the python sdk[1] and it has similar
> > implementation as Java SDK.
> > >> >>>>>>>> >> >
> > >> >>>>>>>> >> > I would agree with Thomas. In case of high volume
> > event stream and bigger cluster size, network call can potentially
> > cause a bottleneck.
> > >> >>>>>>>> >> >
> > >> >>>>>>>> >> > @Robert
> > >> >>>>>>>> >> > I am interested to see the proposal. Can you
> > provide me the link of the proposal?
> > >> >>>>>>>> >> >
> > >> >>>>>>>> >> > [1]:
> >
> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
> > >> >>>>>>>> >> >
> > >> >>>>>>>> >> >
> > >> >>>>>>>> >> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise
> > mailto:t...@apache.org>> wrote:
> > >> >>>>>>>> >> >>
> > >> >>>>>>>> >> >> Thanks for the pointer. For streaming, it will be
> > important to support caching across bundles. It appears that even
> > the Java SDK doesn't support that yet?
> > >> >>>>>>>> >> >>
> > >> >>>>>>>> >> >>
> >
> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
> > >> >>>>>>>> >> >>
> > >> >>>>>>>> >> >> Regarding clear/append: It would be nice if both
> > could occur within a single Fn Api roundtrip when the state is
> > persisted.
> > >> >>>>>>>> >> >>
> > >> >>>>>>>> >> >> Thanks,
> > >> >>>>>>>> >> >> Thomas
> > >> >>>>>>>> >> >>
> > >> >>>>>>>> >> >>
> > >> >>>>>>>> >> >>
> > >> >>>>>>>> >> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik
> > mailto:lc...@google.com>> wrote:
> > >> >>>>>>>> >> >>>
> > >> >>>>>>>> >> >>> User state is built on top of read, append and
> > clear and not off a read and write paradigm to allow for blind
> appends.
> > >> >>>>>>>> >> >>>
> > >> >>>>>>>> >> >>> The optimization you speak of can be done
> > completely inside the SDK without any additional protocol being
> > required as long as you clear the state first and then append all
> > your new data. The Beam Java SDK does this for all runners when
> > executed portably[1]. You could port the same logic to the Beam
> > Python SDK as well.
> > >> >>>>>>>> >> >>>
> > >> >>>>>>>> >> >>> 1:
> >
> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
> > >> >>>>>>>> >> >>>
> > >> >>>>>>>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw
> > mailto:rober...@google.com>> wrote:
> > >> >>>>>>>> >> >>>>
> > >> >>>>>>>> >> >>>> Python workers also have a per-bundle SDK-side
> > cache. A protocol has
> > >> >>>>>>>> >> >>>> been proposed, but hasn't yet been implemented
> > in any SDKs or runners.
> > >> >>>>>>>> >> >>>>
> > >> >>>>>>>> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax
> > mailto:re...@google.com>> wrote:
> > >> >>>>>>>> >> >>>> >
> > >> >>>>>>>> >> >>>> > It's runner dependent. Some runners (e.g. the
> > Dataflow runner) do have such a cache, though I think it's currently
> > has a cap for large bags.
> > >> >>>>>>>> >> >>>> >
> > >> >>>>>>>> >> >>>> > Reuven
> > >> >>>>>>>> >> >>>> >
> > >> >>>>>>>> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar
> > mailto:rakeshku...@lyft.com>> wrote:
> > >> >>>>>>>> >> >>>> >>
> > >> >>>>>>>> >> >>>> >> Hi,
> > >> >>>>>>>> >> >>>> >>
> > >> >>>>>>>> >> >>>> >> I have been using python sdk for the
> > application and also using BagState in production. I was wondering
> > whether state logic has any write-through-cache implemented or not.
> > If we are sending every read and write request through network then
> > it comes with a performance cost. We can avoid network call for a
> > read operation if we have write-through-cache.
> > >> >>>>>>>> >> >>>> >> I have superficially looked into the
> > implementation and I didn't see any cache implementation.
> > >> >>>>>>>> >> >>>> >>
> > >> >>>>>>>> >> >>>> >> is it possible to have this cache? would it
> > cause any issue if we have the caching layer?
> > >> >>>>>>>> >> >>>> >>
> >
>


Re: Write-through-cache in State logic

2019-08-12 Thread Maximilian Michels
>> pointers are insufficient.
> >> >>>>>>>>
> >> >>>>>>>> > On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw
> mailto:rober...@google.com>> wrote:
> >> >>>>>>>> >>
> >> >>>>>>>> >> This is documented at
> >> >>>>>>>> >>
> 
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
> >> >>>>>>>> >> . Note that it requires participation of both the
> runner and the SDK
> >> >>>>>>>> >> (though there are no correctness issues if one or the
> other side does
> >> >>>>>>>> >> not understand the protocol, caching just won't be used).
> >> >>>>>>>> >>
> >> >>>>>>>> >> I don't think it's been implemented anywhere, but
> could be very
> >> >>>>>>>> >> beneficial for performance.
> >> >>>>>>>> >>
> >> >>>>>>>> >> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar
> mailto:rakeshku...@lyft.com>> wrote:
> >> >>>>>>>> >> >
> >> >>>>>>>> >> > I checked the python sdk[1] and it has similar
> implementation as Java SDK.
> >> >>>>>>>> >> >
> >> >>>>>>>> >> > I would agree with Thomas. In case of high volume
> event stream and bigger cluster size, network call can potentially
> cause a bottleneck.
> >> >>>>>>>> >> >
> >> >>>>>>>> >> > @Robert
> >> >>>>>>>> >> > I am interested to see the proposal. Can you
> provide me the link of the proposal?
> >> >>>>>>>> >> >
> >> >>>>>>>> >> > [1]:
> 
> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
> >> >>>>>>>> >> >
> >> >>>>>>>> >> >
> >> >>>>>>>> >> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise
> mailto:t...@apache.org>> wrote:
> >> >>>>>>>> >> >>
> >> >>>>>>>> >> >> Thanks for the pointer. For streaming, it will be
> important to support caching across bundles. It appears that even
> the Java SDK doesn't support that yet?
> >> >>>>>>>> >> >>
> >> >>>>>>>> >> >>
> 
> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
> >> >>>>>>>> >> >>
> >> >>>>>>>> >> >> Regarding clear/append: It would be nice if both
> could occur within a single Fn Api roundtrip when the state is
> persisted.
> >> >>>>>>>> >> >>
> >> >>>>>>>> >> >> Thanks,
> >> >>>>>>>> >> >> Thomas
> >> >>>>>>>> >> >>
> >> >>>>>>>> >> >>
> >> >>>>>>>> >> >>
> >> >>>>>>>> >> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik
> mailto:lc...@google.com>> wrote:
> >> >>>>>>>> >> >>>
> >> >>>>>>>> >> >>> User state is built on top of read, append and
> clear and not off a read and write paradigm to allow for blind appends.
> >> >>>>>>>> >> >>>
> >> >>>>>>>> >> >>> The optimization you speak of can be done
> completely inside the SDK without any additional protocol being
> required as long as you clear the state first and then append all
> your new data. The Beam Java SDK does this for all runners when
> executed portably[1]. You could port the same logic to the Beam
> Python SDK as well.
> >> >>>>>>>> >> >>>
> >> >>>>>>>> >> >>> 1:
> 
> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
> >> >>>>>>>> >> >>>
> >> >>>>>>>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw
> mailto:rober...@google.com>> wrote:
> >> >>>>>>>> >> >>>>
> >> >>>>>>>> >> >>>> Python workers also have a per-bundle SDK-side
> cache. A protocol has
> >> >>>>>>>> >> >>>> been proposed, but hasn't yet been implemented
> in any SDKs or runners.
> >> >>>>>>>> >> >>>>
> >> >>>>>>>> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax
> mailto:re...@google.com>> wrote:
> >> >>>>>>>> >> >>>> >
> >> >>>>>>>> >> >>>> > It's runner dependent. Some runners (e.g. the
> Dataflow runner) do have such a cache, though I think it's currently
> has a cap for large bags.
> >> >>>>>>>> >> >>>> >
> >> >>>>>>>> >> >>>> > Reuven
> >> >>>>>>>> >> >>>> >
> >> >>>>>>>> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar
> mailto:rakeshku...@lyft.com>> wrote:
> >> >>>>>>>> >> >>>> >>
> >> >>>>>>>> >> >>>> >> Hi,
> >> >>>>>>>> >> >>>> >>
> >> >>>>>>>> >> >>>> >> I have been using python sdk for the
> application and also using BagState in production. I was wondering
> whether state logic has any write-through-cache implemented or not.
> If we are sending every read and write request through network then
> it comes with a performance cost. We can avoid network call for a
> read operation if we have write-through-cache.
> >> >>>>>>>> >> >>>> >> I have superficially looked into the
> implementation and I didn't see any cache implementation.
> >> >>>>>>>> >> >>>> >>
> >> >>>>>>>> >> >>>> >> is it possible to have this cache? would it
> cause any issue if we have the caching layer?
> >> >>>>>>>> >> >>>> >>
> 


Re: Write-through-cache in State logic

2019-08-09 Thread Lukasz Cwik
@userstate.on_timer(timer_spec)
> >> >>>>>>> def process_timer(self,
> count=beam.DoFn.StateParam(count_state_spec), window=beam.DoFn.WindowParam):
> >> >>>>>>> if count.read() == 0:
> >> >>>>>>> logging.warning("###timer fired with count %d,
> window %s" % (count.read(), window))
> >> >>>>>>>
> >> >>>>>>>
> >> >>>>>>>
> >> >>>>>>> On Thu, Jul 25, 2019 at 5:09 AM Robert Bradshaw <
> rober...@google.com> wrote:
> >> >>>>>>>>
> >> >>>>>>>> On Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar <
> rakeshku...@lyft.com> wrote:
> >> >>>>>>>> >
> >> >>>>>>>> > Thanks Robert,
> >> >>>>>>>> >
> >> >>>>>>>> >  I stumble on the jira that you have created some time ago
> >> >>>>>>>> > https://jira.apache.org/jira/browse/BEAM-5428
> >> >>>>>>>> >
> >> >>>>>>>> > You also marked code where code changes are required:
> >> >>>>>>>> >
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291
> >> >>>>>>>> >
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
> >> >>>>>>>> >
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465
> >> >>>>>>>> >
> >> >>>>>>>> > I am willing to provide help to implement this. Let me know
> how I can help.
> >> >>>>>>>>
> >> >>>>>>>> As far as I'm aware, no one is actively working on it right
> now.
> >> >>>>>>>> Please feel free to assign yourself the JIRA entry and I'll be
> happy
> >> >>>>>>>> to answer any questions you might have if (well probably when)
> these
> >> >>>>>>>> pointers are insufficient.
> >> >>>>>>>>
> >> >>>>>>>> > On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw <
> rober...@google.com> wrote:
> >> >>>>>>>> >>
> >> >>>>>>>> >> This is documented at
> >> >>>>>>>> >>
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
> >> >>>>>>>> >> . Note that it requires participation of both the runner
> and the SDK
> >> >>>>>>>> >> (though there are no correctness issues if one or the other
> side does
> >> >>>>>>>> >> not understand the protocol, caching just won't be used).
> >> >>>>>>>> >>
> >> >>>>>>>> >> I don't think it's been implemented anywhere, but could be
> very
> >> >>>>>>>> >> beneficial for performance.
> >> >>>>>>>> >>
> >> >>>>>>>> >> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar <
> rakeshku...@lyft.com> wrote:
> >> >>>>>>>> >> >
> >> >>>>>>>> >> > I checked the python sdk[1] and it has similar
> implementation as Java SDK.
> >> >>>>>>>> >> >
> >> >>>>>>>> >> > I would agree with Thomas. In case of high volume event
> stream and bigger cluster size, network call can potentially cause a
> bottleneck.
> >> >>>>>>>> >> >
> >> >>>>>>>> >> > @Robert
> >> >>>>>>>> >> > I am interested to see the proposal. Can you provide me
> the link of the proposal?
> >> >>>>>>>> >> >
> >> >>>>>>>> >> > [1]:
> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
> >> >>>>>>>> >> >
> >> >>>>>>>> >> >
> >> >>>>>>>> >> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise <
> t...@apache.org> wrote:
> >> >>>>>>>> >> >>
> >> >>>>>>>> >> >> Thanks for the pointer. For streaming, it will be
> important to support caching across bundles. It appears that even the Java
> SDK doesn't support that yet?
> >> >>>>>>>> >> >>
> >> >>>>>>>> >> >>
> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
> >> >>>>>>>> >> >>
> >> >>>>>>>> >> >> Regarding clear/append: It would be nice if both could
> occur within a single Fn Api roundtrip when the state is persisted.
> >> >>>>>>>> >> >>
> >> >>>>>>>> >> >> Thanks,
> >> >>>>>>>> >> >> Thomas
> >> >>>>>>>> >> >>
> >> >>>>>>>> >> >>
> >> >>>>>>>> >> >>
> >> >>>>>>>> >> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik <
> lc...@google.com> wrote:
> >> >>>>>>>> >> >>>
> >> >>>>>>>> >> >>> User state is built on top of read, append and clear
> and not off a read and write paradigm to allow for blind appends.
> >> >>>>>>>> >> >>>
> >> >>>>>>>> >> >>> The optimization you speak of can be done completely
> inside the SDK without any additional protocol being required as long as
> you clear the state first and then append all your new data. The Beam Java
> SDK does this for all runners when executed portably[1]. You could port the
> same logic to the Beam Python SDK as well.
> >> >>>>>>>> >> >>>
> >> >>>>>>>> >> >>> 1:
> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
> >> >>>>>>>> >> >>>
> >> >>>>>>>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw <
> rober...@google.com> wrote:
> >> >>>>>>>> >> >>>>
> >> >>>>>>>> >> >>>> Python workers also have a per-bundle SDK-side cache.
> A protocol has
> >> >>>>>>>> >> >>>> been proposed, but hasn't yet been implemented in any
> SDKs or runners.
> >> >>>>>>>> >> >>>>
> >> >>>>>>>> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax <
> re...@google.com> wrote:
> >> >>>>>>>> >> >>>> >
> >> >>>>>>>> >> >>>> > It's runner dependent. Some runners (e.g. the
> Dataflow runner) do have such a cache, though I think it's currently has a
> cap for large bags.
> >> >>>>>>>> >> >>>> >
> >> >>>>>>>> >> >>>> > Reuven
> >> >>>>>>>> >> >>>> >
> >> >>>>>>>> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar <
> rakeshku...@lyft.com> wrote:
> >> >>>>>>>> >> >>>> >>
> >> >>>>>>>> >> >>>> >> Hi,
> >> >>>>>>>> >> >>>> >>
> >> >>>>>>>> >> >>>> >> I have been using python sdk for the application
> and also using BagState in production. I was wondering whether state logic
> has any write-through-cache implemented or not. If we are sending every
> read and write request through network then it comes with a performance
> cost. We can avoid network call for a read operation if we have
> write-through-cache.
> >> >>>>>>>> >> >>>> >> I have superficially looked into the implementation
> and I didn't see any cache implementation.
> >> >>>>>>>> >> >>>> >>
> >> >>>>>>>> >> >>>> >> is it possible to have this cache? would it cause
> any issue if we have the caching layer?
> >> >>>>>>>> >> >>>> >>
>


Re: Write-through-cache in State logic

2019-08-09 Thread Robert Bradshaw
gt;>> @userstate.on_timer(timer_spec)
>> >>>>>>> def process_timer(self, 
>> >>>>>>> count=beam.DoFn.StateParam(count_state_spec), 
>> >>>>>>> window=beam.DoFn.WindowParam):
>> >>>>>>> if count.read() == 0:
>> >>>>>>> logging.warning("###timer fired with count %d, window 
>> >>>>>>> %s" % (count.read(), window))
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On Thu, Jul 25, 2019 at 5:09 AM Robert Bradshaw 
>> >>>>>>>  wrote:
>> >>>>>>>>
>> >>>>>>>> On Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar  
>> >>>>>>>> wrote:
>> >>>>>>>> >
>> >>>>>>>> > Thanks Robert,
>> >>>>>>>> >
>> >>>>>>>> >  I stumble on the jira that you have created some time ago
>> >>>>>>>> > https://jira.apache.org/jira/browse/BEAM-5428
>> >>>>>>>> >
>> >>>>>>>> > You also marked code where code changes are required:
>> >>>>>>>> > https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291
>> >>>>>>>> > https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
>> >>>>>>>> > https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465
>> >>>>>>>> >
>> >>>>>>>> > I am willing to provide help to implement this. Let me know how I 
>> >>>>>>>> > can help.
>> >>>>>>>>
>> >>>>>>>> As far as I'm aware, no one is actively working on it right now.
>> >>>>>>>> Please feel free to assign yourself the JIRA entry and I'll be happy
>> >>>>>>>> to answer any questions you might have if (well probably when) these
>> >>>>>>>> pointers are insufficient.
>> >>>>>>>>
>> >>>>>>>> > On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw 
>> >>>>>>>> >  wrote:
>> >>>>>>>> >>
>> >>>>>>>> >> This is documented at
>> >>>>>>>> >> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
>> >>>>>>>> >> . Note that it requires participation of both the runner and the 
>> >>>>>>>> >> SDK
>> >>>>>>>> >> (though there are no correctness issues if one or the other side 
>> >>>>>>>> >> does
>> >>>>>>>> >> not understand the protocol, caching just won't be used).
>> >>>>>>>> >>
>> >>>>>>>> >> I don't think it's been implemented anywhere, but could be very
>> >>>>>>>> >> beneficial for performance.
>> >>>>>>>> >>
>> >>>>>>>> >> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar 
>> >>>>>>>> >>  wrote:
>> >>>>>>>> >> >
>> >>>>>>>> >> > I checked the python sdk[1] and it has similar implementation 
>> >>>>>>>> >> > as Java SDK.
>> >>>>>>>> >> >
>> >>>>>>>> >> > I would agree with Thomas. In case of high volume event stream 
>> >>>>>>>> >> > and bigger cluster size, network call can potentially cause a 
>> >>>>>>>> >> > bottleneck.
>> >>>>>>>> >> >
>> >>>>>>>> >> > @Robert
>> >>>>>>>> >> > I am interested to see the proposal. Can you provide me the 
>> >>>>>>>> >> > link of the proposal?
>> >>>>>>>> >> >
>> >>>>>>>> >> > [1]: 
>> >>>>>>>> >> > https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
>> >>>>>>>> >> >
>> >>>>>>>> >> >
>> >>>>>>>> >> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise  
>> >>>>>>>> >> > wrote:
>> >>>>>>>> >> >>
>> >>>>>>>> >> >> Thanks for the pointer. For streaming, it will be important 
>> >>>>>>>> >> >> to support caching across bundles. It appears that even the 
>> >>>>>>>> >> >> Java SDK doesn't support that yet?
>> >>>>>>>> >> >>
>> >>>>>>>> >> >> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
>> >>>>>>>> >> >>
>> >>>>>>>> >> >> Regarding clear/append: It would be nice if both could occur 
>> >>>>>>>> >> >> within a single Fn Api roundtrip when the state is persisted.
>> >>>>>>>> >> >>
>> >>>>>>>> >> >> Thanks,
>> >>>>>>>> >> >> Thomas
>> >>>>>>>> >> >>
>> >>>>>>>> >> >>
>> >>>>>>>> >> >>
>> >>>>>>>> >> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik 
>> >>>>>>>> >> >>  wrote:
>> >>>>>>>> >> >>>
>> >>>>>>>> >> >>> User state is built on top of read, append and clear and not 
>> >>>>>>>> >> >>> off a read and write paradigm to allow for blind appends.
>> >>>>>>>> >> >>>
>> >>>>>>>> >> >>> The optimization you speak of can be done completely inside 
>> >>>>>>>> >> >>> the SDK without any additional protocol being required as 
>> >>>>>>>> >> >>> long as you clear the state first and then append all your 
>> >>>>>>>> >> >>> new data. The Beam Java SDK does this for all runners when 
>> >>>>>>>> >> >>> executed portably[1]. You could port the same logic to the 
>> >>>>>>>> >> >>> Beam Python SDK as well.
>> >>>>>>>> >> >>>
>> >>>>>>>> >> >>> 1: 
>> >>>>>>>> >> >>> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
>> >>>>>>>> >> >>>
>> >>>>>>>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw 
>> >>>>>>>> >> >>>  wrote:
>> >>>>>>>> >> >>>>
>> >>>>>>>> >> >>>> Python workers also have a per-bundle SDK-side cache. A 
>> >>>>>>>> >> >>>> protocol has
>> >>>>>>>> >> >>>> been proposed, but hasn't yet been implemented in any SDKs 
>> >>>>>>>> >> >>>> or runners.
>> >>>>>>>> >> >>>>
>> >>>>>>>> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax 
>> >>>>>>>> >> >>>>  wrote:
>> >>>>>>>> >> >>>> >
>> >>>>>>>> >> >>>> > It's runner dependent. Some runners (e.g. the Dataflow 
>> >>>>>>>> >> >>>> > runner) do have such a cache, though I think it's 
>> >>>>>>>> >> >>>> > currently has a cap for large bags.
>> >>>>>>>> >> >>>> >
>> >>>>>>>> >> >>>> > Reuven
>> >>>>>>>> >> >>>> >
>> >>>>>>>> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar 
>> >>>>>>>> >> >>>> >  wrote:
>> >>>>>>>> >> >>>> >>
>> >>>>>>>> >> >>>> >> Hi,
>> >>>>>>>> >> >>>> >>
>> >>>>>>>> >> >>>> >> I have been using python sdk for the application and 
>> >>>>>>>> >> >>>> >> also using BagState in production. I was wondering 
>> >>>>>>>> >> >>>> >> whether state logic has any write-through-cache 
>> >>>>>>>> >> >>>> >> implemented or not. If we are sending every read and 
>> >>>>>>>> >> >>>> >> write request through network then it comes with a 
>> >>>>>>>> >> >>>> >> performance cost. We can avoid network call for a read 
>> >>>>>>>> >> >>>> >> operation if we have write-through-cache.
>> >>>>>>>> >> >>>> >> I have superficially looked into the implementation and 
>> >>>>>>>> >> >>>> >> I didn't see any cache implementation.
>> >>>>>>>> >> >>>> >>
>> >>>>>>>> >> >>>> >> is it possible to have this cache? would it cause any 
>> >>>>>>>> >> >>>> >> issue if we have the caching layer?
>> >>>>>>>> >> >>>> >>


Re: Write-through-cache in State logic

2019-08-05 Thread Thomas Weise
;> count.add(1)
>>>>>> timer_seconds = (window.end.micros // 100) - 1
>>>>>> timer.set(timer_seconds)
>>>>>>
>>>>>> @userstate.on_timer(timer_spec)
>>>>>> def process_timer(self,
>>>>>> count=beam.DoFn.StateParam(count_state_spec), 
>>>>>> window=beam.DoFn.WindowParam):
>>>>>> if count.read() == 0:
>>>>>> logging.warning("###timer fired with count %d, window %s"
>>>>>> % (count.read(), window))
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 25, 2019 at 5:09 AM Robert Bradshaw 
>>>>>> wrote:
>>>>>>
>>>>>>> On Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar 
>>>>>>> wrote:
>>>>>>> >
>>>>>>> > Thanks Robert,
>>>>>>> >
>>>>>>> >  I stumble on the jira that you have created some time ago
>>>>>>> > https://jira.apache.org/jira/browse/BEAM-5428
>>>>>>> >
>>>>>>> > You also marked code where code changes are required:
>>>>>>> >
>>>>>>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291
>>>>>>> >
>>>>>>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
>>>>>>> >
>>>>>>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465
>>>>>>> >
>>>>>>> > I am willing to provide help to implement this. Let me know how I
>>>>>>> can help.
>>>>>>>
>>>>>>> As far as I'm aware, no one is actively working on it right now.
>>>>>>> Please feel free to assign yourself the JIRA entry and I'll be happy
>>>>>>> to answer any questions you might have if (well probably when) these
>>>>>>> pointers are insufficient.
>>>>>>>
>>>>>>> > On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw <
>>>>>>> rober...@google.com> wrote:
>>>>>>> >>
>>>>>>> >> This is documented at
>>>>>>> >>
>>>>>>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
>>>>>>> >> . Note that it requires participation of both the runner and the
>>>>>>> SDK
>>>>>>> >> (though there are no correctness issues if one or the other side
>>>>>>> does
>>>>>>> >> not understand the protocol, caching just won't be used).
>>>>>>> >>
>>>>>>> >> I don't think it's been implemented anywhere, but could be very
>>>>>>> >> beneficial for performance.
>>>>>>> >>
>>>>>>> >> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar <
>>>>>>> rakeshku...@lyft.com> wrote:
>>>>>>> >> >
>>>>>>> >> > I checked the python sdk[1] and it has similar implementation
>>>>>>> as Java SDK.
>>>>>>> >> >
>>>>>>> >> > I would agree with Thomas. In case of high volume event stream
>>>>>>> and bigger cluster size, network call can potentially cause a 
>>>>>>> bottleneck.
>>>>>>> >> >
>>>>>>> >> > @Robert
>>>>>>> >> > I am interested to see the proposal. Can you provide me the
>>>>>>> link of the proposal?
>>>>>>> >> >
>>>>>>> >> > [1]:
>>>>>>> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise 
>>>>>>> wrote:
>>>>>>> >> >>
>>>>>>

Re: Write-through-cache in State logic

2019-08-05 Thread Lukasz Cwik
Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar 
>>>>>> wrote:
>>>>>> >
>>>>>> > Thanks Robert,
>>>>>> >
>>>>>> >  I stumble on the jira that you have created some time ago
>>>>>> > https://jira.apache.org/jira/browse/BEAM-5428
>>>>>> >
>>>>>> > You also marked code where code changes are required:
>>>>>> >
>>>>>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291
>>>>>> >
>>>>>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
>>>>>> >
>>>>>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465
>>>>>> >
>>>>>> > I am willing to provide help to implement this. Let me know how I
>>>>>> can help.
>>>>>>
>>>>>> As far as I'm aware, no one is actively working on it right now.
>>>>>> Please feel free to assign yourself the JIRA entry and I'll be happy
>>>>>> to answer any questions you might have if (well probably when) these
>>>>>> pointers are insufficient.
>>>>>>
>>>>>> > On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw <
>>>>>> rober...@google.com> wrote:
>>>>>> >>
>>>>>> >> This is documented at
>>>>>> >>
>>>>>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
>>>>>> >> . Note that it requires participation of both the runner and the
>>>>>> SDK
>>>>>> >> (though there are no correctness issues if one or the other side
>>>>>> does
>>>>>> >> not understand the protocol, caching just won't be used).
>>>>>> >>
>>>>>> >> I don't think it's been implemented anywhere, but could be very
>>>>>> >> beneficial for performance.
>>>>>> >>
>>>>>> >> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar 
>>>>>> wrote:
>>>>>> >> >
>>>>>> >> > I checked the python sdk[1] and it has similar implementation as
>>>>>> Java SDK.
>>>>>> >> >
>>>>>> >> > I would agree with Thomas. In case of high volume event stream
>>>>>> and bigger cluster size, network call can potentially cause a bottleneck.
>>>>>> >> >
>>>>>> >> > @Robert
>>>>>> >> > I am interested to see the proposal. Can you provide me the link
>>>>>> of the proposal?
>>>>>> >> >
>>>>>> >> > [1]:
>>>>>> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
>>>>>> >> >
>>>>>> >> >
>>>>>> >> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise 
>>>>>> wrote:
>>>>>> >> >>
>>>>>> >> >> Thanks for the pointer. For streaming, it will be important to
>>>>>> support caching across bundles. It appears that even the Java SDK doesn't
>>>>>> support that yet?
>>>>>> >> >>
>>>>>> >> >>
>>>>>> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
>>>>>> >> >>
>>>>>> >> >> Regarding clear/append: It would be nice if both could occur
>>>>>> within a single Fn Api roundtrip when the state is persisted.
>>>>>> >> >>
>>>>>> >> >> Thanks,
>>>>>> >> >> Thomas
>>>>>> >> >>
>>>>>> >> >>
>>>>>> >> >>
>>>>>> >> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik 
>>>>>> wrote:
>>>>>> >> >>>
>>>>>> >> >>> User state is built on top of read, append and clear and not
>>>>>> off a read and write paradigm to allow for blind appends.
>>>>>> >> >>>
>>>>>> >> >>> The optimization you speak of can be done completely inside
>>>>>> the SDK without any additional protocol being required as long as you 
>>>>>> clear
>>>>>> the state first and then append all your new data. The Beam Java SDK does
>>>>>> this for all runners when executed portably[1]. You could port the same
>>>>>> logic to the Beam Python SDK as well.
>>>>>> >> >>>
>>>>>> >> >>> 1:
>>>>>> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
>>>>>> >> >>>
>>>>>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw <
>>>>>> rober...@google.com> wrote:
>>>>>> >> >>>>
>>>>>> >> >>>> Python workers also have a per-bundle SDK-side cache. A
>>>>>> protocol has
>>>>>> >> >>>> been proposed, but hasn't yet been implemented in any SDKs or
>>>>>> runners.
>>>>>> >> >>>>
>>>>>> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax 
>>>>>> wrote:
>>>>>> >> >>>> >
>>>>>> >> >>>> > It's runner dependent. Some runners (e.g. the Dataflow
>>>>>> runner) do have such a cache, though I think it's currently has a cap for
>>>>>> large bags.
>>>>>> >> >>>> >
>>>>>> >> >>>> > Reuven
>>>>>> >> >>>> >
>>>>>> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar <
>>>>>> rakeshku...@lyft.com> wrote:
>>>>>> >> >>>> >>
>>>>>> >> >>>> >> Hi,
>>>>>> >> >>>> >>
>>>>>> >> >>>> >> I have been using python sdk for the application and also
>>>>>> using BagState in production. I was wondering whether state logic has any
>>>>>> write-through-cache implemented or not. If we are sending every read and
>>>>>> write request through network then it comes with a performance cost. We 
>>>>>> can
>>>>>> avoid network call for a read operation if we have write-through-cache.
>>>>>> >> >>>> >> I have superficially looked into the implementation and I
>>>>>> didn't see any cache implementation.
>>>>>> >> >>>> >>
>>>>>> >> >>>> >> is it possible to have this cache? would it cause any
>>>>>> issue if we have the caching layer?
>>>>>> >> >>>> >>
>>>>>>
>>>>>


Re: Write-through-cache in State logic

2019-08-05 Thread Lukasz Cwik
eam/runners/worker/bundle_processor.py#L465
>>>>> >
>>>>> > I am willing to provide help to implement this. Let me know how I
>>>>> can help.
>>>>>
>>>>> As far as I'm aware, no one is actively working on it right now.
>>>>> Please feel free to assign yourself the JIRA entry and I'll be happy
>>>>> to answer any questions you might have if (well probably when) these
>>>>> pointers are insufficient.
>>>>>
>>>>> > On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw 
>>>>> wrote:
>>>>> >>
>>>>> >> This is documented at
>>>>> >>
>>>>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
>>>>> >> . Note that it requires participation of both the runner and the SDK
>>>>> >> (though there are no correctness issues if one or the other side
>>>>> does
>>>>> >> not understand the protocol, caching just won't be used).
>>>>> >>
>>>>> >> I don't think it's been implemented anywhere, but could be very
>>>>> >> beneficial for performance.
>>>>> >>
>>>>> >> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar 
>>>>> wrote:
>>>>> >> >
>>>>> >> > I checked the python sdk[1] and it has similar implementation as
>>>>> Java SDK.
>>>>> >> >
>>>>> >> > I would agree with Thomas. In case of high volume event stream
>>>>> and bigger cluster size, network call can potentially cause a bottleneck.
>>>>> >> >
>>>>> >> > @Robert
>>>>> >> > I am interested to see the proposal. Can you provide me the link
>>>>> of the proposal?
>>>>> >> >
>>>>> >> > [1]:
>>>>> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
>>>>> >> >
>>>>> >> >
>>>>> >> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise 
>>>>> wrote:
>>>>> >> >>
>>>>> >> >> Thanks for the pointer. For streaming, it will be important to
>>>>> support caching across bundles. It appears that even the Java SDK doesn't
>>>>> support that yet?
>>>>> >> >>
>>>>> >> >>
>>>>> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
>>>>> >> >>
>>>>> >> >> Regarding clear/append: It would be nice if both could occur
>>>>> within a single Fn Api roundtrip when the state is persisted.
>>>>> >> >>
>>>>> >> >> Thanks,
>>>>> >> >> Thomas
>>>>> >> >>
>>>>> >> >>
>>>>> >> >>
>>>>> >> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik 
>>>>> wrote:
>>>>> >> >>>
>>>>> >> >>> User state is built on top of read, append and clear and not
>>>>> off a read and write paradigm to allow for blind appends.
>>>>> >> >>>
>>>>> >> >>> The optimization you speak of can be done completely inside the
>>>>> SDK without any additional protocol being required as long as you clear 
>>>>> the
>>>>> state first and then append all your new data. The Beam Java SDK does this
>>>>> for all runners when executed portably[1]. You could port the same logic 
>>>>> to
>>>>> the Beam Python SDK as well.
>>>>> >> >>>
>>>>> >> >>> 1:
>>>>> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
>>>>> >> >>>
>>>>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw <
>>>>> rober...@google.com> wrote:
>>>>> >> >>>>
>>>>> >> >>>> Python workers also have a per-bundle SDK-side cache. A
>>>>> protocol has
>>>>> >> >>>> been proposed, but hasn't yet been implemented in any SDKs or
>>>>> runners.
>>>>> >> >>>>
>>>>> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax 
>>>>> wrote:
>>>>> >> >>>> >
>>>>> >> >>>> > It's runner dependent. Some runners (e.g. the Dataflow
>>>>> runner) do have such a cache, though I think it's currently has a cap for
>>>>> large bags.
>>>>> >> >>>> >
>>>>> >> >>>> > Reuven
>>>>> >> >>>> >
>>>>> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar <
>>>>> rakeshku...@lyft.com> wrote:
>>>>> >> >>>> >>
>>>>> >> >>>> >> Hi,
>>>>> >> >>>> >>
>>>>> >> >>>> >> I have been using python sdk for the application and also
>>>>> using BagState in production. I was wondering whether state logic has any
>>>>> write-through-cache implemented or not. If we are sending every read and
>>>>> write request through network then it comes with a performance cost. We 
>>>>> can
>>>>> avoid network call for a read operation if we have write-through-cache.
>>>>> >> >>>> >> I have superficially looked into the implementation and I
>>>>> didn't see any cache implementation.
>>>>> >> >>>> >>
>>>>> >> >>>> >> is it possible to have this cache? would it cause any issue
>>>>> if we have the caching layer?
>>>>> >> >>>> >>
>>>>>
>>>>


Re: Write-through-cache in State logic

2019-08-05 Thread Thomas Weise
ficient.
>>>>
>>>> > On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw 
>>>> wrote:
>>>> >>
>>>> >> This is documented at
>>>> >>
>>>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
>>>> >> . Note that it requires participation of both the runner and the SDK
>>>> >> (though there are no correctness issues if one or the other side does
>>>> >> not understand the protocol, caching just won't be used).
>>>> >>
>>>> >> I don't think it's been implemented anywhere, but could be very
>>>> >> beneficial for performance.
>>>> >>
>>>> >> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar 
>>>> wrote:
>>>> >> >
>>>> >> > I checked the python sdk[1] and it has similar implementation as
>>>> Java SDK.
>>>> >> >
>>>> >> > I would agree with Thomas. In case of high volume event stream and
>>>> bigger cluster size, network call can potentially cause a bottleneck.
>>>> >> >
>>>> >> > @Robert
>>>> >> > I am interested to see the proposal. Can you provide me the link
>>>> of the proposal?
>>>> >> >
>>>> >> > [1]:
>>>> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
>>>> >> >
>>>> >> >
>>>> >> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise 
>>>> wrote:
>>>> >> >>
>>>> >> >> Thanks for the pointer. For streaming, it will be important to
>>>> support caching across bundles. It appears that even the Java SDK doesn't
>>>> support that yet?
>>>> >> >>
>>>> >> >>
>>>> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
>>>> >> >>
>>>> >> >> Regarding clear/append: It would be nice if both could occur
>>>> within a single Fn Api roundtrip when the state is persisted.
>>>> >> >>
>>>> >> >> Thanks,
>>>> >> >> Thomas
>>>> >> >>
>>>> >> >>
>>>> >> >>
>>>> >> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik 
>>>> wrote:
>>>> >> >>>
>>>> >> >>> User state is built on top of read, append and clear and not off
>>>> a read and write paradigm to allow for blind appends.
>>>> >> >>>
>>>> >> >>> The optimization you speak of can be done completely inside the
>>>> SDK without any additional protocol being required as long as you clear the
>>>> state first and then append all your new data. The Beam Java SDK does this
>>>> for all runners when executed portably[1]. You could port the same logic to
>>>> the Beam Python SDK as well.
>>>> >> >>>
>>>> >> >>> 1:
>>>> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
>>>> >> >>>
>>>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw <
>>>> rober...@google.com> wrote:
>>>> >> >>>>
>>>> >> >>>> Python workers also have a per-bundle SDK-side cache. A
>>>> protocol has
>>>> >> >>>> been proposed, but hasn't yet been implemented in any SDKs or
>>>> runners.
>>>> >> >>>>
>>>> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax 
>>>> wrote:
>>>> >> >>>> >
>>>> >> >>>> > It's runner dependent. Some runners (e.g. the Dataflow
>>>> runner) do have such a cache, though I think it's currently has a cap for
>>>> large bags.
>>>> >> >>>> >
>>>> >> >>>> > Reuven
>>>> >> >>>> >
>>>> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar <
>>>> rakeshku...@lyft.com> wrote:
>>>> >> >>>> >>
>>>> >> >>>> >> Hi,
>>>> >> >>>> >>
>>>> >> >>>> >> I have been using python sdk for the application and also
>>>> using BagState in production. I was wondering whether state logic has any
>>>> write-through-cache implemented or not. If we are sending every read and
>>>> write request through network then it comes with a performance cost. We can
>>>> avoid network call for a read operation if we have write-through-cache.
>>>> >> >>>> >> I have superficially looked into the implementation and I
>>>> didn't see any cache implementation.
>>>> >> >>>> >>
>>>> >> >>>> >> is it possible to have this cache? would it cause any issue
>>>> if we have the caching layer?
>>>> >> >>>> >>
>>>>
>>>


Re: Write-through-cache in State logic

2019-08-05 Thread Lukasz Cwik
PM Rakesh Kumar 
>>> wrote:
>>> >> >
>>> >> > I checked the python sdk[1] and it has similar implementation as
>>> Java SDK.
>>> >> >
>>> >> > I would agree with Thomas. In case of high volume event stream and
>>> bigger cluster size, network call can potentially cause a bottleneck.
>>> >> >
>>> >> > @Robert
>>> >> > I am interested to see the proposal. Can you provide me the link of
>>> the proposal?
>>> >> >
>>> >> > [1]:
>>> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
>>> >> >
>>> >> >
>>> >> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise 
>>> wrote:
>>> >> >>
>>> >> >> Thanks for the pointer. For streaming, it will be important to
>>> support caching across bundles. It appears that even the Java SDK doesn't
>>> support that yet?
>>> >> >>
>>> >> >>
>>> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
>>> >> >>
>>> >> >> Regarding clear/append: It would be nice if both could occur
>>> within a single Fn Api roundtrip when the state is persisted.
>>> >> >>
>>> >> >> Thanks,
>>> >> >> Thomas
>>> >> >>
>>> >> >>
>>> >> >>
>>> >> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik 
>>> wrote:
>>> >> >>>
>>> >> >>> User state is built on top of read, append and clear and not off
>>> a read and write paradigm to allow for blind appends.
>>> >> >>>
>>> >> >>> The optimization you speak of can be done completely inside the
>>> SDK without any additional protocol being required as long as you clear the
>>> state first and then append all your new data. The Beam Java SDK does this
>>> for all runners when executed portably[1]. You could port the same logic to
>>> the Beam Python SDK as well.
>>> >> >>>
>>> >> >>> 1:
>>> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
>>> >> >>>
>>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw <
>>> rober...@google.com> wrote:
>>> >> >>>>
>>> >> >>>> Python workers also have a per-bundle SDK-side cache. A protocol
>>> has
>>> >> >>>> been proposed, but hasn't yet been implemented in any SDKs or
>>> runners.
>>> >> >>>>
>>> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax 
>>> wrote:
>>> >> >>>> >
>>> >> >>>> > It's runner dependent. Some runners (e.g. the Dataflow runner)
>>> do have such a cache, though I think it's currently has a cap for large
>>> bags.
>>> >> >>>> >
>>> >> >>>> > Reuven
>>> >> >>>> >
>>> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar <
>>> rakeshku...@lyft.com> wrote:
>>> >> >>>> >>
>>> >> >>>> >> Hi,
>>> >> >>>> >>
>>> >> >>>> >> I have been using python sdk for the application and also
>>> using BagState in production. I was wondering whether state logic has any
>>> write-through-cache implemented or not. If we are sending every read and
>>> write request through network then it comes with a performance cost. We can
>>> avoid network call for a read operation if we have write-through-cache.
>>> >> >>>> >> I have superficially looked into the implementation and I
>>> didn't see any cache implementation.
>>> >> >>>> >>
>>> >> >>>> >> is it possible to have this cache? would it cause any issue
>>> if we have the caching layer?
>>> >> >>>> >>
>>>
>>


Re: Write-through-cache in State logic

2019-07-29 Thread jincheng sun
 be important to
>> support caching across bundles. It appears that even the Java SDK doesn't
>> support that yet?
>> >> >>
>> >> >>
>> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
>> >> >>
>> >> >> Regarding clear/append: It would be nice if both could occur within
>> a single Fn Api roundtrip when the state is persisted.
>> >> >>
>> >> >> Thanks,
>> >> >> Thomas
>> >> >>
>> >> >>
>> >> >>
>> >> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik 
>> wrote:
>> >> >>>
>> >> >>> User state is built on top of read, append and clear and not off a
>> read and write paradigm to allow for blind appends.
>> >> >>>
>> >> >>> The optimization you speak of can be done completely inside the
>> SDK without any additional protocol being required as long as you clear the
>> state first and then append all your new data. The Beam Java SDK does this
>> for all runners when executed portably[1]. You could port the same logic to
>> the Beam Python SDK as well.
>> >> >>>
>> >> >>> 1:
>> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
>> >> >>>
>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw <
>> rober...@google.com> wrote:
>> >> >>>>
>> >> >>>> Python workers also have a per-bundle SDK-side cache. A protocol
>> has
>> >> >>>> been proposed, but hasn't yet been implemented in any SDKs or
>> runners.
>> >> >>>>
>> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax 
>> wrote:
>> >> >>>> >
>> >> >>>> > It's runner dependent. Some runners (e.g. the Dataflow runner)
>> do have such a cache, though I think it's currently has a cap for large
>> bags.
>> >> >>>> >
>> >> >>>> > Reuven
>> >> >>>> >
>> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar <
>> rakeshku...@lyft.com> wrote:
>> >> >>>> >>
>> >> >>>> >> Hi,
>> >> >>>> >>
>> >> >>>> >> I have been using python sdk for the application and also
>> using BagState in production. I was wondering whether state logic has any
>> write-through-cache implemented or not. If we are sending every read and
>> write request through network then it comes with a performance cost. We can
>> avoid network call for a read operation if we have write-through-cache.
>> >> >>>> >> I have superficially looked into the implementation and I
>> didn't see any cache implementation.
>> >> >>>> >>
>> >> >>>> >> is it possible to have this cache? would it cause any issue if
>> we have the caching layer?
>> >> >>>> >>
>>
>


Re: Write-through-cache in State logic

2019-07-29 Thread Thomas Weise
t; >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik 
> wrote:
> >> >>>
> >> >>> User state is built on top of read, append and clear and not off a
> read and write paradigm to allow for blind appends.
> >> >>>
> >> >>> The optimization you speak of can be done completely inside the SDK
> without any additional protocol being required as long as you clear the
> state first and then append all your new data. The Beam Java SDK does this
> for all runners when executed portably[1]. You could port the same logic to
> the Beam Python SDK as well.
> >> >>>
> >> >>> 1:
> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
> >> >>>
> >> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw <
> rober...@google.com> wrote:
> >> >>>>
> >> >>>> Python workers also have a per-bundle SDK-side cache. A protocol
> has
> >> >>>> been proposed, but hasn't yet been implemented in any SDKs or
> runners.
> >> >>>>
> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax 
> wrote:
> >> >>>> >
> >> >>>> > It's runner dependent. Some runners (e.g. the Dataflow runner)
> do have such a cache, though I think it's currently has a cap for large
> bags.
> >> >>>> >
> >> >>>> > Reuven
> >> >>>> >
> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar <
> rakeshku...@lyft.com> wrote:
> >> >>>> >>
> >> >>>> >> Hi,
> >> >>>> >>
> >> >>>> >> I have been using python sdk for the application and also using
> BagState in production. I was wondering whether state logic has any
> write-through-cache implemented or not. If we are sending every read and
> write request through network then it comes with a performance cost. We can
> avoid network call for a read operation if we have write-through-cache.
> >> >>>> >> I have superficially looked into the implementation and I
> didn't see any cache implementation.
> >> >>>> >>
> >> >>>> >> is it possible to have this cache? would it cause any issue if
> we have the caching layer?
> >> >>>> >>
>


Re: Write-through-cache in State logic

2019-07-25 Thread Robert Bradshaw
On Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar  wrote:
>
> Thanks Robert,
>
>  I stumble on the jira that you have created some time ago
> https://jira.apache.org/jira/browse/BEAM-5428
>
> You also marked code where code changes are required:
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465
>
> I am willing to provide help to implement this. Let me know how I can help.

As far as I'm aware, no one is actively working on it right now.
Please feel free to assign yourself the JIRA entry and I'll be happy
to answer any questions you might have if (well probably when) these
pointers are insufficient.

> On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw  wrote:
>>
>> This is documented at
>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
>> . Note that it requires participation of both the runner and the SDK
>> (though there are no correctness issues if one or the other side does
>> not understand the protocol, caching just won't be used).
>>
>> I don't think it's been implemented anywhere, but could be very
>> beneficial for performance.
>>
>> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar  wrote:
>> >
>> > I checked the python sdk[1] and it has similar implementation as Java SDK.
>> >
>> > I would agree with Thomas. In case of high volume event stream and bigger 
>> > cluster size, network call can potentially cause a bottleneck.
>> >
>> > @Robert
>> > I am interested to see the proposal. Can you provide me the link of the 
>> > proposal?
>> >
>> > [1]: 
>> > https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
>> >
>> >
>> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise  wrote:
>> >>
>> >> Thanks for the pointer. For streaming, it will be important to support 
>> >> caching across bundles. It appears that even the Java SDK doesn't support 
>> >> that yet?
>> >>
>> >> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
>> >>
>> >> Regarding clear/append: It would be nice if both could occur within a 
>> >> single Fn Api roundtrip when the state is persisted.
>> >>
>> >> Thanks,
>> >> Thomas
>> >>
>> >>
>> >>
>> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik  wrote:
>> >>>
>> >>> User state is built on top of read, append and clear and not off a read 
>> >>> and write paradigm to allow for blind appends.
>> >>>
>> >>> The optimization you speak of can be done completely inside the SDK 
>> >>> without any additional protocol being required as long as you clear the 
>> >>> state first and then append all your new data. The Beam Java SDK does 
>> >>> this for all runners when executed portably[1]. You could port the same 
>> >>> logic to the Beam Python SDK as well.
>> >>>
>> >>> 1: 
>> >>> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
>> >>>
>> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw  
>> >>> wrote:
>> >>>>
>> >>>> Python workers also have a per-bundle SDK-side cache. A protocol has
>> >>>> been proposed, but hasn't yet been implemented in any SDKs or runners.
>> >>>>
>> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax  wrote:
>> >>>> >
>> >>>> > It's runner dependent. Some runners (e.g. the Dataflow runner) do 
>> >>>> > have such a cache, though I think it's currently has a cap for large 
>> >>>> > bags.
>> >>>> >
>> >>>> > Reuven
>> >>>> >
>> >>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar  
>> >>>> > wrote:
>> >>>> >>
>> >>>> >> Hi,
>> >>>> >>
>> >>>> >> I have been using python sdk for the application and also using 
>> >>>> >> BagState in production. I was wondering whether state logic has any 
>> >>>> >> write-through-cache implemented or not. If we are sending every read 
>> >>>> >> and write request through network then it comes with a performance 
>> >>>> >> cost. We can avoid network call for a read operation if we have 
>> >>>> >> write-through-cache.
>> >>>> >> I have superficially looked into the implementation and I didn't see 
>> >>>> >> any cache implementation.
>> >>>> >>
>> >>>> >> is it possible to have this cache? would it cause any issue if we 
>> >>>> >> have the caching layer?
>> >>>> >>


Re: Write-through-cache in State logic

2019-07-23 Thread Rakesh Kumar
Thanks Robert,

 I stumble on the jira that you have created some time ago
https://jira.apache.org/jira/browse/BEAM-5428

You also marked code where code changes are required:
https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291
https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465

I am willing to provide help to implement this. Let me know how I can help.



On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw  wrote:

> This is documented at
>
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
> . Note that it requires participation of both the runner and the SDK
> (though there are no correctness issues if one or the other side does
> not understand the protocol, caching just won't be used).
>
> I don't think it's been implemented anywhere, but could be very
> beneficial for performance.
>
> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar  wrote:
> >
> > I checked the python sdk[1] and it has similar implementation as Java
> SDK.
> >
> > I would agree with Thomas. In case of high volume event stream and
> bigger cluster size, network call can potentially cause a bottleneck.
> >
> > @Robert
> > I am interested to see the proposal. Can you provide me the link of the
> proposal?
> >
> > [1]:
> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
> >
> >
> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise  wrote:
> >>
> >> Thanks for the pointer. For streaming, it will be important to support
> caching across bundles. It appears that even the Java SDK doesn't support
> that yet?
> >>
> >>
> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
> >>
> >> Regarding clear/append: It would be nice if both could occur within a
> single Fn Api roundtrip when the state is persisted.
> >>
> >> Thanks,
> >> Thomas
> >>
> >>
> >>
> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik  wrote:
> >>>
> >>> User state is built on top of read, append and clear and not off a
> read and write paradigm to allow for blind appends.
> >>>
> >>> The optimization you speak of can be done completely inside the SDK
> without any additional protocol being required as long as you clear the
> state first and then append all your new data. The Beam Java SDK does this
> for all runners when executed portably[1]. You could port the same logic to
> the Beam Python SDK as well.
> >>>
> >>> 1:
> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
> >>>
> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw 
> wrote:
> >>>>
> >>>> Python workers also have a per-bundle SDK-side cache. A protocol has
> >>>> been proposed, but hasn't yet been implemented in any SDKs or runners.
> >>>>
> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax  wrote:
> >>>> >
> >>>> > It's runner dependent. Some runners (e.g. the Dataflow runner) do
> have such a cache, though I think it's currently has a cap for large bags.
> >>>> >
> >>>> > Reuven
> >>>> >
> >>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar 
> wrote:
> >>>> >>
> >>>> >> Hi,
> >>>> >>
> >>>> >> I have been using python sdk for the application and also using
> BagState in production. I was wondering whether state logic has any
> write-through-cache implemented or not. If we are sending every read and
> write request through network then it comes with a performance cost. We can
> avoid network call for a read operation if we have write-through-cache.
> >>>> >> I have superficially looked into the implementation and I didn't
> see any cache implementation.
> >>>> >>
> >>>> >> is it possible to have this cache? would it cause any issue if we
> have the caching layer?
> >>>> >>
>


Re: Write-through-cache in State logic

2019-07-23 Thread Robert Bradshaw
This is documented at
https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
. Note that it requires participation of both the runner and the SDK
(though there are no correctness issues if one or the other side does
not understand the protocol, caching just won't be used).

I don't think it's been implemented anywhere, but could be very
beneficial for performance.

On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar  wrote:
>
> I checked the python sdk[1] and it has similar implementation as Java SDK.
>
> I would agree with Thomas. In case of high volume event stream and bigger 
> cluster size, network call can potentially cause a bottleneck.
>
> @Robert
> I am interested to see the proposal. Can you provide me the link of the 
> proposal?
>
> [1]: 
> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
>
>
> On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise  wrote:
>>
>> Thanks for the pointer. For streaming, it will be important to support 
>> caching across bundles. It appears that even the Java SDK doesn't support 
>> that yet?
>>
>> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
>>
>> Regarding clear/append: It would be nice if both could occur within a single 
>> Fn Api roundtrip when the state is persisted.
>>
>> Thanks,
>> Thomas
>>
>>
>>
>> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik  wrote:
>>>
>>> User state is built on top of read, append and clear and not off a read and 
>>> write paradigm to allow for blind appends.
>>>
>>> The optimization you speak of can be done completely inside the SDK without 
>>> any additional protocol being required as long as you clear the state first 
>>> and then append all your new data. The Beam Java SDK does this for all 
>>> runners when executed portably[1]. You could port the same logic to the 
>>> Beam Python SDK as well.
>>>
>>> 1: 
>>> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
>>>
>>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw  wrote:
>>>>
>>>> Python workers also have a per-bundle SDK-side cache. A protocol has
>>>> been proposed, but hasn't yet been implemented in any SDKs or runners.
>>>>
>>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax  wrote:
>>>> >
>>>> > It's runner dependent. Some runners (e.g. the Dataflow runner) do have 
>>>> > such a cache, though I think it's currently has a cap for large bags.
>>>> >
>>>> > Reuven
>>>> >
>>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar  
>>>> > wrote:
>>>> >>
>>>> >> Hi,
>>>> >>
>>>> >> I have been using python sdk for the application and also using 
>>>> >> BagState in production. I was wondering whether state logic has any 
>>>> >> write-through-cache implemented or not. If we are sending every read 
>>>> >> and write request through network then it comes with a performance 
>>>> >> cost. We can avoid network call for a read operation if we have 
>>>> >> write-through-cache.
>>>> >> I have superficially looked into the implementation and I didn't see 
>>>> >> any cache implementation.
>>>> >>
>>>> >> is it possible to have this cache? would it cause any issue if we have 
>>>> >> the caching layer?
>>>> >>


Re: Write-through-cache in State logic

2019-07-17 Thread Rakesh Kumar
I checked the python sdk[1] and it has similar implementation as Java SDK.

I would agree with Thomas. In case of high volume event stream and bigger
cluster size, network call can potentially cause a bottleneck.

@Robert
I am interested to see the proposal. Can you provide me the link of the
proposal?

[1]:
https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295


On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise  wrote:

> Thanks for the pointer. For streaming, it will be important to support
> caching across bundles. It appears that even the Java SDK doesn't support
> that yet?
>
>
> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
>
> Regarding clear/append: It would be nice if both could occur within a
> single Fn Api roundtrip when the state is persisted.
>
> Thanks,
> Thomas
>
>
>
> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik  wrote:
>
>> User state is built on top of read, append and clear and not off a read
>> and write paradigm to allow for blind appends.
>>
>> The optimization you speak of can be done completely inside the SDK
>> without any additional protocol being required as long as you clear the
>> state first and then append all your new data. The Beam Java SDK does this
>> for all runners when executed portably[1]. You could port the same logic to
>> the Beam Python SDK as well.
>>
>> 1:
>> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
>>
>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw 
>> wrote:
>>
>>> Python workers also have a per-bundle SDK-side cache. A protocol has
>>> been proposed, but hasn't yet been implemented in any SDKs or runners.
>>>
>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax  wrote:
>>> >
>>> > It's runner dependent. Some runners (e.g. the Dataflow runner) do have
>>> such a cache, though I think it's currently has a cap for large bags.
>>> >
>>> > Reuven
>>> >
>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar 
>>> wrote:
>>> >>
>>> >> Hi,
>>> >>
>>> >> I have been using python sdk for the application and also using
>>> BagState in production. I was wondering whether state logic has any
>>> write-through-cache implemented or not. If we are sending every read and
>>> write request through network then it comes with a performance cost. We can
>>> avoid network call for a read operation if we have write-through-cache.
>>> >> I have superficially looked into the implementation and I didn't see
>>> any cache implementation.
>>> >>
>>> >> is it possible to have this cache? would it cause any issue if we
>>> have the caching layer?
>>> >>
>>>
>>


Re: Write-through-cache in State logic

2019-07-16 Thread Thomas Weise
Thanks for the pointer. For streaming, it will be important to support
caching across bundles. It appears that even the Java SDK doesn't support
that yet?

https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221

Regarding clear/append: It would be nice if both could occur within a
single Fn Api roundtrip when the state is persisted.

Thanks,
Thomas



On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik  wrote:

> User state is built on top of read, append and clear and not off a read
> and write paradigm to allow for blind appends.
>
> The optimization you speak of can be done completely inside the SDK
> without any additional protocol being required as long as you clear the
> state first and then append all your new data. The Beam Java SDK does this
> for all runners when executed portably[1]. You could port the same logic to
> the Beam Python SDK as well.
>
> 1:
> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
>
> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw 
> wrote:
>
>> Python workers also have a per-bundle SDK-side cache. A protocol has
>> been proposed, but hasn't yet been implemented in any SDKs or runners.
>>
>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax  wrote:
>> >
>> > It's runner dependent. Some runners (e.g. the Dataflow runner) do have
>> such a cache, though I think it's currently has a cap for large bags.
>> >
>> > Reuven
>> >
>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar 
>> wrote:
>> >>
>> >> Hi,
>> >>
>> >> I have been using python sdk for the application and also using
>> BagState in production. I was wondering whether state logic has any
>> write-through-cache implemented or not. If we are sending every read and
>> write request through network then it comes with a performance cost. We can
>> avoid network call for a read operation if we have write-through-cache.
>> >> I have superficially looked into the implementation and I didn't see
>> any cache implementation.
>> >>
>> >> is it possible to have this cache? would it cause any issue if we have
>> the caching layer?
>> >>
>>
>


Re: Write-through-cache in State logic

2019-07-16 Thread Lukasz Cwik
User state is built on top of read, append and clear and not off a read and
write paradigm to allow for blind appends.

The optimization you speak of can be done completely inside the SDK without
any additional protocol being required as long as you clear the state first
and then append all your new data. The Beam Java SDK does this for all
runners when executed portably[1]. You could port the same logic to the
Beam Python SDK as well.

1:
https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84

On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw  wrote:

> Python workers also have a per-bundle SDK-side cache. A protocol has
> been proposed, but hasn't yet been implemented in any SDKs or runners.
>
> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax  wrote:
> >
> > It's runner dependent. Some runners (e.g. the Dataflow runner) do have
> such a cache, though I think it's currently has a cap for large bags.
> >
> > Reuven
> >
> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar 
> wrote:
> >>
> >> Hi,
> >>
> >> I have been using python sdk for the application and also using
> BagState in production. I was wondering whether state logic has any
> write-through-cache implemented or not. If we are sending every read and
> write request through network then it comes with a performance cost. We can
> avoid network call for a read operation if we have write-through-cache.
> >> I have superficially looked into the implementation and I didn't see
> any cache implementation.
> >>
> >> is it possible to have this cache? would it cause any issue if we have
> the caching layer?
> >>
>


Re: Write-through-cache in State logic

2019-07-16 Thread Robert Bradshaw
Python workers also have a per-bundle SDK-side cache. A protocol has
been proposed, but hasn't yet been implemented in any SDKs or runners.

On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax  wrote:
>
> It's runner dependent. Some runners (e.g. the Dataflow runner) do have such a 
> cache, though I think it's currently has a cap for large bags.
>
> Reuven
>
> On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar  wrote:
>>
>> Hi,
>>
>> I have been using python sdk for the application and also using BagState in 
>> production. I was wondering whether state logic has any write-through-cache 
>> implemented or not. If we are sending every read and write request through 
>> network then it comes with a performance cost. We can avoid network call for 
>> a read operation if we have write-through-cache.
>> I have superficially looked into the implementation and I didn't see any 
>> cache implementation.
>>
>> is it possible to have this cache? would it cause any issue if we have the 
>> caching layer?
>>


Re: Write-through-cache in State logic

2019-07-15 Thread Reuven Lax
It's runner dependent. Some runners (e.g. the Dataflow runner) do have such
a cache, though I think it's currently has a cap for large bags.

Reuven

On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar  wrote:

> Hi,
>
> I have been using python sdk for the application and also using BagState
> in production. I was wondering whether state logic has any
> write-through-cache implemented or not. If we are sending every read and
> write request through network then it comes with a performance cost. We can
> avoid network call for a read operation if we have write-through-cache.
> I have superficially looked into the implementation and I didn't see any
> cache implementation.
>
> is it possible to have this cache? would it cause any issue if we have the
> caching layer?
>
>


Write-through-cache in State logic

2019-07-15 Thread Rakesh Kumar
Hi,

I have been using python sdk for the application and also using BagState in
production. I was wondering whether state logic has any write-through-cache
implemented or not. If we are sending every read and write request through
network then it comes with a performance cost. We can avoid network call
for a read operation if we have write-through-cache.
I have superficially looked into the implementation and I didn't see any
cache implementation.

is it possible to have this cache? would it cause any issue if we have the
caching layer?