(Forking this since it's straying from the original topic) Got it, thank you Rui. The DataFrame API could do something similar, in that context we could do this by requiring Singleton partitioning between operations that enforce an ordering (e.g. sort_values) and operations that depend on the ordering (e.g. head). We typically warn users about pipelines that require Singleton partitioning, but this could be acceptable when done within a window.
Implementing this could be a reasonable first step before adding a distributed approach. Brian On Tue, Apr 27, 2021 at 11:32 AM Rui Wang <ruw...@google.com> wrote: > I gave a try but didn't find the discussion in my email list. That might > also be an offline discussion. > > I think for SQL implementation, what is the most close so far is we > support an ordering for window analytics functions by sorting those values > in memory (no partitioning, just sort all values for a key in memory). > > -Rui > > On Tue, Apr 27, 2021 at 11:25 AM Brian Hulette <bhule...@google.com> > wrote: > >> >> >> On Tue, Apr 27, 2021 at 10:25 AM Rui Wang <ruw...@google.com> wrote: >> >>> >>> >>> On Tue, Apr 27, 2021 at 9:10 AM Alexey Romanenko < >>> aromanenko....@gmail.com> wrote: >>> >>>> Hello all, >>>> >>>> I try to run a Beam implementation [1] of TPC-DS benchmark [2] and I >>>> observe that most of the queries don’t pass because of different reasons >>>> (see below). I run it with Spark Runner but the issues, I believe, are >>>> mostly related to either query parsing or query planning, so we can expect >>>> the same with other runners too. For now, only ~22% (23/103) of TPC-DS >>>> queries passed successfully via Beam SQL / CalciteSQL. >>>> >>>> The most common issues are the following ones: >>>> >>>> 1. *“Caused by: java.lang.UnsupportedOperationException: Non >>>> equi-join is not supported”* >>>> 2. *“Caused by: java.lang.UnsupportedOperationException: ORDER BY >>>> without a LIMIT is not supported!”* >>>> 3. *“Caused by: >>>> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner$CannotPlanException: >>>> There >>>> are not enough rules to produce a node with desired >>>> properties: convention=BEAM_LOGICAL. All the inputs have relevant nodes, >>>> however the cost is still infinite.”* >>>> 4. *“Caused by: >>>> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorException: >>>> No >>>> match found for function signature substr(<CHARACTER>, <NUMERIC>, >>>> <NUMERIC>)”* >>>> >>>> The full list of query statuses is available here [3]. The generated >>>> TPC-DS SQL queries can be found there as well [4]. >>>> >>> >>> Not every query can be supported by BeamSQL easily. For example, support >>> non equi-join(BEAM-2194). We had discussions for cause 2 to add the >>> limitation that BeamSQL only supports ORDER BY LIMIT (LIMIT is required). >>> Cause 3 needs a case by case investigation, some might be able to be fixed. >>> Cause 4 looks like no such function found in the catalog. >>> >> >> If (2) was discussed on the mailing list could you link that discussion? >> We have the same issue in the DataFrame API - order-sensitive operations >> are not supported since they don't map well to unordered PCollections, but >> we do have support for nlargest/nsmallest, which are analogous to ORDER BY >> with a LIMIT. We've discussed (offline) implementing these in the DataFrame >> API by partitioning the PCollection based on the ordering (as opposed to >> the typical hash of the index). I'm curious how SQL might implement ORDER >> BY without a LIMIT and whether we could use a similar approach for >> dataframes. >> >> >>>> I’m not very familiar with a current status of ongoing work for Beam >>>> SQL, so I’m sorry in advance if my questions will sound naive. >>>> >>>> Please, guide me on this: >>>> >>>> 1. Are there any chances that we can resolve, at least, partly the >>>> current limitations of the query parsing/planning, mentioned above? Are >>>> there any principal blockers among them? >>>> 2. Are there any plans or ongoing work related to this? >>>> 3. Are there any plans to upgrade vendored Calcite version to more >>>> recent one? Should it reduce the number of current limitations or not? >>>> 4. Do you think it could be valuable for Beam SQL to run TPC-DS >>>> benchmark on a regular basis (as we do for Nexmark, for example) even if >>>> not all queries can pass with Beam SQL? >>>> >>> >>> This is definitely valuable for BeamSQL if we have enough resources to >>> run such queries regularly. >>> >>>> >>>> I’d appreciate any additional information/docs/details/opinions on this >>>> topic. >>>> >>>> — >>>> Alexey >>>> >>>> [1] https://github.com/apache/beam/tree/master/sdks/java/testing/tpcds >>>> [2] http://www.tpc.org/tpcds/ >>>> [3] >>>> https://docs.google.com/spreadsheets/d/1Gya9Xoa6uWwORHSrRqpkfSII4ajYvDpUTt0cNJCRHjE/edit?usp=sharing >>>> [4] >>>> https://github.com/apache/beam/tree/master/sdks/java/testing/tpcds/src/main/resources/queries >>>> >>>