[jira] [Created] (DRILL-5755) TOP_N_SORT operator does not free memory while running

2017-08-30 Thread Boaz Ben-Zvi (JIRA)
Boaz Ben-Zvi created DRILL-5755:
---

 Summary: TOP_N_SORT operator does not free memory while running
 Key: DRILL-5755
 URL: https://issues.apache.org/jira/browse/DRILL-5755
 Project: Apache Drill
  Issue Type: Bug
  Components: Execution - Relational Operators
Affects Versions: 1.11.0
Reporter: Boaz Ben-Zvi


 The TOP_N_SORT operator should keep the top N rows while processing its input, 
and free the memory used to hold all rows below the top N.

For example, the following query uses a table with 125M rows:
{code}
select row_count, sum(row_count), avg(double_field), max(double_rand), 
count(float_rand) from dfs.`/data/tmp` group by row_count order by row_count 
limit 30;
{code}

And failed with an OOM when each of the 3 TOP_N_SORT operators was holding 
about 2.44 GB !! (see attached profile).  It should take far less memory to 
hold 30 rows !!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (DRILL-5754) Test framework does not enforce column orders

2017-08-30 Thread Jinfeng Ni (JIRA)
Jinfeng Ni created DRILL-5754:
-

 Summary: Test framework does not enforce column orders
 Key: DRILL-5754
 URL: https://issues.apache.org/jira/browse/DRILL-5754
 Project: Apache Drill
  Issue Type: Bug
Reporter: Jinfeng Ni


Drill has provided a test framework to submit SQL statements and verify the 
query results against expected results.  For instance 

{code}
final String query = "select n_nationkey, n_regionkey from 
cp.`tpch/nation.parquet` where n_nationkey = 5 and n_regionkey = 0";
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("n_nationkey", "n_regionkey")
.baselineValues(5, 0)
.build()
.run();
{code} 

However, it seems that the test framework only do result match based on column 
name, without enforcing the column order in the output result set. The missing 
of column order verification  may be different from what people typically 
expect, and hide some code bugs. 

The following test specify the expected output columns in a reverse order. 
However, the current test framework would still pass the test.

{code}
final String query = "select n_nationkey, n_regionkey from 
cp.`tpch/nation.parquet` where n_nationkey = 5 and n_regionkey = 0";

testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("n_regionkey", "n_nationkey")
.baselineValues(0, 5)
.build()
.run();
{code}

For now, to check the column order in query output,  people should use 
SchemaTestBuilder.  The problem is SchemaTestBuilder only allows to verify 
schema, without allowing to specify base line values. This means people has to 
write two tests if they want to verify schema & values. 

{code}
 final List> expectedSchema = 
Lists.newArrayList(
Pair.of(SchemaPath.getSimplePath("n_nationkey"), 
Types.required(TypeProtos.MinorType.INT)),
Pair.of(SchemaPath.getSimplePath("n_regionkey"), 
Types.required(TypeProtos.MinorType.INT)));

testBuilder()
.sqlQuery(query)
.schemaBaseLine(expectedSchema)
.go();
{code}

This JIRA is opened to ask for enhance test framework to make it enforce column 
order as well. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] drill pull request #928: Drill 5716: Queue-driven memory allocation

2017-08-30 Thread paul-rogers
GitHub user paul-rogers opened a pull request:

https://github.com/apache/drill/pull/928

Drill 5716: Queue-driven memory allocation

Please see [DRILL-5716](https://issues.apache.org/jira/browse/DRILL-5716) 
for an overview.

This PR provides five commits that implement a method of allocating memory 
to queries based on ZK-based queues. The key concept is that the system has a 
fixed amount of memory. Suppose all queries need the same amount of memory. In 
that case, if we want to run n queries, each gets an amount that is the total 
memory divided by n.

This raises two questions:

1. Queries don't need the same amount of memory, so how do we handle this?
2. How do we know how many queries, n, are to run within Drill?

The solution is to leverage an existing feature: the Zookeeper (ZK) based 
queueing mechanism. This mechanism limits (throttles) the number of active 
queries. If we set the limit to n, then we know that Drill won't run more than 
n queries, and we can apply the above math.

But, not all queries are the same size. The ZK-based throttling mechanism 
addressed this by allowing *two* queues: a "large query" queue and a "small 
query" queue. The idea is that the cluster might run a single large ETL query, 
but at the same time, run five or 10 small interactive queries. The numbers are 
configurable via system options. The existing mechanism uses query cost to 
decide when a query is "small" vs. "large." The user sets a threshold in terms 
of cost. Queries split into the two queues accordingly.

### Resource Manager Layer

The first commit introduces a new resource management layer with a number 
of concepts:

* A pluggable, global resource manager configured at boot time. The code 
offers three versions: the null manager (no throttling), the ZK-based one 
discussed above, and a test-only one used in unit tests that uses a simple 
in-process queue.
* A Per-query resource allocator created by the resource manager to work 
with a single query through the query lifecycle.

Since the default manager does no queueing, queues themselves are internal 
to the resource manager. The model assumes:

* A query queue that manages the Drillbit's view into the distributed queue 
state.
* A queue "lease" that represents a grant of a single slot within a queue 
to run a query.

The design handles the current resource managers, and allows creating 
custom schedulers as needed for specific distributions or applications.

Prior to this PR, the Foreman worked with the ZK-based queues directly 
inline in the Foreman code. With the above framework in place, this PR 
refactors the Foreman to extract the ZK-based queue code into a resource 
manager implementation. Functionality is identical, just the location of the 
code moves to allow a cleaner design.

One interesting design issue is worth pointing out. The user can enable and 
disable ZK-queues at run time. However, the resource manager is global. To make 
this work, a “dynamic” resource manager wraps the ZK-based implementation. 
The dynamic version periodically checks system options to see if the ZK-based 
version is enabled. The dynamic version calls to either the ZK-based version or 
the default (non-throttled) version depending on what it finds in the system 
options.

When a switch occurs, queries already queued will stay queued. Only new 
queries use the new setting. This provides a graceful transition to/from 
throttled mode.

### Specifics of Memory Planning

The core of this change is a new memory planner. Here's how it works.

* The user enables queues using the existing `exec.queue.enable` session 
option.
* The user decides the number of "small" vs. "large" queries to run 
concurrently by setting `exec.queue.small` and `exec.queue.large` respectively. 
(The original numbers for these settings were far to high, this PR changes the 
defaults to much lower values.)
* Decide how to split memory across the two queues by specifying the 
`exec.queue.memory_ratio` value. The default is 10, which means that large 
queries get 10 units of memory while small queries get 1 unit.
* Decide on the planner cost threshold that separates "small" vs. "large" 
queries using `exec.queue.threshold`.
* Decide on the maximum queue wait time by setting 
`exec.queue.timeout_millis`. (Default is five minutes.)

The memory planner then does the following:

* Determine the number of “memory units” as the number of concurrent 
small queries + the number of concurrent large queries * the memory ratio.
* Suppose this is a small query. Compute actual memory as system memory / 
number of memory units.
* Traverse the plan. Find the buffering operators grouped by node. 
(Buffering operators, in this release, are sort and hash agg.)
* Suppose we find that, on node X, we have two 

[jira] [Created] (DRILL-5753) Managed External Sort: One or more nodes ran out of memory while executing the query.

2017-08-30 Thread Robert Hou (JIRA)
Robert Hou created DRILL-5753:
-

 Summary: Managed External Sort: One or more nodes ran out of 
memory while executing the query.
 Key: DRILL-5753
 URL: https://issues.apache.org/jira/browse/DRILL-5753
 Project: Apache Drill
  Issue Type: Bug
  Components: Execution - Relational Operators
Affects Versions: 1.11.0
Reporter: Robert Hou
Assignee: Paul Rogers
 Fix For: 1.12.0


The query is:
{noformat}
ALTER SESSION SET `exec.sort.disable_managed` = false;
alter session set `planner.memory.max_query_memory_per_node` = 1252428800;
select count(*) from (
  select * from (
select s1.type type, flatten(s1.rms.rptd) rptds, s1.rms, s1.uid 
from (
  select d.type type, d.uid uid, flatten(d.map.rm) rms from 
dfs.`/drill/testdata/resource-manager/nested-large.json` d order by d.uid
) s1
  ) s2
  order by s2.rms.mapid, s2.rptds.a, s2.rptds.do_not_exist
);
ALTER SESSION SET `exec.sort.disable_managed` = true;
alter session set `planner.memory.max_query_memory_per_node` = 2147483648;
{noformat}

The stack trace is:
{noformat}
2017-08-30 03:35:10,479 [BitServer-5] DEBUG o.a.drill.exec.work.foreman.Foreman 
- 26596b4e-9883-7dc2-6275-37134f7d63be: State change requested RUNNING --> 
FAILED
org.apache.drill.common.exceptions.UserRemoteException: RESOURCE ERROR: One or 
more nodes ran out of memory while executing the query.

Unable to allocate buffer of size 4194304 due to memory limit. Current 
allocation: 43960640
Fragment 2:9

[Error Id: f58210a2-7569-42d0-8961-8c7e42c7fea3 on atsqa6c80.qa.lab:31010]

  (org.apache.drill.exec.exception.OutOfMemoryException) Unable to allocate 
buffer of size 4194304 due to memory limit. Current allocation: 43960640
org.apache.drill.exec.memory.BaseAllocator.buffer():238
org.apache.drill.exec.memory.BaseAllocator.buffer():213
org.apache.drill.exec.vector.BigIntVector.reAlloc():252
org.apache.drill.exec.vector.BigIntVector$Mutator.setSafe():452
org.apache.drill.exec.vector.RepeatedBigIntVector$Mutator.addSafe():355
org.apache.drill.exec.vector.RepeatedBigIntVector.copyFromSafe():220

org.apache.drill.exec.vector.RepeatedBigIntVector$TransferImpl.copyValueSafe():202

org.apache.drill.exec.vector.complex.MapVector$MapTransferPair.copyValueSafe():225

org.apache.drill.exec.vector.complex.MapVector$MapTransferPair.copyValueSafe():225
org.apache.drill.exec.vector.complex.MapVector.copyFromSafe():82
org.apache.drill.exec.test.generated.PriorityQueueCopierGen1466.doCopy():47
org.apache.drill.exec.test.generated.PriorityQueueCopierGen1466.next():77

org.apache.drill.exec.physical.impl.xsort.managed.PriorityQueueCopierWrapper$BatchMerger.next():267

org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch.load():374

org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch.innerNext():303
org.apache.drill.exec.record.AbstractRecordBatch.next():164
org.apache.drill.exec.record.AbstractRecordBatch.next():119
org.apache.drill.exec.record.AbstractRecordBatch.next():109
org.apache.drill.exec.record.AbstractSingleRecordBatch.innerNext():51

org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch.innerNext():93
org.apache.drill.exec.record.AbstractRecordBatch.next():164
org.apache.drill.exec.physical.impl.BaseRootExec.next():105

org.apache.drill.exec.physical.impl.SingleSenderCreator$SingleSenderRootExec.innerNext():92
org.apache.drill.exec.physical.impl.BaseRootExec.next():95
org.apache.drill.exec.work.fragment.FragmentExecutor$1.run():234
org.apache.drill.exec.work.fragment.FragmentExecutor$1.run():227
java.security.AccessController.doPrivileged():-2
javax.security.auth.Subject.doAs():415
org.apache.hadoop.security.UserGroupInformation.doAs():1595
org.apache.drill.exec.work.fragment.FragmentExecutor.run():227
org.apache.drill.common.SelfCleaningRunnable.run():38
java.util.concurrent.ThreadPoolExecutor.runWorker():1145
java.util.concurrent.ThreadPoolExecutor$Worker.run():615
java.lang.Thread.run():744

at 
org.apache.drill.exec.work.foreman.QueryManager$1.statusUpdate(QueryManager.java:521)
 [drill-java-exec-1.12.0-SNAPSHOT.jar:1.12.0-SNAPSHOT]
at 
org.apache.drill.exec.rpc.control.WorkEventBus.statusUpdate(WorkEventBus.java:71)
 [drill-java-exec-1.12.0-SNAPSHOT.jar:1.12.0-SNAPSHOT]
at 
org.apache.drill.exec.work.batch.ControlMessageHandler.handle(ControlMessageHandler.java:94)
 [drill-java-exec-1.12.0-SNAPSHOT.jar:1.12.0-SNAPSHOT]
at 
org.apache.drill.exec.work.batch.ControlMessageHandler.handle(ControlMessageHandler.java:55)
 [drill-java-exec-1.12.0-SNAPSHOT.jar:1.12.0-SNAPSHOT]
at org.apache.drill.exec.rpc.BasicServer.handle(BasicServer.java:157) 
[drill-rpc-1.12.0-SNAPSHOT.jar:1.12.0-SNAPSHOT]
at 

[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r136188895
  
--- Diff: 
exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullVector.java 
---
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
**/
+
+package org.apache.drill.exec.vector;
+
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+
+/** UntypedNullVector is to represent a value vector with {@link 
org.apache.drill.common.types.MinorType.NULL}
+ *  All values in the vector represent two semantic implications: 1) the 
value is unknown, 2) the type is unknown.
+ *  Because of this, we only have to keep track of the number of values in 
value vector,
+ *  and there is no allocated buffer to back up this value vector. 
Therefore, the majority of
+ *  methods in this class is either no-op, or throws {@link 
UnsupportedOperationException}.
+ *
+ */
+public final class UntypedNullVector extends BaseDataValueVector 
implements FixedWidthVector {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UntypedNullVector.class);
+
+  /**
+   * Width of each fixed-width value.
+   */
+  public static final int VALUE_WIDTH = 0;
+
+  private final Accessor accessor = new Accessor();
+  private final Mutator mutator = new Mutator();
+  private int valueCount ;
+
+  public UntypedNullVector(MaterializedField field, BufferAllocator 
allocator) {
+super(field, allocator);
+valueCount = 0;
+  }
+
+  @Override
+  public FieldReader getReader() { throw new 
UnsupportedOperationException(); }
+
+  @Override
+  public int getBufferSizeFor(final int valueCount) {
+return 0;
+  }
+
+  @Override
+  public int getValueCapacity(){
+return Character.MAX_VALUE;
+  }
+
+  @Override
+  public Accessor getAccessor() { return accessor; }
+
+  @Override
+  public Mutator getMutator() { return mutator; }
+
+  @Override
+  public void setInitialCapacity(final int valueCount) {
+  }
+
+  @Override
+  public void allocateNew() {
+  }
+
+  @Override
+  public boolean allocateNewSafe() {
+return true;
+  }
+
+  @Override
+  public void allocateNew(final int valueCount) {
+  }
+
+  @Override
+  public void reset() {
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void zeroVector() {
+  }
+
+  @Override
+  public void load(SerializedField metadata, DrillBuf buffer) {
+
Preconditions.checkArgument(this.field.getPath().equals(metadata.getNamePart().getName()),
+"The field %s doesn't match the provided metadata %s.", 
this.field, metadata);
+final int actualLength = metadata.getBufferLength();
+final int valueCount = metadata.getValueCount();
+final int expectedLength = valueCount * VALUE_WIDTH;
+assert actualLength == expectedLength : String.format("Expected to 
load %d bytes but actually loaded %d bytes", expectedLength, actualLength);
+
+this.valueCount = valueCount;
+  }
+
+  @Override
+  public TransferPair getTransferPair(BufferAllocator allocator){
+return new TransferImpl(getField(), allocator);
+  }
+
+  @Override
+  public TransferPair getTransferPair(String ref, BufferAllocator 
allocator){
+return new TransferImpl(getField().withPath(ref), allocator);
+  }
+
+  @Override
+  public TransferPair makeTransferPair(ValueVector to) {
+

[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r136188791
  
--- Diff: 
exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullVector.java 
---
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
**/
+
+package org.apache.drill.exec.vector;
+
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+
+/** UntypedNullVector is to represent a value vector with {@link 
org.apache.drill.common.types.MinorType.NULL}
+ *  All values in the vector represent two semantic implications: 1) the 
value is unknown, 2) the type is unknown.
+ *  Because of this, we only have to keep track of the number of values in 
value vector,
+ *  and there is no allocated buffer to back up this value vector. 
Therefore, the majority of
+ *  methods in this class is either no-op, or throws {@link 
UnsupportedOperationException}.
+ *
+ */
+public final class UntypedNullVector extends BaseDataValueVector 
implements FixedWidthVector {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UntypedNullVector.class);
+
+  /**
+   * Width of each fixed-width value.
+   */
+  public static final int VALUE_WIDTH = 0;
+
+  private final Accessor accessor = new Accessor();
+  private final Mutator mutator = new Mutator();
+  private int valueCount ;
+
+  public UntypedNullVector(MaterializedField field, BufferAllocator 
allocator) {
+super(field, allocator);
+valueCount = 0;
+  }
+
+  @Override
+  public FieldReader getReader() { throw new 
UnsupportedOperationException(); }
+
+  @Override
+  public int getBufferSizeFor(final int valueCount) {
+return 0;
+  }
+
+  @Override
+  public int getValueCapacity(){
+return Character.MAX_VALUE;
+  }
+
+  @Override
+  public Accessor getAccessor() { return accessor; }
+
+  @Override
+  public Mutator getMutator() { return mutator; }
+
+  @Override
+  public void setInitialCapacity(final int valueCount) {
+  }
+
+  @Override
+  public void allocateNew() {
+  }
+
+  @Override
+  public boolean allocateNewSafe() {
+return true;
+  }
+
+  @Override
+  public void allocateNew(final int valueCount) {
+  }
+
+  @Override
+  public void reset() {
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void zeroVector() {
+  }
+
+  @Override
+  public void load(SerializedField metadata, DrillBuf buffer) {
+
Preconditions.checkArgument(this.field.getPath().equals(metadata.getNamePart().getName()),
+"The field %s doesn't match the provided metadata %s.", 
this.field, metadata);
+final int actualLength = metadata.getBufferLength();
+final int valueCount = metadata.getValueCount();
+final int expectedLength = valueCount * VALUE_WIDTH;
+assert actualLength == expectedLength : String.format("Expected to 
load %d bytes but actually loaded %d bytes", expectedLength, actualLength);
+
+this.valueCount = valueCount;
+  }
+
+  @Override
+  public TransferPair getTransferPair(BufferAllocator allocator){
+return new TransferImpl(getField(), allocator);
+  }
+
+  @Override
+  public TransferPair getTransferPair(String ref, BufferAllocator 
allocator){
+return new TransferImpl(getField().withPath(ref), allocator);
+  }
+
+  @Override
+  public TransferPair makeTransferPair(ValueVector to) {
+

[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r136188197
  
--- Diff: 
exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullVector.java 
---
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
**/
+
+package org.apache.drill.exec.vector;
+
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+
+/** UntypedNullVector is to represent a value vector with {@link 
org.apache.drill.common.types.MinorType.NULL}
+ *  All values in the vector represent two semantic implications: 1) the 
value is unknown, 2) the type is unknown.
+ *  Because of this, we only have to keep track of the number of values in 
value vector,
+ *  and there is no allocated buffer to back up this value vector. 
Therefore, the majority of
+ *  methods in this class is either no-op, or throws {@link 
UnsupportedOperationException}.
+ *
+ */
+public final class UntypedNullVector extends BaseDataValueVector 
implements FixedWidthVector {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UntypedNullVector.class);
+
+  /**
+   * Width of each fixed-width value.
+   */
+  public static final int VALUE_WIDTH = 0;
+
+  private final Accessor accessor = new Accessor();
+  private final Mutator mutator = new Mutator();
+  private int valueCount ;
+
+  public UntypedNullVector(MaterializedField field, BufferAllocator 
allocator) {
+super(field, allocator);
+valueCount = 0;
+  }
+
+  @Override
+  public FieldReader getReader() { throw new 
UnsupportedOperationException(); }
+
+  @Override
+  public int getBufferSizeFor(final int valueCount) {
+return 0;
+  }
+
+  @Override
+  public int getValueCapacity(){
+return Character.MAX_VALUE;
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r136187915
  
--- Diff: 
exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullVector.java 
---
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
**/
+
+package org.apache.drill.exec.vector;
+
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+
+/** UntypedNullVector is to represent a value vector with {@link 
org.apache.drill.common.types.MinorType.NULL}
+ *  All values in the vector represent two semantic implications: 1) the 
value is unknown, 2) the type is unknown.
+ *  Because of this, we only have to keep track of the number of values in 
value vector,
+ *  and there is no allocated buffer to back up this value vector. 
Therefore, the majority of
+ *  methods in this class is either no-op, or throws {@link 
UnsupportedOperationException}.
+ *
+ */
+public final class UntypedNullVector extends BaseDataValueVector 
implements FixedWidthVector {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UntypedNullVector.class);
+
+  /**
+   * Width of each fixed-width value.
+   */
+  public static final int VALUE_WIDTH = 0;
+
+  private final Accessor accessor = new Accessor();
+  private final Mutator mutator = new Mutator();
+  private int valueCount ;
--- End diff --

done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r136187789
  
--- Diff: 
exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullHolder.java 
---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.vector;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.expr.holders.ValueHolder;
+
+public class UntypedNullHolder implements ValueHolder {
+  public static final TypeProtos.MajorType TYPE = 
Types.optional(TypeProtos.MinorType.NULL);
+
+  public TypeProtos.MajorType getType() {return TYPE;}
+
+  public static final int WIDTH = 0;
+
+  public int isSet = 1;
+
+  @Deprecated
+  public int hashCode(){
+throw new UnsupportedOperationException();
+  }
--- End diff --

See response for `toString()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r136187804
  
--- Diff: 
exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullHolder.java 
---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.vector;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.expr.holders.ValueHolder;
+
+public class UntypedNullHolder implements ValueHolder {
+  public static final TypeProtos.MajorType TYPE = 
Types.optional(TypeProtos.MinorType.NULL);
+
+  public TypeProtos.MajorType getType() {return TYPE;}
+
+  public static final int WIDTH = 0;
+
+  public int isSet = 1;
+
+  @Deprecated
+  public int hashCode(){
+throw new UnsupportedOperationException();
+  }
+
+  /*
+   * Reason for deprecation is that ValueHolders are potential scalar 
replacements
+   * and hence we don't want any methods to be invoked on them.
+   */
+  @Deprecated
+  public String toString(){
+throw new UnsupportedOperationException();
+  }
+
+
+
--- End diff --

done.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r136187598
  
--- Diff: 
exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullHolder.java 
---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.vector;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.expr.holders.ValueHolder;
+
+public class UntypedNullHolder implements ValueHolder {
+  public static final TypeProtos.MajorType TYPE = 
Types.optional(TypeProtos.MinorType.NULL);
+
+  public TypeProtos.MajorType getType() {return TYPE;}
+
+  public static final int WIDTH = 0;
+
+  public int isSet = 1;
+
+  @Deprecated
+  public int hashCode(){
+throw new UnsupportedOperationException();
+  }
+
+  /*
+   * Reason for deprecation is that ValueHolders are potential scalar 
replacements
+   * and hence we don't want any methods to be invoked on them.
+   */
+  @Deprecated
+  public String toString(){
+throw new UnsupportedOperationException();
--- End diff --

This applies to all the value holder class. See [1].

My understanding is it's for the benefit of potential scalar replacements 
in byte-code fixup during execution. We may argue we do not have to leverage 
scalar replacement with JDK8. But before we know for sure, it's better to keep 
it inline with the other value holder classes. 


1. 
https://github.com/apache/drill/blob/master/exec/vector/src/main/codegen/templates/ValueHolders.java#L97-L109


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Dedupping json records based on nested value

2017-08-30 Thread François Méthot
Hi,

Congrat for the 1.11 release, we are happy to have our suggestion
implemented in the new release (automatic HDFS block size for parquet
files).

It seems like we are pushing the limit of Drill with new type query...(I am
learning new SQL trick in the process)

We are trying to aggregate a json document based on a nested value.

Document looks like this:

{
 "field1" : {
 "f1_a" : "infoa",
 "f1_b" : "infob"
  },
 "field2" : "very long string",
 "field3" : {
 "f3_a" : "infoc",
 "f3_b" : "infod",
 "f4_c" : {
  
  }
  },
  "field4" : {
 "key_data" : "String to aggregate on",
 "f4_b" : "a string2",
 "f4_c" : {
   complex structure...
  }
  }
}


We want a first, or last (or any) occurrence of field1, field2, field3 and
field4 group by field4.key_data;


Unfortunately min, max function does not support json complex column
(MapHolder). Therefor group by type of queries do not work.

We tried a window function like this
create table  as (
  select first_value(tb1.field1) over (partition by tb1.field4.key_data) as
field1,
   first_value(tb1.field2) over (partition by tb1.field4.key_data) as
field2,
   first_value(tb1.field3) over (partition by tb1.field4.key_data) as
field3,
   first_value(tb1.field4) over (partition by tb1.field4.key_data) as
field4
  from dfs.`doc.json` tb1;
)

We get IndexOutOfBoundException.

We got better success with:
create table  as (
 select * from
  (select tb1.*,
  row_number() over (partition by tb1.field4.key_data) as row_num
   from  dfs.`doc.json` tb1
  ) t
 where t.row_num = 1
)

This works on single json file or with multiple file in a session
configured with planner.width_max_per_node=1.

As soon as we put more than 1 thread per query, We get
IndexOutOfBoundException.
This was tried on 1.10 and 1.11.
It looks like a bug.


Would you have other suggestion to bypass that issue?
Is there an existing aggregation function (to work with group by) that
would return the first,last, or random MapHolder column from json document?
If not, I am thinking of implementing one, would there be an example on how
to Clone a MapHolder within a function? (pretty sure I can't assign "in"
param to output within a function)


Thank you for your time reading this.
any suggestions to try are welcome

Francois


[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r136186438
  
--- Diff: 
exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullHolder.java 
---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.vector;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.expr.holders.ValueHolder;
+
+public class UntypedNullHolder implements ValueHolder {
+  public static final TypeProtos.MajorType TYPE = 
Types.optional(TypeProtos.MinorType.NULL);
+
+  public TypeProtos.MajorType getType() {return TYPE;}
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r136185442
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
 ---
@@ -125,4 +131,19 @@ public BatchSchema getSchema() {
 
   protected abstract boolean setupNewSchema() throws SchemaChangeException;
   protected abstract IterOutcome doWork();
+
+  /**
+   * Default behavior to handle fast NONE (incoming's first next() return 
NONE, in stead of OK_NEW_SCHEMA):
+   * FAST NONE could happen when the underneath Scan operators do not 
produce any batch with schema.
+   *
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r136185139
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
 ---
@@ -61,6 +63,10 @@ public IterOutcome innerNext() {
 }
 switch (upstream) {
 case NONE:
+  if (state == BatchState.FIRST) {
+return handleFastNone();
--- End diff --

Changed to `handleNullInput`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r136184973
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
 ---
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+
+public abstract class AbstractBinaryRecordBatch extends  AbstractRecordBatch {
+  protected final RecordBatch left;
+  protected final RecordBatch right;
+
+  // state (IterOutcome) of the left input
+  protected IterOutcome leftUpstream = IterOutcome.NONE;
+
+  // state (IterOutcome) of the right input
+  protected IterOutcome rightUpstream = IterOutcome.NONE;
--- End diff --

Better not wrap the two sides in a wrapper class, since the sub-class has 
to differentiate the two sides. (for instance in HashJoin, left is probe while 
right is build)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r136182353
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
 ---
@@ -35,18 +35,43 @@
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlKind;
 
+/**
+ * A physical Prel node for Project operator.
+ */
 public class ProjectPrel extends DrillProjectRelBase implements Prel{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ProjectPrel.class);
 
+  private final boolean outputProj;
 
   public ProjectPrel(RelOptCluster cluster, RelTraitSet traits, RelNode 
child, List exps,
   RelDataType rowType) {
+this(cluster, traits, child, exps, rowType, false);
+  }
+
+  /**
+   * Constructor for ProjectPrel.
+   * @param cluster
+   * @param traits traits of ProjectPrel node
+   * @param child  input
+   * @param exps   list of RexNode, representing expressions of projection.
+   * @param rowType output rowType of projection expression.
+   * @param outputProj true if ProjectPrel is inserted by {@link 
org.apache.drill.exec.planner.physical.visitor.TopProjectVisitor}
+   *   Such top Project operator does the following 
processing, before the result was presented to Screen/Writer
+   *   1) ensure final output field names are preserved,
+   *   2) handle cases where input does not return any 
batch (a fast NONE) (see ProjectRecordBatch.handleFastNone() method)
+   *   3) handle cases where expressions in upstream 
operator were evaluated to NULL type
--- End diff --

Done. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r136181920
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
 ---
@@ -160,12 +161,7 @@ public RelOptCost computeSelfCost(final RelOptPlanner 
planner, RelMetadataQuery
 final ScanStats stats = groupScan.getScanStats(settings);
 int columnCount = getRowType().getFieldCount();
 double ioCost = 0;
-boolean isStarQuery = Iterables.tryFind(getRowType().getFieldNames(), 
new Predicate() {
-  @Override
-  public boolean apply(String input) {
-return Preconditions.checkNotNull(input).equals("*");
-  }
-}).isPresent();
+boolean isStarQuery = AbstractRecordReader.isStarQuery(columns);
--- End diff --

Move to `org.apache.drill.exec.util.Utilities`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


gitbox?

2017-08-30 Thread Vlad Rozov

Hi,

As I am new to Drill, I don't know if migration from "Git WiP" 
(https://git-wip-us.apache.org) to "Github Dual Master" 
(https://gitbox.apache.org) was already discussed by the community, but 
from my Apache Apex experience I would recommend to consider migrating 
Drill ASF repos to the gitbox. Such move will give committers write 
access to the Drill repository on Github with all the perks that Github 
provides.


Thank you,

Vlad


[GitHub] drill issue #919: DRILL-5721: Query with only root fragment and no non-root ...

2017-08-30 Thread sohami
Github user sohami commented on the issue:

https://github.com/apache/drill/pull/919
  
@parthchandra - thanks for the review!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill issue #919: DRILL-5721: Query with only root fragment and no non-root ...

2017-08-30 Thread parthchandra
Github user parthchandra commented on the issue:

https://github.com/apache/drill/pull/919
  
+1 LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #919: DRILL-5721: Query with only root fragment and no no...

2017-08-30 Thread parthchandra
Github user parthchandra commented on a diff in the pull request:

https://github.com/apache/drill/pull/919#discussion_r136161134
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---
@@ -1073,26 +1070,22 @@ public QueryId getQueryId() {
*/
   private void setupRootFragment(final PlanFragment rootFragment, final 
FragmentRoot rootOperator)
   throws ExecutionSetupException {
-@SuppressWarnings("resource")
 final FragmentContext rootContext = new 
FragmentContext(drillbitContext, rootFragment, queryContext,
 initiatingClient, 
drillbitContext.getFunctionImplementationRegistry());
-@SuppressWarnings("resource")
-final IncomingBuffers buffers = new IncomingBuffers(rootFragment, 
rootContext);
-rootContext.setBuffers(buffers);
-
-queryManager.addFragmentStatusTracker(rootFragment, true);
-
 final ControlTunnel tunnel = 
drillbitContext.getController().getTunnel(queryContext.getCurrentEndpoint());
+final FragmentStatusReporter statusReporter = new 
FragmentStatusReporter(rootContext, tunnel);
 final FragmentExecutor rootRunner = new FragmentExecutor(rootContext, 
rootFragment,
-new FragmentStatusReporter(rootContext, tunnel),
-rootOperator);
-final RootFragmentManager fragmentManager = new 
RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
+statusReporter, rootOperator);
 
-if (buffers.isDone()) {
+queryManager.addFragmentStatusTracker(rootFragment, true);
+
+// FragmentManager is setting buffer for FragmentContext
+if (rootContext.isBuffersDone()) {
--- End diff --

Yes I see it now. Sorry I missed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r136159563
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
 ---
@@ -130,562 +145,248 @@ public IterOutcome innerNext() {
   }
 
   @Override
-  public WritableBatch getWritableBatch() {
-return WritableBatch.get(this);
+  public int getRecordCount() {
+return recordCount;
   }
 
-  private void setValueCount(int count) {
-for (ValueVector v : allocationVectors) {
-  ValueVector.Mutator m = v.getMutator();
-  m.setValueCount(count);
-}
-  }
 
-  private boolean doAlloc() {
-for (ValueVector v : allocationVectors) {
-  try {
-AllocationHelper.allocateNew(v, current.getRecordCount());
-  } catch (OutOfMemoryException ex) {
-return false;
-  }
+  @SuppressWarnings("resource")
+  private IterOutcome doWork(RecordBatch inputBatch, boolean newSchema) 
throws ClassTransformationException, IOException, SchemaChangeException {
+if (inputBatch.getSchema().getFieldCount() != 
container.getSchema().getFieldCount()) {
+  // wrong.
 }
-return true;
-  }
 
-  @SuppressWarnings("resource")
-  private IterOutcome doWork() throws ClassTransformationException, 
IOException, SchemaChangeException {
-if (allocationVectors != null) {
-  for (ValueVector v : allocationVectors) {
-v.clear();
-  }
+if (newSchema) {
+  createUnionAller(inputBatch);
 }
 
-allocationVectors = Lists.newArrayList();
-transfers.clear();
+container.zeroVectors();
 
-// If both sides of Union-All are empty
-if(unionAllInput.isBothSideEmpty()) {
-  for(int i = 0; i < outputFields.size(); ++i) {
-final String colName = outputFields.get(i).getPath();
-final MajorType majorType = MajorType.newBuilder()
-.setMinorType(MinorType.INT)
-.setMode(DataMode.OPTIONAL)
-.build();
-
-MaterializedField outputField = MaterializedField.create(colName, 
majorType);
-ValueVector vv = container.addOrGet(outputField, callBack);
-allocationVectors.add(vv);
-  }
+for (final ValueVector v : this.allocationVectors) {
+  AllocationHelper.allocateNew(v, inputBatch.getRecordCount());
+}
 
-  container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+recordCount = unionall.unionRecords(0, inputBatch.getRecordCount(), 0);
+for (final ValueVector v : allocationVectors) {
+  final ValueVector.Mutator m = v.getMutator();
+  m.setValueCount(recordCount);
+}
+
+if (callBack.getSchemaChangedAndReset()) {
   return IterOutcome.OK_NEW_SCHEMA;
+} else {
+  return IterOutcome.OK;
 }
+  }
+
+  private void createUnionAller(RecordBatch inputBatch) throws 
ClassTransformationException, IOException, SchemaChangeException {
+transfers.clear();
+allocationVectors.clear();;
 
 final ClassGenerator cg = 
CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, 
context.getFunctionRegistry(), context.getOptions());
 cg.getCodeGenerator().plainJavaCapable(true);
 // Uncomment out this line to debug the generated code.
-//cg.getCodeGenerator().saveCodeForDebugging(true);
+//cg.getCodeGenerator().saveCodeForDebugging(true);
+
 int index = 0;
-for(VectorWrapper vw : current) {
-   ValueVector vvIn = vw.getValueVector();
-  // get the original input column names
-  SchemaPath inputPath = 
SchemaPath.getSimplePath(vvIn.getField().getPath());
-  // get the renamed column names
-  SchemaPath outputPath = 
SchemaPath.getSimplePath(outputFields.get(index).getPath());
+for(VectorWrapper vw : inputBatch) {
+  ValueVector vvIn = vw.getValueVector();
+  ValueVector vvOut = container.getValueVector(index).getValueVector();
 
   final ErrorCollector collector = new ErrorCollectorImpl();
   // According to input data names, Minortypes, Datamodes, choose to
   // transfer directly,
   // rename columns or
   // cast data types (Minortype or DataMode)
-  if (hasSameTypeAndMode(outputFields.get(index), 
vw.getValueVector().getField())) {
+  if (hasSameTypeAndMode(container.getSchema().getColumn(index), 
vvIn.getField())
+  && vvIn.getField().getType().getMinorType() != 
TypeProtos.MinorType.MAP // Per DRILL-5521, existing bug for map transfer
+  ) {
 // Transfer column
-
-MajorType outputFieldType = 

[GitHub] drill pull request #919: DRILL-5721: Query with only root fragment and no no...

2017-08-30 Thread sohami
Github user sohami commented on a diff in the pull request:

https://github.com/apache/drill/pull/919#discussion_r136157876
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---
@@ -1073,26 +1070,22 @@ public QueryId getQueryId() {
*/
   private void setupRootFragment(final PlanFragment rootFragment, final 
FragmentRoot rootOperator)
   throws ExecutionSetupException {
-@SuppressWarnings("resource")
 final FragmentContext rootContext = new 
FragmentContext(drillbitContext, rootFragment, queryContext,
 initiatingClient, 
drillbitContext.getFunctionImplementationRegistry());
-@SuppressWarnings("resource")
-final IncomingBuffers buffers = new IncomingBuffers(rootFragment, 
rootContext);
-rootContext.setBuffers(buffers);
-
-queryManager.addFragmentStatusTracker(rootFragment, true);
-
 final ControlTunnel tunnel = 
drillbitContext.getController().getTunnel(queryContext.getCurrentEndpoint());
+final FragmentStatusReporter statusReporter = new 
FragmentStatusReporter(rootContext, tunnel);
 final FragmentExecutor rootRunner = new FragmentExecutor(rootContext, 
rootFragment,
-new FragmentStatusReporter(rootContext, tunnel),
-rootOperator);
-final RootFragmentManager fragmentManager = new 
RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
+statusReporter, rootOperator);
 
-if (buffers.isDone()) {
+queryManager.addFragmentStatusTracker(rootFragment, true);
+
+// FragmentManager is setting buffer for FragmentContext
+if (rootContext.isBuffersDone()) {
--- End diff --

Sorry about the confusion. It was moved out of else in second commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r136157330
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
 ---
@@ -130,562 +145,248 @@ public IterOutcome innerNext() {
   }
 
   @Override
-  public WritableBatch getWritableBatch() {
-return WritableBatch.get(this);
+  public int getRecordCount() {
+return recordCount;
   }
 
-  private void setValueCount(int count) {
-for (ValueVector v : allocationVectors) {
-  ValueVector.Mutator m = v.getMutator();
-  m.setValueCount(count);
-}
-  }
 
-  private boolean doAlloc() {
-for (ValueVector v : allocationVectors) {
-  try {
-AllocationHelper.allocateNew(v, current.getRecordCount());
-  } catch (OutOfMemoryException ex) {
-return false;
-  }
+  @SuppressWarnings("resource")
+  private IterOutcome doWork(RecordBatch inputBatch, boolean newSchema) 
throws ClassTransformationException, IOException, SchemaChangeException {
+if (inputBatch.getSchema().getFieldCount() != 
container.getSchema().getFieldCount()) {
+  // wrong.
 }
-return true;
-  }
 
-  @SuppressWarnings("resource")
-  private IterOutcome doWork() throws ClassTransformationException, 
IOException, SchemaChangeException {
-if (allocationVectors != null) {
-  for (ValueVector v : allocationVectors) {
-v.clear();
-  }
+if (newSchema) {
+  createUnionAller(inputBatch);
 }
 
-allocationVectors = Lists.newArrayList();
-transfers.clear();
+container.zeroVectors();
 
-// If both sides of Union-All are empty
-if(unionAllInput.isBothSideEmpty()) {
-  for(int i = 0; i < outputFields.size(); ++i) {
-final String colName = outputFields.get(i).getPath();
-final MajorType majorType = MajorType.newBuilder()
-.setMinorType(MinorType.INT)
-.setMode(DataMode.OPTIONAL)
-.build();
-
-MaterializedField outputField = MaterializedField.create(colName, 
majorType);
-ValueVector vv = container.addOrGet(outputField, callBack);
-allocationVectors.add(vv);
-  }
+for (final ValueVector v : this.allocationVectors) {
+  AllocationHelper.allocateNew(v, inputBatch.getRecordCount());
+}
 
-  container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+recordCount = unionall.unionRecords(0, inputBatch.getRecordCount(), 0);
+for (final ValueVector v : allocationVectors) {
+  final ValueVector.Mutator m = v.getMutator();
+  m.setValueCount(recordCount);
+}
+
+if (callBack.getSchemaChangedAndReset()) {
   return IterOutcome.OK_NEW_SCHEMA;
+} else {
+  return IterOutcome.OK;
 }
+  }
+
+  private void createUnionAller(RecordBatch inputBatch) throws 
ClassTransformationException, IOException, SchemaChangeException {
+transfers.clear();
+allocationVectors.clear();;
--- End diff --

I thought the more `;` , the better.  :-)

Removed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r136157172
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
 ---
@@ -130,562 +145,248 @@ public IterOutcome innerNext() {
   }
 
   @Override
-  public WritableBatch getWritableBatch() {
-return WritableBatch.get(this);
+  public int getRecordCount() {
+return recordCount;
   }
 
-  private void setValueCount(int count) {
-for (ValueVector v : allocationVectors) {
-  ValueVector.Mutator m = v.getMutator();
-  m.setValueCount(count);
-}
-  }
 
-  private boolean doAlloc() {
-for (ValueVector v : allocationVectors) {
-  try {
-AllocationHelper.allocateNew(v, current.getRecordCount());
-  } catch (OutOfMemoryException ex) {
-return false;
-  }
+  @SuppressWarnings("resource")
+  private IterOutcome doWork(RecordBatch inputBatch, boolean newSchema) 
throws ClassTransformationException, IOException, SchemaChangeException {
+if (inputBatch.getSchema().getFieldCount() != 
container.getSchema().getFieldCount()) {
+  // wrong.
 }
-return true;
-  }
 
-  @SuppressWarnings("resource")
-  private IterOutcome doWork() throws ClassTransformationException, 
IOException, SchemaChangeException {
-if (allocationVectors != null) {
-  for (ValueVector v : allocationVectors) {
-v.clear();
-  }
+if (newSchema) {
+  createUnionAller(inputBatch);
 }
 
-allocationVectors = Lists.newArrayList();
-transfers.clear();
+container.zeroVectors();
 
-// If both sides of Union-All are empty
-if(unionAllInput.isBothSideEmpty()) {
-  for(int i = 0; i < outputFields.size(); ++i) {
-final String colName = outputFields.get(i).getPath();
-final MajorType majorType = MajorType.newBuilder()
-.setMinorType(MinorType.INT)
-.setMode(DataMode.OPTIONAL)
-.build();
-
-MaterializedField outputField = MaterializedField.create(colName, 
majorType);
-ValueVector vv = container.addOrGet(outputField, callBack);
-allocationVectors.add(vv);
-  }
+for (final ValueVector v : this.allocationVectors) {
+  AllocationHelper.allocateNew(v, inputBatch.getRecordCount());
+}
 
-  container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+recordCount = unionall.unionRecords(0, inputBatch.getRecordCount(), 0);
+for (final ValueVector v : allocationVectors) {
+  final ValueVector.Mutator m = v.getMutator();
+  m.setValueCount(recordCount);
+}
+
+if (callBack.getSchemaChangedAndReset()) {
   return IterOutcome.OK_NEW_SCHEMA;
+} else {
+  return IterOutcome.OK;
 }
+  }
+
+  private void createUnionAller(RecordBatch inputBatch) throws 
ClassTransformationException, IOException, SchemaChangeException {
+transfers.clear();
+allocationVectors.clear();;
 
 final ClassGenerator cg = 
CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, 
context.getFunctionRegistry(), context.getOptions());
 cg.getCodeGenerator().plainJavaCapable(true);
 // Uncomment out this line to debug the generated code.
-//cg.getCodeGenerator().saveCodeForDebugging(true);
+//cg.getCodeGenerator().saveCodeForDebugging(true);
+
 int index = 0;
-for(VectorWrapper vw : current) {
-   ValueVector vvIn = vw.getValueVector();
-  // get the original input column names
-  SchemaPath inputPath = 
SchemaPath.getSimplePath(vvIn.getField().getPath());
-  // get the renamed column names
-  SchemaPath outputPath = 
SchemaPath.getSimplePath(outputFields.get(index).getPath());
+for(VectorWrapper vw : inputBatch) {
+  ValueVector vvIn = vw.getValueVector();
+  ValueVector vvOut = container.getValueVector(index).getValueVector();
 
   final ErrorCollector collector = new ErrorCollectorImpl();
   // According to input data names, Minortypes, Datamodes, choose to
   // transfer directly,
   // rename columns or
   // cast data types (Minortype or DataMode)
-  if (hasSameTypeAndMode(outputFields.get(index), 
vw.getValueVector().getField())) {
+  if (hasSameTypeAndMode(container.getSchema().getColumn(index), 
vvIn.getField())
+  && vvIn.getField().getType().getMinorType() != 
TypeProtos.MinorType.MAP // Per DRILL-5521, existing bug for map transfer
+  ) {
 // Transfer column
-
-MajorType outputFieldType = 

[GitHub] drill pull request #919: DRILL-5721: Query with only root fragment and no no...

2017-08-30 Thread parthchandra
Github user parthchandra commented on a diff in the pull request:

https://github.com/apache/drill/pull/919#discussion_r13616
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---
@@ -1073,26 +1070,22 @@ public QueryId getQueryId() {
*/
   private void setupRootFragment(final PlanFragment rootFragment, final 
FragmentRoot rootOperator)
   throws ExecutionSetupException {
-@SuppressWarnings("resource")
 final FragmentContext rootContext = new 
FragmentContext(drillbitContext, rootFragment, queryContext,
 initiatingClient, 
drillbitContext.getFunctionImplementationRegistry());
-@SuppressWarnings("resource")
-final IncomingBuffers buffers = new IncomingBuffers(rootFragment, 
rootContext);
-rootContext.setBuffers(buffers);
-
-queryManager.addFragmentStatusTracker(rootFragment, true);
-
 final ControlTunnel tunnel = 
drillbitContext.getController().getTunnel(queryContext.getCurrentEndpoint());
+final FragmentStatusReporter statusReporter = new 
FragmentStatusReporter(rootContext, tunnel);
 final FragmentExecutor rootRunner = new FragmentExecutor(rootContext, 
rootFragment,
-new FragmentStatusReporter(rootContext, tunnel),
-rootOperator);
-final RootFragmentManager fragmentManager = new 
RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
+statusReporter, rootOperator);
 
-if (buffers.isDone()) {
+queryManager.addFragmentStatusTracker(rootFragment, true);
+
+// FragmentManager is setting buffer for FragmentContext
+if (rootContext.isBuffersDone()) {
--- End diff --

Yes but from the looks of it the fragment manager hasn't been created yet. 
In fact it seems it is created in the else part of the condition you're 
checking.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r136154993
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
 ---
@@ -130,562 +145,248 @@ public IterOutcome innerNext() {
   }
 
   @Override
-  public WritableBatch getWritableBatch() {
-return WritableBatch.get(this);
+  public int getRecordCount() {
+return recordCount;
   }
 
-  private void setValueCount(int count) {
-for (ValueVector v : allocationVectors) {
-  ValueVector.Mutator m = v.getMutator();
-  m.setValueCount(count);
-}
-  }
 
-  private boolean doAlloc() {
-for (ValueVector v : allocationVectors) {
-  try {
-AllocationHelper.allocateNew(v, current.getRecordCount());
-  } catch (OutOfMemoryException ex) {
-return false;
-  }
+  @SuppressWarnings("resource")
+  private IterOutcome doWork(RecordBatch inputBatch, boolean newSchema) 
throws ClassTransformationException, IOException, SchemaChangeException {
+if (inputBatch.getSchema().getFieldCount() != 
container.getSchema().getFieldCount()) {
+  // wrong.
 }
-return true;
-  }
 
-  @SuppressWarnings("resource")
-  private IterOutcome doWork() throws ClassTransformationException, 
IOException, SchemaChangeException {
-if (allocationVectors != null) {
-  for (ValueVector v : allocationVectors) {
-v.clear();
-  }
+if (newSchema) {
+  createUnionAller(inputBatch);
 }
 
-allocationVectors = Lists.newArrayList();
-transfers.clear();
+container.zeroVectors();
 
-// If both sides of Union-All are empty
-if(unionAllInput.isBothSideEmpty()) {
-  for(int i = 0; i < outputFields.size(); ++i) {
-final String colName = outputFields.get(i).getPath();
-final MajorType majorType = MajorType.newBuilder()
-.setMinorType(MinorType.INT)
-.setMode(DataMode.OPTIONAL)
-.build();
-
-MaterializedField outputField = MaterializedField.create(colName, 
majorType);
-ValueVector vv = container.addOrGet(outputField, callBack);
-allocationVectors.add(vv);
-  }
+for (final ValueVector v : this.allocationVectors) {
+  AllocationHelper.allocateNew(v, inputBatch.getRecordCount());
+}
 
-  container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+recordCount = unionall.unionRecords(0, inputBatch.getRecordCount(), 0);
+for (final ValueVector v : allocationVectors) {
+  final ValueVector.Mutator m = v.getMutator();
+  m.setValueCount(recordCount);
+}
+
+if (callBack.getSchemaChangedAndReset()) {
   return IterOutcome.OK_NEW_SCHEMA;
+} else {
+  return IterOutcome.OK;
 }
+  }
+
+  private void createUnionAller(RecordBatch inputBatch) throws 
ClassTransformationException, IOException, SchemaChangeException {
+transfers.clear();
+allocationVectors.clear();;
 
 final ClassGenerator cg = 
CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, 
context.getFunctionRegistry(), context.getOptions());
 cg.getCodeGenerator().plainJavaCapable(true);
 // Uncomment out this line to debug the generated code.
-//cg.getCodeGenerator().saveCodeForDebugging(true);
+//cg.getCodeGenerator().saveCodeForDebugging(true);
+
 int index = 0;
-for(VectorWrapper vw : current) {
-   ValueVector vvIn = vw.getValueVector();
-  // get the original input column names
-  SchemaPath inputPath = 
SchemaPath.getSimplePath(vvIn.getField().getPath());
-  // get the renamed column names
-  SchemaPath outputPath = 
SchemaPath.getSimplePath(outputFields.get(index).getPath());
+for(VectorWrapper vw : inputBatch) {
+  ValueVector vvIn = vw.getValueVector();
+  ValueVector vvOut = container.getValueVector(index).getValueVector();
 
   final ErrorCollector collector = new ErrorCollectorImpl();
   // According to input data names, Minortypes, Datamodes, choose to
   // transfer directly,
   // rename columns or
   // cast data types (Minortype or DataMode)
-  if (hasSameTypeAndMode(outputFields.get(index), 
vw.getValueVector().getField())) {
+  if (hasSameTypeAndMode(container.getSchema().getColumn(index), 
vvIn.getField())
+  && vvIn.getField().getType().getMinorType() != 
TypeProtos.MinorType.MAP // Per DRILL-5521, existing bug for map transfer
+  ) {
 // Transfer column
-
-MajorType outputFieldType = 

[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r136154585
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
 ---
@@ -130,562 +145,248 @@ public IterOutcome innerNext() {
   }
 
   @Override
-  public WritableBatch getWritableBatch() {
-return WritableBatch.get(this);
+  public int getRecordCount() {
+return recordCount;
   }
 
-  private void setValueCount(int count) {
-for (ValueVector v : allocationVectors) {
-  ValueVector.Mutator m = v.getMutator();
-  m.setValueCount(count);
-}
-  }
 
-  private boolean doAlloc() {
-for (ValueVector v : allocationVectors) {
-  try {
-AllocationHelper.allocateNew(v, current.getRecordCount());
-  } catch (OutOfMemoryException ex) {
-return false;
-  }
+  @SuppressWarnings("resource")
+  private IterOutcome doWork(RecordBatch inputBatch, boolean newSchema) 
throws ClassTransformationException, IOException, SchemaChangeException {
+if (inputBatch.getSchema().getFieldCount() != 
container.getSchema().getFieldCount()) {
+  // wrong.
 }
-return true;
-  }
 
-  @SuppressWarnings("resource")
-  private IterOutcome doWork() throws ClassTransformationException, 
IOException, SchemaChangeException {
-if (allocationVectors != null) {
-  for (ValueVector v : allocationVectors) {
-v.clear();
-  }
+if (newSchema) {
+  createUnionAller(inputBatch);
 }
 
-allocationVectors = Lists.newArrayList();
-transfers.clear();
+container.zeroVectors();
 
-// If both sides of Union-All are empty
-if(unionAllInput.isBothSideEmpty()) {
-  for(int i = 0; i < outputFields.size(); ++i) {
-final String colName = outputFields.get(i).getPath();
-final MajorType majorType = MajorType.newBuilder()
-.setMinorType(MinorType.INT)
-.setMode(DataMode.OPTIONAL)
-.build();
-
-MaterializedField outputField = MaterializedField.create(colName, 
majorType);
-ValueVector vv = container.addOrGet(outputField, callBack);
-allocationVectors.add(vv);
-  }
+for (final ValueVector v : this.allocationVectors) {
+  AllocationHelper.allocateNew(v, inputBatch.getRecordCount());
+}
 
-  container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+recordCount = unionall.unionRecords(0, inputBatch.getRecordCount(), 0);
+for (final ValueVector v : allocationVectors) {
+  final ValueVector.Mutator m = v.getMutator();
+  m.setValueCount(recordCount);
+}
+
+if (callBack.getSchemaChangedAndReset()) {
   return IterOutcome.OK_NEW_SCHEMA;
+} else {
+  return IterOutcome.OK;
 }
+  }
+
+  private void createUnionAller(RecordBatch inputBatch) throws 
ClassTransformationException, IOException, SchemaChangeException {
+transfers.clear();
+allocationVectors.clear();;
 
 final ClassGenerator cg = 
CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, 
context.getFunctionRegistry(), context.getOptions());
 cg.getCodeGenerator().plainJavaCapable(true);
 // Uncomment out this line to debug the generated code.
-//cg.getCodeGenerator().saveCodeForDebugging(true);
+//cg.getCodeGenerator().saveCodeForDebugging(true);
+
 int index = 0;
-for(VectorWrapper vw : current) {
-   ValueVector vvIn = vw.getValueVector();
-  // get the original input column names
-  SchemaPath inputPath = 
SchemaPath.getSimplePath(vvIn.getField().getPath());
-  // get the renamed column names
-  SchemaPath outputPath = 
SchemaPath.getSimplePath(outputFields.get(index).getPath());
+for(VectorWrapper vw : inputBatch) {
+  ValueVector vvIn = vw.getValueVector();
+  ValueVector vvOut = container.getValueVector(index).getValueVector();
 
   final ErrorCollector collector = new ErrorCollectorImpl();
   // According to input data names, Minortypes, Datamodes, choose to
   // transfer directly,
   // rename columns or
   // cast data types (Minortype or DataMode)
-  if (hasSameTypeAndMode(outputFields.get(index), 
vw.getValueVector().getField())) {
+  if (hasSameTypeAndMode(container.getSchema().getColumn(index), 
vvIn.getField())
+  && vvIn.getField().getType().getMinorType() != 
TypeProtos.MinorType.MAP // Per DRILL-5521, existing bug for map transfer
+  ) {
 // Transfer column
-
-MajorType outputFieldType = 

[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r136152074
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
 ---
@@ -130,562 +145,248 @@ public IterOutcome innerNext() {
   }
 
   @Override
-  public WritableBatch getWritableBatch() {
-return WritableBatch.get(this);
+  public int getRecordCount() {
+return recordCount;
   }
 
-  private void setValueCount(int count) {
-for (ValueVector v : allocationVectors) {
-  ValueVector.Mutator m = v.getMutator();
-  m.setValueCount(count);
-}
-  }
 
-  private boolean doAlloc() {
-for (ValueVector v : allocationVectors) {
-  try {
-AllocationHelper.allocateNew(v, current.getRecordCount());
-  } catch (OutOfMemoryException ex) {
-return false;
-  }
+  @SuppressWarnings("resource")
+  private IterOutcome doWork(RecordBatch inputBatch, boolean newSchema) 
throws ClassTransformationException, IOException, SchemaChangeException {
+if (inputBatch.getSchema().getFieldCount() != 
container.getSchema().getFieldCount()) {
+  // wrong.
 }
-return true;
-  }
 
-  @SuppressWarnings("resource")
-  private IterOutcome doWork() throws ClassTransformationException, 
IOException, SchemaChangeException {
-if (allocationVectors != null) {
-  for (ValueVector v : allocationVectors) {
-v.clear();
-  }
+if (newSchema) {
+  createUnionAller(inputBatch);
 }
 
-allocationVectors = Lists.newArrayList();
-transfers.clear();
+container.zeroVectors();
 
-// If both sides of Union-All are empty
-if(unionAllInput.isBothSideEmpty()) {
-  for(int i = 0; i < outputFields.size(); ++i) {
-final String colName = outputFields.get(i).getPath();
-final MajorType majorType = MajorType.newBuilder()
-.setMinorType(MinorType.INT)
-.setMode(DataMode.OPTIONAL)
-.build();
-
-MaterializedField outputField = MaterializedField.create(colName, 
majorType);
-ValueVector vv = container.addOrGet(outputField, callBack);
-allocationVectors.add(vv);
-  }
+for (final ValueVector v : this.allocationVectors) {
+  AllocationHelper.allocateNew(v, inputBatch.getRecordCount());
+}
 
-  container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+recordCount = unionall.unionRecords(0, inputBatch.getRecordCount(), 0);
+for (final ValueVector v : allocationVectors) {
+  final ValueVector.Mutator m = v.getMutator();
+  m.setValueCount(recordCount);
+}
+
+if (callBack.getSchemaChangedAndReset()) {
   return IterOutcome.OK_NEW_SCHEMA;
+} else {
+  return IterOutcome.OK;
 }
+  }
+
+  private void createUnionAller(RecordBatch inputBatch) throws 
ClassTransformationException, IOException, SchemaChangeException {
+transfers.clear();
+allocationVectors.clear();;
 
 final ClassGenerator cg = 
CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, 
context.getFunctionRegistry(), context.getOptions());
 cg.getCodeGenerator().plainJavaCapable(true);
 // Uncomment out this line to debug the generated code.
-//cg.getCodeGenerator().saveCodeForDebugging(true);
+//cg.getCodeGenerator().saveCodeForDebugging(true);
+
 int index = 0;
-for(VectorWrapper vw : current) {
-   ValueVector vvIn = vw.getValueVector();
-  // get the original input column names
-  SchemaPath inputPath = 
SchemaPath.getSimplePath(vvIn.getField().getPath());
-  // get the renamed column names
-  SchemaPath outputPath = 
SchemaPath.getSimplePath(outputFields.get(index).getPath());
+for(VectorWrapper vw : inputBatch) {
+  ValueVector vvIn = vw.getValueVector();
+  ValueVector vvOut = container.getValueVector(index).getValueVector();
 
   final ErrorCollector collector = new ErrorCollectorImpl();
   // According to input data names, Minortypes, Datamodes, choose to
   // transfer directly,
   // rename columns or
   // cast data types (Minortype or DataMode)
-  if (hasSameTypeAndMode(outputFields.get(index), 
vw.getValueVector().getField())) {
+  if (hasSameTypeAndMode(container.getSchema().getColumn(index), 
vvIn.getField())
+  && vvIn.getField().getType().getMinorType() != 
TypeProtos.MinorType.MAP // Per DRILL-5521, existing bug for map transfer
+  ) {
 // Transfer column
-
-MajorType outputFieldType = 

[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r136151330
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
 ---
@@ -130,562 +145,248 @@ public IterOutcome innerNext() {
   }
 
   @Override
-  public WritableBatch getWritableBatch() {
-return WritableBatch.get(this);
+  public int getRecordCount() {
+return recordCount;
   }
 
-  private void setValueCount(int count) {
-for (ValueVector v : allocationVectors) {
-  ValueVector.Mutator m = v.getMutator();
-  m.setValueCount(count);
-}
-  }
 
-  private boolean doAlloc() {
-for (ValueVector v : allocationVectors) {
-  try {
-AllocationHelper.allocateNew(v, current.getRecordCount());
-  } catch (OutOfMemoryException ex) {
-return false;
-  }
+  @SuppressWarnings("resource")
+  private IterOutcome doWork(RecordBatch inputBatch, boolean newSchema) 
throws ClassTransformationException, IOException, SchemaChangeException {
+if (inputBatch.getSchema().getFieldCount() != 
container.getSchema().getFieldCount()) {
+  // wrong.
 }
-return true;
-  }
 
-  @SuppressWarnings("resource")
-  private IterOutcome doWork() throws ClassTransformationException, 
IOException, SchemaChangeException {
-if (allocationVectors != null) {
-  for (ValueVector v : allocationVectors) {
-v.clear();
-  }
+if (newSchema) {
+  createUnionAller(inputBatch);
 }
 
-allocationVectors = Lists.newArrayList();
-transfers.clear();
+container.zeroVectors();
 
-// If both sides of Union-All are empty
-if(unionAllInput.isBothSideEmpty()) {
-  for(int i = 0; i < outputFields.size(); ++i) {
-final String colName = outputFields.get(i).getPath();
-final MajorType majorType = MajorType.newBuilder()
-.setMinorType(MinorType.INT)
-.setMode(DataMode.OPTIONAL)
-.build();
-
-MaterializedField outputField = MaterializedField.create(colName, 
majorType);
-ValueVector vv = container.addOrGet(outputField, callBack);
-allocationVectors.add(vv);
-  }
+for (final ValueVector v : this.allocationVectors) {
+  AllocationHelper.allocateNew(v, inputBatch.getRecordCount());
+}
 
-  container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+recordCount = unionall.unionRecords(0, inputBatch.getRecordCount(), 0);
+for (final ValueVector v : allocationVectors) {
+  final ValueVector.Mutator m = v.getMutator();
+  m.setValueCount(recordCount);
+}
+
+if (callBack.getSchemaChangedAndReset()) {
   return IterOutcome.OK_NEW_SCHEMA;
+} else {
+  return IterOutcome.OK;
 }
+  }
+
+  private void createUnionAller(RecordBatch inputBatch) throws 
ClassTransformationException, IOException, SchemaChangeException {
+transfers.clear();
+allocationVectors.clear();;
 
 final ClassGenerator cg = 
CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, 
context.getFunctionRegistry(), context.getOptions());
 cg.getCodeGenerator().plainJavaCapable(true);
 // Uncomment out this line to debug the generated code.
-//cg.getCodeGenerator().saveCodeForDebugging(true);
+//cg.getCodeGenerator().saveCodeForDebugging(true);
+
 int index = 0;
-for(VectorWrapper vw : current) {
-   ValueVector vvIn = vw.getValueVector();
-  // get the original input column names
-  SchemaPath inputPath = 
SchemaPath.getSimplePath(vvIn.getField().getPath());
-  // get the renamed column names
-  SchemaPath outputPath = 
SchemaPath.getSimplePath(outputFields.get(index).getPath());
+for(VectorWrapper vw : inputBatch) {
+  ValueVector vvIn = vw.getValueVector();
+  ValueVector vvOut = container.getValueVector(index).getValueVector();
 
   final ErrorCollector collector = new ErrorCollectorImpl();
   // According to input data names, Minortypes, Datamodes, choose to
   // transfer directly,
   // rename columns or
   // cast data types (Minortype or DataMode)
-  if (hasSameTypeAndMode(outputFields.get(index), 
vw.getValueVector().getField())) {
+  if (hasSameTypeAndMode(container.getSchema().getColumn(index), 
vvIn.getField())
+  && vvIn.getField().getType().getMinorType() != 
TypeProtos.MinorType.MAP // Per DRILL-5521, existing bug for map transfer
+  ) {
 // Transfer column
-
-MajorType outputFieldType = 

[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r136149953
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
 ---
@@ -130,562 +145,248 @@ public IterOutcome innerNext() {
   }
 
   @Override
-  public WritableBatch getWritableBatch() {
-return WritableBatch.get(this);
+  public int getRecordCount() {
+return recordCount;
   }
 
-  private void setValueCount(int count) {
-for (ValueVector v : allocationVectors) {
-  ValueVector.Mutator m = v.getMutator();
-  m.setValueCount(count);
-}
-  }
 
-  private boolean doAlloc() {
-for (ValueVector v : allocationVectors) {
-  try {
-AllocationHelper.allocateNew(v, current.getRecordCount());
-  } catch (OutOfMemoryException ex) {
-return false;
-  }
+  @SuppressWarnings("resource")
+  private IterOutcome doWork(RecordBatch inputBatch, boolean newSchema) 
throws ClassTransformationException, IOException, SchemaChangeException {
+if (inputBatch.getSchema().getFieldCount() != 
container.getSchema().getFieldCount()) {
+  // wrong.
 }
-return true;
-  }
 
-  @SuppressWarnings("resource")
-  private IterOutcome doWork() throws ClassTransformationException, 
IOException, SchemaChangeException {
-if (allocationVectors != null) {
-  for (ValueVector v : allocationVectors) {
-v.clear();
-  }
+if (newSchema) {
+  createUnionAller(inputBatch);
 }
 
-allocationVectors = Lists.newArrayList();
-transfers.clear();
+container.zeroVectors();
 
-// If both sides of Union-All are empty
-if(unionAllInput.isBothSideEmpty()) {
-  for(int i = 0; i < outputFields.size(); ++i) {
-final String colName = outputFields.get(i).getPath();
-final MajorType majorType = MajorType.newBuilder()
-.setMinorType(MinorType.INT)
-.setMode(DataMode.OPTIONAL)
-.build();
-
-MaterializedField outputField = MaterializedField.create(colName, 
majorType);
-ValueVector vv = container.addOrGet(outputField, callBack);
-allocationVectors.add(vv);
-  }
+for (final ValueVector v : this.allocationVectors) {
+  AllocationHelper.allocateNew(v, inputBatch.getRecordCount());
+}
 
-  container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+recordCount = unionall.unionRecords(0, inputBatch.getRecordCount(), 0);
+for (final ValueVector v : allocationVectors) {
+  final ValueVector.Mutator m = v.getMutator();
+  m.setValueCount(recordCount);
+}
--- End diff --

New util method in `VectorUtil`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r136149852
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
 ---
@@ -130,562 +145,248 @@ public IterOutcome innerNext() {
   }
 
   @Override
-  public WritableBatch getWritableBatch() {
-return WritableBatch.get(this);
+  public int getRecordCount() {
+return recordCount;
   }
 
-  private void setValueCount(int count) {
-for (ValueVector v : allocationVectors) {
-  ValueVector.Mutator m = v.getMutator();
-  m.setValueCount(count);
-}
-  }
 
-  private boolean doAlloc() {
-for (ValueVector v : allocationVectors) {
-  try {
-AllocationHelper.allocateNew(v, current.getRecordCount());
-  } catch (OutOfMemoryException ex) {
-return false;
-  }
+  @SuppressWarnings("resource")
+  private IterOutcome doWork(RecordBatch inputBatch, boolean newSchema) 
throws ClassTransformationException, IOException, SchemaChangeException {
+if (inputBatch.getSchema().getFieldCount() != 
container.getSchema().getFieldCount()) {
+  // wrong.
 }
-return true;
-  }
 
-  @SuppressWarnings("resource")
-  private IterOutcome doWork() throws ClassTransformationException, 
IOException, SchemaChangeException {
-if (allocationVectors != null) {
-  for (ValueVector v : allocationVectors) {
-v.clear();
-  }
+if (newSchema) {
+  createUnionAller(inputBatch);
 }
 
-allocationVectors = Lists.newArrayList();
-transfers.clear();
+container.zeroVectors();
 
-// If both sides of Union-All are empty
-if(unionAllInput.isBothSideEmpty()) {
-  for(int i = 0; i < outputFields.size(); ++i) {
-final String colName = outputFields.get(i).getPath();
-final MajorType majorType = MajorType.newBuilder()
-.setMinorType(MinorType.INT)
-.setMode(DataMode.OPTIONAL)
-.build();
-
-MaterializedField outputField = MaterializedField.create(colName, 
majorType);
-ValueVector vv = container.addOrGet(outputField, callBack);
-allocationVectors.add(vv);
-  }
+for (final ValueVector v : this.allocationVectors) {
+  AllocationHelper.allocateNew(v, inputBatch.getRecordCount());
+}
--- End diff --

Sounds reasonable. I put two util method in `VectorUtil`, since 
`VectorAccessibleUtilities` probably should only take `VectorAccessible` 
related util method, as its comment stated.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r136143881
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
 ---
@@ -39,88 +35,107 @@
 import org.apache.drill.exec.expr.ValueVectorWriteExpression;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.UnionAll;
-import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.resolver.TypeCastRules;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
 
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Stack;
 
-public class UnionAllRecordBatch extends AbstractRecordBatch {
+public class UnionAllRecordBatch extends 
AbstractBinaryRecordBatch {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnionAllRecordBatch.class);
 
-  private List outputFields;
+  private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
   private UnionAller unionall;
-  private UnionAllInput unionAllInput;
-  private RecordBatch current;
-
   private final List transfers = Lists.newArrayList();
-  private List allocationVectors;
-  protected SchemaChangeCallBack callBack = new SchemaChangeCallBack();
+  private List allocationVectors = Lists.newArrayList();
   private int recordCount = 0;
-  private boolean schemaAvailable = false;
+  private UnionInputIterator unionInputIterator;
 
   public UnionAllRecordBatch(UnionAll config, List children, 
FragmentContext context) throws OutOfMemoryException {
-super(config, context, false);
-assert (children.size() == 2) : "The number of the operands of Union 
must be 2";
-unionAllInput = new UnionAllInput(this, children.get(0), 
children.get(1));
-  }
-
-  @Override
-  public int getRecordCount() {
-return recordCount;
+super(config, context, true, children.get(0), children.get(1));
   }
 
   @Override
   protected void killIncoming(boolean sendUpstream) {
-unionAllInput.getLeftRecordBatch().kill(sendUpstream);
-unionAllInput.getRightRecordBatch().kill(sendUpstream);
+left.kill(sendUpstream);
+right.kill(sendUpstream);
   }
 
-  @Override
-  public SelectionVector2 getSelectionVector2() {
-throw new UnsupportedOperationException("UnionAllRecordBatch does not 
support selection vector");
-  }
+  protected void buildSchema() throws SchemaChangeException {
+if (! prefetchFirstBatchFromBothSides()) {
+  return;
+}
 
-  @Override
-  public SelectionVector4 getSelectionVector4() {
-throw new UnsupportedOperationException("UnionAllRecordBatch does not 
support selection vector");
+unionInputIterator = new UnionInputIterator(leftUpstream, left, 
rightUpstream, right);
+
+if (leftUpstream == IterOutcome.NONE && rightUpstream == 
IterOutcome.OK_NEW_SCHEMA) {
+  inferOutputFieldsOneSide(right.getSchema());
+} else if (rightUpstream == IterOutcome.NONE && leftUpstream == 
IterOutcome.OK_NEW_SCHEMA) {
+  inferOutputFieldsOneSide((left.getSchema()));
+} else if (leftUpstream == IterOutcome.OK_NEW_SCHEMA && rightUpstream 
== IterOutcome.OK_NEW_SCHEMA) {
+  inferOutputFieldsBothSide(left.getSchema(), right.getSchema());
+}
+
+container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
+for (VectorWrapper vv: container) {
+  vv.getValueVector().allocateNew();
+  vv.getValueVector().getMutator().setValueCount(0);
+}
   }
 
   @Override
   public IterOutcome innerNext() {
 try {
-  IterOutcome upstream = unionAllInput.nextBatch();
-  logger.debug("Upstream of Union-All: {}", upstream);
+  if (!unionInputIterator.hasNext()) {
+return 

[GitHub] drill issue #922: DRILL-5741: During startup Drill should not exceed the ava...

2017-08-30 Thread paul-rogers
Github user paul-rogers commented on the issue:

https://github.com/apache/drill/pull/922
  
This may be one of those times when we need to resort to a bit of design 
thinking.

The core idea is that the user sets one environment variable to check the 
others. The first issue is that, if the user can't do the sums to set the Drill 
memory allocation right (with respect to actual memory), not sure how they will 
get the total memory variable right.

OK, so we get the memory from the system, then do a percentage. That is 
better. But, what is the system memory? Is it total memory? Suppose the user 
says Drill gets 60%. We can now check. But, Drill is distributed. Newer nodes 
in a cluster may have 256GB, older nodes 128GB. Drill demands symmetrical 
resources so the memory given to Drill must be identical on all nodes, 
regardless of system memory. So, the percent of total system memory idea 
doesn't work in practice.

So, maybe we express memory as the total *free* memory. Cool. We give Drill 
60%. Drill starts and everything is fine. Now, we also give Spark 60%. Spark 
starts. It complains in its logs (assuming we make this same change to the 
Spark startup scripts.) But, Spark uses its memory and causes Drill to fail. We 
check Drill logs. Nada. We have to check Spark's logs. Now, imagine doing this 
with five apps; the app that complains may not be the one to fail. And, imagine 
doing this across 100 nodes. Won't scale.

Note that the problem is that we checked memory statically at startup. But, 
our problem was that things changed later: we launched an over-subscribed 
Spark. So, our script must run continuously, constantly checking if any new 
apps are launched. Since some apps grow memory over time, we have to check all 
other apps for total memory usage against that allocated to Drill.

Now, presumably, all other apps are doing the same: Spark is continually 
checking, Storm is doing so, and so on. Now, the admin needs to gather all 
these logs (across dozens of nodes) and extract meaning. What we need, then, is 
a network endpoint to publish the information and a tool to gather and report 
that data. We've just invented monitoring tools.

Take a step back, what we really want to know is available system memory 
vs. that consumed by apps. So, what we want is a Linux-level monitoring of free 
memory. And, since we have other things to do, we want alerts when free memory 
drops below some point. We've now invented alerting tools.

Now, we got into this mess because we launched apps without concern about 
the total memory usage on each node. That is, we didn't plan our app load to 
fit into our available memory. So, we turn this around. We've got 128GB (say) 
of memory. How do we run only those apps that fit, deferring those that don't? 
We've just invented YARN, Mesos, Kubernetes and the like.

Now we get to the reason for the -1. The proposed change adds significant 
complexity to the scripts, *but can never solve the actual oversubscription 
problem*. For that, we need a global resource manager.

Now, suppose that someone wants to run Drill without such a manager. 
Perhaps some distribution does not provide this tool and instead provides a 
tool that simply launches processes, leaving it to each process to struggle 
with its own resources. In such an environment, the vendor can add a check, 
such as this one, that will fire on all nodes and warn the user about potential 
oversubscription *on that node*, *at that moment*, *for that app* in *one app's 
log file*.

To facilitate this, we can do two things.

1. In the vendor-specific `distrib-env.sh` file, do any memory setting 
adjustments that are wanted.
2. Modify `drillbit.sh` to call a `drill-check.sh` script, if it exists, 
just prior to launching Drill.
3. In the vendor-specific `distrib-env.sh` file, do the check proposed here.

The only change needed in Apache Drill is step 2. Then each vendor can add 
the checks if they don't provide a resource manager. Those vendors (or users) 
that use YARN or Mesos or whatever don't need the checks because they have 
overall tools that solves the problem for them.

Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (DRILL-5752) Speed Up Unit Tests

2017-08-30 Thread Timothy Farkas (JIRA)
Timothy Farkas created DRILL-5752:
-

 Summary: Speed Up Unit Tests
 Key: DRILL-5752
 URL: https://issues.apache.org/jira/browse/DRILL-5752
 Project: Apache Drill
  Issue Type: Improvement
Reporter: Timothy Farkas
Assignee: Timothy Farkas


Tests can be split into categories.

High-level categories:
* Fast
* Slow

Low-level categories:
* Vector
* WebUI
* Planner
* Operator
* Storage
* Hive
* JDBC
* Kudu
* Mongo
* Hbase

After the tests are categorized the Travis build can just run the fast tests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: Speeding Up Unit Tests

2017-08-30 Thread Timothy Farkas
Hi Paul,

I agree. Since that seems like the lowest hanging fruit at this point I'll 
focus on that. I'll start off with the categories you mentioned:

High-level categories:
 - Fast

 - Slow


Low-level categories:

 - Vector

 - WebUI

 - Planner
 - Operator

 - Storage

 - Hive

 - JDBC

 - Kudu

 - Mongo

 - Hbase

After the tests are categorized the Travis build can just run the fast tests.

If there more suggestions for categories and what tests should belong in what 
category, please let me know.

Thanks,
Tim


From: Paul Rogers 
Sent: Wednesday, August 30, 2017 9:51:30 AM
To: dev@drill.apache.org
Subject: Re: Speeding Up Unit Tests

Hi Tim,

Here’s another thought. JUnit & SureFire have a way of grouping tests into 
categories. The classic grouping are the “fast” tests we want to run all the 
time, and the “slow” tests we run occasionally. I recently tried to get this 
working to move a couple of tests into the slow category.

Perhaps we can also group tests by domain. For example, if someone is mucking 
with the web UI, they don’t really need to run all the storage plugin tests. If 
someone works with the planner, they don’t really need to stress test value 
vectors.

Github builds might run a “smoke test” subset. The build process prior to a 
release might run all the tests.

IMHO, we want to encourage more tests, without people thinking, “gee, I don’t 
want to make the build slower, so I’ll go light on my new tests…”

Thanks,

- Paul

> On Aug 29, 2017, at 2:08 PM, Timothy Farkas  wrote:
>
> Thanks Paul,
>
> I see now the parent pom already configures surefire to fork 2 processes to 
> run tests. I tried running multiple tests in parallel in each forked process, 
> but there were errors with BaseTestQuery, probably because it uses static 
> variables. One additional speedup I found was that the maven -T flag 
> parallelizes the build and also seems to parallelize running the tests across 
> sub modules. Using that flag the test duration for contrib went from 18 
> minutes to 10 minutes.
>
> I'll investigate making BaseTestQuery thread safe. Maybe we can get more 
> speedups from being able to run multiple tests in parallel in the same 
> process as well.
>
> Tim
>
> 
> From: Paul Rogers 
> Sent: Monday, August 28, 2017 6:39:10 PM
> To: dev@drill.apache.org
> Subject: Re: Speeding Up Unit Tests
>
> Tests run in parallel when run from the command line in Maven. We have had 
> issues, especially when some test changes global state. Can’t remember the 
> details, however.
>
> - Paul
>
>> On Aug 28, 2017, at 6:04 PM, Timothy Farkas  wrote:
>>
>> Hi All,
>>
>> I will be working on 
>> DRILL-5730  and see some 
>> other areas for potential improvement with regard to testing:
>>
>> 1.  Add caching of maven artifacts to the Travis build. This should 
>> significantly speed up compiling the code in travis.
>> 2.  Running unit tests in parallel.
>> 3.  Make the Travis build actually run unit tests (currently it does not).
>>
>> Points (1) and (3) are easy to do, but I was wondering if anyone has 
>> experience with some of the potential pitfalls of running unit tests in 
>> parallel? Are there certain tests which will have race conditions? and are 
>> there any foreseeable issues with running multiple embedded drill bits in 
>> parallel?
>>
>> Thanks,
>> Tim
>>
>>
>



[GitHub] drill issue #909: DRILL-4264: Allow field names to include dots

2017-08-30 Thread paul-rogers
Github user paul-rogers commented on the issue:

https://github.com/apache/drill/pull/909
  
Thanks much for the great work!
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Speeding Up Unit Tests

2017-08-30 Thread Paul Rogers
Hi Tim,

Here’s another thought. JUnit & SureFire have a way of grouping tests into 
categories. The classic grouping are the “fast” tests we want to run all the 
time, and the “slow” tests we run occasionally. I recently tried to get this 
working to move a couple of tests into the slow category.

Perhaps we can also group tests by domain. For example, if someone is mucking 
with the web UI, they don’t really need to run all the storage plugin tests. If 
someone works with the planner, they don’t really need to stress test value 
vectors.

Github builds might run a “smoke test” subset. The build process prior to a 
release might run all the tests.

IMHO, we want to encourage more tests, without people thinking, “gee, I don’t 
want to make the build slower, so I’ll go light on my new tests…”

Thanks,

- Paul

> On Aug 29, 2017, at 2:08 PM, Timothy Farkas  wrote:
> 
> Thanks Paul,
> 
> I see now the parent pom already configures surefire to fork 2 processes to 
> run tests. I tried running multiple tests in parallel in each forked process, 
> but there were errors with BaseTestQuery, probably because it uses static 
> variables. One additional speedup I found was that the maven -T flag 
> parallelizes the build and also seems to parallelize running the tests across 
> sub modules. Using that flag the test duration for contrib went from 18 
> minutes to 10 minutes.
> 
> I'll investigate making BaseTestQuery thread safe. Maybe we can get more 
> speedups from being able to run multiple tests in parallel in the same 
> process as well.
> 
> Tim
> 
> 
> From: Paul Rogers 
> Sent: Monday, August 28, 2017 6:39:10 PM
> To: dev@drill.apache.org
> Subject: Re: Speeding Up Unit Tests
> 
> Tests run in parallel when run from the command line in Maven. We have had 
> issues, especially when some test changes global state. Can’t remember the 
> details, however.
> 
> - Paul
> 
>> On Aug 28, 2017, at 6:04 PM, Timothy Farkas  wrote:
>> 
>> Hi All,
>> 
>> I will be working on 
>> DRILL-5730  and see some 
>> other areas for potential improvement with regard to testing:
>> 
>> 1.  Add caching of maven artifacts to the Travis build. This should 
>> significantly speed up compiling the code in travis.
>> 2.  Running unit tests in parallel.
>> 3.  Make the Travis build actually run unit tests (currently it does not).
>> 
>> Points (1) and (3) are easy to do, but I was wondering if anyone has 
>> experience with some of the potential pitfalls of running unit tests in 
>> parallel? Are there certain tests which will have race conditions? and are 
>> there any foreseeable issues with running multiple embedded drill bits in 
>> parallel?
>> 
>> Thanks,
>> Tim
>> 
>> 
> 



Re: Drill 2.0 (design) hackathon

2017-08-30 Thread Paul Rogers
A partial list of Drill’s public APIs:

IMHO, highest priority for Drill 2.0.


  *   JDBC/ODBC drivers
  *   Client (for JDBC/ODBC) + ODBC & JDBC
  *   Client (for full Drill async, columnar)
  *   Storage plugin
  *   Format plugin
  *   System/session options
  *   Queueing (e.g. ZK-based queues)
  *   Rest API
  *   Resource Planning (e.g. max query memory per node)
  *   Metadata access, storage (e.g. file system locations vs. a metastore)
  *   Metadata files formats (Parquet, views, etc.)

Lower priority for future releases:


  *   Query Planning (e.g. Calcite rules)
  *   Config options
  *   SQL syntax, especially Drill extensions
  *   UDF
  *   Management (e.g. JMX, Rest API calls, etc.)
  *   Drill File System (HDFS)
  *   Web UI
  *   Shell scripts

There are certainly more. Please suggest those that are missing. I’ve taken a 
rough cut at which APIs need forward/backward compatibility first, in part 
based on those that are the “most public” and most likely to change. Others are 
important, but we can’t do them all at once.

Thanks,

- Paul

On Aug 29, 2017, at 6:00 PM, Aman Sinha 
> wrote:

Hi Paul,
certainly makes sense to have the API compatibility discussions during this
hackathon.  The 2.0 release may be a good checkpoint to introduce breaking
changes necessitating changes to the ODBC/JDBC drivers and other external
applications. As part of this exercise (not during the hackathon but as a
follow-up action), we also should clearly identify the "public" interfaces.


I will add this to the agenda.

thanks,
-Aman

On Tue, Aug 29, 2017 at 2:08 PM, Paul Rogers 
> wrote:

Thanks Aman for organizing the Hackathon!

The list included many good ideas for Drill 2.0. Some of those require
changes to Drill’s “public” interfaces (file format, client protocol, SQL
behavior, etc.)

At present, Drill has no good mechanism to handle backward/forward
compatibility at the API level. Protobuf versioning certainly helps, but
can’t completely solve semantic changes (where a field changes meaning, or
a non-Protobuf data chunk changes format.) As just one concrete example,
changing to Arrow will break pre-Arrow ODBC/JDBC drivers because class
names and data formats will change.

Perhaps we can prioritize, for the proposed 2.0 release, a one-time set of
breaking changes that introduce a versioning mechanism into our public
APIs. Once these are in place, we can evolve the APIs in the future by
following the newly-created versioning protocol.

Without such a mechanism, we cannot support old & new clients in the same
cluster. Nor can we support rolling upgrades. Of course, another solution
is to get it right the second time, then freeze all APIs and agree to never
again change them. Not sure we have sufficient access to a crystal ball to
predict everything we’d ever need in our APIs, however...

Thanks,

- Paul

On Aug 24, 2017, at 8:39 AM, Aman Sinha 
> wrote:

Drill Developers,

In order to kick-start the Drill 2.0  release discussions, I would like
to
propose a Drill 2.0  (design) hackathon (a.k.a Drill Developer Day ™ J ).

As I mentioned in the hangout on Tuesday,  MapR has offered to host it on
Sept 18th at their offices at 350 Holger Way, San Jose.   Hope that works
for most of you!

The goal is to get the community together for a day-long technical
discussion on key topics in preparation for a Drill 2.0 release as well
as
potential improvements in upcoming 1.xx releases.  Depending on the
interest areas, we could form groups and have a volunteer lead each
group.

Based on prior discussions on the dev list, hangouts and existing JIRAs,
there is already a substantial set of topics and I have summarized a few
of
them below.   What other topics do folks want to talk about?   Feel free
to
respond to this thread and I will create a google doc to consolidate.
Understandably, the list would be long but we will use the hackathon to
get
a sense of a reasonable feature set for 1.xx and 2.0 releases.


1. Metadata management.

1a: Defining an abstraction layer for various types of metadata: views,
schema, statistics, security

1b: Underlying storage for metadata: what are the options and their
trade-offs?

- Hive metastore

- Parquet metadata cache (parquet specific)

- An embedded DBMS

- A distributed key-value store

- Others..



2. Drill integration with Apache Arrow

2a: Evaluate the choices and tradeoffs



3. Resource management

3a: Memory limits per query

3b: Spilling

3c: Resource management with Drill on Yarn/Mesos/Kubernetes

3d: Local vs. global resource management

3e: Aligning with admission control/queueing



4. TPC-DS coverage and related planner/operator enhancements

4a: Additional set operations: INTERSECT, EXCEPT

4b: GROUPING SETS, ROLLUP, CUBE support

4c: Handling inequality joins and cartesian joins of non-scalar inputs
(via 

[GitHub] drill pull request #927: DRILL-5751: Fix unit tests to use local file system...

2017-08-30 Thread arina-ielchiieva
GitHub user arina-ielchiieva opened a pull request:

https://github.com/apache/drill/pull/927

DRILL-5751: Fix unit tests to use local file system even if it is not…

… set by default

Please refer to 
[DRILL-5751](https://issues.apache.org/jira/browse/DRILL-5751) for details.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/arina-ielchiieva/drill DRILL-5751

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/927.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #927


commit 079aa8ae2e4d30341cc227c311b08418ab4cf747
Author: Arina Ielchiieva 
Date:   2017-08-24T16:49:52Z

DRILL-5751: Fix unit tests to use local file system even if it is not set 
by default




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (DRILL-5751) Fix unit tests to use local file system even if it is not set by default

2017-08-30 Thread Arina Ielchiieva (JIRA)
Arina Ielchiieva created DRILL-5751:
---

 Summary: Fix unit tests to use local file system even if it is not 
set by default
 Key: DRILL-5751
 URL: https://issues.apache.org/jira/browse/DRILL-5751
 Project: Apache Drill
  Issue Type: Bug
Affects Versions: 1.11.0
Reporter: Arina Ielchiieva
Assignee: Arina Ielchiieva
 Fix For: 1.12.0


When running test using, for example, mapr profile, some test fail with 
FileNotFoundException.
To fix the failures, we would need to modify related tests always to use local 
file system, even if default file system is, for example, maprfs.
List of affected unit tests:
TestHashAggrSpill
TestParquetWriter
TestCTAS
StorageStrategyTest
TestDynamicUDFSupport
etc.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] drill issue #915: DRILL-1051: Casting timestamp as date gives wrong result f...

2017-08-30 Thread vdiravka
Github user vdiravka commented on the issue:

https://github.com/apache/drill/pull/915
  
@arina-ielchiieva The test failures are expected and should be corrected. 
Thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #909: DRILL-4264: Allow field names to include dots

2017-08-30 Thread vvysotskyi
Github user vvysotskyi commented on a diff in the pull request:

https://github.com/apache/drill/pull/909#discussion_r136016798
  
--- Diff: 
logical/src/main/java/org/apache/drill/common/expression/SchemaPath.java ---
@@ -115,6 +112,33 @@ public static SchemaPath create(NamePart namePart) {
   }
 
   /**
+   * Parses input string and returns {@code SchemaPath} instance.
+   *
+   * @param expr input string to be parsed
+   * @return {@code SchemaPath} instance
+   */
+  public static SchemaPath parseFromString(String expr) {
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r135979350
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
 ---
@@ -130,562 +145,248 @@ public IterOutcome innerNext() {
   }
 
   @Override
-  public WritableBatch getWritableBatch() {
-return WritableBatch.get(this);
+  public int getRecordCount() {
+return recordCount;
   }
 
-  private void setValueCount(int count) {
-for (ValueVector v : allocationVectors) {
-  ValueVector.Mutator m = v.getMutator();
-  m.setValueCount(count);
-}
-  }
 
-  private boolean doAlloc() {
-for (ValueVector v : allocationVectors) {
-  try {
-AllocationHelper.allocateNew(v, current.getRecordCount());
-  } catch (OutOfMemoryException ex) {
-return false;
-  }
+  @SuppressWarnings("resource")
+  private IterOutcome doWork(RecordBatch inputBatch, boolean newSchema) 
throws ClassTransformationException, IOException, SchemaChangeException {
+if (inputBatch.getSchema().getFieldCount() != 
container.getSchema().getFieldCount()) {
+  // wrong.
--- End diff --

The revised patch will throw  IllegalArgumentException. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r135977556
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
 ---
@@ -39,88 +35,107 @@
 import org.apache.drill.exec.expr.ValueVectorWriteExpression;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.UnionAll;
-import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.resolver.TypeCastRules;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
 
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Stack;
 
-public class UnionAllRecordBatch extends AbstractRecordBatch {
+public class UnionAllRecordBatch extends 
AbstractBinaryRecordBatch {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnionAllRecordBatch.class);
 
-  private List outputFields;
+  private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
   private UnionAller unionall;
-  private UnionAllInput unionAllInput;
-  private RecordBatch current;
-
   private final List transfers = Lists.newArrayList();
-  private List allocationVectors;
-  protected SchemaChangeCallBack callBack = new SchemaChangeCallBack();
+  private List allocationVectors = Lists.newArrayList();
   private int recordCount = 0;
-  private boolean schemaAvailable = false;
+  private UnionInputIterator unionInputIterator;
 
   public UnionAllRecordBatch(UnionAll config, List children, 
FragmentContext context) throws OutOfMemoryException {
-super(config, context, false);
-assert (children.size() == 2) : "The number of the operands of Union 
must be 2";
-unionAllInput = new UnionAllInput(this, children.get(0), 
children.get(1));
-  }
-
-  @Override
-  public int getRecordCount() {
-return recordCount;
+super(config, context, true, children.get(0), children.get(1));
   }
 
   @Override
   protected void killIncoming(boolean sendUpstream) {
-unionAllInput.getLeftRecordBatch().kill(sendUpstream);
-unionAllInput.getRightRecordBatch().kill(sendUpstream);
+left.kill(sendUpstream);
+right.kill(sendUpstream);
   }
 
-  @Override
-  public SelectionVector2 getSelectionVector2() {
-throw new UnsupportedOperationException("UnionAllRecordBatch does not 
support selection vector");
-  }
+  protected void buildSchema() throws SchemaChangeException {
+if (! prefetchFirstBatchFromBothSides()) {
+  return;
+}
 
-  @Override
-  public SelectionVector4 getSelectionVector4() {
-throw new UnsupportedOperationException("UnionAllRecordBatch does not 
support selection vector");
+unionInputIterator = new UnionInputIterator(leftUpstream, left, 
rightUpstream, right);
+
+if (leftUpstream == IterOutcome.NONE && rightUpstream == 
IterOutcome.OK_NEW_SCHEMA) {
+  inferOutputFieldsOneSide(right.getSchema());
+} else if (rightUpstream == IterOutcome.NONE && leftUpstream == 
IterOutcome.OK_NEW_SCHEMA) {
+  inferOutputFieldsOneSide((left.getSchema()));
+} else if (leftUpstream == IterOutcome.OK_NEW_SCHEMA && rightUpstream 
== IterOutcome.OK_NEW_SCHEMA) {
+  inferOutputFieldsBothSide(left.getSchema(), right.getSchema());
+}
+
+container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
+for (VectorWrapper vv: container) {
+  vv.getValueVector().allocateNew();
+  vv.getValueVector().getMutator().setValueCount(0);
+}
--- End diff --

Why would you think the above the four lines of code are **_"copied"_** 
from somewhere else, while the two lines of code are NOT "copied"? You could 
say the two lines are shorter, but I could not see why the four lines are 
copied, while the two lines of code 

[GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...

2017-08-30 Thread jinfengni
Github user jinfengni commented on a diff in the pull request:

https://github.com/apache/drill/pull/906#discussion_r135976739
  
--- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestEmptyBatchMiniPlan.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.unit;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.ExternalSort;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.config.FlattenPOP;
+import org.apache.drill.exec.physical.config.HashAggregate;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.config.Limit;
+import org.apache.drill.exec.physical.config.MergeJoinPOP;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.config.UnionAll;
+import org.apache.drill.exec.planner.physical.AggPrelBase;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+public class TestEmptyBatchMiniPlan extends MiniPlanUnitTestBase{
+  protected static DrillFileSystem fs;
+
+  @BeforeClass
+  public static void initFS() throws Exception {
+Configuration conf = new Configuration();
+conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
+fs = new DrillFileSystem(conf);
+  }
+
+  @Test
+  public void testEmptyJsonInput() throws Exception {
+RecordBatch scanBatch = createEmptyBatchFromJson();
+
+new MiniPlanTestBuilder()
+.root(scanBatch)
+.expectNullBatch(true)
+.go();
+  }
+
+  @Test
+  public void testProjectEmpty() throws Exception {
+final PhysicalOperator project = new Project(parseExprs("x+5", "x"), 
null);
+testSingleInputEmptyBatchHandling(project);
+  }
+
+  @Test
+  public void testFilterEmpty() throws Exception {
+final PhysicalOperator filter = new Filter(null, parseExpr("a=5"), 
1.0f);
+testSingleInputEmptyBatchHandling(filter);
+  }
+
+  @Test
+  public void testHashAggEmpty() throws Exception {
+final PhysicalOperator hashAgg = new HashAggregate(null, 
AggPrelBase.OperatorPhase.PHASE_1of1, parseExprs("a", "a"), 
parseExprs("sum(b)", "b_sum"), 1.0f);
+testSingleInputEmptyBatchHandling(hashAgg);
+  }
+
+  @Test
+  public void testStreamingAggEmpty() throws Exception {
+final PhysicalOperator hashAgg = new StreamingAggregate(null, 
parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"), 1.0f);
+testSingleInputEmptyBatchHandling(hashAgg);
+  }
+
+  @Test
+  public void testSortEmpty() throws Exception {
+final PhysicalOperator sort = new ExternalSort(null,
+Lists.newArrayList(ordering("b", 
RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST)), 
false);
+testSingleInputEmptyBatchHandling(sort);
+  }
+
+  @Test
+  public void testLimitEmpty() throws Exception {
+final PhysicalOperator limit = new Limit(null, 10, 5);
+testSingleInputEmptyBatchHandling(limit);
+  }
+
+  @Test
+