[ https://issues.apache.org/jira/browse/SPARK-22947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16492624#comment-16492624 ]
Tomasz Gawęda commented on SPARK-22947: --------------------------------------- [~icexelloss] Hi, just wondering - will you continue to work on this SPIP? It's quite interesting and helpful > SPIP: as-of join in Spark SQL > ----------------------------- > > Key: SPARK-22947 > URL: https://issues.apache.org/jira/browse/SPARK-22947 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 2.2.1 > Reporter: Li Jin > Priority: Major > Attachments: SPIP_ as-of join in Spark SQL (1).pdf > > > h2. Background and Motivation > Time series analysis is one of the most common analysis on financial data. In > time series analysis, as-of join is a very common operation. Supporting as-of > join in Spark SQL will allow many use cases of using Spark SQL for time > series analysis. > As-of join is “join on time” with inexact time matching criteria. Various > library has implemented asof join or similar functionality: > Kdb: https://code.kx.com/wiki/Reference/aj > Pandas: > http://pandas.pydata.org/pandas-docs/version/0.19.0/merging.html#merging-merge-asof > R: This functionality is called “Last Observation Carried Forward” > https://www.rdocumentation.org/packages/zoo/versions/1.8-0/topics/na.locf > JuliaDB: http://juliadb.org/latest/api/joins.html#IndexedTables.asofjoin > Flint: https://github.com/twosigma/flint#temporal-join-functions > This proposal advocates introducing new API in Spark SQL to support as-of > join. > h2. Target Personas > Data scientists, data engineers > h2. Goals > * New API in Spark SQL that allows as-of join > * As-of join of multiple table (>2) should be performant, because it’s very > common that users need to join multiple data sources together for further > analysis. > * Define Distribution, Partitioning and shuffle strategy for ordered time > series data > h2. Non-Goals > These are out of scope for the existing SPIP, should be considered in future > SPIP as improvement to Spark’s time series analysis ability: > * Utilize partition information from data source, i.e, begin/end of each > partition to reduce sorting/shuffling > * Define API for user to implement asof join time spec in business calendar > (i.e. lookback one business day, this is very common in financial data > analysis because of market calendars) > * Support broadcast join > h2. Proposed API Changes > h3. TimeContext > TimeContext is an object that defines the time scope of the analysis, it has > begin time (inclusive) and end time (exclusive). User should be able to > change the time scope of the analysis (i.e, from one month to five year) by > just changing the TimeContext. > To Spark engine, TimeContext is a hint that: > can be used to repartition data for join > serve as a predicate that can be pushed down to storage layer > Time context is similar to filtering time by begin/end, the main difference > is that time context can be expanded based on the operation taken (see > example in as-of join). > Time context example: > {code:java} > TimeContext timeContext = TimeContext("20160101", "20170101") > {code} > h3. asofJoin > h4. User Case A (join without key) > Join two DataFrames on time, with one day lookback: > {code:java} > TimeContext timeContext = TimeContext("20160101", "20170101") > dfA = ... > dfB = ... > JoinSpec joinSpec = JoinSpec(timeContext).on("time").tolerance("-1day") > result = dfA.asofJoin(dfB, joinSpec) > {code} > Example input/output: > {code:java} > dfA: > time, quantity > 20160101, 100 > 20160102, 50 > 20160104, -50 > 20160105, 100 > dfB: > time, price > 20151231, 100.0 > 20160104, 105.0 > 20160105, 102.0 > output: > time, quantity, price > 20160101, 100, 100.0 > 20160102, 50, null > 20160104, -50, 105.0 > 20160105, 100, 102.0 > {code} > Note row (20160101, 100) of dfA is joined with (20151231, 100.0) of dfB. This > is an important illustration of the time context - it is able to expand the > context to 20151231 on dfB because of the 1 day lookback. > h4. Use Case B (join with key) > To join on time and another key (for instance, id), we use “by” to specify > the key. > {code:java} > TimeContext timeContext = TimeContext("20160101", "20170101") > dfA = ... > dfB = ... > JoinSpec joinSpec = > JoinSpec(timeContext).on("time").by("id").tolerance("-1day") > result = dfA.asofJoin(dfB, joinSpec) > {code} > Example input/output: > {code:java} > dfA: > time, id, quantity > 20160101, 1, 100 > 20160101, 2, 50 > 20160102, 1, -50 > 20160102, 2, 50 > dfB: > time, id, price > 20151231, 1, 100.0 > 20150102, 1, 105.0 > 20150102, 2, 195.0 > Output: > time, id, quantity, price > 20160101, 1, 100, 100.0 > 20160101, 2, 50, null > 20160102, 1, -50, 105.0 > 20160102, 2, 50, 195.0 > {code} > h2. Optional Design Sketch > h3. Implementation A > (This is just initial thought of how to implement this) > (1) Using begin/end of the TimeContext, we first partition the left DataFrame > intonon-overlapping partitions. For the purpose of demonstration, assume we > partition it into one-day partitions: > {code:java} > [20160101, 20160102) [20160102, 20160103) ... [20161231, 20170101) > {code} > (2) Then we partition right DataFrame into overlapping partitions, taking > into account tolerance, e.g. one day lookback: > {code:java} > [20151231, 20160102) [20160101, 20160103) ... [20161230, 20170101) > {code} > (3) Pair left and right partitions > (4) For each pair of partitions, because all data for the join is in the > partition pair, we can now join the partition pair locally. > (5) Use partitioning in (1) as the output distribution so we can reuse it for > sequential asof joins > h2. Optional Rejected Sketch > h3. Rejected Implementation A > Another implementation is to sample the data to figure out its time range > instead of using a time context. This approach is implemented in > https://github.com/twosigma/flint > This approach suffers in performance if sampling data is expensive. For > instance, when the data to be sampled is the output of an expensive > computation, sampling the data would cause the expensive computation to be > done twice. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org