Hi Aman,

TL;DR: Maps in Drill are nested tuples. The easiest solution may be for the 
operator to generate code to process each map member one-by-one.

The original question and code appears based on an assumption: that a map is a 
value (like an INT or VARCHAR), but is a complex value (a value with inner 
structure.) We discussed that, for scalar types, we'd use a Freemarker template 
to generate, at compile time, an implementation for each combination of (scalar 
type, data mode). That is, we'd have an INT implementation and a NULLABLE INT 
implementation, and so on. 

What do we do for Map? Perhaps this is due, in part, to the type name. The term 
"Map" conjures up an image of a Java Map or a Python hash: a collection of 
name/value pairs in which the members of one instance are independent of those 
in another instance (or row, in Drill's case.)

Ideally, the operator can obtain the Map value, pass it to the UDAF by value. 
The UDAF can then do its calculation, returning the final value for the group, 
which the operator copies into the output vector. For a Drill Map, perhaps the 
value can be in the form of a Java Map. Indeed, if we look carefully, we see 
that the getObject() method for the Map mutator can, indeed, return the value 
as a Java map.

But, soon, we start seeing issues. There is no function in the Mutator to set 
the value of a map column from a Java map. UDAFs use "Holders" to pass values 
in and out, and there are no Map Holders that hold Java Maps. There is clearly 
a cost to the conversions. UDAFs maintain internal state (which Drill maps to 
vectors), but there is no good internal state for a Java map.

So, we look for an alternative: maybe we pass in a Source for Map values and a 
Sink to write the output values. Now, we have to decide what to do inside the 
implementation. Here we must recognize that a Drill Map is actually what Impala 
and Hive call a "struct": it is a fixed set of columns that are the same for 
all records. So, we have to iterate over the members of the map and do 
something with each column. What do we do? Well, we could use the UDAFs we 
generated for each type.

Recall that Maps can nest to any level, so our interpreted code that loops over 
map members must also handle nested maps. How? By recursively iterating over 
the map members.

Next, we need a place to store intermediate values. For simple types, Drill 
allocates a (hidden) vector to hold these values based on the Holder type of a 
@Workspace field. (The temporary vector is needed by the Hash Agg though 
perhaps not for streaming.) How do we create these intermediate values for our 
Map UDAF? Right about now we get the sense that perhaps we are heading down the 
wrong path. But, where did we go wrong?

Perhaps we should shift our thinking. Maps are not just another value that 
happens to be complex. Instead, Maps are special: they are nested tuples. 
Perhaps it should be the responsibility of the operator to "expand" the Map 
tuple the same way that it does for the top-level tuple (row). In this model, 
the Streaming Age would say, "hey, I have a Map. So, what I'll do is 
recursively expand the columns and process each nested column same as for the 
top column."

What is the result? We can use the generated scalar UDAFs described above. We 
use the same technique to generate the internal hidden vectors for intermediate 
values. We do not need a Map implementation that iterates over nested columns; 
instead that iteration is done at code gen time.

This does require new code in code gen. But, we were going to have to write new 
code somewhere anyway.

But, what if the UDAF really wants to do something with a map? (Perhaps pull 
out a single column, perhaps compute a distance given a map with x and y 
values.) Code gen can start by looking for a UDAF that takes a map reader as 
input. If found, use it. Else, recursively expand the map columns using those 
columns types to find a UDAF.

Note that this same thinking helps with REPEATED types. Rather than generating 
a REPEATED INT, say, UDAF, the generated code can instead apply the REQUIRED 
INT UDAF to each int array element. (Assuming that the aggregate is one where 
this makes sense.)

In short, Maps (and arrays, lists, repeated lists and unions) are not just more 
complex forms of simple values. They are a different beast and require a 
different approach to computation, aggregation and UDAFs. The above is one 
approach; perhaps there are better solutions one we recognize that complex 
types requires a different approach than do simple types.

As it turned out, treating Map columns as nested tuples worked surprisingly 
well in the Row Set mechanisms. What would otherwise have been overly complex 
turned instead into a simple recursive algorithm in which both the row and a 
Map are both tuples and the same code applies to both. Perhaps that same trick 
can help here.

Thanks,
- Paul

 

    On Sunday, April 15, 2018, 10:32:31 AM PDT, Aman Sinha 
<amansi...@apache.org> wrote:  
 
 For Hash Agg versus Streaming Agg, here are some considerations:

First, note that the proposed syntax is essentially trying to mimic the
DISTINCT ON behavior using a GROUP-BY and ANY_VALUE aggregate function.
 There are 2 possibilities:
1.  All aggregate functions in the select list are ANY_VALUE:    SELECT a,
b,  ANY_VALUE(c), ANY_VALUE(d) FROM T  GROUP BY a, b
2.  There' s a mix of ANY_VALUE and other aggregate functions such as
SUM/MIN/MAX:  SELECT a, b, ANY_VALUE(c), SUM(d) FROM T GROUP BY a, b

For case 1, we could have a slight modification of the Hash Agg operator to
work in a 'non-blocking' manner.  i.e  for each unique combination of
grouping keys (a, b), we insert only the first entry in the hash table and
at the same time append this row to the output batch.  Subsequent rows with
the duplicate keys will be ignored.  This is OK because we are not really
accumulating the values of c and d columns, we already got the first row's
value and have already output it.  If the hash partition for this group is
not in memory (was spilled to disk), we would end up having to write these
duplicate rows to disk also and then re-process them later when the
partition is read back in memory by treating it as an 'incoming' batch.
 Spilling does add complexity, so need to discuss this in more detail on
how it will be processed for the complex types.

For case 2, we should not generate a Hash Agg plan because the SUM(d) must
be done in a blocking manner,  so it prevents doing a sequential write to
the output batch as needed for the ANY_VALUE.

The expectation is that case 2 is very rare because the functionality of
doing the DISTINCTing is essentially satisfied by  case 1.

-Aman

On Fri, Apr 13, 2018 at 6:34 PM, Aman Sinha <amansi...@apache.org> wrote:

> Hi Paul,
> yes, the any_value function will need to have separate generated code for
> different data types and mode combinations.  I believe Gautam has
> implemented all the scalar types (through the standard template mechanism
> that Drill follows).  The complex types are the ones that are harder.
>
> > Moreover, the code generator must understand that code generated for a
> Map UDAF must be different than that for a scalar UDAF. Presumably we must
> have that code, since the UDF mechanism supports maps.
>
> Yes, I assume you are referring to the decision point here [1].
>
> There is some overlap with what we do with MAPPIFY/KVGEN function which
> occurs as part of the Project operator.  ProjectRecordBatch generates code
> for the functions that require ComplexWriter.    The MAPPIFY function reads
> data using a FieldReader [2]  and outputs data using a ComplexWriter.
>  However, there are differences with how ANY_VALUE operates particularly
> because we want to treat it as an Aggregate function.  For instance, a
> ValueReference in a ComplexWriter is always marked as LATE binding type [3]
> whereas for ANY_VALUE we want it to reflect the input type.  Code
> generation for either StreamingAgg or HashAgg does not like LATE type.  So,
> this is a new requirement which potentially needs some changes to
> ValueReference.
>
> Regarding repeated maps/arrays, let me discuss with Gautam about the
> details and will provide an update.
>
> For Hash Agg versus Streaming Agg, I have some thoughts that I will send
> out in a follow-up email.
>
> [1] https://github.com/apache/drill/blob/master/exec/java-
> exec/src/main/java/org/apache/drill/exec/expr/fn/
> FunctionConverter.java#L108
> [2] https://github.com/apache/drill/blob/master/exec/java-
> exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Mappify.java#L55
> [3] https://github.com/apache/drill/blob/master/exec/java-
> exec/src/main/java/org/apache/drill/exec/expr/fn/ValueReference.java#L76
>
> -Aman
>
> On Fri, Apr 13, 2018 at 10:31 AM, Paul Rogers <par0...@yahoo.com.invalid>
> wrote:
>
>> Hi Aman & Gautam,
>>
>> FWIW, here is my understanding of UDAF functions based on a write-up I
>> did a few months back. [1]
>>
>> All Drill functions are implemented using the UDF and UDAF protocol. (The
>> "U" (User) is a bit of a misnomer, internal and user-defined functions
>> follow the same protocol.) Every UDF (including UDAF) is strongly typed in
>> its arguments and return value. To create the ANY_VALUE implementation, you
>> must create a separate implementation for each and every combination of
>> (type, mode). That is, we need a REQUIRED INT and OPTIONAL INT
>> implementation for INT types.
>>
>> In this case, the incoming argument is passed using the argument
>> annotation, and the return value via the return annotation. The generated
>> code sets the incoming argument and copies the return value from the return
>> variable. (There is an example of the generated code in the write-up.)
>>
>> For a Map, there is no annotation to say "set this value to a map" for
>> either input or output. Instead, we pass in a complex reader for input (I
>> believe) and a complex writer for output. (Here I am a bit hazy as I never
>> had time to experiment with maps and arrays in UDFs.)
>>
>> So, you'll need a Map implementation. (Maps can only be REQUIRED, never
>> OPTIONAL, unless they are in a UNION or LIST...)
>>
>> Moreover, the code generator must understand that code generated for a
>> Map UDAF must be different than that for a scalar UDAF. Presumably we must
>> have that code, since the UDF mechanism supports maps.
>>
>> Have you worked out how to handle arrays (REPEATED cardinality?) It was
>> not clear from my exploration of UDFs how we handle REPEATED types. The
>> UDAF must pass in one array, which the UDAF copies to its output, which is
>> then written to the output repeated vector. Since values must arrive in
>> Holders, it is not clear how this would be done for arrays. Perhaps there
>> is an annotation that lets us use some form of complex writer for arrays as
>> is done for MAPs? Again, sorry, I didn't have time to learn that bit. Would
>> be great to understand that so we can add it to the write-up.
>>
>> This chain mentions a MAP type. Drill also includes other complex types:
>> REPEATED MAP, (non-repeated) LIST, (repeated) LIST, and UNION. It is not at
>> all clear how UDAFs work for these types.
>>
>> One other thing to consider: ANY_VALUE can never work for the hash agg
>> because output values are updated in random order. It can only ever work
>> for streaming agg because the streaming agg only appends output values.
>> Fortunately, this chain is about the streaming agg. For Hash Agg,
>> intermediate variable-width values are stored in an Object Vector, but
>> those values won't survive serialization. As a result, only fixed-width
>> types can be updated in random order. DRILL-6087 describes this issue.
>>
>> Thanks,
>> - Paul
>>
>> [1] https://github.com/paul-rogers/drill/wiki/UDFs-Background-Information
>>
>>
>>
>>
>>
>>
>>    On Wednesday, April 11, 2018, 4:09:47 PM PDT, Aman Sinha <
>> amansi...@apache.org> wrote:
>>
>>  Here's some background on what Gautam is trying to do:  Currently, SQL
>> does
>> not have a standard way to do a DISTINCT on a subset of the columns in the
>> SELECT list.  Suppose there are 2 columns:
>>  a:  INTEGER
>>  b:  MAP
>> Suppose I want to only do DISTINCT on 'a' and I don't really care about
>> the
>> column 'b' .. I just want the first or any value of 'b' within a single
>> group of 'a'.    Postgres actually has a 'DISTINCT ON(a), b' syntax but
>> based on our discussion on the Calcite mailing list, we want to avoid that
>> syntax.  So, there's an alternative proposal to do the following:
>>
>>    SELECT a, ANY_VALUE(b) FROM table GROUP BY a
>>
>> This means, ANY_VALUE will essentially be treated as an Aggregate function
>> and from a code-gen perspective, we want to read 1 item (a MapHolder) from
>> the incoming MapVector and write it to a particular index in the output
>> MapVector.    This is where  it would be useful to  have
>> MapVector.setSafe()  since the StreamingAgg and HashAgg both generate
>> setSafe()  for normal aggregate functions.
>>
>> However, it seems the better (or perhaps only) way to do this is through
>> the MapOrListWriter (ComplexWriter) as long as there's a way to instruct
>> the writer to write to a specific output index (the output index is needed
>> because there are several groups in the output container and we want to
>> write to a specific one).
>>
>> -Aman
>>
>>
>> On Wed, Apr 11, 2018 at 2:13 PM, Paul Rogers <par0...@yahoo.com.invalid>
>> wrote:
>>
>> > What semantics are wanted? SetSafe sets a single value in a vector. What
>> > does it mean to set a single map or array value? What would we pass as
>> an
>> > argument?
>> > For non-simple types, something needs to iterate over the values: be
>> they
>> > elements of a map, elements in an array, elements of an array of maps,
>> then
>> > over the map members, etc.
>> > I believe that you are hitting a fundamental difference between simple
>> > scale values and complex (composite) values.
>> > This is for an aggregate. There is no meaningful aggregate of a map or
>> an
>> > array. Once could aggregate over a scalar that is a member of a map to
>> > produce a scalar result. Or, one could iterate over the members of an
>> array
>> > to produce, say, an average or sum.
>> > You are dealing with aggregate UDFs (even built in functions implement
>> the
>> > UDAF protocol.) A quick check of the source code does not find a
>> > "AnyValueComplexFunctions" class, so this may perhaps be something new
>> you
>> > are developing. What are the desired semantics?
>> > The UDAF protocol can include a complex writer for maps. I've not played
>> > with that yet. But, it does not seem meaningful to aggregate a map to
>> > produce a map or to aggregate an array to produce an array. The idea is
>> > that the UDAF figures out what to do with maps, then uses the complex
>> > writer to produce the desired result. This makes sense since there is no
>> > way to store a map as a simple value passed to setSafe().
>> > Can you provide additional details of what you are trying to do?
>> > Thanks,
>> > - Paul
>> >
>> >
>> >
>> >    On Wednesday, April 11, 2018, 1:53:12 PM PDT, Padma Penumarthy <
>> > ppenumar...@mapr.com> wrote:
>> >
>> >  I guess you can add a setSafe method which recursively does setSafe for
>> > all children.
>> >
>> > Thanks
>> > Padma
>> >
>> >
>> > > On Apr 11, 2018, at 1:19 PM, Gautam Parai <gpa...@mapr.com> wrote:
>> > >
>> > > Hi Paul/Padma,
>> > >
>> > >
>> > > Thank you so much for the responses. This function is supposed to
>> return
>> > `any value` from the batch of incoming rows. Hence, the need to handle
>> > maps/lists.
>> > >
>> > >
>> > > This codegen is for the StreamingAggregator for Complex type(e.g.
>> maps)
>> > in the incoming batch. It is trying to assign the values in the
>> > ComplexHolder to the outgoing MapVector.
>> > >
>> > >
>> > > MapVector vv9; // Output VV of StreamingAgg
>> > >
>> > > ....
>> > >
>> > >
>> > >    public void outputRecordValues(int outIndex)
>> > >
>> > >        throws SchemaChangeException
>> > >
>> > >    {
>> > >
>> > >        {
>> > >
>> > >            ComplexHolder out8;
>> > >
>> > >            {
>> > >
>> > >                final ComplexHolder out = new ComplexHolder();
>> > >
>> > >                FieldReader fr = work0;
>> > >
>> > >                MapHolder value = work1;
>> > >
>> > >                BigIntHolder nonNullCount = work2;
>> > >
>> > >
>> > >
>> > > AnyValueComplexFunctions$MapAnyValue_output: {
>> > >
>> > >    out.reader = fr;
>> > >
>> > > }
>> > >
>> > >
>> > >
>> > >                work0 = fr;
>> > >
>> > >                work1 = value;
>> > >
>> > >                work2 = nonNullCount;
>> > >
>> > >                out8 = out;
>> > >
>> > >            }
>> > >
>> > >            vv9 .getMutator().setSafe((outIndex), out8); //Don't have
>> > setSafe for MapVector
>> > >
>> > >        }
>> > >
>> > >    }
>> > >
>> > >
>> > > Please let me know your thoughts.
>> > >
>> > >
>> > > Gautam
>> > >
>> > >
>> > >
>> > > ________________________________
>> > > From: Paul Rogers <par0...@yahoo.com.INVALID>
>> > > Sent: Wednesday, April 11, 2018 12:40:15 PM
>> > > To: dev@drill.apache.org
>> > > Subject: Re: [DISCUSS] Regarding mutator interface
>> > >
>> > > Note that, for maps and lists, there is nothing to set. Maps are
>> purely
>> > containers for other vectors. Lists (you didn't mention whether
>> "repeated"
>> > or "non-repeated") are also containers. Non-repeated lists are
>> containers
>> > for unions, repeated-lists are containers for arrays.
>> > > Any setting should be done on the contained vectors. For lists, only
>> the
>> > offset vector is updated.
>> > > So, another question is: what is the generated code trying to set?
>> > >
>> > > Thanks,
>> > > - Paul
>> > >
>> > >
>> > >
>> > >    On Wednesday, April 11, 2018, 12:33:52 PM PDT, Padma Penumarthy <
>> > ppenumar...@mapr.com> wrote:
>> > >
>> > > Can you explain how aggregation on complex type works (or supposed to
>> > work).
>> > >
>> > > Thanks
>> > > Padma
>> > >
>> > >
>> > >> On Apr 11, 2018, at 12:15 PM, Gautam Parai <gpa...@mapr.com> wrote:
>> > >>
>> > >> Hi all,
>> > >>
>> > >>
>> > >> I am implementing a new aggregate function which also handles Complex
>> > types (map and list). However, the codegen barfs with
>> > >>
>> > >>
>> > >> CompileException: Line 104, Column 39: A method named "setSafe" is
>> not
>> > declared in any enclosing class nor any supertype, nor through a static
>> > import
>> > >>
>> > >>
>> > >> It looks like we do not have set()/ setSafe() methods for
>> > MapVector/ListVector mutators.
>> > >>
>> > >>
>> > >> Should we add these methods to the Mutator interface to ensure all
>> > mutators implement them? Is these a reason we chose not to do so?
>> > >>
>> > >>
>> > >> Please let me know your thoughts. Thanks!
>> > >>
>> > >>
>> > >> Gautam
>> > >
>> >
>> >
>>
>>
>
>
  

Reply via email to