Yingyi Bu created ASTERIXDB-1343:
------------------------------------
Summary: Queries over nodegroup-based datasets do not work
Key: ASTERIXDB-1343
URL: https://issues.apache.org/jira/browse/ASTERIXDB-1343
Project: Apache AsterixDB
Issue Type: Bug
Reporter: Yingyi Bu
Assignee: Yingyi Bu
The following query will fail, if the datasets are created on a node group.
DDL:
{noformat}
drop dataverse tpch if exists;
create dataverse tpch;
use dataverse tpch;
create type LineItemType as closed {
l_orderkey: int64,
l_partkey: int64,
l_suppkey: int64,
l_linenumber: int64,
l_quantity: int64,
l_extendedprice: double,
l_discount: double,
l_tax: double,
l_returnflag: string,
l_linestatus: string,
l_shipdate: string,
l_commitdate: string,
l_receiptdate: string,
l_shipinstruct: string,
l_shipmode: string,
l_comment: string
}
create type OrderType as closed {
o_orderkey: int64,
o_custkey: int64,
o_orderstatus: string,
o_totalprice: double,
o_orderdate: string,
o_orderpriority: string,
o_clerk: string,
o_shippriority: int64,
o_comment: string
}
create nodegroup group1 if not exists on
asterix_nc1;
create dataset LineItem(LineItemType)
primary key l_orderkey, l_linenumber on group1;
create dataset Orders(OrderType)
primary key o_orderkey on group1;
{noformat}
Query:
{noformat}
use dataverse tpch;
declare function tmp()
{
for $l in dataset('LineItem')
where $l.l_commitdate < $l.l_receiptdate
distinct by $l.l_orderkey
return { "o_orderkey": $l.l_orderkey }
}
for $o in dataset('Orders')
for $t in tmp()
where $o.o_orderkey = $t.o_orderkey and
$o.o_orderdate >= '1993-07-01' and $o.o_orderdate < '1993-10-01'
group by $o_orderpriority := $o.o_orderpriority with $o
order by $o_orderpriority
return {
"order_priority": $o_orderpriority,
"count": count($o)
}
{noformat}
Exception:
{noformat}
Exception in thread "Thread-1" java.lang.AssertionError: Dependency activity
partitioned differently from dependent: 4 != 2
at
org.apache.hyracks.control.cc.scheduler.ActivityClusterPlanner.buildActivityPlanMap(ActivityClusterPlanner.java:109)
at
org.apache.hyracks.control.cc.scheduler.ActivityClusterPlanner.planActivityCluster(ActivityClusterPlanner.java:71)
at
org.apache.hyracks.control.cc.scheduler.JobScheduler.findRunnableTaskClusterRoots(JobScheduler.java:139)
at
org.apache.hyracks.control.cc.scheduler.JobScheduler.findRunnableTaskClusterRoots(JobScheduler.java:118)
at
org.apache.hyracks.control.cc.scheduler.JobScheduler.findRunnableTaskClusterRoots(JobScheduler.java:108)
at
org.apache.hyracks.control.cc.scheduler.JobScheduler.startRunnableActivityClusters(JobScheduler.java:164)
at
org.apache.hyracks.control.cc.scheduler.JobScheduler.notifyTaskComplete(JobScheduler.java:617)
at
org.apache.hyracks.control.cc.work.TaskCompleteWork.performEvent(TaskCompleteWork.java:56)
at
org.apache.hyracks.control.cc.work.AbstractTaskLifecycleWork.runWork(AbstractTaskLifecycleWork.java:70)
at
org.apache.hyracks.control.cc.work.AbstractHeartbeatWork.doRun(AbstractHeartbeatWork.java:48)
at
org.apache.hyracks.control.common.work.SynchronizableWork.run(SynchronizableWork.java:36)
at
org.apache.hyracks.control.common.work.WorkQueue$WorkerThread.run(WorkQueue.java:132)
{noformat}
{noformat}
distribute result [%0->$$25]
-- DISTRIBUTE_RESULT |PARTITIONED|
exchange
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
project ([$$25])
-- STREAM_PROJECT |PARTITIONED|
assign [$$25] <- [function-call: asterix:closed-record-constructor,
Args:[AString: {order_priority}, %0->$$3, AString: {count}, %0->$$33]]
-- ASSIGN |PARTITIONED|
exchange
-- SORT_MERGE_EXCHANGE [$$3(ASC) ] |PARTITIONED|
group by ([$$3 := %0->$$39]) decor ([]) {
aggregate [$$33] <- [function-call: asterix:agg-sum,
Args:[%0->$$38]]
-- AGGREGATE |LOCAL|
nested tuple source
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- PRE_CLUSTERED_GROUP_BY[$$39] |PARTITIONED|
exchange
-- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$39(ASC)] HASH:[$$39]
|PARTITIONED|
group by ([$$39 := %0->$$27]) decor ([]) {
aggregate [$$38] <- [function-call: asterix:agg-count,
Args:[AInt64: {1}]]
-- AGGREGATE |LOCAL|
nested tuple source
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- SORT_GROUP_BY[$$27] |PARTITIONED|
exchange
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
project ([$$27])
-- STREAM_PROJECT |PARTITIONED|
exchange
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
join (function-call: algebricks:eq, Args:[%0->$$30,
%0->$$31])
-- HYBRID_HASH_JOIN [$$30][$$31] |PARTITIONED|
exchange
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
project ([$$27, $$30])
-- STREAM_PROJECT |PARTITIONED|
select (function-call: algebricks:and,
Args:[function-call: algebricks:lt, Args:[%0->$$29, AString: {1993-10-01}],
function-call: algebricks:ge, Args:[%0->$$29, AString: {1993-07-01}]])
-- STREAM_SELECT |PARTITIONED|
project ([$$27, $$29, $$30])
-- STREAM_PROJECT |PARTITIONED|
assign [$$27, $$29] <- [function-call:
asterix:field-access-by-index, Args:[%0->$$4, AInt32: {5}], function-call:
asterix:field-access-by-index, Args:[%0->$$4, AInt32: {4}]]
-- ASSIGN |PARTITIONED|
exchange
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
data-scan []<-[$$30, $$4] <- tpch:Orders
-- DATASOURCE_SCAN |PARTITIONED|
exchange
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
empty-tuple-source
-- EMPTY_TUPLE_SOURCE |PARTITIONED|
exchange
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
distinct ([%0->$$31])
-- PRE_SORTED_DISTINCT_BY |PARTITIONED|
exchange
-- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$31(ASC)]
HASH:[$$31] |PARTITIONED|
project ([$$31])
-- STREAM_PROJECT |PARTITIONED|
select (function-call: algebricks:lt,
Args:[function-call: asterix:field-access-by-index, Args:[%0->$$5, AInt32:
{11}], function-call: asterix:field-access-by-index, Args:[%0->$$5, AInt32:
{12}]])
-- STREAM_SELECT |PARTITIONED|
project ([$$5, $$31])
-- STREAM_PROJECT |PARTITIONED|
exchange
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
data-scan []<-[$$31, $$32, $$5] <-
tpch:LineItem
-- DATASOURCE_SCAN |PARTITIONED|
exchange
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
empty-tuple-source
-- EMPTY_TUPLE_SOURCE |PARTITIONED|
{noformat}
The reason is the optimized query plan assumes that computation nodes are the
same as storage nodes and uses wrong exchange strategies.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)