[GitHub] drill pull request #928: Drill 5716: Queue-driven memory allocation
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/928#discussion_r136448394 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.coord.DistributedSemaphore; +import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.SystemOptionManager; + +/** + * Distributed query queue which uses a Zookeeper distributed semaphore to + * control queuing across the cluster. The distributed queue is actually two + * queues: one for "small" queries, another for "large" queries. Query size is + * determined by the Planner's estimate of query cost. + * + * This queue is configured using system options: + * + * exec.queue.enable + * + * Set to true to enable the distributed queue. + * exec.queue.large + * + * The maximum number of large queries to admit. Additional + * queries wait in the queue. + * exec.queue.small + * + * The maximum number of small queries to admit. Additional + * queries wait in the queue. + * exec.queue.threshold + * + * The cost threshold. Queries below this size are small, at + * or above this size are large.. + * exec.queue.timeout_millis + * + * The maximum time (in milliseconds) a query will wait in the + * queue before failing. + * + * + * The above values are refreshed every five seconds. This aids performance + * a bit in systems with very high query arrival rates. + */ + +public class DistributedQueryQueue implements QueryQueue { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedQueryQueue.class); + + private class DistributedQueueLease implements QueueLease { +private final QueryId queryId; +private DistributedLease lease; +private final String queueName; +private long queryMemory; + +public DistributedQueueLease(QueryId queryId, String queueName, DistributedLease lease, long queryMemory) { + this.queryId = queryId; + this.queueName = queueName; + this.lease = lease; + this.queryMemory = queryMemory; +} + +@Override +public String toString() { + return String.format("Lease for %s queue to query %s", + queueName, QueryIdHelper.getQueryId(queryId)); +} + +@Override +public long queryMemoryPerNode() { return queryMemory; } + +@Override +public void release() { + DistributedQueryQueue.this.release(this); +} + +@Override +public String queueName() { return queueName; } + } + + /** + * Exposes a snapshot of internal state information for use in status + * reporting, such as in the UI. + */ + + @XmlRootElement + public static class ZKQueueInfo { +public final int smallQueueSize; +public final int largeQueueSize; +public final double queueThreshold; +public final long memoryPerNode; +public final long memoryPerSmallQuery; +public final long memoryPerLargeQuery; + +public ZKQueueInfo(DistributedQueryQueue queue) { + smallQueueSize = queue.smallQueueSize; + largeQueueSize = queue.largeQueueSize; + queueThreshold = queue.queueThreshold; + memoryPerNode = queue.memoryPerNode; + memoryPe
[GitHub] drill pull request #928: Drill 5716: Queue-driven memory allocation
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/928#discussion_r136499555 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DynamicResourceManager.java --- @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.SystemOptionManager; +import org.apache.drill.exec.work.foreman.Foreman; + +/** + * Wrapper around the default and/or distributed resource managers + * to allow dynamically enabling and disabling queueing. + */ + +public class DynamicResourceManager implements ResourceManager { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DynamicResourceManager.class); + + private final DrillbitContext context; + private ResourceManager defaultRm; + private ResourceManager queueingRm; + private ResourceManager activeRm; + public long lastUpdateTime; + public int recheckDelayMs = 5000; + + public DynamicResourceManager(final DrillbitContext context) { +this.context = context; +refreshRM(); + } + + public synchronized ResourceManager activeRM() { +refreshRM(); +return activeRm; + } + + @Override + public long memoryPerNode() { +return activeRm.memoryPerNode(); + } + + @Override + public int cpusPerNode() { +return activeRm.cpusPerNode(); + } + + @Override + public synchronized QueryResourceAllocator newQueryPlanner(QueryContext queryContext) { --- End diff -- this synchronization will be a problem with high concurrency. Can we reduce the number of synchronizations we are doing per query ? Seems like you are doing when newQueryPlanner is called and newExecRM is called. Is one enough for both ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #928: Drill 5716: Queue-driven memory allocation
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/928#discussion_r136490253 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.coord.DistributedSemaphore; +import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.SystemOptionManager; + +/** + * Distributed query queue which uses a Zookeeper distributed semaphore to + * control queuing across the cluster. The distributed queue is actually two + * queues: one for "small" queries, another for "large" queries. Query size is + * determined by the Planner's estimate of query cost. + * + * This queue is configured using system options: + * + * exec.queue.enable + * + * Set to true to enable the distributed queue. + * exec.queue.large + * + * The maximum number of large queries to admit. Additional + * queries wait in the queue. + * exec.queue.small + * + * The maximum number of small queries to admit. Additional + * queries wait in the queue. + * exec.queue.threshold + * + * The cost threshold. Queries below this size are small, at + * or above this size are large.. + * exec.queue.timeout_millis + * + * The maximum time (in milliseconds) a query will wait in the + * queue before failing. + * + * + * The above values are refreshed every five seconds. This aids performance + * a bit in systems with very high query arrival rates. + */ + +public class DistributedQueryQueue implements QueryQueue { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedQueryQueue.class); + + private class DistributedQueueLease implements QueueLease { +private final QueryId queryId; +private DistributedLease lease; +private final String queueName; +private long queryMemory; + +public DistributedQueueLease(QueryId queryId, String queueName, DistributedLease lease, long queryMemory) { + this.queryId = queryId; + this.queueName = queueName; + this.lease = lease; + this.queryMemory = queryMemory; --- End diff -- Isn't the per query memory same based on the queue ? do we have to pass it in ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #928: Drill 5716: Queue-driven memory allocation
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/928#discussion_r136491684 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryQueue.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.exec.proto.UserBitShared.QueryId; + +/** + * Interface which defines a queue implementation for query queues. + * Implementations can queue locally, queue distributed, or do + * nothing at all. + * + * A queue can report itself as enabled or disabled. When enabled, + * all queries must obtain a lease prior to starting execution. The + * lease must be released at the completion of execution. + */ + +public interface QueryQueue { + + /** + * The opaque lease returned once a query is admitted + * for execution. + */ + + public interface QueueLease { +long queryMemoryPerNode(); + +/** + * Release a query lease obtained from {@link #queue(QueryId, double))}. + * Should be called by the per-query resource manager. + * + * @param lease the lease to be released. + */ + +void release(); + +String queueName(); + }; + + /** + * Exception thrown if a query exceeds the configured wait time + * in the query queue. + */ + + @SuppressWarnings("serial") + public class QueueTimeoutException extends Exception { + +private QueryId queryId; +private String queueName; +private int timeoutMs; + +public QueueTimeoutException(QueryId queryId, String queueName, int timeoutMs) { + super( String.format( + "Query timed out of the %s queue after %d ms.", + queueName, timeoutMs )); + this.queryId = queryId; + this.queueName = queueName; + this.timeoutMs = timeoutMs; +} + +public QueryId queryId() { return queryId; } +public String queueName() { return queueName; } +public int timeoutMs() { return timeoutMs; } + } + + /** + * Exception thrown for all non-timeout error conditions. + */ + + @SuppressWarnings("serial") + public class QueryQueueException extends Exception { +QueryQueueException(String msg, Exception e) { + super(msg, e); +} + } + + void setMemoryPerNode(long memoryPerNode); + + /** + * Return the amount of memory per node when creating a EXPLAIN + * query plan. Plans to be executed should get the query memory from + * the lease, as the lease may adjust the default amount on a per-query + * basis. This means that the memory used to execute the query may + * differ from the amount shown in an EXPLAIN plan. + * + * @return assumed memory per node, in bytes, to use when creating + * an EXPLAIN plan + */ + + long getDefaultMemoryPerNode(double cost); + + /** + * Determine if the queue is enabled. + * @return true if the query is enabled, false otherwise. + */ + + boolean enabled(); + + /** + * Queue a query. The method returns only when the query is admitted for + * execution. As a result, the calling thread may block up to the configured --- End diff -- we should have a upper bound on how many such queries can wait for execution. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #928: Drill 5716: Queue-driven memory allocation
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/928#discussion_r136489994 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.server.BootStrapContext; +import org.apache.drill.exec.util.MemoryAllocationUtilities; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.apache.drill.exec.work.foreman.Foreman; + +/** + * Represents a default resource manager for clusters that do not provide query + * queues. Without queues to provide a hard limit on the query admission rate, + * the number of active queries must be estimated and the resulting resource + * allocations will be rough estimates. + */ + +public class DefaultResourceManager implements ResourceManager { + + public static class DefaultResourceAllocator implements QueryResourceAllocator { + +private QueryContext queryContext; + +protected DefaultResourceAllocator(QueryContext queryContext) { + this.queryContext = queryContext; +} + +@Override +public void visitAbstractPlan(PhysicalPlan plan) { + boolean replanMemory = ! plan.getProperties().hasResourcePlan; + if (! replanMemory || plan == null) { +return; + } + MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext); +} + +@Override +public void visitPhysicalPlan(QueryWorkUnit work) { +} + } + + public static class DefaultQueryResourceManager extends DefaultResourceAllocator implements QueryResourceManager { + +@SuppressWarnings("unused") +private final DefaultResourceManager rm; + +public DefaultQueryResourceManager(final DefaultResourceManager rm, final Foreman foreman) { + super(foreman.getQueryContext()); + this.rm = rm; +} + +@Override +public void setCost(double cost) { + // Nothing to do. The EXECUTION option in Foreman calls this, + // but does not do the work to plan sort memory. Is EXECUTION + // even used? +} + +@Override +public void admit() { + // No queueing by default +} + +@Override +public void exit() { + // No queueing by default +} + +@Override +public boolean hasQueue() { return false; } + +@Override +public String queueName() { return null; } + } + + BootStrapContext bootStrapContext; + public long memoryPerNode; + public int cpusPerNode; + + public DefaultResourceManager() { +memoryPerNode = DrillConfig.getMaxDirectMemory(); +cpusPerNode = Runtime.getRuntime().availableProcessors(); + } + + @Override + public long memoryPerNode() { return memoryPerNode; } + + @Override + public int cpusPerNode() { return cpusPerNode; } + + @Override + public QueryResourceAllocator newQueryPlanner(QueryContext queryContext) { --- End diff -- can we rename this function as newQueryResourceAllocator instead of newQueryPlanner so it is less confusing ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #928: Drill 5716: Queue-driven memory allocation
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/928#discussion_r136471827 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.coord.DistributedSemaphore; +import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.SystemOptionManager; + +/** + * Distributed query queue which uses a Zookeeper distributed semaphore to + * control queuing across the cluster. The distributed queue is actually two + * queues: one for "small" queries, another for "large" queries. Query size is + * determined by the Planner's estimate of query cost. + * + * This queue is configured using system options: + * + * exec.queue.enable + * + * Set to true to enable the distributed queue. + * exec.queue.large + * + * The maximum number of large queries to admit. Additional + * queries wait in the queue. + * exec.queue.small + * + * The maximum number of small queries to admit. Additional + * queries wait in the queue. + * exec.queue.threshold + * + * The cost threshold. Queries below this size are small, at + * or above this size are large.. + * exec.queue.timeout_millis + * + * The maximum time (in milliseconds) a query will wait in the + * queue before failing. + * + * + * The above values are refreshed every five seconds. This aids performance + * a bit in systems with very high query arrival rates. + */ + +public class DistributedQueryQueue implements QueryQueue { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedQueryQueue.class); + + private class DistributedQueueLease implements QueueLease { +private final QueryId queryId; +private DistributedLease lease; +private final String queueName; +private long queryMemory; + +public DistributedQueueLease(QueryId queryId, String queueName, DistributedLease lease, long queryMemory) { + this.queryId = queryId; + this.queueName = queueName; + this.lease = lease; + this.queryMemory = queryMemory; +} + +@Override +public String toString() { + return String.format("Lease for %s queue to query %s", + queueName, QueryIdHelper.getQueryId(queryId)); +} + +@Override +public long queryMemoryPerNode() { return queryMemory; } + +@Override +public void release() { + DistributedQueryQueue.this.release(this); +} + +@Override +public String queueName() { return queueName; } + } + + /** + * Exposes a snapshot of internal state information for use in status + * reporting, such as in the UI. + */ + + @XmlRootElement + public static class ZKQueueInfo { +public final int smallQueueSize; +public final int largeQueueSize; +public final double queueThreshold; +public final long memoryPerNode; +public final long memoryPerSmallQuery; +public final long memoryPerLargeQuery; + +public ZKQueueInfo(DistributedQueryQueue queue) { + smallQueueSize = queue.smallQueueSize; + largeQueueSize = queue.largeQueueSize; + queueThreshold = queue.queueThreshold; + memoryPerNode = queue.memoryPerNode; + memoryPe
[GitHub] drill pull request #928: Drill 5716: Queue-driven memory allocation
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/928#discussion_r136491168 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.coord.DistributedSemaphore; +import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.SystemOptionManager; + +/** + * Distributed query queue which uses a Zookeeper distributed semaphore to + * control queuing across the cluster. The distributed queue is actually two + * queues: one for "small" queries, another for "large" queries. Query size is + * determined by the Planner's estimate of query cost. + * + * This queue is configured using system options: + * + * exec.queue.enable + * + * Set to true to enable the distributed queue. + * exec.queue.large + * + * The maximum number of large queries to admit. Additional + * queries wait in the queue. + * exec.queue.small + * + * The maximum number of small queries to admit. Additional + * queries wait in the queue. + * exec.queue.threshold + * + * The cost threshold. Queries below this size are small, at + * or above this size are large.. + * exec.queue.timeout_millis + * + * The maximum time (in milliseconds) a query will wait in the + * queue before failing. + * + * + * The above values are refreshed every five seconds. This aids performance + * a bit in systems with very high query arrival rates. + */ + +public class DistributedQueryQueue implements QueryQueue { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedQueryQueue.class); + + private class DistributedQueueLease implements QueueLease { +private final QueryId queryId; +private DistributedLease lease; +private final String queueName; +private long queryMemory; + +public DistributedQueueLease(QueryId queryId, String queueName, DistributedLease lease, long queryMemory) { + this.queryId = queryId; + this.queueName = queueName; + this.lease = lease; + this.queryMemory = queryMemory; +} + +@Override +public String toString() { + return String.format("Lease for %s queue to query %s", + queueName, QueryIdHelper.getQueryId(queryId)); +} + +@Override +public long queryMemoryPerNode() { return queryMemory; } + +@Override +public void release() { + DistributedQueryQueue.this.release(this); +} + +@Override +public String queueName() { return queueName; } + } + + /** + * Exposes a snapshot of internal state information for use in status + * reporting, such as in the UI. + */ + + @XmlRootElement + public static class ZKQueueInfo { +public final int smallQueueSize; +public final int largeQueueSize; +public final double queueThreshold; +public final long memoryPerNode; +public final long memoryPerSmallQuery; +public final long memoryPerLargeQuery; + +public ZKQueueInfo(DistributedQueryQueue queue) { + smallQueueSize = queue.smallQueueSize; + largeQueueSize = queue.largeQueueSize; + queueThreshold = queue.queueThreshold; + memoryPerNode = queue.memoryPerNode; + memoryPe
[GitHub] drill pull request #928: Drill 5716: Queue-driven memory allocation
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/928#discussion_r136333709 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java --- @@ -394,6 +394,8 @@ DoubleValidator QUEUE_MEMORY_RATIO = new RangeDoubleValidator("exec.queue.memory_ratio", 0.001, 1000); + DoubleValidator QUEUE_MEMORY_RESERVE = new RangeDoubleValidator("exec.queue.memory_reserve_ratio", 0, 1.0); --- End diff -- please add a comment explaining what this ratio means as you mentioned in the commit notes. Please do for the queue.memory_ratio as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #928: Drill 5716: Queue-driven memory allocation
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/928#discussion_r136491938 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManager.java --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.work.foreman.Foreman; + +/** + * Drillbit-wide resource manager shared by all queries. Manages + * memory (at present) and CPU (planned). Since queries are the + * primary consumer of resources, manages resources by throttling + * queries into the system, and allocating resources to queries + * in order to control total use. An "null" implementation handles the + * case of no queuing. Clearly, the null case cannot effectively control + * resource use. + */ + +public interface ResourceManager { + + /** + * Returns the memory, in bytes, assigned to each node in a Drill + * cluster. Since Drill nodes are symmetrical, knowing he memory on --- End diff -- better is to say "assumed" to be symmetrical or "expected" to be symmetrical :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #928: Drill 5716: Queue-driven memory allocation
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/928#discussion_r136489458 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.server.BootStrapContext; +import org.apache.drill.exec.util.MemoryAllocationUtilities; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.apache.drill.exec.work.foreman.Foreman; + +/** + * Represents a default resource manager for clusters that do not provide query + * queues. Without queues to provide a hard limit on the query admission rate, + * the number of active queries must be estimated and the resulting resource + * allocations will be rough estimates. + */ + +public class DefaultResourceManager implements ResourceManager { + + public static class DefaultResourceAllocator implements QueryResourceAllocator { + +private QueryContext queryContext; + +protected DefaultResourceAllocator(QueryContext queryContext) { + this.queryContext = queryContext; +} + +@Override +public void visitAbstractPlan(PhysicalPlan plan) { + boolean replanMemory = ! plan.getProperties().hasResourcePlan; + if (! replanMemory || plan == null) { +return; + } + MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext); +} + +@Override +public void visitPhysicalPlan(QueryWorkUnit work) { +} + } + + public static class DefaultQueryResourceManager extends DefaultResourceAllocator implements QueryResourceManager { + +@SuppressWarnings("unused") +private final DefaultResourceManager rm; + +public DefaultQueryResourceManager(final DefaultResourceManager rm, final Foreman foreman) { + super(foreman.getQueryContext()); + this.rm = rm; +} + +@Override +public void setCost(double cost) { + // Nothing to do. The EXECUTION option in Foreman calls this, + // but does not do the work to plan sort memory. Is EXECUTION + // even used? --- End diff -- Not clear what this comment means . Is it ok not to do anything ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #928: Drill 5716: Queue-driven memory allocation
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/928#discussion_r136451559 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java --- @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; +import org.apache.drill.exec.physical.base.FragmentRoot; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.apache.drill.exec.work.QueryWorkUnit.MinorFragmentDefn; +import org.apache.drill.exec.work.foreman.Foreman; +import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException; +import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueLease; +import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; + +/** + * Global resource manager that provides basic admission control (AC) + * via a configured queue: either the Zookeeper-based distributed queue + * or the in-process embedded Drillbit queue. The queue places an upper + * limit on the number of running queries. This limit then "slices" + * memory and CPU between queries: each gets the same share of resources. + * + * This is a "basic" implementation. Clearly, a more advanced implementation + * could look at query cost to determine whether to give a given query more + * or less than the "standard" share. That is left as a future exercise; + * in this version we just want to get the basics working. + */ + +public class ThrottledResourceManager extends AbstractResourceManager { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ThrottledResourceManager.class); + + public static class QueuedResourceAllocator implements QueryResourceAllocator { + +protected final ThrottledResourceManager rm; +protected QueryContext queryContext; +protected PhysicalPlan plan; +protected QueryWorkUnit work; +protected double queryCost; + +protected QueuedResourceAllocator(final ThrottledResourceManager rm, QueryContext queryContext) { + this.rm = rm; + this.queryContext = queryContext; +} + +@Override +public void visitAbstractPlan(PhysicalPlan plan) { + this.plan = plan; + queryCost = plan.totalCost(); +} + +@Override +public void visitPhysicalPlan(final QueryWorkUnit work) { + this.work = work; + planMemory(); +} + +private void planMemory() { + if (plan.getProperties().hasResourcePlan) { +logger.debug("Memory already planned."); +return; + } + + // Group fragments by node. + + Multimap> nodeMap = buildBufferedOpMap(); + + // Memory must be symmetric to avoid bottlenecks in which one node has + // sorts (say) with less memory than another, causing skew in data arrival + // rates for downstream operators. + + int width = countBufferingOperators(nodeMap); + + // Then, share memory evenly across the + // all sort operators on that node. This handles asymmetric distribution + // such as occurs if a sort appears in the root fragment (the one with screen), + // which is never parallelized. + + for ( String key : nodeMap.keys() ) { +planNodeMemory(key, nodeMap.get(key
[GitHub] drill pull request #928: Drill 5716: Queue-driven memory allocation
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/928#discussion_r136471965 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.coord.DistributedSemaphore; +import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.SystemOptionManager; + +/** + * Distributed query queue which uses a Zookeeper distributed semaphore to + * control queuing across the cluster. The distributed queue is actually two + * queues: one for "small" queries, another for "large" queries. Query size is + * determined by the Planner's estimate of query cost. + * + * This queue is configured using system options: + * + * exec.queue.enable + * + * Set to true to enable the distributed queue. + * exec.queue.large + * + * The maximum number of large queries to admit. Additional + * queries wait in the queue. + * exec.queue.small + * + * The maximum number of small queries to admit. Additional + * queries wait in the queue. + * exec.queue.threshold + * + * The cost threshold. Queries below this size are small, at + * or above this size are large.. + * exec.queue.timeout_millis + * + * The maximum time (in milliseconds) a query will wait in the + * queue before failing. + * + * + * The above values are refreshed every five seconds. This aids performance + * a bit in systems with very high query arrival rates. + */ + +public class DistributedQueryQueue implements QueryQueue { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedQueryQueue.class); + + private class DistributedQueueLease implements QueueLease { +private final QueryId queryId; +private DistributedLease lease; +private final String queueName; +private long queryMemory; + +public DistributedQueueLease(QueryId queryId, String queueName, DistributedLease lease, long queryMemory) { + this.queryId = queryId; + this.queueName = queueName; + this.lease = lease; + this.queryMemory = queryMemory; +} + +@Override +public String toString() { + return String.format("Lease for %s queue to query %s", + queueName, QueryIdHelper.getQueryId(queryId)); +} + +@Override +public long queryMemoryPerNode() { return queryMemory; } + +@Override +public void release() { + DistributedQueryQueue.this.release(this); +} + +@Override +public String queueName() { return queueName; } + } + + /** + * Exposes a snapshot of internal state information for use in status + * reporting, such as in the UI. + */ + + @XmlRootElement + public static class ZKQueueInfo { +public final int smallQueueSize; +public final int largeQueueSize; +public final double queueThreshold; +public final long memoryPerNode; +public final long memoryPerSmallQuery; +public final long memoryPerLargeQuery; + +public ZKQueueInfo(DistributedQueryQueue queue) { + smallQueueSize = queue.smallQueueSize; + largeQueueSize = queue.largeQueueSize; + queueThreshold = queue.queueThreshold; + memoryPerNode = queue.memoryPerNode; + memoryPe
[GitHub] drill pull request #928: Drill 5716: Queue-driven memory allocation
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/928#discussion_r136446729 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.server.BootStrapContext; +import org.apache.drill.exec.util.MemoryAllocationUtilities; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.apache.drill.exec.work.foreman.Foreman; + +/** + * Represents a default resource manager for clusters that do not provide query + * queues. Without queues to provide a hard limit on the query admission rate, + * the number of active queries must be estimated and the resulting resource + * allocations will be rough estimates. + */ + +public class DefaultResourceManager implements ResourceManager { + + public static class DefaultResourceAllocator implements QueryResourceAllocator { + +private QueryContext queryContext; + +protected DefaultResourceAllocator(QueryContext queryContext) { + this.queryContext = queryContext; +} + +@Override +public void visitAbstractPlan(PhysicalPlan plan) { + boolean replanMemory = ! plan.getProperties().hasResourcePlan; + if (! replanMemory || plan == null) { +return; --- End diff -- You are reading plan and then checking for null later. change it like this. if (!plan || !plan.getProperties().hasResourcePlan) { return; } --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #928: Drill 5716: Queue-driven memory allocation
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/928#discussion_r136335585 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/work/foreman/rm/TestQueues.java --- @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ -package org.apache.drill.exec.work.foreman.rm; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.work.foreman.rm.EmbeddedQueryQueue; -import org.apache.drill.test.ClientFixture; -import org.apache.drill.test.ClusterFixture; -import org.apache.drill.test.FixtureBuilder; -import org.apache.drill.test.QueryBuilder.QuerySummary; -import org.apache.drill.test.QueryBuilder.QuerySummaryFuture; -import org.junit.Test; - -public class TestQueues { - - @Test - public void testEmbedded() throws Exception { -FixtureBuilder builder = ClusterFixture.builder() -.configProperty(EmbeddedQueryQueue.ENABLED, true) -.configProperty(EmbeddedQueryQueue.QUEUE_SIZE, 2) -.configProperty(EmbeddedQueryQueue.TIMEOUT_MS, 2) -; -try(ClusterFixture cluster = builder.build(); -ClientFixture client = cluster.clientFixture()) { - List futures = new ArrayList<>(); - int n = 100; - for (int i = 0; i < n; i++) { -futures.add(client.queryBuilder().sql("SELECT `id_i` FROM `mock`.`implicit_10K` ORDER BY `id_i`").futureSummary()); - } - for (QuerySummaryFuture future : futures) { -QuerySummary summary = future.get(); -System.out.print( summary.queryIdString() + ": " ); -if (summary.failed()) { - System.out.println("Error - " + summary.error().getMessage()); -} else { - System.out.println(summary.recordCount()); -} - } -} - } - - @Test - public void testZk() throws Exception { -FixtureBuilder builder = ClusterFixture.builder() -.systemOption(ExecConstants.ENABLE_QUEUE.getOptionName(), true) -.systemOption(ExecConstants.LARGE_QUEUE_SIZE.getOptionName(), 1) -.systemOption(ExecConstants.SMALL_QUEUE_SIZE.getOptionName(), 2) -.withLocalZk() -; -try(ClusterFixture cluster = builder.build(); -ClientFixture client = cluster.clientFixture()) { - List futures = new ArrayList<>(); - int n = 100; - for (int i = 0; i < n; i++) { -futures.add(client.queryBuilder().sql("SELECT `id_i` FROM `mock`.`implicit_10K` ORDER BY `id_i`").futureSummary()); - } - for (QuerySummaryFuture future : futures) { -QuerySummary summary = future.get(); -System.out.print( summary.queryIdString() + ": " ); -if (summary.failed()) { - System.out.println("Error - " + summary.error().getMessage()); -} else { - System.out.println(summary.recordCount()); -} - } -} - } -} --- End diff -- It is not clear. Are these tests failing ? If yes, were they always failing or failing with the new changes ? If they are not failing, why not keep them since they provide some coverage. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #928: Drill 5716: Queue-driven memory allocation
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/928#discussion_r136489778 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.server.BootStrapContext; +import org.apache.drill.exec.util.MemoryAllocationUtilities; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.apache.drill.exec.work.foreman.Foreman; + +/** + * Represents a default resource manager for clusters that do not provide query + * queues. Without queues to provide a hard limit on the query admission rate, + * the number of active queries must be estimated and the resulting resource + * allocations will be rough estimates. + */ + +public class DefaultResourceManager implements ResourceManager { + + public static class DefaultResourceAllocator implements QueryResourceAllocator { + +private QueryContext queryContext; + +protected DefaultResourceAllocator(QueryContext queryContext) { + this.queryContext = queryContext; +} + +@Override +public void visitAbstractPlan(PhysicalPlan plan) { + boolean replanMemory = ! plan.getProperties().hasResourcePlan; + if (! replanMemory || plan == null) { +return; + } + MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext); +} + +@Override +public void visitPhysicalPlan(QueryWorkUnit work) { +} + } + + public static class DefaultQueryResourceManager extends DefaultResourceAllocator implements QueryResourceManager { + +@SuppressWarnings("unused") +private final DefaultResourceManager rm; + +public DefaultQueryResourceManager(final DefaultResourceManager rm, final Foreman foreman) { + super(foreman.getQueryContext()); + this.rm = rm; +} + +@Override +public void setCost(double cost) { + // Nothing to do. The EXECUTION option in Foreman calls this, + // but does not do the work to plan sort memory. Is EXECUTION + // even used? +} + +@Override +public void admit() { + // No queueing by default +} + +@Override +public void exit() { + // No queueing by default +} + +@Override +public boolean hasQueue() { return false; } + +@Override +public String queueName() { return null; } + } + + BootStrapContext bootStrapContext; + public long memoryPerNode; + public int cpusPerNode; + + public DefaultResourceManager() { +memoryPerNode = DrillConfig.getMaxDirectMemory(); +cpusPerNode = Runtime.getRuntime().availableProcessors(); --- End diff -- Shouldn't you be doing the same thing that you were doing earlier in the code i.e use the min value from what is configured on JVM command line and config option ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #928: Drill 5716: Queue-driven memory allocation
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/928#discussion_r136490058 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.server.BootStrapContext; +import org.apache.drill.exec.util.MemoryAllocationUtilities; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.apache.drill.exec.work.foreman.Foreman; + +/** + * Represents a default resource manager for clusters that do not provide query + * queues. Without queues to provide a hard limit on the query admission rate, + * the number of active queries must be estimated and the resulting resource + * allocations will be rough estimates. + */ + +public class DefaultResourceManager implements ResourceManager { + + public static class DefaultResourceAllocator implements QueryResourceAllocator { + +private QueryContext queryContext; + +protected DefaultResourceAllocator(QueryContext queryContext) { + this.queryContext = queryContext; +} + +@Override +public void visitAbstractPlan(PhysicalPlan plan) { + boolean replanMemory = ! plan.getProperties().hasResourcePlan; + if (! replanMemory || plan == null) { +return; + } + MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext); +} + +@Override +public void visitPhysicalPlan(QueryWorkUnit work) { +} + } + + public static class DefaultQueryResourceManager extends DefaultResourceAllocator implements QueryResourceManager { + +@SuppressWarnings("unused") +private final DefaultResourceManager rm; + +public DefaultQueryResourceManager(final DefaultResourceManager rm, final Foreman foreman) { + super(foreman.getQueryContext()); + this.rm = rm; +} + +@Override +public void setCost(double cost) { + // Nothing to do. The EXECUTION option in Foreman calls this, + // but does not do the work to plan sort memory. Is EXECUTION + // even used? +} + +@Override +public void admit() { + // No queueing by default +} + +@Override +public void exit() { + // No queueing by default +} + +@Override +public boolean hasQueue() { return false; } + +@Override +public String queueName() { return null; } + } + + BootStrapContext bootStrapContext; + public long memoryPerNode; + public int cpusPerNode; + + public DefaultResourceManager() { +memoryPerNode = DrillConfig.getMaxDirectMemory(); +cpusPerNode = Runtime.getRuntime().availableProcessors(); + } + + @Override + public long memoryPerNode() { return memoryPerNode; } + + @Override + public int cpusPerNode() { return cpusPerNode; } + + @Override + public QueryResourceAllocator newQueryPlanner(QueryContext queryContext) { +return new DefaultResourceAllocator(queryContext); + } + + @Override + public QueryResourceManager newExecRM(final Foreman foreman) { --- End diff -- can we rename this newQueryResourceManager instead of newExecRM ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #928: Drill 5716: Queue-driven memory allocation
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/928#discussion_r136468725 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.coord.DistributedSemaphore; +import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.SystemOptionManager; + +/** + * Distributed query queue which uses a Zookeeper distributed semaphore to + * control queuing across the cluster. The distributed queue is actually two + * queues: one for "small" queries, another for "large" queries. Query size is + * determined by the Planner's estimate of query cost. + * + * This queue is configured using system options: + * + * exec.queue.enable + * + * Set to true to enable the distributed queue. + * exec.queue.large + * + * The maximum number of large queries to admit. Additional + * queries wait in the queue. + * exec.queue.small + * + * The maximum number of small queries to admit. Additional + * queries wait in the queue. + * exec.queue.threshold + * + * The cost threshold. Queries below this size are small, at + * or above this size are large.. + * exec.queue.timeout_millis + * + * The maximum time (in milliseconds) a query will wait in the + * queue before failing. + * + * + * The above values are refreshed every five seconds. This aids performance + * a bit in systems with very high query arrival rates. + */ + +public class DistributedQueryQueue implements QueryQueue { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedQueryQueue.class); + + private class DistributedQueueLease implements QueueLease { +private final QueryId queryId; +private DistributedLease lease; +private final String queueName; +private long queryMemory; + +public DistributedQueueLease(QueryId queryId, String queueName, DistributedLease lease, long queryMemory) { + this.queryId = queryId; + this.queueName = queueName; + this.lease = lease; + this.queryMemory = queryMemory; +} + +@Override +public String toString() { + return String.format("Lease for %s queue to query %s", + queueName, QueryIdHelper.getQueryId(queryId)); +} + +@Override +public long queryMemoryPerNode() { return queryMemory; } + +@Override +public void release() { + DistributedQueryQueue.this.release(this); +} + +@Override +public String queueName() { return queueName; } + } + + /** + * Exposes a snapshot of internal state information for use in status + * reporting, such as in the UI. + */ + + @XmlRootElement + public static class ZKQueueInfo { +public final int smallQueueSize; +public final int largeQueueSize; +public final double queueThreshold; +public final long memoryPerNode; +public final long memoryPerSmallQuery; +public final long memoryPerLargeQuery; + +public ZKQueueInfo(DistributedQueryQueue queue) { + smallQueueSize = queue.smallQueueSize; + largeQueueSize = queue.largeQueueSize; + queueThreshold = queue.queueThreshold; + memoryPerNode = queue.memoryPerNode; + memoryPe
[GitHub] drill pull request #928: Drill 5716: Queue-driven memory allocation
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/928#discussion_r136450731 --- Diff: exec/java-exec/src/main/resources/drill-module.conf --- @@ -344,6 +357,20 @@ drill.exec: { # Full workspace name should be indicated (including schema and workspace separated by dot). # Workspace MUST be file-based and writable. Workspace name is case-sensitive. default_temporary_workspace: "dfs.tmp" + + // Resource management + rm : { +// Memory per node normally comes from the direct memory alloated on the JVM +// command line. This parameter, if other than 0, further limits the amount. +// Primarily for testing. +memory_per_node: 0, +// The number of available CPUs normally comes directly from the system itself. +// This parameter, if other than 0, further limits the number of CPUs will +// will consider when planning. Note that, sadly, this parameter does not +// limit *actual* CPU usage; only the amount of CPU assumed to exist when +// planning and managing queries. Primarily for testing. +cpus_per_node: 0, + } --- End diff -- I am wondering should we have some prefix like memory_per_node_for_testing_only or memory_per_node_debug_only so someone will not misinterpret or accidentally change this config and expect different results. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #928: Drill 5716: Queue-driven memory allocation
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/928#discussion_r136495651 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java --- @@ -186,9 +187,10 @@ public void dispose(WebUserConnection instance) { @Inject WorkManager workManager; +@SuppressWarnings("resource") @Override public WebUserConnection provide() { - final HttpSession session = request.getSession(); +// final HttpSession session = request.getSession(); --- End diff -- why not just remove it if not needed or not being used ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #928: Drill 5716: Queue-driven memory allocation
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/928#discussion_r136336256 --- Diff: protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java --- @@ -12140,6 +12140,31 @@ public Builder clearDef() { */ com.google.protobuf.ByteString getOptionsJsonBytes(); + +// optional double total_cost = 7; --- End diff -- you can remove this redundant comment I guess. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #928: Drill 5716: Queue-driven memory allocation
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/928#discussion_r136376799 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java --- @@ -373,13 +373,26 @@ String AVERAGE_FIELD_WIDTH_KEY = "planner.memory.average_field_width"; OptionValidator AVERAGE_FIELD_WIDTH = new PositiveLongValidator(AVERAGE_FIELD_WIDTH_KEY, Long.MAX_VALUE); + // Resource management boot-time options. + + String MAX_MEMORY_PER_NODE = "drill.exec.rm.memory_per_node"; + String MAX_CPUS_PER_NODE = "drill.exec.rm.cpus_per_node"; + + // Resource management system run-time options. + + // Enables queues. When running embedded, enables an in-process queue. When + // running distributed, enables the Zookeeper-based distributed queue. + BooleanValidator ENABLE_QUEUE = new BooleanValidator("exec.queue.enable"); LongValidator LARGE_QUEUE_SIZE = new PositiveLongValidator("exec.queue.large", 1000); LongValidator SMALL_QUEUE_SIZE = new PositiveLongValidator("exec.queue.small", 10); - LongValidator QUEUE_THRESHOLD_SIZE = new PositiveLongValidator("exec.queue.threshold", - Long.MAX_VALUE); - LongValidator QUEUE_TIMEOUT = new PositiveLongValidator("exec.queue.timeout_millis", - Long.MAX_VALUE); + LongValidator QUEUE_THRESHOLD_SIZE = new PositiveLongValidator("exec.queue.threshold", Long.MAX_VALUE); + LongValidator QUEUE_TIMEOUT = new PositiveLongValidator("exec.queue.timeout_millis", Long.MAX_VALUE); + + // Ratio of memory for small queries vs. large queries. + // Each small query gets 1 unit, each large query gets QUEUE_MEMORY_RATIO units. + + DoubleValidator QUEUE_MEMORY_RATIO = new RangeDoubleValidator("exec.queue.memory_ratio", 0.001, 1000); --- End diff -- why lower bound of 0.001 ? should it be 1 ? I am thinking larger query gets at least as much as smaller query. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (DRILL-5760) Performance hit: SVR causes repeated vector reallocations
Paul Rogers created DRILL-5760: -- Summary: Performance hit: SVR causes repeated vector reallocations Key: DRILL-5760 URL: https://issues.apache.org/jira/browse/DRILL-5760 Project: Apache Drill Issue Type: Improvement Affects Versions: 1.10.0 Reporter: Paul Rogers Priority: Minor Run the query in DRILL-5753 with DEBUG logging enabled. You will see a set of vector reallocations out of the JSON reader as described by DRILL-5759. Later, the sort in the query will complete with an in-memory sort. Data will be sent downstream to a selection vector remover (SVR). The SVR will fire a very large number of additional vector reallocations: {code} VarCharVector - Reallocating VarChar, new size 65536 VarCharVector - Reallocating VarChar, new size 65536 UInt4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [131072] -> [262144] UInt4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [131072] -> [262144] UInt1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [32768] -> [65536] UInt1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [32768] -> [65536] UInt1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [32768] -> [65536] BigIntVector - Reallocating vector [c(BIGINT:OPTIONAL)]. # of bytes: [262144] -> [524288] UInt1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [32768] -> [65536] Float8Vector - Reallocating vector [d(FLOAT8:OPTIONAL)]. # of bytes: [262144] -> [524288] UInt4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [65536] -> [131072] UInt4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [65536] -> [131072] UInt4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [65536] -> [131072] UInt4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [65536] -> [131072] UInt1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [16384] -> [32768] VarCharVector - Reallocating VarChar, new size 65536 UInt1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [16384] -> [32768] BigIntVector - Reallocating vector [col1(BIGINT:OPTIONAL)]. # of bytes: [131072] -> [262144] UInt1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [16384] -> [32768] BigIntVector - Reallocating vector [$data$(BIGINT:REQUIRED)]. # of bytes: [524288] -> [1048576] VarCharVector - Reallocating VarChar, new size 65536 VarCharVector - Reallocating VarChar, new size 131072 VarCharVector - Reallocating VarChar, new size 131072 UInt4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [262144] -> [524288] UInt4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [262144] -> [524288] UInt1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [65536] -> [131072] UInt1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [65536] -> [131072] UInt1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [65536] -> [131072] BigIntVector - Reallocating vector [c(BIGINT:OPTIONAL)]. # of bytes: [524288] -> [1048576] Int1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [65536] -> [131072] Float8Vector - Reallocating vector [d(FLOAT8:OPTIONAL)]. # of bytes: [524288] -> [1048576] Int4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [131072] -> [262144] Int4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [131072] -> [262144] Int4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [131072] -> [262144] Int4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [131072] -> [262144] Int1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [32768] -> [65536] VarCharVector - Reallocating VarChar, new size 131072 Int1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [32768] -> [65536] BigIntVector - Reallocating vector [col1(BIGINT:OPTIONAL)]. # of bytes: [262144] -> [524288] Int1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [32768] -> [65536] BigIntVector - Reallocating vector [$data$(BIGINT:REQUIRED)]. # of bytes: [1048576] -> [2097152] VarCharVector - Reallocating VarChar, new size 262144 VarCharVector - Reallocating VarChar, new size 131072 VarCharVector - Reallocating VarChar, new size 262144 Int4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [524288] -> [1048576] Int4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [524288] -> [1048576] Int1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [131072] -> [262144] Int1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [131072] -> [262144] Int1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [131072] -> [262144] BigIntVector -
[jira] [Created] (DRILL-5759) Performance hit: JSON reader causes repeated vector reallocations
Paul Rogers created DRILL-5759: -- Summary: Performance hit: JSON reader causes repeated vector reallocations Key: DRILL-5759 URL: https://issues.apache.org/jira/browse/DRILL-5759 Project: Apache Drill Issue Type: Improvement Affects Versions: 1.10.0 Reporter: Paul Rogers Priority: Minor Run the query in DRILL-5753 with DEBUG level logging. This query uses the JSON record reader. You will see many messages about vector reallocation. These messages show that the JSON reader incorrectly allocated vector sizes. The specific problem is with repeated types. In the messages, note that vectors start off with 4096 values, which is the target row count. But, the data contains repeated columns, and so the vectors must grow multiple times to hold the repeated items. On the first batch, heuristics might be used to guess the cardinality. On the second and subsequent batches, empirical data gathered on the first batch could be used to better allocate the subsequent batches. The result is memory thrashing and a performance hit as memory is allocated, copied and released multiple times. {code} BigIntVector - Reallocating vector [$data$(BIGINT:REQUIRED)]. # of bytes: [32768] -> [65536] UInt4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [16384] -> [32768] UInt4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [16384] -> [32768] UInt1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [4096] -> [8192] UInt1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [4096] -> [8192] UInt1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [4096] -> [8192] BigIntVector - Reallocating vector [c(BIGINT:OPTIONAL)]. # of bytes: [32768] -> [65536] UInt1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [4096] -> [8192] Float8Vector - Reallocating vector [d(FLOAT8:OPTIONAL)]. # of bytes: [32768] -> [65536] BigIntVector - Reallocating vector [$data$(BIGINT:REQUIRED)]. # of bytes: [65536] -> [131072] UInt4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [32768] -> [65536] UInt4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [32768] -> [65536] UInt1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [8192] -> [16384] UInt1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [8192] -> [16384] BigIntVector - Reallocating vector [c(BIGINT:OPTIONAL)]. # of bytes: [65536] -> [131072] UInt1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [8192] -> [16384] Float8Vector - Reallocating vector [d(FLOAT8:OPTIONAL)]. # of bytes: [65536] -> [131072] UInt1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [8192] -> [16384] UInt4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [16384] -> [32768] UInt4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [16384] -> [32768] UInt4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [16384] -> [32768] UInt1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [4096] -> [8192] UInt1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [4096] -> [8192] BigIntVector - Reallocating vector [col1(BIGINT:OPTIONAL)]. # of bytes: [32768] -> [65536] BigIntVector - Reallocating vector [$data$(BIGINT:REQUIRED)]. # of bytes: [131072] -> [262144] UInt4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [65536] -> [131072] UInt1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [16384] -> [32768] Float8Vector - Reallocating vector [d(FLOAT8:OPTIONAL)]. # of bytes: [131072] -> [262144] UInt1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [16384] -> [32768] UInt4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [65536] -> [131072] UInt1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [16384] -> [32768] UInt1Vector - Reallocating vector [$bits$(UINT1:REQUIRED)]. # of bytes: [16384] -> [32768] BigIntVector - Reallocating vector [c(BIGINT:OPTIONAL)]. # of bytes: [131072] -> [262144] UInt4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [16384] -> [32768] UInt4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [16384] -> [32768] UInt4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [32768] -> [65536] UInt4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [32768] -> [65536] UInt4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [32768] -> [65536] UInt4Vector - Reallocating vector [$offsets$(UINT4:REQUIRED)]. # of bytes: [16384] -> [32768] {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] drill issue #920: DRILL-5737: Hash Agg uses more than the allocated memory u...
Github user priteshm commented on the issue: https://github.com/apache/drill/pull/920 @Ben-Zvi can you please review for it to be included in the current sprint? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (DRILL-5758) Rollup of external sort fixes to issues found by QA
Paul Rogers created DRILL-5758: -- Summary: Rollup of external sort fixes to issues found by QA Key: DRILL-5758 URL: https://issues.apache.org/jira/browse/DRILL-5758 Project: Apache Drill Issue Type: Task Affects Versions: 1.12.0 Reporter: Paul Rogers Assignee: Paul Rogers Fix For: 1.12.0 Tracking JIRA to used for the PR that combines fixes for various JIRA entries. Bugs fixed in this task are given by the linked issues. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/906#discussion_r136440319 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java --- @@ -130,562 +145,248 @@ public IterOutcome innerNext() { } @Override - public WritableBatch getWritableBatch() { -return WritableBatch.get(this); + public int getRecordCount() { +return recordCount; } - private void setValueCount(int count) { -for (ValueVector v : allocationVectors) { - ValueVector.Mutator m = v.getMutator(); - m.setValueCount(count); -} - } - private boolean doAlloc() { -for (ValueVector v : allocationVectors) { - try { -AllocationHelper.allocateNew(v, current.getRecordCount()); - } catch (OutOfMemoryException ex) { -return false; - } + @SuppressWarnings("resource") + private IterOutcome doWork(RecordBatch inputBatch, boolean newSchema) throws ClassTransformationException, IOException, SchemaChangeException { +if (inputBatch.getSchema().getFieldCount() != container.getSchema().getFieldCount()) { + // wrong. } -return true; - } - @SuppressWarnings("resource") - private IterOutcome doWork() throws ClassTransformationException, IOException, SchemaChangeException { -if (allocationVectors != null) { - for (ValueVector v : allocationVectors) { -v.clear(); - } +if (newSchema) { + createUnionAller(inputBatch); } -allocationVectors = Lists.newArrayList(); -transfers.clear(); +container.zeroVectors(); -// If both sides of Union-All are empty -if(unionAllInput.isBothSideEmpty()) { - for(int i = 0; i < outputFields.size(); ++i) { -final String colName = outputFields.get(i).getPath(); -final MajorType majorType = MajorType.newBuilder() -.setMinorType(MinorType.INT) -.setMode(DataMode.OPTIONAL) -.build(); - -MaterializedField outputField = MaterializedField.create(colName, majorType); -ValueVector vv = container.addOrGet(outputField, callBack); -allocationVectors.add(vv); - } +for (final ValueVector v : this.allocationVectors) { + AllocationHelper.allocateNew(v, inputBatch.getRecordCount()); +} - container.buildSchema(BatchSchema.SelectionVectorMode.NONE); +recordCount = unionall.unionRecords(0, inputBatch.getRecordCount(), 0); +for (final ValueVector v : allocationVectors) { + final ValueVector.Mutator m = v.getMutator(); + m.setValueCount(recordCount); +} + +if (callBack.getSchemaChangedAndReset()) { return IterOutcome.OK_NEW_SCHEMA; +} else { + return IterOutcome.OK; } + } + + private void createUnionAller(RecordBatch inputBatch) throws ClassTransformationException, IOException, SchemaChangeException { +transfers.clear(); +allocationVectors.clear();; final ClassGenerator cg = CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); cg.getCodeGenerator().plainJavaCapable(true); // Uncomment out this line to debug the generated code. -//cg.getCodeGenerator().saveCodeForDebugging(true); +//cg.getCodeGenerator().saveCodeForDebugging(true); + int index = 0; -for(VectorWrapper vw : current) { - ValueVector vvIn = vw.getValueVector(); - // get the original input column names - SchemaPath inputPath = SchemaPath.getSimplePath(vvIn.getField().getPath()); - // get the renamed column names - SchemaPath outputPath = SchemaPath.getSimplePath(outputFields.get(index).getPath()); +for(VectorWrapper vw : inputBatch) { + ValueVector vvIn = vw.getValueVector(); + ValueVector vvOut = container.getValueVector(index).getValueVector(); final ErrorCollector collector = new ErrorCollectorImpl(); // According to input data names, Minortypes, Datamodes, choose to // transfer directly, // rename columns or // cast data types (Minortype or DataMode) - if (hasSameTypeAndMode(outputFields.get(index), vw.getValueVector().getField())) { + if (hasSameTypeAndMode(container.getSchema().getColumn(index), vvIn.getField()) + && vvIn.getField().getType().getMinorType() != TypeProtos.MinorType.MAP // Per DRILL-5521, existing bug for map transfer + ) { // Transfer column - -MajorType outputFieldType = outputFields.get(ind
[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/906#discussion_r136439631 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java --- @@ -329,9 +326,11 @@ public TypedFieldId getValueVectorId(SchemaPath path) { @VisibleForTesting public static class Mutator implements OutputMutator { -/** Whether schema has changed since last inquiry (via #isNewSchema}). Is - * true before first inquiry. */ -private boolean schemaChanged = true; +/** Flag keeping track whether top-level schema has changed since last inquiry (via #isNewSchema}). + * It's initialized to false, or reset to false after #isNewSchema or after #clear, until a new value vector + * or a value vector with different type is added to fieldVectorMap. + **/ +private boolean schemaChanged; --- End diff -- Thanks for the explanation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/906#discussion_r136445816 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java --- @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.record; + +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.base.PhysicalOperator; + +public abstract class AbstractBinaryRecordBatch extends AbstractRecordBatch { + protected final RecordBatch left; + protected final RecordBatch right; + + // state (IterOutcome) of the left input + protected IterOutcome leftUpstream = IterOutcome.NONE; + + // state (IterOutcome) of the right input + protected IterOutcome rightUpstream = IterOutcome.NONE; --- End diff -- Thanks for the explanation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/906#discussion_r136440163 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java --- @@ -130,562 +145,248 @@ public IterOutcome innerNext() { } @Override - public WritableBatch getWritableBatch() { -return WritableBatch.get(this); + public int getRecordCount() { +return recordCount; } - private void setValueCount(int count) { -for (ValueVector v : allocationVectors) { - ValueVector.Mutator m = v.getMutator(); - m.setValueCount(count); -} - } - private boolean doAlloc() { -for (ValueVector v : allocationVectors) { - try { -AllocationHelper.allocateNew(v, current.getRecordCount()); - } catch (OutOfMemoryException ex) { -return false; - } + @SuppressWarnings("resource") + private IterOutcome doWork(RecordBatch inputBatch, boolean newSchema) throws ClassTransformationException, IOException, SchemaChangeException { +if (inputBatch.getSchema().getFieldCount() != container.getSchema().getFieldCount()) { + // wrong. } -return true; - } - @SuppressWarnings("resource") - private IterOutcome doWork() throws ClassTransformationException, IOException, SchemaChangeException { -if (allocationVectors != null) { - for (ValueVector v : allocationVectors) { -v.clear(); - } +if (newSchema) { + createUnionAller(inputBatch); } -allocationVectors = Lists.newArrayList(); -transfers.clear(); +container.zeroVectors(); -// If both sides of Union-All are empty -if(unionAllInput.isBothSideEmpty()) { - for(int i = 0; i < outputFields.size(); ++i) { -final String colName = outputFields.get(i).getPath(); -final MajorType majorType = MajorType.newBuilder() -.setMinorType(MinorType.INT) -.setMode(DataMode.OPTIONAL) -.build(); - -MaterializedField outputField = MaterializedField.create(colName, majorType); -ValueVector vv = container.addOrGet(outputField, callBack); -allocationVectors.add(vv); - } +for (final ValueVector v : this.allocationVectors) { + AllocationHelper.allocateNew(v, inputBatch.getRecordCount()); +} - container.buildSchema(BatchSchema.SelectionVectorMode.NONE); +recordCount = unionall.unionRecords(0, inputBatch.getRecordCount(), 0); +for (final ValueVector v : allocationVectors) { + final ValueVector.Mutator m = v.getMutator(); + m.setValueCount(recordCount); +} + +if (callBack.getSchemaChangedAndReset()) { return IterOutcome.OK_NEW_SCHEMA; +} else { + return IterOutcome.OK; } + } + + private void createUnionAller(RecordBatch inputBatch) throws ClassTransformationException, IOException, SchemaChangeException { +transfers.clear(); +allocationVectors.clear();; final ClassGenerator cg = CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); cg.getCodeGenerator().plainJavaCapable(true); // Uncomment out this line to debug the generated code. -//cg.getCodeGenerator().saveCodeForDebugging(true); +//cg.getCodeGenerator().saveCodeForDebugging(true); + int index = 0; -for(VectorWrapper vw : current) { - ValueVector vvIn = vw.getValueVector(); - // get the original input column names - SchemaPath inputPath = SchemaPath.getSimplePath(vvIn.getField().getPath()); - // get the renamed column names - SchemaPath outputPath = SchemaPath.getSimplePath(outputFields.get(index).getPath()); +for(VectorWrapper vw : inputBatch) { + ValueVector vvIn = vw.getValueVector(); + ValueVector vvOut = container.getValueVector(index).getValueVector(); final ErrorCollector collector = new ErrorCollectorImpl(); // According to input data names, Minortypes, Datamodes, choose to // transfer directly, // rename columns or // cast data types (Minortype or DataMode) - if (hasSameTypeAndMode(outputFields.get(index), vw.getValueVector().getField())) { + if (hasSameTypeAndMode(container.getSchema().getColumn(index), vvIn.getField()) + && vvIn.getField().getType().getMinorType() != TypeProtos.MinorType.MAP // Per DRILL-5521, existing bug for map transfer + ) { // Transfer column - -MajorType outputFieldType = outputFields.get(ind
[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/906#discussion_r136446075 --- Diff: exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullHolder.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.vector; + +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.expr.holders.ValueHolder; + +public class UntypedNullHolder implements ValueHolder { + public static final TypeProtos.MajorType TYPE = Types.optional(TypeProtos.MinorType.NULL); + + public TypeProtos.MajorType getType() {return TYPE;} + + public static final int WIDTH = 0; + + public int isSet = 1; + + @Deprecated + public int hashCode(){ +throw new UnsupportedOperationException(); + } + + /* + * Reason for deprecation is that ValueHolders are potential scalar replacements + * and hence we don't want any methods to be invoked on them. + */ + @Deprecated + public String toString(){ +throw new UnsupportedOperationException(); --- End diff -- Hmmm... Not entirely convinced, but let's let it go for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: Working on upgrade Drill Calcite version
+dev@calcite. Excellent work, Roman. Your changes to Calcite don’t seem to be very major, and I ran the test suite on your https://github.com/KulykRoman/incubator-calcite/commits/DrillCalcite1.13.0_rc1 branch and all the tests pass. So when you are ready, I think we can expedite getting these changes back into Calcite. Calcite will release 1.14 probably in the next two weeks. So, when you’re on to 1.13 let’s continue the momentum and get up to the latest release. I would love to get to the situation that we can ask Drill to verify a Calcite release before we make it. There are possibly breaking changes in the area of schema and grouping-sets aggregate but it should be mostly straightforward. Julian > On Aug 31, 2017, at 8:12 AM, Jinfeng Ni wrote: > > It's great to hear that someone is working on moving Drill on new Calcite > release! As someone who did similar job in last two rebase work, I could > see that there would be many regressions to fix. Please let me know if I > can provide any help. > > I thought we have pushed almost Drill specific commits to Calcite. I'll > take a look at your Calcite branch. Ideally, we want to get rid of all > Drill specific commits, after the rebase work this time. > > Thanks, > > Jinfeng > > > > > On Thu, Aug 31, 2017 at 8:04 AM, Aman Sinha wrote: > >> Yes, this is long overdue ! Thanks for working on it Roman. If needed, >> we can do a separate hangout with a few selected folks who have worked on >> Drill+Calcite to provide feedback on how to resolve the test failures. >> Although, at this stage you probably know a lot more about the integration >> than many of us. >> >> -Aman >> >> On Thu, Aug 31, 2017 at 7:33 AM, Roman Kulyk wrote: >> >>> Hi, team! >>> >>> I want to inform that I am working on updating Drill Calcite version to >>> 1.13. I forked from Calcite "branch-1.13" and added some commits: >>> - DRILL-1455: Add return type-inference strategy for arithmetic operators >>> when one of the arguments is ANY type. (was not in Calcite) >>> - Add new method to ViewExpander interface to allow passing SchemaRoot. >>> (was not in Calcite) >>> - Allow a MAP literal type. (was not in Calcite) >>> - DRILL-4047: Modify table functions to allow querying a table with >> options >>> in Drill (was not in Calcite) >>> - Drill-specific change: Add back AbstractConverter in RelSet.java ... >>> (small changes after a3bc0d8 commit) >>> - CALCITE-628 related but not fix the problem: Ensure target traits are >>> simple when use Frameworks or RelOptRule.convert() method. (small changes >>> after a3bc0d8 commit) >>> - Support select * from schema-less table in execution engine like Drill >>> (small changes after 9bd7d75 commit) >>> >>> Only after adding these commits to Calcite, a part of Drill >> functionality >>> started to work as before. Currently, I am at Drill integration stage. I >> am >>> fixing Drill unit tests. Progress can be tracked in my branches. >>> >>> Drill: https://github.com/KulykRoman/drill/commits/CalciteForkRebase_rc1 >>> Calcite: >>> https://github.com/KulykRoman/incubator-calcite/commits/ >>> DrillCalcite1.13.0_rc1 >>> >>> Currently, I face ~160 errors in java-exec module unit tests. I have >>> divided them into seven groups: >>> 1) SYSTEM ERROR: ClassCastException >>> 2) Unexpected column errors >>> 3) Runtime Exceptions >>> 4) validation errors >>> 5) RpcExceptions (generalized group: some of them should be fixed by the >>> previous groups) >>> 6) IllegalState errors (generalized group: some of them should be fixed >> by >>> the previous group) >>> 7) Other errors >>> >>> Best regards, >>> Roman Kulyk >>> >>
[GitHub] drill pull request #929: DRILL-5757: CONVERT_TO_JSON function is failed whil...
GitHub user vdiravka opened a pull request: https://github.com/apache/drill/pull/929 DRILL-5757: CONVERT_TO_JSON function is failed while using non-existe⦠â¦nce field as a parameter. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vdiravka/drill DRILL-5757 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/drill/pull/929.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #929 commit ad860593040bc2ad26d90ddf1d8158ae5c45810b Author: Vitalii Diravka Date: 2017-08-31T13:24:37Z DRILL-5757: CONVERT_TO_JSON function is failed while using non-existence field as a parameter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #927: DRILL-5751: Fix unit tests to use local file system...
Github user vvysotskyi commented on a diff in the pull request: https://github.com/apache/drill/pull/927#discussion_r136373928 --- Diff: exec/java-exec/src/test/java/org/apache/drill/TestDynamicUDFSupport.java --- @@ -120,8 +132,11 @@ public void testDisableDynamicSupport() throws Exception { @Test public void testAbsentBinaryInStaging() throws Exception { Path staging = getDrillbitContext().getRemoteFunctionRegistry().getStagingArea(); +FileSystem fs = getDrillbitContext().getRemoteFunctionRegistry().getFs(); +copyJar(fs, jars, staging, default_binary_name); --- End diff -- It seems that a source file should be copied here instead of the binary file, since test checks that binary file is missing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #927: DRILL-5751: Fix unit tests to use local file system...
Github user vvysotskyi commented on a diff in the pull request: https://github.com/apache/drill/pull/927#discussion_r136384386 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTestBase.java --- @@ -64,7 +63,9 @@ @BeforeClass public static void setup() throws IOException { // initialize file system -fs = FileSystem.get(new Configuration()); +Configuration configuration = new Configuration(); +configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS); +fs = FileSystem.get(configuration); --- End diff -- These three lines appear several times in the code, so may be it would be better to create a static method which returns `FileSystem` instance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (DRILL-5757) CONVERT_TO_JSON function is failed while using non-existence field as a parameter.
Vitalii Diravka created DRILL-5757: -- Summary: CONVERT_TO_JSON function is failed while using non-existence field as a parameter. Key: DRILL-5757 URL: https://issues.apache.org/jira/browse/DRILL-5757 Project: Apache Drill Issue Type: Bug Components: Execution - Data Types Affects Versions: 1.11.0 Reporter: Vitalii Diravka Assignee: Vitalii Diravka Fix For: 1.12.0 Using of non-existence field as parameter (or field with null values across a whole batch) of CONVERT_TO_JSON function leads to failure. simple.json {code} {"id": 0} {code} {code} 0: jdbc:drill:zk=local> select `id`, convert_to(`complex_field`, 'JSON') from dfs.`/tmp/simple.json`; Error: SYSTEM ERROR: UnsupportedOperationException: Unable to get holder type for minor type [LATE] and mode [OPTIONAL] Fragment 0:0 [Error Id: a13a1ed0-f8fe-4d4a-ba35-ab17c3a2f171 on vitalii-pc:31010] (state=,code=0) {code} The root cause of the issue and fix: If there isn't a particular vector for the field, "NullExpression" is created as an argument for the Drill function. That argument can be replaced later in "ExpressionTreeMaterializer.visitFunctionCall()" with "TypedNullConstant" logical expression, but LATE type shouldn't be used for this case, the data type of the field is still unknown. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
Re: Dedupping json records based on nested value
Hi Francois, I haven't read the details of your use case but just want to make sure you have looked at the nested data functions and ruled them out for your requirement: https://drill.apache.org/docs/nested-data-functions/ -Aman On Thu, Aug 31, 2017 at 8:23 AM, François Méthot wrote: > I manage to implement a single UDF that returns a copy of a MapHolder input > var, it allowed me to figure how to use SingleMapReaderImpl input and > ComplexWriter as out. > > I tried to move that approach into an aggregation function that looks like > the snippet below. > I want to return the first MapHolder value encountered in a group by > operation. > > select firstvalue(tb1.field1), firstvalue(tb1.field2), > firstvalue(tb1.field3), firstvalue(tb1.field4) from dfs.`doc.json` tb1 > group by tb1.field4.key_data > > I get: > Error: SYSTEM ERROR: UnsupportedOperationException: Unable to get new > vector for minor type [LATE] and mode [REQUIRED] > > I am not sure, if we can use the "out" variable within the add method. > Any hint from the experts to put me on track would be appreciated. > > Thanks > Francois > > > @FunctionTemplate(name = "firstvalue", scope = > FunctionTemplate.FunctionScope.POINT_AGGREGATE) > public static class BitCount implements DrillAggFunc{ >public static class FirstValueComplex implements DrillAggFunc >{ > @Param > MapHolder map; > > @Workspace > BitHolder firstSeen; > > @Output > ComplexWriter out; > > @Override > public void Setup() > { > firstSeen.value=0; > } > > @Override > public void add() > { >if(firstSeen.value == 0) >{ > org.apache.drill.exec.vector.complex.impl.SingleMapReaderImpl > map > = > (org.apache.drill.exec.vector.complex.impl.SingleMapReaderImpl)(Object) > map; > map.copyAsValue(out.rootAsMap()); > firstSeen.value=1; >} > } > > @Override > public void output() > { > > } > > @Override > public void reset() > { >out.clear(); >firstSeen.value=0; > } >} > } > > On 30 August 2017 at 16:57, François Méthot wrote: > > > > > Hi, > > > > Congrat for the 1.11 release, we are happy to have our suggestion > > implemented in the new release (automatic HDFS block size for parquet > > files). > > > > It seems like we are pushing the limit of Drill with new type query...(I > > am learning new SQL trick in the process) > > > > We are trying to aggregate a json document based on a nested value. > > > > Document looks like this: > > > > { > > "field1" : { > > "f1_a" : "infoa", > > "f1_b" : "infob" > > }, > > "field2" : "very long string", > > "field3" : { > > "f3_a" : "infoc", > > "f3_b" : "infod", > > "f4_c" : { > > > > } > > }, > > "field4" : { > > "key_data" : "String to aggregate on", > > "f4_b" : "a string2", > > "f4_c" : { > > complex structure... > > } > > } > > } > > > > > > We want a first, or last (or any) occurrence of field1, field2, field3 > and > > field4 group by field4.key_data; > > > > > > Unfortunately min, max function does not support json complex column > > (MapHolder). Therefor group by type of queries do not work. > > > > We tried a window function like this > > create table as ( > > select first_value(tb1.field1) over (partition by tb1.field4.key_data) > > as field1, > >first_value(tb1.field2) over (partition by tb1.field4.key_data) as > > field2, > >first_value(tb1.field3) over (partition by tb1.field4.key_data) as > > field3, > >first_value(tb1.field4) over (partition by tb1.field4.key_data) as > > field4 > > from dfs.`doc.json` tb1; > > ) > > > > We get IndexOutOfBoundException. > > > > We got better success with: > > create table as ( > > select * from > > (select tb1.*, > > row_number() over (partition by tb1.field4.key_data) as row_num > >from dfs.`doc.json` tb1 > > ) t > > where t.row_num = 1 > > ) > > > > This works on single json file or with multiple file in a session > > configured with planner.width_max_per_node=1. > > > > As soon as we put more than 1 thread per query, We get > > IndexOutOfBoundException. > > This was tried on 1.10 and 1.11. > > It looks like a bug. > > > > > > Would you have other suggestion to bypass that issue? > > Is there an existing aggregation function (to work with group by) that > > would return the first,last, or random MapHolder column from json > document? > > If not, I am thinking of implementing one, would there be an example on > > how to Clone a MapHolder within a function? (pretty sure I can't assign > > "in" param to output within a function) > > > > > > Thank you for your time reading this. > > any suggestions to try are welcome > > > > Francois > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
Re: Dedupping json records based on nested value
I manage to implement a single UDF that returns a copy of a MapHolder input var, it allowed me to figure how to use SingleMapReaderImpl input and ComplexWriter as out. I tried to move that approach into an aggregation function that looks like the snippet below. I want to return the first MapHolder value encountered in a group by operation. select firstvalue(tb1.field1), firstvalue(tb1.field2), firstvalue(tb1.field3), firstvalue(tb1.field4) from dfs.`doc.json` tb1 group by tb1.field4.key_data I get: Error: SYSTEM ERROR: UnsupportedOperationException: Unable to get new vector for minor type [LATE] and mode [REQUIRED] I am not sure, if we can use the "out" variable within the add method. Any hint from the experts to put me on track would be appreciated. Thanks Francois @FunctionTemplate(name = "firstvalue", scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE) public static class BitCount implements DrillAggFunc{ public static class FirstValueComplex implements DrillAggFunc { @Param MapHolder map; @Workspace BitHolder firstSeen; @Output ComplexWriter out; @Override public void Setup() { firstSeen.value=0; } @Override public void add() { if(firstSeen.value == 0) { org.apache.drill.exec.vector.complex.impl.SingleMapReaderImpl map = (org.apache.drill.exec.vector.complex.impl.SingleMapReaderImpl)(Object)map; map.copyAsValue(out.rootAsMap()); firstSeen.value=1; } } @Override public void output() { } @Override public void reset() { out.clear(); firstSeen.value=0; } } } On 30 August 2017 at 16:57, François Méthot wrote: > > Hi, > > Congrat for the 1.11 release, we are happy to have our suggestion > implemented in the new release (automatic HDFS block size for parquet > files). > > It seems like we are pushing the limit of Drill with new type query...(I > am learning new SQL trick in the process) > > We are trying to aggregate a json document based on a nested value. > > Document looks like this: > > { > "field1" : { > "f1_a" : "infoa", > "f1_b" : "infob" > }, > "field2" : "very long string", > "field3" : { > "f3_a" : "infoc", > "f3_b" : "infod", > "f4_c" : { > > } > }, > "field4" : { > "key_data" : "String to aggregate on", > "f4_b" : "a string2", > "f4_c" : { > complex structure... > } > } > } > > > We want a first, or last (or any) occurrence of field1, field2, field3 and > field4 group by field4.key_data; > > > Unfortunately min, max function does not support json complex column > (MapHolder). Therefor group by type of queries do not work. > > We tried a window function like this > create table as ( > select first_value(tb1.field1) over (partition by tb1.field4.key_data) > as field1, >first_value(tb1.field2) over (partition by tb1.field4.key_data) as > field2, >first_value(tb1.field3) over (partition by tb1.field4.key_data) as > field3, >first_value(tb1.field4) over (partition by tb1.field4.key_data) as > field4 > from dfs.`doc.json` tb1; > ) > > We get IndexOutOfBoundException. > > We got better success with: > create table as ( > select * from > (select tb1.*, > row_number() over (partition by tb1.field4.key_data) as row_num >from dfs.`doc.json` tb1 > ) t > where t.row_num = 1 > ) > > This works on single json file or with multiple file in a session > configured with planner.width_max_per_node=1. > > As soon as we put more than 1 thread per query, We get > IndexOutOfBoundException. > This was tried on 1.10 and 1.11. > It looks like a bug. > > > Would you have other suggestion to bypass that issue? > Is there an existing aggregation function (to work with group by) that > would return the first,last, or random MapHolder column from json document? > If not, I am thinking of implementing one, would there be an example on > how to Clone a MapHolder within a function? (pretty sure I can't assign > "in" param to output within a function) > > > Thank you for your time reading this. > any suggestions to try are welcome > > Francois > > > > > > > > > > > > > > > >
Re: Working on upgrade Drill Calcite version
It's great to hear that someone is working on moving Drill on new Calcite release! As someone who did similar job in last two rebase work, I could see that there would be many regressions to fix. Please let me know if I can provide any help. I thought we have pushed almost Drill specific commits to Calcite. I'll take a look at your Calcite branch. Ideally, we want to get rid of all Drill specific commits, after the rebase work this time. Thanks, Jinfeng On Thu, Aug 31, 2017 at 8:04 AM, Aman Sinha wrote: > Yes, this is long overdue ! Thanks for working on it Roman. If needed, > we can do a separate hangout with a few selected folks who have worked on > Drill+Calcite to provide feedback on how to resolve the test failures. > Although, at this stage you probably know a lot more about the integration > than many of us. > > -Aman > > On Thu, Aug 31, 2017 at 7:33 AM, Roman Kulyk wrote: > > > Hi, team! > > > > I want to inform that I am working on updating Drill Calcite version to > > 1.13. I forked from Calcite "branch-1.13" and added some commits: > > - DRILL-1455: Add return type-inference strategy for arithmetic operators > > when one of the arguments is ANY type. (was not in Calcite) > > - Add new method to ViewExpander interface to allow passing SchemaRoot. > > (was not in Calcite) > > - Allow a MAP literal type. (was not in Calcite) > > - DRILL-4047: Modify table functions to allow querying a table with > options > > in Drill (was not in Calcite) > > - Drill-specific change: Add back AbstractConverter in RelSet.java ... > > (small changes after a3bc0d8 commit) > > - CALCITE-628 related but not fix the problem: Ensure target traits are > > simple when use Frameworks or RelOptRule.convert() method. (small changes > > after a3bc0d8 commit) > > - Support select * from schema-less table in execution engine like Drill > > (small changes after 9bd7d75 commit) > > > > Only after adding these commits to Calcite, a part of Drill > functionality > > started to work as before. Currently, I am at Drill integration stage. I > am > > fixing Drill unit tests. Progress can be tracked in my branches. > > > > Drill: https://github.com/KulykRoman/drill/commits/CalciteForkRebase_rc1 > > Calcite: > > https://github.com/KulykRoman/incubator-calcite/commits/ > > DrillCalcite1.13.0_rc1 > > > > Currently, I face ~160 errors in java-exec module unit tests. I have > > divided them into seven groups: > > 1) SYSTEM ERROR: ClassCastException > > 2) Unexpected column errors > > 3) Runtime Exceptions > > 4) validation errors > > 5) RpcExceptions (generalized group: some of them should be fixed by the > > previous groups) > > 6) IllegalState errors (generalized group: some of them should be fixed > by > > the previous group) > > 7) Other errors > > > > Best regards, > > Roman Kulyk > > >
Re: Working on upgrade Drill Calcite version
Yes, this is long overdue ! Thanks for working on it Roman. If needed, we can do a separate hangout with a few selected folks who have worked on Drill+Calcite to provide feedback on how to resolve the test failures. Although, at this stage you probably know a lot more about the integration than many of us. -Aman On Thu, Aug 31, 2017 at 7:33 AM, Roman Kulyk wrote: > Hi, team! > > I want to inform that I am working on updating Drill Calcite version to > 1.13. I forked from Calcite "branch-1.13" and added some commits: > - DRILL-1455: Add return type-inference strategy for arithmetic operators > when one of the arguments is ANY type. (was not in Calcite) > - Add new method to ViewExpander interface to allow passing SchemaRoot. > (was not in Calcite) > - Allow a MAP literal type. (was not in Calcite) > - DRILL-4047: Modify table functions to allow querying a table with options > in Drill (was not in Calcite) > - Drill-specific change: Add back AbstractConverter in RelSet.java ... > (small changes after a3bc0d8 commit) > - CALCITE-628 related but not fix the problem: Ensure target traits are > simple when use Frameworks or RelOptRule.convert() method. (small changes > after a3bc0d8 commit) > - Support select * from schema-less table in execution engine like Drill > (small changes after 9bd7d75 commit) > > Only after adding these commits to Calcite, a part of Drill functionality > started to work as before. Currently, I am at Drill integration stage. I am > fixing Drill unit tests. Progress can be tracked in my branches. > > Drill: https://github.com/KulykRoman/drill/commits/CalciteForkRebase_rc1 > Calcite: > https://github.com/KulykRoman/incubator-calcite/commits/ > DrillCalcite1.13.0_rc1 > > Currently, I face ~160 errors in java-exec module unit tests. I have > divided them into seven groups: > 1) SYSTEM ERROR: ClassCastException > 2) Unexpected column errors > 3) Runtime Exceptions > 4) validation errors > 5) RpcExceptions (generalized group: some of them should be fixed by the > previous groups) > 6) IllegalState errors (generalized group: some of them should be fixed by > the previous group) > 7) Other errors > > Best regards, > Roman Kulyk >
[jira] [Created] (DRILL-5756) CONVERT_FROM_JSON on an file with space throws NullPointerException
Volodymyr Vysotskyi created DRILL-5756: -- Summary: CONVERT_FROM_JSON on an file with space throws NullPointerException Key: DRILL-5756 URL: https://issues.apache.org/jira/browse/DRILL-5756 Project: Apache Drill Issue Type: Bug Components: Storage - JSON Reporter: Volodymyr Vysotskyi Query {code:sql} SELECT CONVERT_FROM(columns[1], 'JSON') as col1 FROM dfs.`file1.tbl`; {code} where file1.tbl is a file with single column returns error {noformat} Error: SYSTEM ERROR: NullPointerException Fragment 0:0 [Error Id: 85b7fa7f-2049-4c8b-8c96-805af218f118 on node1:31010] (state=,code=0) {noformat} It should return single row with null value since CONVERT_FROM is executed on the column that does not exists. *Problem description* convert_fromJSON function considers only non-nullable VarCharHolder, but for this case we receives NullableVarCharHolder which has null value. So convert_fromJSON function that takes NullableVarCharHolder as an input parameter should be added. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
Working on upgrade Drill Calcite version
Hi, team! I want to inform that I am working on updating Drill Calcite version to 1.13. I forked from Calcite "branch-1.13" and added some commits: - DRILL-1455: Add return type-inference strategy for arithmetic operators when one of the arguments is ANY type. (was not in Calcite) - Add new method to ViewExpander interface to allow passing SchemaRoot. (was not in Calcite) - Allow a MAP literal type. (was not in Calcite) - DRILL-4047: Modify table functions to allow querying a table with options in Drill (was not in Calcite) - Drill-specific change: Add back AbstractConverter in RelSet.java ... (small changes after a3bc0d8 commit) - CALCITE-628 related but not fix the problem: Ensure target traits are simple when use Frameworks or RelOptRule.convert() method. (small changes after a3bc0d8 commit) - Support select * from schema-less table in execution engine like Drill (small changes after 9bd7d75 commit) Only after adding these commits to Calcite, a part of Drill functionality started to work as before. Currently, I am at Drill integration stage. I am fixing Drill unit tests. Progress can be tracked in my branches. Drill: https://github.com/KulykRoman/drill/commits/CalciteForkRebase_rc1 Calcite: https://github.com/KulykRoman/incubator-calcite/commits/DrillCalcite1.13.0_rc1 Currently, I face ~160 errors in java-exec module unit tests. I have divided them into seven groups: 1) SYSTEM ERROR: ClassCastException 2) Unexpected column errors 3) Runtime Exceptions 4) validation errors 5) RpcExceptions (generalized group: some of them should be fixed by the previous groups) 6) IllegalState errors (generalized group: some of them should be fixed by the previous group) 7) Other errors Best regards, Roman Kulyk
Disable Lilith SOCKET connection by default.
Hi all, I am having troubles when running Drill unit tests on the node where Hiveserver2 service is running. In this case, run hangs in the middle. It is because Lilith and Hiveserver2 use the same port 1. I tried to change the Lilith port in the logback.xml, but we cannot change this port in Lilith UI (https://github.com/huxi/lilith/issues/10). I have no other Ideas, how to avoid this issue without disabling Lilith or stopping HS2. Do you mind if I disable Lilith SOCKET connection by default? -- Kind regards, Volodymyr Vysotskyi