Hi Charles,
Thanks for forwarding this. Looks like Idan found the right answer. Still, I 
repeated the analysis and have some suggestions.

Looked at the code mentioned in the message chain. This is a place where our 
error handling could use work:

  public void allocateNew() throws OutOfMemoryException {
    if (!allocateNewSafe()) {
      throw new OutOfMemoryException();
    }
  }

Some default allocation failed, but we preserve none of the relevant 
information: nothing about the kind of vector, nothing about the cause of the 
failure. There are dozens of implementations of allocateNewSafe(); it is 
impossible to determine which was called.

A typical implementation:
  public boolean allocateNewSafe() {
    long curAllocationSize = ...
    try{
      allocateBytes(curAllocationSize);
    } catch (DrillRuntimeException ex) {
      return false;
    }
    return true;
  }


We catch the exception, then ignore it. Sign... We can fix all this, but it 
does not help with this specific issue. See DRILL-7658.

As it turns out, most of the implementations are in the generated vector 
classes. These classes, oddly, have their own redundant copy of 
allocateNewSafe(). Since we don't see those methods on the stack, we can 
quickly narrow down the candidates to:

* AbstractMapVector (any map vector)
* A few others that won't occur for Parquet


Given this, it means the allocation is failing when allocating a map. Idan 
mentions "one column is an array of a single element comprised of 15 columns". 
We can presume that the "element" is actually a map, and that the map has 15 
columns.

So, looks like the map allocation failed during a partition sender (the next 
element on the stack). The partition sender takes incoming batches (presumably 
from he scan, though the stack trace does not say because were at the root of 
the DAG), and splits them by key to destination nodes.

Idan mentions the query runs on a single machine. So, the partitions are only 
to threads on that same machine. Idan mentions a 16-core machine. Since Drill 
parallelizes queries to 70% of the cores, we may be running 11 threads, so each 
partition sender tries to buffer data for 11 receivers. Each will buffer three 
batches of data for a total of 33 batches.

Next we need to know how many records are in each batch. Seems we have two 
default values, defined in drill-override.conf:


    store.parquet.flat.batch.num_records: 32767,
    store.parquet.complex.batch.num_records: 4000,


If we think the record has a map, then perhaps Parquet choose the "complex" 
count of 4000 records? I think this can be checked by looking at the query 
profile which, if I recall, should be produced even for a failed query.

So, let's guess 4000 records * 33 buffered batches = 130K records. We don't 
know the size of each, however. (And, note that Idan said that he artificially 
increased parallelism, so the buffering need is greater than the above 
back-of-the-envelope calcs.)


We do know the size of the data: 15 files of 150K each. Let's assume that is 
compressed. So, if all files are in memory, that would be 15 * 150K * 10:1 
compression ratio = 22 MB, which is tiny. So, it is unlikely that, Drill is 
actually buffering all 33 batches. This tells us that something else is going 
wrong; we are not actually running out memory for data, just as Idan suggested, 
we are exhausting memory for some other reason.


Reading further it looks like Idan found his own solution. He increased 
parallelism to the point where the internal buffering of each Parquet reader 
used up all available memory. This is probably a bug, but Parquet is a a 
fiendishly complex beast. Over time, people threw all kinds of parallel 
readers, buffering and other things at it to beat Impala in TPC benchmarks.

Since a query that finishes is faster than a highly-tuned query that crashes, 
I'd recommend throttling the slice count back. You really only need as many as 
there are cores. In fact, you need less. Unlike other readers, Parquet launches 
a bunch of its own parallel readers so each single Parquet reader will have 
many (I don't recall the number) of parallel column readers, each aggressively 
buffering everything it can.

Since the data is small, there is no need for such heroics: Drill can read 20+ 
meg of data quite quickly, even with a few threads. So, try that first and see 
if that works.

Once the query works, study the query profile to determine the memory budget 
and CPU usage. Tune from there, keeping memory well within the available bounds.


Thanks,
- Paul

 

    On Tuesday, March 24, 2020, 11:46:47 AM PDT, Charles Givre 
<cgi...@gmail.com> wrote:  
 
 
Idan Sheinberg  8:21 AM
Hi  there
I'm trying run a simple offset query (ORDER BY timestamp LIMIT 500 OFFSET 1000) 
against rather complex parquet files (say 4 columns, once being an array 
currently consisting of a single element comprised of 15 columns)
All files share the same Schema, of course.
 User Error Occurred: One or more nodes ran out of memory while executing the 
query. (null)
org.apache.drill.common.exceptions.UserException: RESOURCE ERROR: One or more 
nodes ran out of memory while executing the query.
null
[Error Id: 67b61fc9-320f-47a1-8718-813843a10ecc ]
    at 
org.apache.drill.common.exceptions.UserException$Builder.build(UserException.java:657)
    at 
org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:338)
    at 
org.apache.drill.common.SelfCleaningRunnable.run(SelfCleaningRunnable.java:38)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.drill.exec.exception.OutOfMemoryException: null
    at 
org.apache.drill.exec.vector.complex.AbstractContainerVector.allocateNew(AbstractContainerVector.java:59)
    at 
org.apache.drill.exec.test.generated.PartitionerGen5$OutgoingRecordBatch.allocateOutgoingRecordBatch(PartitionerTemplate.java:380)
    at 
org.apache.drill.exec.test.generated.PartitionerGen5$OutgoingRecordBatch.initializeBatch(PartitionerTemplate.java:400)
    at 
org.apache.drill.exec.test.generated.PartitionerGen5.setup(PartitionerTemplate.java:126)
    at 
org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.createClassInstances(PartitionSenderRootExec.java:263)
    at 
org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.createPartitioner(PartitionSenderRootExec.java:218)
    at 
org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.innerNext(PartitionSenderRootExec.java:188)
    at 
org.apache.drill.exec.physical.impl.BaseRootExec.next(BaseRootExec.java:93)
    at 
org.apache.drill.exec.work.fragment.FragmentExecutor$1.run(FragmentExecutor.java:323)
    at 
org.apache.drill.exec.work.fragment.FragmentExecutor$1.run(FragmentExecutor.java:310)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
    at 
org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:310)
    ... 4 common frames omitted
Now, I'm running this query from a 16 core, 32GB Ram machine, with Heap sized 
at 20GB, Eden sized at 16GB (added manually to JAVA_OPTS) and Direct Sized at 8 
GB.
By querying sys.memory I can confirm all limits apply. At no point throughout 
the query Am I nearing memory limit of the HEAP/DIRECT or the OS itself





8:25
However, due to the way 
org.apache.drill.exec.vector.complex.AbstractContainerVector.allocateNew is 
impelmented
8:27
@Override
  public void allocateNew() throws OutOfMemoryException {
    if (!allocateNewSafe()) {
      throw new OutOfMemoryException();
    }
  }
8:27
The actual exception/error is swallowed, and I have no idea what's the cause of 
the failure
8:28
The data-set itself consists of say 15 parquet files, each one weighing at 
about 100kb
8:30
but as mentioned earlier, the parquet files are a bit more complex than the 
usual.
8:32
@cgivre @Vova Vysotskyi is there anything I can do or tweak to make this error 
go away?

cgivre  8:40 AM
Hmm...
8:40
This may be a bug.  Can you create an issue on our JIRA board?

Idan Sheinberg  8:43 AM
Sure
8:43
I'll get to it

cgivre  8:44 AM
I'd like for Paul Rogers to see this as I think he was the author of some of 
this.

Idan Sheinberg  8:44 AM
Hmm. I'll keep that in mind

cgivre  8:47 AM
We've been refactoring some of the complex readers as well, so its possible 
that is caused this, but I'm not really sure.
8:47
What version of Drill?

cgivre  9:11 AM
This kind of info is super helpful as we're trying to work out all these 
details.
9:11
Reading schemas on the fly is not trivial, so when we find issues, we do like 
to resolve them

Idan Sheinberg  9:16 AM
This is drill 0.18 -SNAPSHOT as of last month
9:16
Ummmm
9:16
I do think I managed to resolve the issue however
9:16
I'm going to run some additional tests and let you know

cgivre  9:16 AM
What did you do?
9:17
You might want to rebase with today's build as well

Idan Sheinberg  9:21 AM
I'll come back with the details in a few moments

cgivre  9:38 AM
Thx
new messages

Idan Sheinberg  9:50 AM
Ok. See it seems as though it's a combination of a few things.
The data-set in question is still small (as mentioned before), but we are 
setting planner.slice_target  to an extremely low value in order to trigger 
parallelism and speed up parquet parsing by using multiple fragments.
We have 16 cores, 32 GB (C5.4xlarge on AWS) but we set 
planner.width.max_per_node  to further increase parallelism.  it seems as 
though each fragment is handling parquet parsing on it's own, and somehow 
incurs a great burden on
the direct memory buffer pool, as I do see 16GB peaks of direct memory usage 
after lowering the planner.width.max_per_node to 16 (our available core).
The query planner itself reports the HASH_PARTITION_SENDER as the largest phase 
with 1-2 GB of memory utilization
Seeing such an impact (16 GB of direct memory) for 1K items spread across 15 
files, even with a very complex parquet schema, seems unreasonable to me  

Reply via email to