I'm trying to run the following query ** *SELECT op.platform, op.name <http://op.name>, op.paymentType, ck.posDiscountName, sum(op.amount) amt FROM `dfs`.`/path_to_parquet` op, `dfs`.`path_to_parquet2` ck WHERE ck.id <http://ck.id> = op.check_id GROUP BY op.platform, op.name <http://op.name>, op.paymentType, ck.posDiscountName LIMIT 2147483647*
I also tried the same query without the LIMIT clause <https://issues.apache.org/jira/browse/DRILL-5435> but it still fails for the same reason. I'm facing the following exception in the logs and I'm not sure how to resolve it. Suppressed: java.lang.IllegalStateException: Memory was leaked by query. > Memory leaked: (4194304) > Allocator(op:0:0:0:Screen) 1000000/4194304/12582912/10000000000 > (res/actual/peak/limit) > at > org.apache.drill.exec.memory.BaseAllocator.close(BaseAllocator.java:492) > at > org.apache.drill.exec.ops.OperatorContextImpl.close(OperatorContextImpl.java:141) > at > org.apache.drill.exec.ops.FragmentContext.suppressingClose(FragmentContext.java:422) > at > org.apache.drill.exec.ops.FragmentContext.close(FragmentContext.java:411) > at > org.apache.drill.exec.work.fragment.FragmentExecutor.closeOutResources(FragmentExecutor.java:318) > at > org.apache.drill.exec.work.fragment.FragmentExecutor.cleanup(FragmentExecutor.java:155) > at > org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:262) > at > org.apache.drill.common.SelfCleaningRunnable.run(SelfCleaningRunnable.java:38) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more > Suppressed: java.lang.IllegalStateException: Memory was leaked by > query. Memory leaked: (4194304) > Allocator(frag:0:0) 3000000/4194304/1511949440/30000000000 > (res/actual/peak/limit) > at > org.apache.drill.exec.memory.BaseAllocator.close(BaseAllocator.java:492) > at > org.apache.drill.exec.ops.FragmentContext.suppressingClose(FragmentContext.java:422) > at > org.apache.drill.exec.ops.FragmentContext.close(FragmentContext.java:416) > at > org.apache.drill.exec.work.fragment.FragmentExecutor.closeOutResources(FragmentExecutor.java:318) > at > org.apache.drill.exec.work.fragment.FragmentExecutor.cleanup(FragmentExecutor.java:155) > at > org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:262) > at > org.apache.drill.common.SelfCleaningRunnable.run(SelfCleaningRunnable.java:38) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more The UI is showing the following error *org.apache.drill.common.exceptions.UserException: CONNECTION ERROR: Connection /1.1.1.1:40834 <http://1.1.1.1:40834> <--> Gelbana/1.1.1.1:31010 <http://1.1.1.1:31010> (user client) closed unexpectedly. Drillbit down? [Error Id: 268bc3a7-114f-4681-984c-05d143f7ebd9 ]* I understand that this bug has been fixed in 1.9 <https://issues.apache.org/jira/browse/DRILL-4616>, which is the version I'm using. I did what the comments suggested which is to tell Drill to use a tmp directory that has enough space, so I set the JVM option *java.io.tmpdir* to /home/mgelbana/server*/temp/* which has over 100GB of free space, and modified the drill-override.conf file to have the following tmp: { > directories: ["/home/mgelbana/server/temp/"], > filesystem: "file:///" > }, > sort: { > external: { > spill: { > batch.size : 4000, > group.size : 100, > threshold : 200, > directories : [ "/home/mgelbana/server/temp/spill" ], > fs : "file:///" > } > } > } I'm running a single Drillbit on a single machine with 25 GB of heap memory and a 100 GB of direct memory. The machine has 48 cores (i.e. the output of *nproc* on linux) *planner.width.max_per_node = 40* *planner.memory.max_query_memory_per_node = 8589934592 (8 GB)* That's the plan of the query is 00-00 Screen : rowType = RecordType(ANY platform, ANY name, ANY > paymentType, ANY posDiscountName, ANY amt): rowcount = 2.147483647E9, > cumulative cost = {7.001208181169999E9 rows, 3.700395926736E10 cpu, 0.0 io, > 8.6479703758848E12 network, 1.4449960681199999E10 memory}, id = 24229 > 00-01 Project(platform=[$0], name=[$1], paymentType=[$2], > posDiscountName=[$3], amt=[$4]) : rowType = RecordType(ANY platform, ANY > name, ANY paymentType, ANY posDiscountName, ANY amt): rowcount = > 2.147483647E9, cumulative cost = {6.786459816469999E9 rows, > 3.678921090266E10 cpu, 0.0 io, 8.6479703758848E12 network, > 1.4449960681199999E10 memory}, id = 24228 > 00-02 SelectionVectorRemover : rowType = RecordType(ANY platform, > ANY name, ANY paymentType, ANY posDiscountName, ANY amt): rowcount = > 2.147483647E9, cumulative cost = {6.786459816469999E9 rows, > 3.678921090266E10 cpu, 0.0 io, 8.6479703758848E12 network, > 1.4449960681199999E10 memory}, id = 24227 > 00-03 Limit(fetch=[2147483647]) : rowType = RecordType(ANY > platform, ANY name, ANY paymentType, ANY posDiscountName, ANY amt): > rowcount = 2.147483647E9, cumulative cost = {4.638976169469999E9 rows, > 3.464172725566E10 cpu, 0.0 io, 8.6479703758848E12 network, > 1.4449960681199999E10 memory}, id = 24226 > 00-04 UnionExchange : rowType = RecordType(ANY platform, ANY > name, ANY paymentType, ANY posDiscountName, ANY amt): rowcount = > 2198378.67, cumulative cost = {2.4914925224699993E9 rows, 2.605179266766E10 > cpu, 0.0 io, 8.6479703758848E12 network, 1.4449960681199999E10 memory}, id > = 24225 > 01-01 HashAgg(group=[{0, 1, 2, 3}], amt=[SUM($4)]) : rowType > = RecordType(ANY platform, ANY name, ANY paymentType, ANY posDiscountName, > ANY amt): rowcount = 2198378.67, cumulative cost = {2.489294143799999E9 > rows, 2.60342056383E10 cpu, 0.0 io, 8.6029475807232E12 network, > 1.4449960681199999E10 memory}, id = 24224 > 01-02 Project(platform=[$0], name=[$1], paymentType=[$2], > posDiscountName=[$3], amt=[$4]) : rowType = RecordType(ANY platform, ANY > name, ANY paymentType, ANY posDiscountName, ANY amt): rowcount = > 2.19837867E7, cumulative cost = {2.4673103570999994E9 rows, > 2.50669190235E10 cpu, 0.0 io, 8.6029475807232E12 network, 1.34826740664E10 > memory}, id = 24223 > 01-03 HashToRandomExchange(dist0=[[$0]], dist1=[[$1]], > dist2=[[$2]], dist3=[[$3]]) : rowType = RecordType(ANY platform, ANY name, > ANY paymentType, ANY posDiscountName, ANY amt, ANY > E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 2.19837867E7, cumulative cost = > {2.4673103570999994E9 rows, 2.50669190235E10 cpu, 0.0 io, > 8.6029475807232E12 network, 1.34826740664E10 memory}, id = 24222 > 02-01 UnorderedMuxExchange : rowType = RecordType(ANY > platform, ANY name, ANY paymentType, ANY posDiscountName, ANY amt, ANY > E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 2.19837867E7, cumulative cost = > {2.4453265703999996E9 rows, 2.48470811565E10 cpu, 0.0 io, 8.062674038784E12 > network, 1.34826740664E10 memory}, id = 24221 > 03-01 Project(platform=[$0], name=[$1], > paymentType=[$2], posDiscountName=[$3], amt=[$4], > E_X_P_R_H_A_S_H_F_I_E_L_D=[hash32AsDouble($3, hash32AsDouble($2, > hash32AsDouble($1, hash32AsDouble($0))))]) : rowType = RecordType(ANY > platform, ANY name, ANY paymentType, ANY posDiscountName, ANY amt, ANY > E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 2.19837867E7, cumulative cost = > {2.4233427837E9 rows, 2.48250973698E10 cpu, 0.0 io, 8.062674038784E12 > network, 1.34826740664E10 memory}, id = 24220 > 03-02 HashAgg(group=[{0, 1, 2, 3}], amt=[SUM($4)]) > : rowType = RecordType(ANY platform, ANY name, ANY paymentType, ANY > posDiscountName, ANY amt): rowcount = 2.19837867E7, cumulative cost = > {2.401358997E9 rows, 2.4737162223E10 cpu, 0.0 io, 8.062674038784E12 > network, 1.34826740664E10 memory}, id = 24219 > 03-03 Project(platform=[$0], name=[$1], > paymentType=[$2], posDiscountName=[$5], amount=[$4]) : rowType = > RecordType(ANY platform, ANY name, ANY paymentType, ANY posDiscountName, > ANY amount): rowcount = 2.19837867E8, cumulative cost = {2.18152113E9 rows, > 1.5064296075E10 cpu, 0.0 io, 8.062674038784E12 network, 3.8098079184E9 > memory}, id = 24218 > 03-04 HashJoin(condition=[=($3, $6)], > joinType=[inner]) : rowType = RecordType(ANY platform, ANY name, ANY > paymentType, ANY check_id, ANY amount, ANY posDiscountName, ANY id): > rowcount = 2.19837867E8, cumulative cost = {2.18152113E9 rows, > 1.5064296075E10 cpu, 0.0 io, 8.062674038784E12 network, 3.8098079184E9 > memory}, id = 24217 > 03-05 Project(posDiscountName=[$0], id=[$1]) > : rowType = RecordType(ANY posDiscountName, ANY id): rowcount = > 2.16466359E8, cumulative cost = {8.65865436E8 rows, 4.978726257E9 cpu, 0.0 > io, 2.659938619392E12 network, 0.0 memory}, id = 24216 > 03-07 HashToRandomExchange(dist0=[[$1]]) : > rowType = RecordType(ANY posDiscountName, ANY id, ANY > E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 2.16466359E8, cumulative cost = > {8.65865436E8 rows, 4.978726257E9 cpu, 0.0 io, 2.659938619392E12 network, > 0.0 memory}, id = 24215 > 05-01 UnorderedMuxExchange : rowType = > RecordType(ANY posDiscountName, ANY id, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): > rowcount = 2.16466359E8, cumulative cost = {6.49399077E8 rows, > 1.515264513E9 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 24214 > 07-01 Project(posDiscountName=[$0], > id=[$1], E_X_P_R_H_A_S_H_F_I_E_L_D=[hash32AsDouble($1)]) : rowType = > RecordType(ANY posDiscountName, ANY id, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): > rowcount = 2.16466359E8, cumulative cost = {4.32932718E8 rows, > 1.298798154E9 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 24213 > 07-02 > Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath > [path=file:/path_to_parquet]], selectionRoot=file:/path_to_parquet, > numFiles=1, usedMetadataFile=false, columns=[`posDiscountName`, `id`]]]) : > rowType = RecordType(ANY posDiscountName, ANY id): rowcount = 2.16466359E8, > cumulative cost = {2.16466359E8 rows, 4.32932718E8 cpu, 0.0 io, 0.0 > network, 0.0 memory}, id = 24212 > 03-06 Project(platform=[$0], name=[$1], > paymentType=[$2], check_id=[$3], amount=[$4]) : rowType = RecordType(ANY > platform, ANY name, ANY paymentType, ANY check_id, ANY amount): rowcount = > 2.19837867E8, cumulative cost = {8.79351468E8 rows, 5.715784542E9 cpu, 0.0 > io, 5.402735419392E12 network, 0.0 memory}, id = 24211 > 03-08 HashToRandomExchange(dist0=[[$3]]) : > rowType = RecordType(ANY platform, ANY name, ANY paymentType, ANY check_id, > ANY amount, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 2.19837867E8, > cumulative cost = {8.79351468E8 rows, 5.715784542E9 cpu, 0.0 io, > 5.402735419392E12 network, 0.0 memory}, id = 24210 > 04-01 UnorderedMuxExchange : rowType = > RecordType(ANY platform, ANY name, ANY paymentType, ANY check_id, ANY > amount, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 2.19837867E8, cumulative > cost = {6.59513601E8 rows, 2.19837867E9 cpu, 0.0 io, 0.0 network, 0.0 > memory}, id = 24209 > 06-01 Project(platform=[$0], name=[$1], > paymentType=[$2], check_id=[$3], amount=[$4], > E_X_P_R_H_A_S_H_F_I_E_L_D=[hash32AsDouble($3)]) : rowType = RecordType(ANY > platform, ANY name, ANY paymentType, ANY check_id, ANY amount, ANY > E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 2.19837867E8, cumulative cost = > {4.39675734E8 rows, 1.978540803E9 cpu, 0.0 io, 0.0 network, 0.0 memory}, id > = 24208 > 06-02 > Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath > [path=file:/path_to_parquet2]], selectionRoot=file:/path_to_parquet2, > numFiles=1, usedMetadataFile=false, columns=[`platform`, `name`, > `paymentType`, `check_id`, `amount`]]]) : rowType = RecordType(ANY > platform, ANY name, ANY paymentType, ANY check_id, ANY amount): rowcount = > 2.19837867E8, cumulative cost = {2.19837867E8 rows, 1.099189335E9 cpu, 0.0 > io, 0.0 network, 0.0 memory}, id = 24207 The machine I'm using has plenty of resources and I just can't believe I can't use drill to run this query ! I really appreciate any insight please. Thanks, Gelbana
