[jira] [Resolved] (DRILL-5485) Remove WebServer dependency on DrillClient

2017-06-02 Thread Jinfeng Ni (JIRA)

 [ 
https://issues.apache.org/jira/browse/DRILL-5485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jinfeng Ni resolved DRILL-5485.
---
Resolution: Fixed

Fixed in 874bf6296dcd1a42c7cf7f097c1a6b5458010cbb

> Remove WebServer dependency on DrillClient
> --
>
> Key: DRILL-5485
> URL: https://issues.apache.org/jira/browse/DRILL-5485
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Web Server
>Reporter: Sorabh Hamirwasia
>  Labels: ready-to-commit
> Fix For: 1.11.0
>
>
> With encryption support using SASL, client's won't be able to authenticate 
> using PLAIN mechanism when encryption is enabled on the cluster. Today 
> WebServer which is embedded inside Drillbit creates a DrillClient instance 
> for each WebClient session. And the WebUser is authenticated as part of 
> authentication between DrillClient instance and Drillbit using PLAIN 
> mechanism. But with encryption enabled this will fail since encryption 
> doesn't support authentication using PLAN mechanism, hence no WebClient can 
> connect to a Drillbit. There are below issues as well with this approach:
> 1) Since DrillClient is used per WebUser session this is expensive as it has 
> heavyweight RPC layer for DrillClient and all it's dependencies. 
> 2) If the Foreman for a WebUser is also selected to be a different node then 
> there will be extra hop of transferring data back to WebClient.
> To resolve all the above issue it would be better to authenticate the WebUser 
> locally using the Drillbit on which WebServer is running without creating 
> DrillClient instance. We can use the local PAMAuthenticator to authenticate 
> the user. After authentication is successful the local Drillbit can also 
> serve as the Foreman for all the queries submitted by WebUser. This can be 
> achieved by submitting the query to the local Drillbit Foreman work queue. 
> This will also remove the requirement to encrypt the channel opened between 
> WebServer (DrillClient) and selected Drillbit since with this approach there 
> won't be any physical channel opened between them.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] drill pull request #826: DRILL-5379: Set Hdfs Block Size based on Parquet Bl...

2017-06-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/826


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #829: DRILL-5485: Remove WebServer dependency on DrillCli...

2017-06-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/829


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #832: DRILL-5504: Vector validator to diagnose offset vec...

2017-06-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/832


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #845: DRILL-5537: Display columns alias for queries with ...

2017-06-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/845


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #838: DRILL-5512: Standardize error handling in ScanBatch

2017-06-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/838


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #843: DRILL-5533: Fix flag assignment in FunctionInitiali...

2017-06-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/843


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-06-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/789


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #828: DRILL-5229: update kudu-client to 1.3.0

2017-06-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/828


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #834: DRILL-5481: Allow to persist profiles in-memory onl...

2017-06-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/834


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #818: DRILL-5140: Fix CompileException in run-time genera...

2017-06-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/818


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-06-02 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r119977034
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -400,114 +782,411 @@ public IterOutcome getOutcome() {
 
   @Override
   public int getOutputCount() {
-// return outputCount;
 return lastBatchOutputCount;
   }
 
   @Override
   public void cleanup() {
-if (htable != null) {
-  htable.clear();
-  htable = null;
-}
+  if ( schema == null ) { return; } // not set up; nothing to clean
+  for ( int i = 0; i < numPartitions; i++) {
+  if (htables[i] != null) {
+  htables[i].clear();
+  htables[i] = null;
+  }
+  if ( batchHolders[i] != null) {
+  for (BatchHolder bh : batchHolders[i]) {
+bh.clear();
+  }
+  batchHolders[i].clear();
+  batchHolders[i] = null;
+  }
+
+  // delete any (still active) output spill file
+  if ( outputStream[i] != null && spillFiles[i] != null) {
+try {
+  spillSet.delete(spillFiles[i]);
+} catch(IOException e) {
+  logger.warn("Cleanup: Failed to delete spill file 
{}",spillFiles[i]);
+}
+  }
+  }
+  // delete any spill file left in unread spilled partitions
+  while ( ! spilledPartitionsList.isEmpty() ) {
+SpilledPartition sp = spilledPartitionsList.remove(0);
+try {
+  spillSet.delete(sp.spillFile);
+} catch(IOException e) {
+  logger.warn("Cleanup: Failed to delete spill file 
{}",sp.spillFile);
+}
+  }
+  spillSet.close(); // delete the spill directory(ies)
 htIdxHolder = null;
 materializedValueFields = null;
 outStartIdxHolder = null;
 outNumRecordsHolder = null;
+  }
 
-if (batchHolders != null) {
-  for (BatchHolder bh : batchHolders) {
+  // First free the memory used by the given (spilled) partition (i.e., 
hash table plus batches)
+  // then reallocate them in pristine state to allow the partition to 
continue receiving rows
+  private void reinitPartition(int part) throws SchemaChangeException, 
ClassTransformationException, IOException {
+assert htables[part] != null;
+htables[part].reset();
+if ( batchHolders[part] != null) {
+  for (BatchHolder bh : batchHolders[part]) {
 bh.clear();
   }
-  batchHolders.clear();
-  batchHolders = null;
+  batchHolders[part].clear();
 }
+batchHolders[part] = new ArrayList(); // First 
BatchHolder is created when the first put request is received.
   }
 
-//  private final AggOutcome setOkAndReturn() {
-//this.outcome = IterOutcome.OK;
-//for (VectorWrapper v : outgoing) {
-//  v.getValueVector().getMutator().setValueCount(outputCount);
-//}
-//return AggOutcome.RETURN_OUTCOME;
-//  }
 
   private final void incIndex() {
 underlyingIndex++;
 if (underlyingIndex >= incoming.getRecordCount()) {
   currentIndex = Integer.MAX_VALUE;
   return;
 }
-currentIndex = getVectorIndex(underlyingIndex);
+try { currentIndex = getVectorIndex(underlyingIndex); }
+catch (SchemaChangeException sc) { throw new 
DrillRuntimeException(sc);}
   }
 
   private final void resetIndex() {
 underlyingIndex = -1;
 incIndex();
   }
 
-  private void addBatchHolder() {
+  private boolean isSpilled(int part) {
+return outputStream[part] != null;
+  }
+  /**
+   * Which partition to choose for flushing out (i.e. spill or return) ?
+   * - The current partition (to which a new bach holder is added) has a 
priority,
+   *   because its last batch holder is full.
+   * - Also the largest prior spilled partition has some priority, as it 
is already spilled;
+   *   but spilling too few rows (e.g. a single batch) gets us nothing.
+   * - So the largest non-spilled partition has some priority, to get more 
memory freed.
+   * Need to weigh the above three options.
+   *
+   *  @param currPart - The partition that hit the memory limit (gets a 
priority)
+   *  @return The partition (number) chosen to be spilled
+   */
+  private int chooseAPartitionToFlush(int currPart) {
+if ( ! is2ndPhase ) { return currPart; } // 1st phase: just use the 
current partition
+int currPartSize = batchHolders[currPart].size();
+if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current 

[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-06-02 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r119975302
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -400,114 +782,411 @@ public IterOutcome getOutcome() {
 
   @Override
   public int getOutputCount() {
-// return outputCount;
 return lastBatchOutputCount;
   }
 
   @Override
   public void cleanup() {
-if (htable != null) {
-  htable.clear();
-  htable = null;
-}
+  if ( schema == null ) { return; } // not set up; nothing to clean
+  for ( int i = 0; i < numPartitions; i++) {
+  if (htables[i] != null) {
+  htables[i].clear();
+  htables[i] = null;
+  }
+  if ( batchHolders[i] != null) {
+  for (BatchHolder bh : batchHolders[i]) {
+bh.clear();
+  }
+  batchHolders[i].clear();
+  batchHolders[i] = null;
+  }
+
+  // delete any (still active) output spill file
+  if ( outputStream[i] != null && spillFiles[i] != null) {
+try {
+  spillSet.delete(spillFiles[i]);
+} catch(IOException e) {
+  logger.warn("Cleanup: Failed to delete spill file 
{}",spillFiles[i]);
+}
+  }
+  }
+  // delete any spill file left in unread spilled partitions
+  while ( ! spilledPartitionsList.isEmpty() ) {
+SpilledPartition sp = spilledPartitionsList.remove(0);
+try {
+  spillSet.delete(sp.spillFile);
+} catch(IOException e) {
+  logger.warn("Cleanup: Failed to delete spill file 
{}",sp.spillFile);
+}
+  }
+  spillSet.close(); // delete the spill directory(ies)
 htIdxHolder = null;
 materializedValueFields = null;
 outStartIdxHolder = null;
 outNumRecordsHolder = null;
+  }
 
-if (batchHolders != null) {
-  for (BatchHolder bh : batchHolders) {
+  // First free the memory used by the given (spilled) partition (i.e., 
hash table plus batches)
+  // then reallocate them in pristine state to allow the partition to 
continue receiving rows
+  private void reinitPartition(int part) throws SchemaChangeException, 
ClassTransformationException, IOException {
+assert htables[part] != null;
+htables[part].reset();
+if ( batchHolders[part] != null) {
+  for (BatchHolder bh : batchHolders[part]) {
 bh.clear();
   }
-  batchHolders.clear();
-  batchHolders = null;
+  batchHolders[part].clear();
 }
+batchHolders[part] = new ArrayList(); // First 
BatchHolder is created when the first put request is received.
   }
 
-//  private final AggOutcome setOkAndReturn() {
-//this.outcome = IterOutcome.OK;
-//for (VectorWrapper v : outgoing) {
-//  v.getValueVector().getMutator().setValueCount(outputCount);
-//}
-//return AggOutcome.RETURN_OUTCOME;
-//  }
 
   private final void incIndex() {
 underlyingIndex++;
 if (underlyingIndex >= incoming.getRecordCount()) {
   currentIndex = Integer.MAX_VALUE;
   return;
 }
-currentIndex = getVectorIndex(underlyingIndex);
+try { currentIndex = getVectorIndex(underlyingIndex); }
+catch (SchemaChangeException sc) { throw new 
DrillRuntimeException(sc);}
   }
 
   private final void resetIndex() {
 underlyingIndex = -1;
 incIndex();
   }
 
-  private void addBatchHolder() {
+  private boolean isSpilled(int part) {
+return outputStream[part] != null;
+  }
+  /**
+   * Which partition to choose for flushing out (i.e. spill or return) ?
+   * - The current partition (to which a new bach holder is added) has a 
priority,
+   *   because its last batch holder is full.
+   * - Also the largest prior spilled partition has some priority, as it 
is already spilled;
+   *   but spilling too few rows (e.g. a single batch) gets us nothing.
+   * - So the largest non-spilled partition has some priority, to get more 
memory freed.
+   * Need to weigh the above three options.
+   *
+   *  @param currPart - The partition that hit the memory limit (gets a 
priority)
+   *  @return The partition (number) chosen to be spilled
+   */
+  private int chooseAPartitionToFlush(int currPart) {
+if ( ! is2ndPhase ) { return currPart; } // 1st phase: just use the 
current partition
+int currPartSize = batchHolders[currPart].size();
+if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current 

[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-06-02 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r119975022
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -400,114 +782,411 @@ public IterOutcome getOutcome() {
 
   @Override
   public int getOutputCount() {
-// return outputCount;
 return lastBatchOutputCount;
   }
 
   @Override
   public void cleanup() {
-if (htable != null) {
-  htable.clear();
-  htable = null;
-}
+  if ( schema == null ) { return; } // not set up; nothing to clean
+  for ( int i = 0; i < numPartitions; i++) {
+  if (htables[i] != null) {
+  htables[i].clear();
+  htables[i] = null;
+  }
+  if ( batchHolders[i] != null) {
+  for (BatchHolder bh : batchHolders[i]) {
+bh.clear();
+  }
+  batchHolders[i].clear();
+  batchHolders[i] = null;
+  }
+
+  // delete any (still active) output spill file
+  if ( outputStream[i] != null && spillFiles[i] != null) {
+try {
+  spillSet.delete(spillFiles[i]);
+} catch(IOException e) {
+  logger.warn("Cleanup: Failed to delete spill file 
{}",spillFiles[i]);
+}
+  }
+  }
+  // delete any spill file left in unread spilled partitions
+  while ( ! spilledPartitionsList.isEmpty() ) {
+SpilledPartition sp = spilledPartitionsList.remove(0);
+try {
+  spillSet.delete(sp.spillFile);
+} catch(IOException e) {
+  logger.warn("Cleanup: Failed to delete spill file 
{}",sp.spillFile);
+}
+  }
+  spillSet.close(); // delete the spill directory(ies)
 htIdxHolder = null;
 materializedValueFields = null;
 outStartIdxHolder = null;
 outNumRecordsHolder = null;
+  }
 
-if (batchHolders != null) {
-  for (BatchHolder bh : batchHolders) {
+  // First free the memory used by the given (spilled) partition (i.e., 
hash table plus batches)
+  // then reallocate them in pristine state to allow the partition to 
continue receiving rows
+  private void reinitPartition(int part) throws SchemaChangeException, 
ClassTransformationException, IOException {
+assert htables[part] != null;
+htables[part].reset();
+if ( batchHolders[part] != null) {
+  for (BatchHolder bh : batchHolders[part]) {
 bh.clear();
   }
-  batchHolders.clear();
-  batchHolders = null;
+  batchHolders[part].clear();
 }
+batchHolders[part] = new ArrayList(); // First 
BatchHolder is created when the first put request is received.
   }
 
-//  private final AggOutcome setOkAndReturn() {
-//this.outcome = IterOutcome.OK;
-//for (VectorWrapper v : outgoing) {
-//  v.getValueVector().getMutator().setValueCount(outputCount);
-//}
-//return AggOutcome.RETURN_OUTCOME;
-//  }
 
   private final void incIndex() {
 underlyingIndex++;
 if (underlyingIndex >= incoming.getRecordCount()) {
   currentIndex = Integer.MAX_VALUE;
   return;
 }
-currentIndex = getVectorIndex(underlyingIndex);
+try { currentIndex = getVectorIndex(underlyingIndex); }
+catch (SchemaChangeException sc) { throw new 
DrillRuntimeException(sc);}
   }
 
   private final void resetIndex() {
 underlyingIndex = -1;
 incIndex();
   }
 
-  private void addBatchHolder() {
+  private boolean isSpilled(int part) {
+return outputStream[part] != null;
+  }
+  /**
+   * Which partition to choose for flushing out (i.e. spill or return) ?
+   * - The current partition (to which a new bach holder is added) has a 
priority,
+   *   because its last batch holder is full.
+   * - Also the largest prior spilled partition has some priority, as it 
is already spilled;
+   *   but spilling too few rows (e.g. a single batch) gets us nothing.
+   * - So the largest non-spilled partition has some priority, to get more 
memory freed.
+   * Need to weigh the above three options.
+   *
+   *  @param currPart - The partition that hit the memory limit (gets a 
priority)
+   *  @return The partition (number) chosen to be spilled
+   */
+  private int chooseAPartitionToFlush(int currPart) {
+if ( ! is2ndPhase ) { return currPart; } // 1st phase: just use the 
current partition
+int currPartSize = batchHolders[currPart].size();
+if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current 

[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-06-02 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r119974724
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -400,114 +782,411 @@ public IterOutcome getOutcome() {
 
   @Override
   public int getOutputCount() {
-// return outputCount;
 return lastBatchOutputCount;
   }
 
   @Override
   public void cleanup() {
-if (htable != null) {
-  htable.clear();
-  htable = null;
-}
+  if ( schema == null ) { return; } // not set up; nothing to clean
+  for ( int i = 0; i < numPartitions; i++) {
+  if (htables[i] != null) {
+  htables[i].clear();
+  htables[i] = null;
+  }
+  if ( batchHolders[i] != null) {
+  for (BatchHolder bh : batchHolders[i]) {
+bh.clear();
+  }
+  batchHolders[i].clear();
+  batchHolders[i] = null;
+  }
+
+  // delete any (still active) output spill file
+  if ( outputStream[i] != null && spillFiles[i] != null) {
+try {
+  spillSet.delete(spillFiles[i]);
+} catch(IOException e) {
+  logger.warn("Cleanup: Failed to delete spill file 
{}",spillFiles[i]);
+}
+  }
+  }
+  // delete any spill file left in unread spilled partitions
+  while ( ! spilledPartitionsList.isEmpty() ) {
+SpilledPartition sp = spilledPartitionsList.remove(0);
+try {
+  spillSet.delete(sp.spillFile);
+} catch(IOException e) {
+  logger.warn("Cleanup: Failed to delete spill file 
{}",sp.spillFile);
+}
+  }
+  spillSet.close(); // delete the spill directory(ies)
 htIdxHolder = null;
 materializedValueFields = null;
 outStartIdxHolder = null;
 outNumRecordsHolder = null;
+  }
 
-if (batchHolders != null) {
-  for (BatchHolder bh : batchHolders) {
+  // First free the memory used by the given (spilled) partition (i.e., 
hash table plus batches)
+  // then reallocate them in pristine state to allow the partition to 
continue receiving rows
+  private void reinitPartition(int part) throws SchemaChangeException, 
ClassTransformationException, IOException {
+assert htables[part] != null;
+htables[part].reset();
+if ( batchHolders[part] != null) {
+  for (BatchHolder bh : batchHolders[part]) {
 bh.clear();
   }
-  batchHolders.clear();
-  batchHolders = null;
+  batchHolders[part].clear();
 }
+batchHolders[part] = new ArrayList(); // First 
BatchHolder is created when the first put request is received.
   }
 
-//  private final AggOutcome setOkAndReturn() {
-//this.outcome = IterOutcome.OK;
-//for (VectorWrapper v : outgoing) {
-//  v.getValueVector().getMutator().setValueCount(outputCount);
-//}
-//return AggOutcome.RETURN_OUTCOME;
-//  }
 
   private final void incIndex() {
 underlyingIndex++;
 if (underlyingIndex >= incoming.getRecordCount()) {
   currentIndex = Integer.MAX_VALUE;
   return;
 }
-currentIndex = getVectorIndex(underlyingIndex);
+try { currentIndex = getVectorIndex(underlyingIndex); }
+catch (SchemaChangeException sc) { throw new 
DrillRuntimeException(sc);}
   }
 
   private final void resetIndex() {
 underlyingIndex = -1;
 incIndex();
   }
 
-  private void addBatchHolder() {
+  private boolean isSpilled(int part) {
+return outputStream[part] != null;
+  }
+  /**
+   * Which partition to choose for flushing out (i.e. spill or return) ?
+   * - The current partition (to which a new bach holder is added) has a 
priority,
+   *   because its last batch holder is full.
+   * - Also the largest prior spilled partition has some priority, as it 
is already spilled;
+   *   but spilling too few rows (e.g. a single batch) gets us nothing.
+   * - So the largest non-spilled partition has some priority, to get more 
memory freed.
+   * Need to weigh the above three options.
+   *
+   *  @param currPart - The partition that hit the memory limit (gets a 
priority)
+   *  @return The partition (number) chosen to be spilled
+   */
+  private int chooseAPartitionToFlush(int currPart) {
+if ( ! is2ndPhase ) { return currPart; } // 1st phase: just use the 
current partition
+int currPartSize = batchHolders[currPart].size();
+if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current 

[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-06-02 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r119973491
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -400,114 +782,411 @@ public IterOutcome getOutcome() {
 
   @Override
   public int getOutputCount() {
-// return outputCount;
 return lastBatchOutputCount;
   }
 
   @Override
   public void cleanup() {
-if (htable != null) {
-  htable.clear();
-  htable = null;
-}
+  if ( schema == null ) { return; } // not set up; nothing to clean
+  for ( int i = 0; i < numPartitions; i++) {
+  if (htables[i] != null) {
+  htables[i].clear();
+  htables[i] = null;
+  }
+  if ( batchHolders[i] != null) {
+  for (BatchHolder bh : batchHolders[i]) {
+bh.clear();
+  }
+  batchHolders[i].clear();
+  batchHolders[i] = null;
+  }
+
+  // delete any (still active) output spill file
+  if ( outputStream[i] != null && spillFiles[i] != null) {
+try {
+  spillSet.delete(spillFiles[i]);
+} catch(IOException e) {
+  logger.warn("Cleanup: Failed to delete spill file 
{}",spillFiles[i]);
+}
+  }
+  }
+  // delete any spill file left in unread spilled partitions
+  while ( ! spilledPartitionsList.isEmpty() ) {
+SpilledPartition sp = spilledPartitionsList.remove(0);
+try {
+  spillSet.delete(sp.spillFile);
+} catch(IOException e) {
+  logger.warn("Cleanup: Failed to delete spill file 
{}",sp.spillFile);
+}
+  }
+  spillSet.close(); // delete the spill directory(ies)
 htIdxHolder = null;
 materializedValueFields = null;
 outStartIdxHolder = null;
 outNumRecordsHolder = null;
+  }
 
-if (batchHolders != null) {
-  for (BatchHolder bh : batchHolders) {
+  // First free the memory used by the given (spilled) partition (i.e., 
hash table plus batches)
+  // then reallocate them in pristine state to allow the partition to 
continue receiving rows
+  private void reinitPartition(int part) throws SchemaChangeException, 
ClassTransformationException, IOException {
+assert htables[part] != null;
+htables[part].reset();
+if ( batchHolders[part] != null) {
+  for (BatchHolder bh : batchHolders[part]) {
 bh.clear();
   }
-  batchHolders.clear();
-  batchHolders = null;
+  batchHolders[part].clear();
 }
+batchHolders[part] = new ArrayList(); // First 
BatchHolder is created when the first put request is received.
   }
 
-//  private final AggOutcome setOkAndReturn() {
-//this.outcome = IterOutcome.OK;
-//for (VectorWrapper v : outgoing) {
-//  v.getValueVector().getMutator().setValueCount(outputCount);
-//}
-//return AggOutcome.RETURN_OUTCOME;
-//  }
 
   private final void incIndex() {
 underlyingIndex++;
 if (underlyingIndex >= incoming.getRecordCount()) {
   currentIndex = Integer.MAX_VALUE;
   return;
 }
-currentIndex = getVectorIndex(underlyingIndex);
+try { currentIndex = getVectorIndex(underlyingIndex); }
+catch (SchemaChangeException sc) { throw new 
DrillRuntimeException(sc);}
   }
 
   private final void resetIndex() {
 underlyingIndex = -1;
 incIndex();
   }
 
-  private void addBatchHolder() {
+  private boolean isSpilled(int part) {
+return outputStream[part] != null;
+  }
+  /**
+   * Which partition to choose for flushing out (i.e. spill or return) ?
+   * - The current partition (to which a new bach holder is added) has a 
priority,
+   *   because its last batch holder is full.
+   * - Also the largest prior spilled partition has some priority, as it 
is already spilled;
+   *   but spilling too few rows (e.g. a single batch) gets us nothing.
+   * - So the largest non-spilled partition has some priority, to get more 
memory freed.
+   * Need to weigh the above three options.
+   *
+   *  @param currPart - The partition that hit the memory limit (gets a 
priority)
+   *  @return The partition (number) chosen to be spilled
+   */
+  private int chooseAPartitionToFlush(int currPart) {
+if ( ! is2ndPhase ) { return currPart; } // 1st phase: just use the 
current partition
+int currPartSize = batchHolders[currPart].size();
+if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current 

[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-06-02 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r119954158
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -400,114 +782,411 @@ public IterOutcome getOutcome() {
 
   @Override
   public int getOutputCount() {
-// return outputCount;
 return lastBatchOutputCount;
   }
 
   @Override
   public void cleanup() {
-if (htable != null) {
-  htable.clear();
-  htable = null;
-}
+  if ( schema == null ) { return; } // not set up; nothing to clean
+  for ( int i = 0; i < numPartitions; i++) {
+  if (htables[i] != null) {
+  htables[i].clear();
+  htables[i] = null;
+  }
+  if ( batchHolders[i] != null) {
+  for (BatchHolder bh : batchHolders[i]) {
+bh.clear();
+  }
+  batchHolders[i].clear();
+  batchHolders[i] = null;
+  }
+
+  // delete any (still active) output spill file
+  if ( outputStream[i] != null && spillFiles[i] != null) {
+try {
+  spillSet.delete(spillFiles[i]);
--- End diff --

Concurrent open files: While spilling, there is one per each (non-pristine) 
spilling partition (yes, can be as high as 16, or even 32). Afterwards, they 
are all closed; then for reading, each one gets opened; and though we process 
one partition at a time, closing of all is postponed to the end, as the 
processing code is unaware that the "incoming" actually comes from a spill 
file. 
About the limits: Seems that current defaults (e.g. 64K open files per 
process) can serve us well for the foreseeable future. Intel just announced the 
i9, where the top of the line CPU has 18 cores. Hence 1000s of concurrent 
active same-process threads are not feasible anytime soon (think about context 
switching). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate

2017-06-02 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r119947814
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -400,114 +782,411 @@ public IterOutcome getOutcome() {
 
   @Override
   public int getOutputCount() {
-// return outputCount;
 return lastBatchOutputCount;
   }
 
   @Override
   public void cleanup() {
-if (htable != null) {
-  htable.clear();
-  htable = null;
-}
+  if ( schema == null ) { return; } // not set up; nothing to clean
+  for ( int i = 0; i < numPartitions; i++) {
+  if (htables[i] != null) {
+  htables[i].clear();
+  htables[i] = null;
+  }
+  if ( batchHolders[i] != null) {
+  for (BatchHolder bh : batchHolders[i]) {
+bh.clear();
+  }
+  batchHolders[i].clear();
+  batchHolders[i] = null;
+  }
+
+  // delete any (still active) output spill file
+  if ( outputStream[i] != null && spillFiles[i] != null) {
+try {
+  spillSet.delete(spillFiles[i]);
--- End diff --

This is HashAgg closing time, so GC probably does it; anyway won't hurt -- 
added a close call. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Upgrading Calcite's version

2017-06-02 Thread Jinfeng Ni
DRILL-3993 is the JIRA for 2nd option.



1. https://issues.apache.org/jira/browse/DRILL-3993



On Fri, Jun 2, 2017 at 11:26 AM, rahul challapalli <
challapallira...@gmail.com> wrote:

> Yes, drill has its own fork of calcite. You have 2 options here
>
> 1. Hand pick the specific changes from calcite into drill
> 2. Upgrade drill to use the latest calcite version
>
> I believe there is already an on-going effort to upgrade drill to use the
> latest version of calcite. I couldn't find the relevant jira though.
>
> - Rahul
>
> On Fri, Jun 2, 2017 at 8:51 AM, Muhammad Gelbana 
> wrote:
>
> > Was the currently used version of Calcite (Based on v1.4 ?) modified in
> > anyway before it was used in building Drill ?
> >
> > I'm considering creating a new build of Drill with the latest version of
> > Calcite and I need to understand the amount of effort needed.
> >
> > The reason I want to do that is that I need a feature that exists in a
> more
> > recent version of Calcite, which is pushing down aggregates without using
> > subqueries.
> >
> > *-*
> > *Muhammad Gelbana*
> > http://www.linkedin.com/in/mgelbana
> >
>


Re: [External] Re: UNORDERED_RECEIVER taking 70% of query time

2017-06-02 Thread Kunal Khatua
Hi Jasbir


I don't think the Apache mailing lists allows you to send attachments, except 
may be text files. (The txt file made it through).


In your Operator Profile, you'll see two columns... %Fragment Time and 
%QueryTime

Taking your mouse over those table headers should show you a description of the 
two.


%Fragment time is the fraction of time spent by threads of that Major Fragment 
for a specific operator. This simply means which operator did the threads of a 
major fragment spend most time on.


%QueryTime is teh fraction of time spent by the threads of ALL the Major 
fragments for a specific operator. This simply means which operator, 
implicitly, consumed the most CPU resources.


>From the latter, it appears that the HashJoin (03-xx-04) and Parquet Scan 
>(03-xx-06) are the biggest bottlenecks. THe unordered receiver is not the 
>bottleneck in the query.


~ Kunal




From: jasbir.s...@accenture.com 
Sent: Friday, June 2, 2017 12:13:21 AM
To: u...@drill.apache.org; dev@drill.apache.org
Cc: maneesh.koth...@accenture.com; nitin.a.sar...@accenture.com; 
h.p.ku...@accenture.com
Subject: RE: [External] Re: UNORDERED_RECEIVER taking 70% of query time

Hi,

Please find the attached query profile.

I am running Drill in local mode on my laptop with default memory allocation to 
Apache Drill.

Let me know if you are not able to find the attachment.

Also, sending the file in RAR format.

Regards,
Jasbir Singh


-Original Message-
From: Abhishek Girish [mailto:agir...@apache.org]
Sent: Friday, June 02, 2017 11:00 AM
To: u...@drill.apache.org
Subject: [External] Re: UNORDERED_RECEIVER taking 70% of query time

Attachment hasn't come through. Can you upload the query profile to some cloud 
storage and share a link to it?

Also, please share details on how large your dataset is, number of Drillbits, 
memory and other configurations.


On Thu, Jun 1, 2017 at 10:18 PM,  wrote:

> Hi,
>
>
>
> I am running a simple query which performs JOIN operation between two
> parquet files and it takes around 3-4 secs and I noticed that 70% of
> the time is used by UNORDERED_RECEIVER.
>
>
>
> Sample query is –
>
>
>
> select sum(sales),week from dfs.`C:\parquet-location\
> F8894180-AFFB-4803-B8CF-CCF883AA5AAF-Search_Snapshot_Data.parquet`
> where model_component_id in(
>
> select model_component_id from
> dfs.`C:\parquet-location\poc48k.parquet`)
> group by week
>
>
>
>
>
> Can we somehow reduce unordered receiver time?
>
>
>
> Please find the below screenshot of Visualized plan
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you
> have received it in error, please notify the sender immediately and
> delete the original. Any other use of the e-mail by you is prohibited.
> Where allowed by local law, electronic communications with Accenture
> and its affiliates, including e-mail and instant messaging (including
> content), may be scanned by our systems for the purposes of
> information security and assessment of internal compliance with Accenture 
> policy.
> 
> __
>
> www.accenture.com
>



This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security and assessment of internal compliance with Accenture 
policy.
__

www.accenture.com


Re: Upgrading Calcite's version

2017-06-02 Thread rahul challapalli
Yes, drill has its own fork of calcite. You have 2 options here

1. Hand pick the specific changes from calcite into drill
2. Upgrade drill to use the latest calcite version

I believe there is already an on-going effort to upgrade drill to use the
latest version of calcite. I couldn't find the relevant jira though.

- Rahul

On Fri, Jun 2, 2017 at 8:51 AM, Muhammad Gelbana 
wrote:

> Was the currently used version of Calcite (Based on v1.4 ?) modified in
> anyway before it was used in building Drill ?
>
> I'm considering creating a new build of Drill with the latest version of
> Calcite and I need to understand the amount of effort needed.
>
> The reason I want to do that is that I need a feature that exists in a more
> recent version of Calcite, which is pushing down aggregates without using
> subqueries.
>
> *-*
> *Muhammad Gelbana*
> http://www.linkedin.com/in/mgelbana
>


Upgrading Calcite's version

2017-06-02 Thread Muhammad Gelbana
Was the currently used version of Calcite (Based on v1.4 ?) modified in
anyway before it was used in building Drill ?

I'm considering creating a new build of Drill with the latest version of
Calcite and I need to understand the amount of effort needed.

The reason I want to do that is that I need a feature that exists in a more
recent version of Calcite, which is pushing down aggregates without using
subqueries.

*-*
*Muhammad Gelbana*
http://www.linkedin.com/in/mgelbana