[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-05-31 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r119519134
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, 
HashTableConfig htConfig, Fragme
   }
 }
 
-ChainedHashTable ht =
+spillSet = new SpillSet(context,hashAggrConfig, 
UserBitShared.CoreOperatorType.HASH_AGGREGATE);
+baseHashTable =
 new ChainedHashTable(htConfig, context, allocator, incoming, null 
/* no incoming probe */, outgoing);
-this.htable = ht.createAndSetupHashTable(groupByOutFieldIds);
-
+this.groupByOutFieldIds = groupByOutFieldIds; // retain these for 
delayedSetup, and to allow recreating hash tables (after a spill)
 numGroupByOutFields = groupByOutFieldIds.length;
-batchHolders = new ArrayList();
-// First BatchHolder is created when the first put request is received.
 
 doSetup(incoming);
   }
 
+  /**
+   *  Delayed setup are the parts from setup() that can only be set after 
actual data arrives in incoming
+   *  This data is used to compute the number of partitions.
+   */
+  private void delayedSetup() {
+
+// Set the number of partitions from the configuration (raise to a 
power of two, if needed)
+numPartitions = 
context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY);
+if ( numPartitions == 1 ) {
+  canSpill = false;
+  logger.warn("Spilling was disabled");
+}
+while (Integer.bitCount(numPartitions) > 1) { // in case not a power 
of 2
+  numPartitions++;
+}
+if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an 
empty batch
+else {
+  // Estimate the max batch size; should use actual data (e.g. lengths 
of varchars)
+  updateEstMaxBatchSize(incoming);
+}
+long memAvail = memoryLimit - allocator.getAllocatedMemory();
+if ( !canSpill ) { // single phase, or spill disabled by configuation
+  numPartitions = 1; // single phase should use only a single 
partition (to save memory)
+} else { // two phase
+  // Adjust down the number of partitions if needed - when the memory 
available can not hold as
+  // many batches (configurable option), plus overhead (e.g. hash 
table, links, hash values))
+  while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 
8 * 1024 * 1024) > memAvail ) {
+numPartitions /= 2;
+if ( numPartitions < 2) {
+  if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at 
least 2 to make progress
+  break;
+}
+  }
+}
+logger.trace("{} phase. Number of partitions chosen: {}. {} spill", 
isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",
+numPartitions, canSpill ? "Can" : "Cannot");
+
+// The following initial safety check should be revisited once we can 
lower the number of rows in a batch
+// In cases of very tight memory -- need at least memory to process 
one batch, plus overhead (e.g. hash table)
+if ( numPartitions == 1 ) {
+  // if too little memory - behave like the old code -- no memory 
limit for hash aggregate
+  allocator.setLimit(10_000_000_000L);
+}
+// Based on the number of partitions: Set the mask and bit count
+partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
+bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
+
+// Create arrays (one entry per partition)
+htables = new HashTable[numPartitions] ;
+batchHolders = (ArrayList[]) new 
ArrayList[numPartitions] ;
+outBatchIndex = new int[numPartitions] ;
+outputStream = new OutputStream[numPartitions];
+spilledBatchesCount = new int[numPartitions];
+// spilledPaths = new Path[numPartitions];
+spillFiles = new String[numPartitions];
+spilledPartitionsList = new ArrayList();
+
+plannedBatches = numPartitions; // each partition should allocate its 
first batch
+
+// initialize every (per partition) entry in the arrays
+for (int i = 0; i < numPartitions; i++ ) {
+  try {
+this.htables[i] = 
baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions);
+this.htables[i].setMaxVarcharSize(maxColumnWidth);
+  } catch (IllegalStateException ise) {} // ignore
+  catch (Exception e) { throw new DrillRuntimeException(e); }
+  this.batchHolders[i] = new ArrayList(); // First 
BatchHolder is created when the first put request is received.
+}
+  }
+  /**
+   * get new incoming: (when reading spilled files like an "incoming")
+   * 

[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-05-31 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r119518393
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, 
HashTableConfig htConfig, Fragme
   }
 }
 
-ChainedHashTable ht =
+spillSet = new SpillSet(context,hashAggrConfig, 
UserBitShared.CoreOperatorType.HASH_AGGREGATE);
+baseHashTable =
 new ChainedHashTable(htConfig, context, allocator, incoming, null 
/* no incoming probe */, outgoing);
-this.htable = ht.createAndSetupHashTable(groupByOutFieldIds);
-
+this.groupByOutFieldIds = groupByOutFieldIds; // retain these for 
delayedSetup, and to allow recreating hash tables (after a spill)
 numGroupByOutFields = groupByOutFieldIds.length;
-batchHolders = new ArrayList();
-// First BatchHolder is created when the first put request is received.
 
 doSetup(incoming);
   }
 
+  /**
+   *  Delayed setup are the parts from setup() that can only be set after 
actual data arrives in incoming
+   *  This data is used to compute the number of partitions.
+   */
+  private void delayedSetup() {
+
+// Set the number of partitions from the configuration (raise to a 
power of two, if needed)
+numPartitions = 
context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY);
+if ( numPartitions == 1 ) {
+  canSpill = false;
+  logger.warn("Spilling was disabled");
+}
+while (Integer.bitCount(numPartitions) > 1) { // in case not a power 
of 2
+  numPartitions++;
+}
+if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an 
empty batch
+else {
+  // Estimate the max batch size; should use actual data (e.g. lengths 
of varchars)
+  updateEstMaxBatchSize(incoming);
+}
+long memAvail = memoryLimit - allocator.getAllocatedMemory();
+if ( !canSpill ) { // single phase, or spill disabled by configuation
+  numPartitions = 1; // single phase should use only a single 
partition (to save memory)
+} else { // two phase
+  // Adjust down the number of partitions if needed - when the memory 
available can not hold as
+  // many batches (configurable option), plus overhead (e.g. hash 
table, links, hash values))
+  while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 
8 * 1024 * 1024) > memAvail ) {
+numPartitions /= 2;
+if ( numPartitions < 2) {
+  if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at 
least 2 to make progress
+  break;
+}
+  }
+}
+logger.trace("{} phase. Number of partitions chosen: {}. {} spill", 
isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",
+numPartitions, canSpill ? "Can" : "Cannot");
+
+// The following initial safety check should be revisited once we can 
lower the number of rows in a batch
+// In cases of very tight memory -- need at least memory to process 
one batch, plus overhead (e.g. hash table)
+if ( numPartitions == 1 ) {
+  // if too little memory - behave like the old code -- no memory 
limit for hash aggregate
+  allocator.setLimit(10_000_000_000L);
+}
+// Based on the number of partitions: Set the mask and bit count
+partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
+bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
+
+// Create arrays (one entry per partition)
+htables = new HashTable[numPartitions] ;
+batchHolders = (ArrayList[]) new 
ArrayList[numPartitions] ;
+outBatchIndex = new int[numPartitions] ;
+outputStream = new OutputStream[numPartitions];
+spilledBatchesCount = new int[numPartitions];
+// spilledPaths = new Path[numPartitions];
+spillFiles = new String[numPartitions];
+spilledPartitionsList = new ArrayList();
+
+plannedBatches = numPartitions; // each partition should allocate its 
first batch
+
+// initialize every (per partition) entry in the arrays
+for (int i = 0; i < numPartitions; i++ ) {
+  try {
+this.htables[i] = 
baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions);
+this.htables[i].setMaxVarcharSize(maxColumnWidth);
+  } catch (IllegalStateException ise) {} // ignore
+  catch (Exception e) { throw new DrillRuntimeException(e); }
+  this.batchHolders[i] = new ArrayList(); // First 
BatchHolder is created when the first put request is received.
+}
+  }
+  /**
+   * get new incoming: (when reading spilled files like an "incoming")
+   * 

[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-05-31 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r119518069
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, 
HashTableConfig htConfig, Fragme
   }
 }
 
-ChainedHashTable ht =
+spillSet = new SpillSet(context,hashAggrConfig, 
UserBitShared.CoreOperatorType.HASH_AGGREGATE);
+baseHashTable =
 new ChainedHashTable(htConfig, context, allocator, incoming, null 
/* no incoming probe */, outgoing);
-this.htable = ht.createAndSetupHashTable(groupByOutFieldIds);
-
+this.groupByOutFieldIds = groupByOutFieldIds; // retain these for 
delayedSetup, and to allow recreating hash tables (after a spill)
 numGroupByOutFields = groupByOutFieldIds.length;
-batchHolders = new ArrayList();
-// First BatchHolder is created when the first put request is received.
 
 doSetup(incoming);
   }
 
+  /**
+   *  Delayed setup are the parts from setup() that can only be set after 
actual data arrives in incoming
+   *  This data is used to compute the number of partitions.
+   */
+  private void delayedSetup() {
+
+// Set the number of partitions from the configuration (raise to a 
power of two, if needed)
+numPartitions = 
context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY);
+if ( numPartitions == 1 ) {
+  canSpill = false;
+  logger.warn("Spilling was disabled");
+}
+while (Integer.bitCount(numPartitions) > 1) { // in case not a power 
of 2
+  numPartitions++;
+}
+if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an 
empty batch
+else {
+  // Estimate the max batch size; should use actual data (e.g. lengths 
of varchars)
+  updateEstMaxBatchSize(incoming);
+}
+long memAvail = memoryLimit - allocator.getAllocatedMemory();
+if ( !canSpill ) { // single phase, or spill disabled by configuation
+  numPartitions = 1; // single phase should use only a single 
partition (to save memory)
+} else { // two phase
+  // Adjust down the number of partitions if needed - when the memory 
available can not hold as
+  // many batches (configurable option), plus overhead (e.g. hash 
table, links, hash values))
+  while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 
8 * 1024 * 1024) > memAvail ) {
+numPartitions /= 2;
+if ( numPartitions < 2) {
+  if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at 
least 2 to make progress
+  break;
+}
+  }
+}
+logger.trace("{} phase. Number of partitions chosen: {}. {} spill", 
isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",
+numPartitions, canSpill ? "Can" : "Cannot");
+
+// The following initial safety check should be revisited once we can 
lower the number of rows in a batch
+// In cases of very tight memory -- need at least memory to process 
one batch, plus overhead (e.g. hash table)
+if ( numPartitions == 1 ) {
+  // if too little memory - behave like the old code -- no memory 
limit for hash aggregate
+  allocator.setLimit(10_000_000_000L);
+}
+// Based on the number of partitions: Set the mask and bit count
+partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
+bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
+
+// Create arrays (one entry per partition)
+htables = new HashTable[numPartitions] ;
+batchHolders = (ArrayList[]) new 
ArrayList[numPartitions] ;
+outBatchIndex = new int[numPartitions] ;
+outputStream = new OutputStream[numPartitions];
+spilledBatchesCount = new int[numPartitions];
+// spilledPaths = new Path[numPartitions];
+spillFiles = new String[numPartitions];
+spilledPartitionsList = new ArrayList();
+
+plannedBatches = numPartitions; // each partition should allocate its 
first batch
+
+// initialize every (per partition) entry in the arrays
+for (int i = 0; i < numPartitions; i++ ) {
+  try {
+this.htables[i] = 
baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions);
+this.htables[i].setMaxVarcharSize(maxColumnWidth);
+  } catch (IllegalStateException ise) {} // ignore
+  catch (Exception e) { throw new DrillRuntimeException(e); }
+  this.batchHolders[i] = new ArrayList(); // First 
BatchHolder is created when the first put request is received.
+}
+  }
+  /**
+   * get new incoming: (when reading spilled files like an "incoming")
+   * 

[jira] [Created] (DRILL-5560) Create configuration file for distribution specific configuration

2017-05-31 Thread Padma Penumarthy (JIRA)
Padma Penumarthy created DRILL-5560:
---

 Summary: Create configuration file for distribution specific 
configuration
 Key: DRILL-5560
 URL: https://issues.apache.org/jira/browse/DRILL-5560
 Project: Apache Drill
  Issue Type: Bug
Affects Versions: 1.10.0
Reporter: Padma Penumarthy
Assignee: Padma Penumarthy
 Fix For: 1.11.0


Create a configuration file for distribution specific settings 
"drill-distrib.conf". 
This will be used to add distribution specific configuration. 
The order in which configuration gets loaded and overriden is 
"drill-default.conf", per module configuration files "drill-module.conf", 
"drill-distrib.conf" and "drill-override.conf".




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-05-31 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r119500755
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, 
HashTableConfig htConfig, Fragme
   }
 }
 
-ChainedHashTable ht =
+spillSet = new SpillSet(context,hashAggrConfig, 
UserBitShared.CoreOperatorType.HASH_AGGREGATE);
+baseHashTable =
 new ChainedHashTable(htConfig, context, allocator, incoming, null 
/* no incoming probe */, outgoing);
-this.htable = ht.createAndSetupHashTable(groupByOutFieldIds);
-
+this.groupByOutFieldIds = groupByOutFieldIds; // retain these for 
delayedSetup, and to allow recreating hash tables (after a spill)
 numGroupByOutFields = groupByOutFieldIds.length;
-batchHolders = new ArrayList();
-// First BatchHolder is created when the first put request is received.
 
 doSetup(incoming);
   }
 
+  /**
+   *  Delayed setup are the parts from setup() that can only be set after 
actual data arrives in incoming
+   *  This data is used to compute the number of partitions.
+   */
+  private void delayedSetup() {
+
+// Set the number of partitions from the configuration (raise to a 
power of two, if needed)
+numPartitions = 
context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY);
+if ( numPartitions == 1 ) {
+  canSpill = false;
+  logger.warn("Spilling was disabled");
+}
+while (Integer.bitCount(numPartitions) > 1) { // in case not a power 
of 2
+  numPartitions++;
+}
+if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an 
empty batch
+else {
+  // Estimate the max batch size; should use actual data (e.g. lengths 
of varchars)
+  updateEstMaxBatchSize(incoming);
+}
+long memAvail = memoryLimit - allocator.getAllocatedMemory();
+if ( !canSpill ) { // single phase, or spill disabled by configuation
+  numPartitions = 1; // single phase should use only a single 
partition (to save memory)
+} else { // two phase
+  // Adjust down the number of partitions if needed - when the memory 
available can not hold as
+  // many batches (configurable option), plus overhead (e.g. hash 
table, links, hash values))
+  while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 
8 * 1024 * 1024) > memAvail ) {
+numPartitions /= 2;
+if ( numPartitions < 2) {
+  if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at 
least 2 to make progress
+  break;
+}
+  }
+}
+logger.trace("{} phase. Number of partitions chosen: {}. {} spill", 
isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",
+numPartitions, canSpill ? "Can" : "Cannot");
+
+// The following initial safety check should be revisited once we can 
lower the number of rows in a batch
+// In cases of very tight memory -- need at least memory to process 
one batch, plus overhead (e.g. hash table)
+if ( numPartitions == 1 ) {
+  // if too little memory - behave like the old code -- no memory 
limit for hash aggregate
+  allocator.setLimit(10_000_000_000L);
+}
+// Based on the number of partitions: Set the mask and bit count
+partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
+bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
+
+// Create arrays (one entry per partition)
+htables = new HashTable[numPartitions] ;
+batchHolders = (ArrayList[]) new 
ArrayList[numPartitions] ;
+outBatchIndex = new int[numPartitions] ;
+outputStream = new OutputStream[numPartitions];
+spilledBatchesCount = new int[numPartitions];
+// spilledPaths = new Path[numPartitions];
+spillFiles = new String[numPartitions];
+spilledPartitionsList = new ArrayList();
+
+plannedBatches = numPartitions; // each partition should allocate its 
first batch
+
+// initialize every (per partition) entry in the arrays
+for (int i = 0; i < numPartitions; i++ ) {
+  try {
+this.htables[i] = 
baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions);
+this.htables[i].setMaxVarcharSize(maxColumnWidth);
+  } catch (IllegalStateException ise) {} // ignore
+  catch (Exception e) { throw new DrillRuntimeException(e); }
+  this.batchHolders[i] = new ArrayList(); // First 
BatchHolder is created when the first put request is received.
+}
+  }
+  /**
+   * get new incoming: (when reading spilled files like an "incoming")
+   * 

[GitHub] drill pull request #847: Drill 5545: Update POM to add support for running f...

2017-05-31 Thread parthchandra
GitHub user parthchandra opened a pull request:

https://github.com/apache/drill/pull/847

Drill 5545: Update POM to add support for running findbugs

Also fix some issues reported by findbugs in the Async Parquet Reader

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/parthchandra/drill DRILL-5545

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/847.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #847


commit e6eacfa3968ae45a7ab40afd213244964b626d50
Author: Parth Chandra 
Date:   2017-05-23T18:04:15Z

DRILL-5544: Update POM to add support for running findbugs

commit 5ab6189a79099361f9b4629eb422b72d7a0cbeae
Author: Parth Chandra 
Date:   2017-05-17T23:56:13Z

DRILL-5544: Fix issues reported by findbugs in Async Parquet Reader.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-05-31 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r119481421
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, 
HashTableConfig htConfig, Fragme
   }
 }
 
-ChainedHashTable ht =
+spillSet = new SpillSet(context,hashAggrConfig, 
UserBitShared.CoreOperatorType.HASH_AGGREGATE);
+baseHashTable =
 new ChainedHashTable(htConfig, context, allocator, incoming, null 
/* no incoming probe */, outgoing);
-this.htable = ht.createAndSetupHashTable(groupByOutFieldIds);
-
+this.groupByOutFieldIds = groupByOutFieldIds; // retain these for 
delayedSetup, and to allow recreating hash tables (after a spill)
 numGroupByOutFields = groupByOutFieldIds.length;
-batchHolders = new ArrayList();
-// First BatchHolder is created when the first put request is received.
 
 doSetup(incoming);
   }
 
+  /**
+   *  Delayed setup are the parts from setup() that can only be set after 
actual data arrives in incoming
+   *  This data is used to compute the number of partitions.
+   */
+  private void delayedSetup() {
+
+// Set the number of partitions from the configuration (raise to a 
power of two, if needed)
+numPartitions = 
context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY);
+if ( numPartitions == 1 ) {
+  canSpill = false;
+  logger.warn("Spilling was disabled");
+}
+while (Integer.bitCount(numPartitions) > 1) { // in case not a power 
of 2
+  numPartitions++;
+}
+if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an 
empty batch
+else {
+  // Estimate the max batch size; should use actual data (e.g. lengths 
of varchars)
+  updateEstMaxBatchSize(incoming);
+}
+long memAvail = memoryLimit - allocator.getAllocatedMemory();
+if ( !canSpill ) { // single phase, or spill disabled by configuation
+  numPartitions = 1; // single phase should use only a single 
partition (to save memory)
+} else { // two phase
+  // Adjust down the number of partitions if needed - when the memory 
available can not hold as
+  // many batches (configurable option), plus overhead (e.g. hash 
table, links, hash values))
+  while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 
8 * 1024 * 1024) > memAvail ) {
+numPartitions /= 2;
+if ( numPartitions < 2) {
+  if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at 
least 2 to make progress
+  break;
+}
+  }
+}
+logger.trace("{} phase. Number of partitions chosen: {}. {} spill", 
isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",
+numPartitions, canSpill ? "Can" : "Cannot");
+
+// The following initial safety check should be revisited once we can 
lower the number of rows in a batch
+// In cases of very tight memory -- need at least memory to process 
one batch, plus overhead (e.g. hash table)
+if ( numPartitions == 1 ) {
+  // if too little memory - behave like the old code -- no memory 
limit for hash aggregate
+  allocator.setLimit(10_000_000_000L);
+}
+// Based on the number of partitions: Set the mask and bit count
+partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
+bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
+
+// Create arrays (one entry per partition)
+htables = new HashTable[numPartitions] ;
+batchHolders = (ArrayList[]) new 
ArrayList[numPartitions] ;
+outBatchIndex = new int[numPartitions] ;
+outputStream = new OutputStream[numPartitions];
+spilledBatchesCount = new int[numPartitions];
+// spilledPaths = new Path[numPartitions];
+spillFiles = new String[numPartitions];
+spilledPartitionsList = new ArrayList();
+
+plannedBatches = numPartitions; // each partition should allocate its 
first batch
+
+// initialize every (per partition) entry in the arrays
+for (int i = 0; i < numPartitions; i++ ) {
+  try {
+this.htables[i] = 
baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions);
+this.htables[i].setMaxVarcharSize(maxColumnWidth);
+  } catch (IllegalStateException ise) {} // ignore
+  catch (Exception e) { throw new DrillRuntimeException(e); }
--- End diff --

Done: 

`  } catch (ClassTransformationException e) {
throw UserException.unsupportedError(e)
.message("Code generation error - likely an error in the code.")
 

[jira] [Created] (DRILL-5559) Incorrect query result when querying json files with schema change

2017-05-31 Thread Jinfeng Ni (JIRA)
Jinfeng Ni created DRILL-5559:
-

 Summary: Incorrect query result when querying json files with 
schema change
 Key: DRILL-5559
 URL: https://issues.apache.org/jira/browse/DRILL-5559
 Project: Apache Drill
  Issue Type: Bug
Reporter: Jinfeng Ni


Have two json files with nested structure. In the first one, `a.b` is bigint 
while in the second one, `a.b` is a float.

{code}
cat 1.json
{a:{b:100}}

cat 2.json
{a:{b:200.0}}
{code}

The following query would return wrong result for the second row. Notice that 
it's changed from 200.0 to 4641240890982006784. 

{code}
select a from dfs.tmp.t2;
++
| a  |
++
| {"b":100}  |
| {"b":4641240890982006784}  |
++
{code}

Explain plan output:
{code}
explain plan for select a from dfs.tmp.t2;
+--+--+
| text | json |
+--+--+
| 00-00Screen
00-01  Project(a=[$0])
00-02Scan(groupscan=[EasyGroupScan [selectionRoot=file:/tmp/t2, 
numFiles=2, columns=[`a`], files=[file:/tmp/t2/1.json, file:/tmp/t2/2.json]]])
{code}

If the involved operators could not handle schema change, at minimum we should 
fail the query with SchemaChangeException error, in stead of returning wrong 
query results.

Another interesting observation. If we query field `a.b` in stead of `a`, then 
Drill returns correct result.

{code}
select t.a.b from dfs.tmp.t2 t;
+-+
| EXPR$0  |
+-+
| 100 |
| 200.0   |
+-+
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] drill issue #831: DRILL-5432: Added pcap-format support

2017-05-31 Thread parthchandra
Github user parthchandra commented on the issue:

https://github.com/apache/drill/pull/831
  
The contrib directory is where we have, in the past, added new storage and 
format plugins that are new and may not have been sufficiently tested.
For this plugin, I think testing with pcap files from different sources 
would be useful. [1,2] are useful sources for data that will test boundary 
conditions. I tried on a file from [2] and got an NPE (didn't investigate the 
cause). A random sample of files from [1] worked very nicely indeed, though I 
didn't validate the output.
You might have already done this level of testing; if so, I will withdraw 
the suggestion.

[1] 
https://wiki.wireshark.org/SampleCaptures#Captures_used_in_Wireshark_testing
[2] http://www.netresec.com/?page=PcapFiles


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill issue #846: DRILL-5544: Out of heap running CTAS against text delimite...

2017-05-31 Thread paul-rogers
Github user paul-rogers commented on the issue:

https://github.com/apache/drill/pull/846
  
Another critical issue here. Direct memory is not a limitless resource, 
unfortunately. Allocating memory larger than 16 MB causes memory fragmentation 
as the allocation must come from the system, but all free memory is cached in 
Netty in 16 MB chunks. So, if the buffer size here is larger than 16 MB, the 
operation may fail with an OOM.

If this operation must buffer in memory, and must use larger than 16 MB, 
then you need something like what the original code provided. That buffered 
writer can be backed by a chain of 16 MB direct memory blocks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---