This is quite interesting! The Flink Table API (relational and SQL) has an 
implementation for the type of join you mention in the example. We call it 
Temporal Table Join, and it works on something we call Temporal Tables: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html
 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html>

The implementation would have benefited from something like a Sorted MapState 
and we actually discussed adding such a state type during implementation. You 
can still see that in the TODOs here, actually: 
https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95
 
<https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95>

So I’m +1 for this, but someone would have to implement that for the different 
Runners as well. 😅

Aljoscha

> On 24. May 2019, at 05:32, Reuven Lax <re...@google.com> wrote:
> 
> 
> 
> On Thu, May 23, 2019 at 1:53 PM Ahmet Altay <al...@google.com 
> <mailto:al...@google.com>> wrote:
> 
> 
> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik <lc...@google.com 
> <mailto:lc...@google.com>> wrote:
> 
> 
> On Thu, May 23, 2019 at 11:37 AM Rui Wang <ruw...@google.com 
> <mailto:ruw...@google.com>> wrote:
> A few obvious problems with this code:
>   1. Removing the elements already processed from the bag requires clearing 
> and rewriting the entire bag. This is O(n^2) in the number of input trades.
> why it's not O(2 * n) to clearing and rewriting trade state?
> 
> 
> public interface SortedMultimapState<K, V> extends State {
>   // Add a value to the map.
>   void put(K key, V value);
>   // Get all values for a given key. 
>   ReadableState<Iterable<V>> get(K key);
>  // Return all entries in the map.
>   ReadableState<Iterable<KV<K, V>>> allEntries();
>   // Return all entries in the map with keys <= limit. returned elements are 
> sorted by the key.
>   ReadableState<Iterable<KV<K, V>>> entriesUntil(K limit); 
>  // Remove all values with the given key;
>   void remove(K key);
>  // Remove all entries in the map with keys <= limit.
>   void removeUntil(K limit);
> Will removeUntilExcl(K limit) also useful? It will remove all entries in the 
> map with keys < limit.
>  
> Runners will sort based on the encoded value of the key. In order to make 
> this easier for users, I propose that we introduce a new tag on Coders 
> PreservesOrder. A Coder that contains this tag guarantees that the encoded 
> value preserves the same ordering as the base Java type.
> 
> Could you clarify what is  "encoded value preserves the same ordering as the 
> base Java type"?
> 
> Lets say A and B represent two different instances of the same Java type like 
> a double, then A < B (using the languages comparison operator) iff encode(A) 
> < encode(B) (note the encoded versions are compared lexicographically)
> 
> Since coders are shared across SDKs, do we expect A < B iff e(A) < e(P) 
> property to hold for all languages we support? What happens A, B sort 
> differently in different languages? 
> 
> That would have to be the property of the coder (which means that this 
> property probably needs to be represented in the portability representation 
> of the coder). I imagine the common use cases will be for simple coders like 
> int, long, string, etc., which are likely to sort the same in most languages.
>  
>  
>  
> 
> -Rui

Reply via email to