[jira] [Resolved] (DRILL-5485) Remove WebServer dependency on DrillClient

2017-06-02 Thread Jinfeng Ni (JIRA)

 [ 
https://issues.apache.org/jira/browse/DRILL-5485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jinfeng Ni resolved DRILL-5485.
---
Resolution: Fixed

Fixed in 874bf6296dcd1a42c7cf7f097c1a6b5458010cbb

> Remove WebServer dependency on DrillClient
> --
>
> Key: DRILL-5485
> URL: https://issues.apache.org/jira/browse/DRILL-5485
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Web Server
>Reporter: Sorabh Hamirwasia
>  Labels: ready-to-commit
> Fix For: 1.11.0
>
>
> With encryption support using SASL, client's won't be able to authenticate 
> using PLAIN mechanism when encryption is enabled on the cluster. Today 
> WebServer which is embedded inside Drillbit creates a DrillClient instance 
> for each WebClient session. And the WebUser is authenticated as part of 
> authentication between DrillClient instance and Drillbit using PLAIN 
> mechanism. But with encryption enabled this will fail since encryption 
> doesn't support authentication using PLAN mechanism, hence no WebClient can 
> connect to a Drillbit. There are below issues as well with this approach:
> 1) Since DrillClient is used per WebUser session this is expensive as it has 
> heavyweight RPC layer for DrillClient and all it's dependencies. 
> 2) If the Foreman for a WebUser is also selected to be a different node then 
> there will be extra hop of transferring data back to WebClient.
> To resolve all the above issue it would be better to authenticate the WebUser 
> locally using the Drillbit on which WebServer is running without creating 
> DrillClient instance. We can use the local PAMAuthenticator to authenticate 
> the user. After authentication is successful the local Drillbit can also 
> serve as the Foreman for all the queries submitted by WebUser. This can be 
> achieved by submitting the query to the local Drillbit Foreman work queue. 
> This will also remove the requirement to encrypt the channel opened between 
> WebServer (DrillClient) and selected Drillbit since with this approach there 
> won't be any physical channel opened between them.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (DRILL-5229) Upgrade kudu client to org.apache.kudu:kudu-client:1.2.0

2017-06-02 Thread Jinfeng Ni (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035827#comment-16035827
 ] 

Jinfeng Ni commented on DRILL-5229:
---

Committer is [~sudheeshkatkam] (lost the trace after rebasing).  

> Upgrade kudu client to org.apache.kudu:kudu-client:1.2.0 
> -
>
> Key: DRILL-5229
> URL: https://issues.apache.org/jira/browse/DRILL-5229
> Project: Apache Drill
>  Issue Type: Bug
>  Components: Storage - Other
>Affects Versions: 1.8.0
>Reporter: Rahul Raj
>Assignee: Sudheesh Katkam
>  Labels: ready-to-commit
> Fix For: 1.11.0
>
>
> Getting an error -" out-of-order key" for a query select v,count(k) from
> kudu.test group by v where k is the primary key. This happens only when the
> aggregation is done on primary key. Should drill move to the latest kudu
> client to investigate this further?
> Current drill kudu connector uses org.kududb:kudu-client:0.6.0 from
> cloudera repository, where the latest released library
> org.apache.kudu:kudu-client:1.2.0 is hosted on maven central. There are a
> few breaking changes with the new library:
>1. TIMESTAMP renamed to UNIXTIME_MICROS
>2. In KuduRecordReader#setup -
>KuduScannerBuilder#lowerBoundPartitionKeyRaw renamed to lowerBoundRaw
>andKuduScannerBuilder#exclusiveUpperBoundPartitionKeyRaw renamed
>exclusiveUpperBoundRaw. Both methods are deprecated.
>3. In KuduRecordWriterImpl#updateSchema - client.createTable(name,
>kuduSchema) requires CreateTableOperatios as the third argument



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (DRILL-5356) Refactor Parquet Record Reader

2017-06-02 Thread Jinfeng Ni (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035823#comment-16035823
 ] 

Jinfeng Ni commented on DRILL-5356:
---

Committer is [~sudheeshkatkam] (lost the trace after rebasing).  

> Refactor Parquet Record Reader
> --
>
> Key: DRILL-5356
> URL: https://issues.apache.org/jira/browse/DRILL-5356
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>  Labels: ready-to-commit
> Fix For: 1.11.0
>
>
> The Parquet record reader class is a key part of Drill that has evolved over 
> time to become somewhat hard to follow.
> A number of us are working on Parquet-related tasks and find we have to spend 
> an uncomfortable amount of time trying to understand the code. In particular, 
> this writer needs to figure out how to convince the reader to provide 
> higher-density record batches.
> Rather than continue to decypher the complex code multiple times, this ticket 
> requests to refactor the code to make it functionally identical, but 
> structurally cleaner. The result will be faster time to value when working 
> with this code.
> This is a lower-priority change and will be coordinated with others working 
> on this code base. This ticket is only for the record reader class itself; it 
> does not include the various readers and writers that Parquet uses since 
> another project is actively modifying those classes.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (DRILL-5379) Set Hdfs Block Size based on Parquet Block Size

2017-06-02 Thread Jinfeng Ni (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035821#comment-16035821
 ] 

Jinfeng Ni commented on DRILL-5379:
---

Committer is [~sudheeshkatkam] (lost the trace after rebasing).  

> Set Hdfs Block Size based on Parquet Block Size
> ---
>
> Key: DRILL-5379
> URL: https://issues.apache.org/jira/browse/DRILL-5379
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Storage - Parquet
>Affects Versions: 1.9.0
>Reporter: F Méthot
>Assignee: Padma Penumarthy
>  Labels: ready-to-commit
> Fix For: Future, 1.11.0
>
>
> It seems there a way to force Drill to store CTAS generated parquet file as a 
> single block when using HDFS. Java HDFS API allows to do that, files could be 
> created with the Parquet block-size set in a session or system config.
> Since it is ideal  to have single parquet file per hdfs block.
> Here is the HDFS API that allow to do that:
> http://archive.cloudera.com/cdh4/cdh/4/hadoop/api/org/apache/hadoop/fs/FileSystem.html#create(org.apache.hadoop.fs.Path,%20boolean,%20int,%20short,%20long)
> http://archive.cloudera.com/cdh4/cdh/4/hadoop/api/org/apache/hadoop/fs/FileSystem.html#create(org.apache.hadoop.fs.Path,%20boolean,%20int,%20short,%20long)
> Drill uses the hadoop ParquetFileWriter 
> (https://github.com/Parquet/parquet-mr/blob/master/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java).
> This is where the file creation occurs so it might be tricky.
> However, ParquetRecordWriter.java 
> (https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java)
>  in Drill creates the ParquetFileWriter with an hadoop configuration object.
> something to explore: Could the block size be set as a property within the 
> Configuration object before passing it to ParquetFileWriter constructor?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (DRILL-5481) Allow Drill to persist profiles in-memory only with a max capacity

2017-06-02 Thread Jinfeng Ni (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035819#comment-16035819
 ] 

Jinfeng Ni commented on DRILL-5481:
---

Committer is [~sudheeshkatkam] (lost the trace after rebasing).  

> Allow Drill to persist profiles in-memory only with a max capacity
> --
>
> Key: DRILL-5481
> URL: https://issues.apache.org/jira/browse/DRILL-5481
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.10.0
>Reporter: Kunal Khatua
>Assignee: Kunal Khatua
>  Labels: ready-to-commit
> Fix For: 1.11.0
>
>
> To allow for fast persistence of profiles on a temporary basis (i.e. till the 
> life of the Drillbit), an existing test class 
> {{org.apache.drill.exec.testing.store.NoWriteLocalStore.java}} was refactored 
> to {{org.apache.drill.exec.store.sys.store.EphemeralPersistentStore}} and 
> given the ability to maintain a max capacity.
> This should allow query profiles to be available for as long as the Drillbit 
> process' lifespan.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (DRILL-5533) Fix flag assignment in FunctionInitializer.checkInit() method

2017-06-02 Thread Jinfeng Ni (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035816#comment-16035816
 ] 

Jinfeng Ni commented on DRILL-5533:
---

Committer is [~sudheeshkatkam] (lost the trace after rebasing).  

> Fix flag assignment in FunctionInitializer.checkInit() method
> -
>
> Key: DRILL-5533
> URL: https://issues.apache.org/jira/browse/DRILL-5533
> Project: Apache Drill
>  Issue Type: Bug
>Affects Versions: 1.10.0
>Reporter: Arina Ielchiieva
>Assignee: Arina Ielchiieva
>Priority: Minor
>  Labels: ready-to-commit
> Fix For: 1.11.0
>
>
> FunctionInitializer.checkInit() method uses DCL to ensure that function body 
> is loaded only once. But flag parameter is never updated and all threads are 
> entering synchronized block.
> Also FunctionInitializer.getImports() always returns empty list.
> https://github.com/apache/drill/blob/3e8b01d5b0d3013e3811913f0fd6028b22c1ac3f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionInitializer.java
> Changes:
> 1. Fix DCL in FunctionInitializer.checkInit() method (update flag parameter  
> when function body is loaded).
> 2. Fix ImportGrabber.getImports() method to return list with imports.
> 3. Add unit tests for FunctionInitializer.
> 4. Minor refactoring (rename methods, add javadoc).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (DRILL-5512) Standardize error handling in ScanBatch

2017-06-02 Thread Jinfeng Ni (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035817#comment-16035817
 ] 

Jinfeng Ni commented on DRILL-5512:
---

Committer is [~sudheeshkatkam] (lost the trace after rebasing).  

> Standardize error handling in ScanBatch
> ---
>
> Key: DRILL-5512
> URL: https://issues.apache.org/jira/browse/DRILL-5512
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.10.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>  Labels: ready-to-commit
> Fix For: 1.11.0
>
>
> ScanBatch is the Drill operator executor that handles most readers. Like most 
> Drill operators, it uses an ad-hoc set of error detection and reporting 
> methods that evolved over Drill development.
> This ticket asks to standardize on error handling as outlined in DRILL-5083. 
> This basically means reporting all errors as a {{UserException}} rather than 
> using the {{IterOutcome.STOP}} return status or using the 
> {{FragmentContext.fail()}} method.
> This work requires the new error codes introduced in DRILL-5511, and is a 
> step toward making readers aware of vector size limits.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (DRILL-5140) Fix CompileException in run-time generated code when record batch has large number of fields.

2017-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035807#comment-16035807
 ] 

ASF GitHub Bot commented on DRILL-5140:
---

Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/818


> Fix CompileException in run-time generated code when record batch has large 
> number of fields.
> -
>
> Key: DRILL-5140
> URL: https://issues.apache.org/jira/browse/DRILL-5140
> Project: Apache Drill
>  Issue Type: Bug
>  Components: Execution - Flow
>Affects Versions: 1.9.0
>Reporter: Khurram Faraaz
>Assignee: Volodymyr Vysotskyi
>Priority: Critical
>  Labels: ready-to-commit
> Attachments: drill_5117.q, manyColumns.csv
>
>
> CTAS that does SELECT over 5003 columns fails with CompileException: File 
> 'org.apache.drill.exec.compile.DrillJavaFileObject...
> Drill 1.9.0 git commit ID : 4c1b420b
> CTAS statement and CSV data file are attached.
> I ran test with and without setting the below system option, test failed in 
> both cases.
> alter system set `exec.java_compiler`='JDK';
> sqlline session just closes with below message, after the failing CTAS is 
> executed.
> Closing: org.apache.drill.jdbc.impl.DrillConnectionImpl
> Stack trace from drillbit.log
> {noformat}
> 2016-12-20 12:02:16,016 [27a6e241-99b1-1f2a-8a91-394f8166e969:frag:0:0] ERROR 
> o.a.d.e.w.fragment.FragmentExecutor - SYSTEM ERROR: CompileException: File 
> 'org.apache.drill.exec.compile.DrillJavaFileObject[ProjectorGen45.java]', 
> Line 11, Column 8: ProjectorGen45.java:11: error: too many constants
> public class ProjectorGen45 {
>^ (compiler.err.limit.pool)
> Fragment 0:0
> [Error Id: ced84dce-669d-47c2-b5d2-5e0559dbd9fd on centos-01.qa.lab:31010]
> org.apache.drill.common.exceptions.UserException: SYSTEM ERROR: 
> CompileException: File 
> 'org.apache.drill.exec.compile.DrillJavaFileObject[ProjectorGen45.java]', 
> Line 11, Column 8: ProjectorGen45.java:11: error: too many constants
> public class ProjectorGen45 {
>^ (compiler.err.limit.pool)
> Fragment 0:0
> [Error Id: ced84dce-669d-47c2-b5d2-5e0559dbd9fd on centos-01.qa.lab:31010]
> at 
> org.apache.drill.common.exceptions.UserException$Builder.build(UserException.java:543)
>  ~[drill-common-1.9.0.jar:1.9.0]
> at 
> org.apache.drill.exec.work.fragment.FragmentExecutor.sendFinalState(FragmentExecutor.java:293)
>  [drill-java-exec-1.9.0.jar:1.9.0]
> at 
> org.apache.drill.exec.work.fragment.FragmentExecutor.cleanup(FragmentExecutor.java:160)
>  [drill-java-exec-1.9.0.jar:1.9.0]
> at 
> org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:262)
>  [drill-java-exec-1.9.0.jar:1.9.0]
> at 
> org.apache.drill.common.SelfCleaningRunnable.run(SelfCleaningRunnable.java:38)
>  [drill-common-1.9.0.jar:1.9.0]
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  [na:1.8.0_91]
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  [na:1.8.0_91]
> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
> Caused by: org.apache.drill.exec.exception.SchemaChangeException: Failure 
> while attempting to load generated class
> at 
> org.apache.drill.exec.physical.impl.project.ProjectRecordBatch.setupNewSchema(ProjectRecordBatch.java:487)
>  ~[drill-java-exec-1.9.0.jar:1.9.0]
> at 
> org.apache.drill.exec.record.AbstractSingleRecordBatch.innerNext(AbstractSingleRecordBatch.java:78)
>  ~[drill-java-exec-1.9.0.jar:1.9.0]
> at 
> org.apache.drill.exec.physical.impl.project.ProjectRecordBatch.innerNext(ProjectRecordBatch.java:135)
>  ~[drill-java-exec-1.9.0.jar:1.9.0]
> at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:162)
>  ~[drill-java-exec-1.9.0.jar:1.9.0]
> at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:119)
>  ~[drill-java-exec-1.9.0.jar:1.9.0]
> at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:109)
>  ~[drill-java-exec-1.9.0.jar:1.9.0]
> at 
> org.apache.drill.exec.record.AbstractSingleRecordBatch.innerNext(AbstractSingleRecordBatch.java:51)
>  ~[drill-java-exec-1.9.0.jar:1.9.0]
> at 
> org.apache.drill.exec.physical.impl.project.ProjectRecordBatch.innerNext(ProjectRecordBatch.java:135)
>  ~[drill-java-exec-1.9.0.jar:1.9.0]
> at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:162)
>  ~[drill-java-exec-1.9.0.jar:1.9.0]
> at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:119)
>  ~[drill-java-exec-1.9.0.jar:1.9.0]
> 

[jira] [Commented] (DRILL-5481) Allow Drill to persist profiles in-memory only with a max capacity

2017-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035806#comment-16035806
 ] 

ASF GitHub Bot commented on DRILL-5481:
---

Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/834


> Allow Drill to persist profiles in-memory only with a max capacity
> --
>
> Key: DRILL-5481
> URL: https://issues.apache.org/jira/browse/DRILL-5481
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.10.0
>Reporter: Kunal Khatua
>Assignee: Kunal Khatua
>  Labels: ready-to-commit
>
> To allow for fast persistence of profiles on a temporary basis (i.e. till the 
> life of the Drillbit), an existing test class 
> {{org.apache.drill.exec.testing.store.NoWriteLocalStore.java}} was refactored 
> to {{org.apache.drill.exec.store.sys.store.EphemeralPersistentStore}} and 
> given the ability to maintain a max capacity.
> This should allow query profiles to be available for as long as the Drillbit 
> process' lifespan.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (DRILL-5485) Remove WebServer dependency on DrillClient

2017-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035810#comment-16035810
 ] 

ASF GitHub Bot commented on DRILL-5485:
---

Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/829


> Remove WebServer dependency on DrillClient
> --
>
> Key: DRILL-5485
> URL: https://issues.apache.org/jira/browse/DRILL-5485
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Web Server
>Reporter: Sorabh Hamirwasia
>  Labels: ready-to-commit
> Fix For: 1.11.0
>
>
> With encryption support using SASL, client's won't be able to authenticate 
> using PLAIN mechanism when encryption is enabled on the cluster. Today 
> WebServer which is embedded inside Drillbit creates a DrillClient instance 
> for each WebClient session. And the WebUser is authenticated as part of 
> authentication between DrillClient instance and Drillbit using PLAIN 
> mechanism. But with encryption enabled this will fail since encryption 
> doesn't support authentication using PLAN mechanism, hence no WebClient can 
> connect to a Drillbit. There are below issues as well with this approach:
> 1) Since DrillClient is used per WebUser session this is expensive as it has 
> heavyweight RPC layer for DrillClient and all it's dependencies. 
> 2) If the Foreman for a WebUser is also selected to be a different node then 
> there will be extra hop of transferring data back to WebClient.
> To resolve all the above issue it would be better to authenticate the WebUser 
> locally using the Drillbit on which WebServer is running without creating 
> DrillClient instance. We can use the local PAMAuthenticator to authenticate 
> the user. After authentication is successful the local Drillbit can also 
> serve as the Foreman for all the queries submitted by WebUser. This can be 
> achieved by submitting the query to the local Drillbit Foreman work queue. 
> This will also remove the requirement to encrypt the channel opened between 
> WebServer (DrillClient) and selected Drillbit since with this approach there 
> won't be any physical channel opened between them.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (DRILL-5379) Set Hdfs Block Size based on Parquet Block Size

2017-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035805#comment-16035805
 ] 

ASF GitHub Bot commented on DRILL-5379:
---

Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/826


> Set Hdfs Block Size based on Parquet Block Size
> ---
>
> Key: DRILL-5379
> URL: https://issues.apache.org/jira/browse/DRILL-5379
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Storage - Parquet
>Affects Versions: 1.9.0
>Reporter: F Méthot
>Assignee: Padma Penumarthy
>  Labels: ready-to-commit
> Fix For: Future
>
>
> It seems there a way to force Drill to store CTAS generated parquet file as a 
> single block when using HDFS. Java HDFS API allows to do that, files could be 
> created with the Parquet block-size set in a session or system config.
> Since it is ideal  to have single parquet file per hdfs block.
> Here is the HDFS API that allow to do that:
> http://archive.cloudera.com/cdh4/cdh/4/hadoop/api/org/apache/hadoop/fs/FileSystem.html#create(org.apache.hadoop.fs.Path,%20boolean,%20int,%20short,%20long)
> http://archive.cloudera.com/cdh4/cdh/4/hadoop/api/org/apache/hadoop/fs/FileSystem.html#create(org.apache.hadoop.fs.Path,%20boolean,%20int,%20short,%20long)
> Drill uses the hadoop ParquetFileWriter 
> (https://github.com/Parquet/parquet-mr/blob/master/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java).
> This is where the file creation occurs so it might be tricky.
> However, ParquetRecordWriter.java 
> (https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java)
>  in Drill creates the ParquetFileWriter with an hadoop configuration object.
> something to explore: Could the block size be set as a property within the 
> Configuration object before passing it to ParquetFileWriter constructor?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (DRILL-5537) Display columns alias for queries with sum() when RDBMS storage plugin is enabled

2017-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035811#comment-16035811
 ] 

ASF GitHub Bot commented on DRILL-5537:
---

Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/845


> Display columns alias for queries with sum() when RDBMS storage plugin is 
> enabled
> -
>
> Key: DRILL-5537
> URL: https://issues.apache.org/jira/browse/DRILL-5537
> Project: Apache Drill
>  Issue Type: Bug
>Affects Versions: 1.10.0
>Reporter: Arina Ielchiieva
>Assignee: Arina Ielchiieva
>  Labels: ready-to-commit
>
> When [RDBMS storage 
> plugin|https://drill.apache.org/docs/rdbms-storage-plugin/]  is enabled, 
> alias is not displayed for column with sum function:
> {noformat}
> 0: jdbc:drill:zk=local> select version, sum(1) as s from sys.version group by 
> version;
> +--+--+
> | version  | $f1  |
> +--+--+
> | 1.11.0-SNAPSHOT  | 1|
> +--+--+
> 1 row selected (0.444 seconds)
> {noformat}
> Other functions like avg, count are not affected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (DRILL-5229) Upgrade kudu client to org.apache.kudu:kudu-client:1.2.0

2017-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035809#comment-16035809
 ] 

ASF GitHub Bot commented on DRILL-5229:
---

Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/828


> Upgrade kudu client to org.apache.kudu:kudu-client:1.2.0 
> -
>
> Key: DRILL-5229
> URL: https://issues.apache.org/jira/browse/DRILL-5229
> Project: Apache Drill
>  Issue Type: Bug
>  Components: Storage - Other
>Affects Versions: 1.8.0
>Reporter: Rahul Raj
>Assignee: Sudheesh Katkam
>  Labels: ready-to-commit
>
> Getting an error -" out-of-order key" for a query select v,count(k) from
> kudu.test group by v where k is the primary key. This happens only when the
> aggregation is done on primary key. Should drill move to the latest kudu
> client to investigate this further?
> Current drill kudu connector uses org.kududb:kudu-client:0.6.0 from
> cloudera repository, where the latest released library
> org.apache.kudu:kudu-client:1.2.0 is hosted on maven central. There are a
> few breaking changes with the new library:
>1. TIMESTAMP renamed to UNIXTIME_MICROS
>2. In KuduRecordReader#setup -
>KuduScannerBuilder#lowerBoundPartitionKeyRaw renamed to lowerBoundRaw
>andKuduScannerBuilder#exclusiveUpperBoundPartitionKeyRaw renamed
>exclusiveUpperBoundRaw. Both methods are deprecated.
>3. In KuduRecordWriterImpl#updateSchema - client.createTable(name,
>kuduSchema) requires CreateTableOperatios as the third argument



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (DRILL-5356) Refactor Parquet Record Reader

2017-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035808#comment-16035808
 ] 

ASF GitHub Bot commented on DRILL-5356:
---

Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/789


> Refactor Parquet Record Reader
> --
>
> Key: DRILL-5356
> URL: https://issues.apache.org/jira/browse/DRILL-5356
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>  Labels: ready-to-commit
> Fix For: 1.11.0
>
>
> The Parquet record reader class is a key part of Drill that has evolved over 
> time to become somewhat hard to follow.
> A number of us are working on Parquet-related tasks and find we have to spend 
> an uncomfortable amount of time trying to understand the code. In particular, 
> this writer needs to figure out how to convince the reader to provide 
> higher-density record batches.
> Rather than continue to decypher the complex code multiple times, this ticket 
> requests to refactor the code to make it functionally identical, but 
> structurally cleaner. The result will be faster time to value when working 
> with this code.
> This is a lower-priority change and will be coordinated with others working 
> on this code base. This ticket is only for the record reader class itself; it 
> does not include the various readers and writers that Parquet uses since 
> another project is actively modifying those classes.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (DRILL-5512) Standardize error handling in ScanBatch

2017-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035803#comment-16035803
 ] 

ASF GitHub Bot commented on DRILL-5512:
---

Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/838


> Standardize error handling in ScanBatch
> ---
>
> Key: DRILL-5512
> URL: https://issues.apache.org/jira/browse/DRILL-5512
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.10.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>  Labels: ready-to-commit
>
> ScanBatch is the Drill operator executor that handles most readers. Like most 
> Drill operators, it uses an ad-hoc set of error detection and reporting 
> methods that evolved over Drill development.
> This ticket asks to standardize on error handling as outlined in DRILL-5083. 
> This basically means reporting all errors as a {{UserException}} rather than 
> using the {{IterOutcome.STOP}} return status or using the 
> {{FragmentContext.fail()}} method.
> This work requires the new error codes introduced in DRILL-5511, and is a 
> step toward making readers aware of vector size limits.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (DRILL-5533) Fix flag assignment in FunctionInitializer.checkInit() method

2017-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035802#comment-16035802
 ] 

ASF GitHub Bot commented on DRILL-5533:
---

Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/843


> Fix flag assignment in FunctionInitializer.checkInit() method
> -
>
> Key: DRILL-5533
> URL: https://issues.apache.org/jira/browse/DRILL-5533
> Project: Apache Drill
>  Issue Type: Bug
>Affects Versions: 1.10.0
>Reporter: Arina Ielchiieva
>Assignee: Arina Ielchiieva
>Priority: Minor
>  Labels: ready-to-commit
>
> FunctionInitializer.checkInit() method uses DCL to ensure that function body 
> is loaded only once. But flag parameter is never updated and all threads are 
> entering synchronized block.
> Also FunctionInitializer.getImports() always returns empty list.
> https://github.com/apache/drill/blob/3e8b01d5b0d3013e3811913f0fd6028b22c1ac3f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionInitializer.java
> Changes:
> 1. Fix DCL in FunctionInitializer.checkInit() method (update flag parameter  
> when function body is loaded).
> 2. Fix ImportGrabber.getImports() method to return list with imports.
> 3. Add unit tests for FunctionInitializer.
> 4. Minor refactoring (rename methods, add javadoc).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (DRILL-5504) Vector validator to diagnose offset vector issues

2017-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035804#comment-16035804
 ] 

ASF GitHub Bot commented on DRILL-5504:
---

Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/832


> Vector validator to diagnose offset vector issues
> -
>
> Key: DRILL-5504
> URL: https://issues.apache.org/jira/browse/DRILL-5504
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.10.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>  Labels: ready-to-commit
> Fix For: 1.11.0
>
>
> DRILL-5470 describes a case in which an offset vector appears to have become 
> corrupted, yielding a bogus field-length value that is orders of magnitude 
> larger than the vector that contains the data.
> Debugging such cases is slow and tedious. To help, we propose to create a 
> "vector validator" that spins through vectors looking for problems.
> Then, to allow the validator to be used in the field, extend the "iterator 
> validator batch iterator" to optionally allow vector validation on each batch.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator

2017-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035771#comment-16035771
 ] 

ASF GitHub Bot commented on DRILL-5457:
---

Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r119977034
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -400,114 +782,411 @@ public IterOutcome getOutcome() {
 
   @Override
   public int getOutputCount() {
-// return outputCount;
 return lastBatchOutputCount;
   }
 
   @Override
   public void cleanup() {
-if (htable != null) {
-  htable.clear();
-  htable = null;
-}
+  if ( schema == null ) { return; } // not set up; nothing to clean
+  for ( int i = 0; i < numPartitions; i++) {
+  if (htables[i] != null) {
+  htables[i].clear();
+  htables[i] = null;
+  }
+  if ( batchHolders[i] != null) {
+  for (BatchHolder bh : batchHolders[i]) {
+bh.clear();
+  }
+  batchHolders[i].clear();
+  batchHolders[i] = null;
+  }
+
+  // delete any (still active) output spill file
+  if ( outputStream[i] != null && spillFiles[i] != null) {
+try {
+  spillSet.delete(spillFiles[i]);
+} catch(IOException e) {
+  logger.warn("Cleanup: Failed to delete spill file 
{}",spillFiles[i]);
+}
+  }
+  }
+  // delete any spill file left in unread spilled partitions
+  while ( ! spilledPartitionsList.isEmpty() ) {
+SpilledPartition sp = spilledPartitionsList.remove(0);
+try {
+  spillSet.delete(sp.spillFile);
+} catch(IOException e) {
+  logger.warn("Cleanup: Failed to delete spill file 
{}",sp.spillFile);
+}
+  }
+  spillSet.close(); // delete the spill directory(ies)
 htIdxHolder = null;
 materializedValueFields = null;
 outStartIdxHolder = null;
 outNumRecordsHolder = null;
+  }
 
-if (batchHolders != null) {
-  for (BatchHolder bh : batchHolders) {
+  // First free the memory used by the given (spilled) partition (i.e., 
hash table plus batches)
+  // then reallocate them in pristine state to allow the partition to 
continue receiving rows
+  private void reinitPartition(int part) throws SchemaChangeException, 
ClassTransformationException, IOException {
+assert htables[part] != null;
+htables[part].reset();
+if ( batchHolders[part] != null) {
+  for (BatchHolder bh : batchHolders[part]) {
 bh.clear();
   }
-  batchHolders.clear();
-  batchHolders = null;
+  batchHolders[part].clear();
 }
+batchHolders[part] = new ArrayList(); // First 
BatchHolder is created when the first put request is received.
   }
 
-//  private final AggOutcome setOkAndReturn() {
-//this.outcome = IterOutcome.OK;
-//for (VectorWrapper v : outgoing) {
-//  v.getValueVector().getMutator().setValueCount(outputCount);
-//}
-//return AggOutcome.RETURN_OUTCOME;
-//  }
 
   private final void incIndex() {
 underlyingIndex++;
 if (underlyingIndex >= incoming.getRecordCount()) {
   currentIndex = Integer.MAX_VALUE;
   return;
 }
-currentIndex = getVectorIndex(underlyingIndex);
+try { currentIndex = getVectorIndex(underlyingIndex); }
+catch (SchemaChangeException sc) { throw new 
DrillRuntimeException(sc);}
   }
 
   private final void resetIndex() {
 underlyingIndex = -1;
 incIndex();
   }
 
-  private void addBatchHolder() {
+  private boolean isSpilled(int part) {
+return outputStream[part] != null;
+  }
+  /**
+   * Which partition to choose for flushing out (i.e. spill or return) ?
+   * - The current partition (to which a new bach holder is added) has a 
priority,
+   *   because its last batch holder is full.
+   * - Also the largest prior spilled partition has some priority, as it 
is already spilled;
+   *   but spilling too few rows (e.g. a single batch) gets us nothing.
+   * - So the largest non-spilled partition has some priority, to get more 
memory freed.
+   * Need to weigh the above three options.
+   *
+   *  @param currPart - The partition that hit the memory limit (gets a 
priority)
+   *  @return The partition (number) chosen to be spilled
+   */
+  private int chooseAPartitionTo

[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator

2017-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035741#comment-16035741
 ] 

ASF GitHub Bot commented on DRILL-5457:
---

Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r119975302
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -400,114 +782,411 @@ public IterOutcome getOutcome() {
 
   @Override
   public int getOutputCount() {
-// return outputCount;
 return lastBatchOutputCount;
   }
 
   @Override
   public void cleanup() {
-if (htable != null) {
-  htable.clear();
-  htable = null;
-}
+  if ( schema == null ) { return; } // not set up; nothing to clean
+  for ( int i = 0; i < numPartitions; i++) {
+  if (htables[i] != null) {
+  htables[i].clear();
+  htables[i] = null;
+  }
+  if ( batchHolders[i] != null) {
+  for (BatchHolder bh : batchHolders[i]) {
+bh.clear();
+  }
+  batchHolders[i].clear();
+  batchHolders[i] = null;
+  }
+
+  // delete any (still active) output spill file
+  if ( outputStream[i] != null && spillFiles[i] != null) {
+try {
+  spillSet.delete(spillFiles[i]);
+} catch(IOException e) {
+  logger.warn("Cleanup: Failed to delete spill file 
{}",spillFiles[i]);
+}
+  }
+  }
+  // delete any spill file left in unread spilled partitions
+  while ( ! spilledPartitionsList.isEmpty() ) {
+SpilledPartition sp = spilledPartitionsList.remove(0);
+try {
+  spillSet.delete(sp.spillFile);
+} catch(IOException e) {
+  logger.warn("Cleanup: Failed to delete spill file 
{}",sp.spillFile);
+}
+  }
+  spillSet.close(); // delete the spill directory(ies)
 htIdxHolder = null;
 materializedValueFields = null;
 outStartIdxHolder = null;
 outNumRecordsHolder = null;
+  }
 
-if (batchHolders != null) {
-  for (BatchHolder bh : batchHolders) {
+  // First free the memory used by the given (spilled) partition (i.e., 
hash table plus batches)
+  // then reallocate them in pristine state to allow the partition to 
continue receiving rows
+  private void reinitPartition(int part) throws SchemaChangeException, 
ClassTransformationException, IOException {
+assert htables[part] != null;
+htables[part].reset();
+if ( batchHolders[part] != null) {
+  for (BatchHolder bh : batchHolders[part]) {
 bh.clear();
   }
-  batchHolders.clear();
-  batchHolders = null;
+  batchHolders[part].clear();
 }
+batchHolders[part] = new ArrayList(); // First 
BatchHolder is created when the first put request is received.
   }
 
-//  private final AggOutcome setOkAndReturn() {
-//this.outcome = IterOutcome.OK;
-//for (VectorWrapper v : outgoing) {
-//  v.getValueVector().getMutator().setValueCount(outputCount);
-//}
-//return AggOutcome.RETURN_OUTCOME;
-//  }
 
   private final void incIndex() {
 underlyingIndex++;
 if (underlyingIndex >= incoming.getRecordCount()) {
   currentIndex = Integer.MAX_VALUE;
   return;
 }
-currentIndex = getVectorIndex(underlyingIndex);
+try { currentIndex = getVectorIndex(underlyingIndex); }
+catch (SchemaChangeException sc) { throw new 
DrillRuntimeException(sc);}
   }
 
   private final void resetIndex() {
 underlyingIndex = -1;
 incIndex();
   }
 
-  private void addBatchHolder() {
+  private boolean isSpilled(int part) {
+return outputStream[part] != null;
+  }
+  /**
+   * Which partition to choose for flushing out (i.e. spill or return) ?
+   * - The current partition (to which a new bach holder is added) has a 
priority,
+   *   because its last batch holder is full.
+   * - Also the largest prior spilled partition has some priority, as it 
is already spilled;
+   *   but spilling too few rows (e.g. a single batch) gets us nothing.
+   * - So the largest non-spilled partition has some priority, to get more 
memory freed.
+   * Need to weigh the above three options.
+   *
+   *  @param currPart - The partition that hit the memory limit (gets a 
priority)
+   *  @return The partition (number) chosen to be spilled
+   */
+  private int chooseAPartitionTo

[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator

2017-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035735#comment-16035735
 ] 

ASF GitHub Bot commented on DRILL-5457:
---

Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r119975022
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -400,114 +782,411 @@ public IterOutcome getOutcome() {
 
   @Override
   public int getOutputCount() {
-// return outputCount;
 return lastBatchOutputCount;
   }
 
   @Override
   public void cleanup() {
-if (htable != null) {
-  htable.clear();
-  htable = null;
-}
+  if ( schema == null ) { return; } // not set up; nothing to clean
+  for ( int i = 0; i < numPartitions; i++) {
+  if (htables[i] != null) {
+  htables[i].clear();
+  htables[i] = null;
+  }
+  if ( batchHolders[i] != null) {
+  for (BatchHolder bh : batchHolders[i]) {
+bh.clear();
+  }
+  batchHolders[i].clear();
+  batchHolders[i] = null;
+  }
+
+  // delete any (still active) output spill file
+  if ( outputStream[i] != null && spillFiles[i] != null) {
+try {
+  spillSet.delete(spillFiles[i]);
+} catch(IOException e) {
+  logger.warn("Cleanup: Failed to delete spill file 
{}",spillFiles[i]);
+}
+  }
+  }
+  // delete any spill file left in unread spilled partitions
+  while ( ! spilledPartitionsList.isEmpty() ) {
+SpilledPartition sp = spilledPartitionsList.remove(0);
+try {
+  spillSet.delete(sp.spillFile);
+} catch(IOException e) {
+  logger.warn("Cleanup: Failed to delete spill file 
{}",sp.spillFile);
+}
+  }
+  spillSet.close(); // delete the spill directory(ies)
 htIdxHolder = null;
 materializedValueFields = null;
 outStartIdxHolder = null;
 outNumRecordsHolder = null;
+  }
 
-if (batchHolders != null) {
-  for (BatchHolder bh : batchHolders) {
+  // First free the memory used by the given (spilled) partition (i.e., 
hash table plus batches)
+  // then reallocate them in pristine state to allow the partition to 
continue receiving rows
+  private void reinitPartition(int part) throws SchemaChangeException, 
ClassTransformationException, IOException {
+assert htables[part] != null;
+htables[part].reset();
+if ( batchHolders[part] != null) {
+  for (BatchHolder bh : batchHolders[part]) {
 bh.clear();
   }
-  batchHolders.clear();
-  batchHolders = null;
+  batchHolders[part].clear();
 }
+batchHolders[part] = new ArrayList(); // First 
BatchHolder is created when the first put request is received.
   }
 
-//  private final AggOutcome setOkAndReturn() {
-//this.outcome = IterOutcome.OK;
-//for (VectorWrapper v : outgoing) {
-//  v.getValueVector().getMutator().setValueCount(outputCount);
-//}
-//return AggOutcome.RETURN_OUTCOME;
-//  }
 
   private final void incIndex() {
 underlyingIndex++;
 if (underlyingIndex >= incoming.getRecordCount()) {
   currentIndex = Integer.MAX_VALUE;
   return;
 }
-currentIndex = getVectorIndex(underlyingIndex);
+try { currentIndex = getVectorIndex(underlyingIndex); }
+catch (SchemaChangeException sc) { throw new 
DrillRuntimeException(sc);}
   }
 
   private final void resetIndex() {
 underlyingIndex = -1;
 incIndex();
   }
 
-  private void addBatchHolder() {
+  private boolean isSpilled(int part) {
+return outputStream[part] != null;
+  }
+  /**
+   * Which partition to choose for flushing out (i.e. spill or return) ?
+   * - The current partition (to which a new bach holder is added) has a 
priority,
+   *   because its last batch holder is full.
+   * - Also the largest prior spilled partition has some priority, as it 
is already spilled;
+   *   but spilling too few rows (e.g. a single batch) gets us nothing.
+   * - So the largest non-spilled partition has some priority, to get more 
memory freed.
+   * Need to weigh the above three options.
+   *
+   *  @param currPart - The partition that hit the memory limit (gets a 
priority)
+   *  @return The partition (number) chosen to be spilled
+   */
+  private int chooseAPartitionTo

[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator

2017-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035725#comment-16035725
 ] 

ASF GitHub Bot commented on DRILL-5457:
---

Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r119974724
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -400,114 +782,411 @@ public IterOutcome getOutcome() {
 
   @Override
   public int getOutputCount() {
-// return outputCount;
 return lastBatchOutputCount;
   }
 
   @Override
   public void cleanup() {
-if (htable != null) {
-  htable.clear();
-  htable = null;
-}
+  if ( schema == null ) { return; } // not set up; nothing to clean
+  for ( int i = 0; i < numPartitions; i++) {
+  if (htables[i] != null) {
+  htables[i].clear();
+  htables[i] = null;
+  }
+  if ( batchHolders[i] != null) {
+  for (BatchHolder bh : batchHolders[i]) {
+bh.clear();
+  }
+  batchHolders[i].clear();
+  batchHolders[i] = null;
+  }
+
+  // delete any (still active) output spill file
+  if ( outputStream[i] != null && spillFiles[i] != null) {
+try {
+  spillSet.delete(spillFiles[i]);
+} catch(IOException e) {
+  logger.warn("Cleanup: Failed to delete spill file 
{}",spillFiles[i]);
+}
+  }
+  }
+  // delete any spill file left in unread spilled partitions
+  while ( ! spilledPartitionsList.isEmpty() ) {
+SpilledPartition sp = spilledPartitionsList.remove(0);
+try {
+  spillSet.delete(sp.spillFile);
+} catch(IOException e) {
+  logger.warn("Cleanup: Failed to delete spill file 
{}",sp.spillFile);
+}
+  }
+  spillSet.close(); // delete the spill directory(ies)
 htIdxHolder = null;
 materializedValueFields = null;
 outStartIdxHolder = null;
 outNumRecordsHolder = null;
+  }
 
-if (batchHolders != null) {
-  for (BatchHolder bh : batchHolders) {
+  // First free the memory used by the given (spilled) partition (i.e., 
hash table plus batches)
+  // then reallocate them in pristine state to allow the partition to 
continue receiving rows
+  private void reinitPartition(int part) throws SchemaChangeException, 
ClassTransformationException, IOException {
+assert htables[part] != null;
+htables[part].reset();
+if ( batchHolders[part] != null) {
+  for (BatchHolder bh : batchHolders[part]) {
 bh.clear();
   }
-  batchHolders.clear();
-  batchHolders = null;
+  batchHolders[part].clear();
 }
+batchHolders[part] = new ArrayList(); // First 
BatchHolder is created when the first put request is received.
   }
 
-//  private final AggOutcome setOkAndReturn() {
-//this.outcome = IterOutcome.OK;
-//for (VectorWrapper v : outgoing) {
-//  v.getValueVector().getMutator().setValueCount(outputCount);
-//}
-//return AggOutcome.RETURN_OUTCOME;
-//  }
 
   private final void incIndex() {
 underlyingIndex++;
 if (underlyingIndex >= incoming.getRecordCount()) {
   currentIndex = Integer.MAX_VALUE;
   return;
 }
-currentIndex = getVectorIndex(underlyingIndex);
+try { currentIndex = getVectorIndex(underlyingIndex); }
+catch (SchemaChangeException sc) { throw new 
DrillRuntimeException(sc);}
   }
 
   private final void resetIndex() {
 underlyingIndex = -1;
 incIndex();
   }
 
-  private void addBatchHolder() {
+  private boolean isSpilled(int part) {
+return outputStream[part] != null;
+  }
+  /**
+   * Which partition to choose for flushing out (i.e. spill or return) ?
+   * - The current partition (to which a new bach holder is added) has a 
priority,
+   *   because its last batch holder is full.
+   * - Also the largest prior spilled partition has some priority, as it 
is already spilled;
+   *   but spilling too few rows (e.g. a single batch) gets us nothing.
+   * - So the largest non-spilled partition has some priority, to get more 
memory freed.
+   * Need to weigh the above three options.
+   *
+   *  @param currPart - The partition that hit the memory limit (gets a 
priority)
+   *  @return The partition (number) chosen to be spilled
+   */
+  private int chooseAPartitionTo

[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator

2017-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035697#comment-16035697
 ] 

ASF GitHub Bot commented on DRILL-5457:
---

Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r119973491
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -400,114 +782,411 @@ public IterOutcome getOutcome() {
 
   @Override
   public int getOutputCount() {
-// return outputCount;
 return lastBatchOutputCount;
   }
 
   @Override
   public void cleanup() {
-if (htable != null) {
-  htable.clear();
-  htable = null;
-}
+  if ( schema == null ) { return; } // not set up; nothing to clean
+  for ( int i = 0; i < numPartitions; i++) {
+  if (htables[i] != null) {
+  htables[i].clear();
+  htables[i] = null;
+  }
+  if ( batchHolders[i] != null) {
+  for (BatchHolder bh : batchHolders[i]) {
+bh.clear();
+  }
+  batchHolders[i].clear();
+  batchHolders[i] = null;
+  }
+
+  // delete any (still active) output spill file
+  if ( outputStream[i] != null && spillFiles[i] != null) {
+try {
+  spillSet.delete(spillFiles[i]);
+} catch(IOException e) {
+  logger.warn("Cleanup: Failed to delete spill file 
{}",spillFiles[i]);
+}
+  }
+  }
+  // delete any spill file left in unread spilled partitions
+  while ( ! spilledPartitionsList.isEmpty() ) {
+SpilledPartition sp = spilledPartitionsList.remove(0);
+try {
+  spillSet.delete(sp.spillFile);
+} catch(IOException e) {
+  logger.warn("Cleanup: Failed to delete spill file 
{}",sp.spillFile);
+}
+  }
+  spillSet.close(); // delete the spill directory(ies)
 htIdxHolder = null;
 materializedValueFields = null;
 outStartIdxHolder = null;
 outNumRecordsHolder = null;
+  }
 
-if (batchHolders != null) {
-  for (BatchHolder bh : batchHolders) {
+  // First free the memory used by the given (spilled) partition (i.e., 
hash table plus batches)
+  // then reallocate them in pristine state to allow the partition to 
continue receiving rows
+  private void reinitPartition(int part) throws SchemaChangeException, 
ClassTransformationException, IOException {
+assert htables[part] != null;
+htables[part].reset();
+if ( batchHolders[part] != null) {
+  for (BatchHolder bh : batchHolders[part]) {
 bh.clear();
   }
-  batchHolders.clear();
-  batchHolders = null;
+  batchHolders[part].clear();
 }
+batchHolders[part] = new ArrayList(); // First 
BatchHolder is created when the first put request is received.
   }
 
-//  private final AggOutcome setOkAndReturn() {
-//this.outcome = IterOutcome.OK;
-//for (VectorWrapper v : outgoing) {
-//  v.getValueVector().getMutator().setValueCount(outputCount);
-//}
-//return AggOutcome.RETURN_OUTCOME;
-//  }
 
   private final void incIndex() {
 underlyingIndex++;
 if (underlyingIndex >= incoming.getRecordCount()) {
   currentIndex = Integer.MAX_VALUE;
   return;
 }
-currentIndex = getVectorIndex(underlyingIndex);
+try { currentIndex = getVectorIndex(underlyingIndex); }
+catch (SchemaChangeException sc) { throw new 
DrillRuntimeException(sc);}
   }
 
   private final void resetIndex() {
 underlyingIndex = -1;
 incIndex();
   }
 
-  private void addBatchHolder() {
+  private boolean isSpilled(int part) {
+return outputStream[part] != null;
+  }
+  /**
+   * Which partition to choose for flushing out (i.e. spill or return) ?
+   * - The current partition (to which a new bach holder is added) has a 
priority,
+   *   because its last batch holder is full.
+   * - Also the largest prior spilled partition has some priority, as it 
is already spilled;
+   *   but spilling too few rows (e.g. a single batch) gets us nothing.
+   * - So the largest non-spilled partition has some priority, to get more 
memory freed.
+   * Need to weigh the above three options.
+   *
+   *  @param currPart - The partition that hit the memory limit (gets a 
priority)
+   *  @return The partition (number) chosen to be spilled
+   */
+  private int chooseAPartitionTo

[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator

2017-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035440#comment-16035440
 ] 

ASF GitHub Bot commented on DRILL-5457:
---

Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r119954158
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -400,114 +782,411 @@ public IterOutcome getOutcome() {
 
   @Override
   public int getOutputCount() {
-// return outputCount;
 return lastBatchOutputCount;
   }
 
   @Override
   public void cleanup() {
-if (htable != null) {
-  htable.clear();
-  htable = null;
-}
+  if ( schema == null ) { return; } // not set up; nothing to clean
+  for ( int i = 0; i < numPartitions; i++) {
+  if (htables[i] != null) {
+  htables[i].clear();
+  htables[i] = null;
+  }
+  if ( batchHolders[i] != null) {
+  for (BatchHolder bh : batchHolders[i]) {
+bh.clear();
+  }
+  batchHolders[i].clear();
+  batchHolders[i] = null;
+  }
+
+  // delete any (still active) output spill file
+  if ( outputStream[i] != null && spillFiles[i] != null) {
+try {
+  spillSet.delete(spillFiles[i]);
--- End diff --

Concurrent open files: While spilling, there is one per each (non-pristine) 
spilling partition (yes, can be as high as 16, or even 32). Afterwards, they 
are all closed; then for reading, each one gets opened; and though we process 
one partition at a time, closing of all is postponed to the end, as the 
processing code is unaware that the "incoming" actually comes from a spill 
file. 
About the limits: Seems that current defaults (e.g. 64K open files per 
process) can serve us well for the foreseeable future. Intel just announced the 
i9, where the top of the line CPU has 18 cores. Hence 1000s of concurrent 
active same-process threads are not feasible anytime soon (think about context 
switching). 


> Support Spill to Disk for the Hash Aggregate Operator
> -
>
> Key: DRILL-5457
> URL: https://issues.apache.org/jira/browse/DRILL-5457
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.10.0
>Reporter: Boaz Ben-Zvi
>Assignee: Boaz Ben-Zvi
> Fix For: 1.11.0
>
>
> Support gradual spilling memory to disk as the available memory gets too 
> small to allow in memory work for the Hash Aggregate Operator.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator

2017-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035385#comment-16035385
 ] 

ASF GitHub Bot commented on DRILL-5457:
---

Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/822#discussion_r119947814
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -400,114 +782,411 @@ public IterOutcome getOutcome() {
 
   @Override
   public int getOutputCount() {
-// return outputCount;
 return lastBatchOutputCount;
   }
 
   @Override
   public void cleanup() {
-if (htable != null) {
-  htable.clear();
-  htable = null;
-}
+  if ( schema == null ) { return; } // not set up; nothing to clean
+  for ( int i = 0; i < numPartitions; i++) {
+  if (htables[i] != null) {
+  htables[i].clear();
+  htables[i] = null;
+  }
+  if ( batchHolders[i] != null) {
+  for (BatchHolder bh : batchHolders[i]) {
+bh.clear();
+  }
+  batchHolders[i].clear();
+  batchHolders[i] = null;
+  }
+
+  // delete any (still active) output spill file
+  if ( outputStream[i] != null && spillFiles[i] != null) {
+try {
+  spillSet.delete(spillFiles[i]);
--- End diff --

This is HashAgg closing time, so GC probably does it; anyway won't hurt -- 
added a close call. 


> Support Spill to Disk for the Hash Aggregate Operator
> -
>
> Key: DRILL-5457
> URL: https://issues.apache.org/jira/browse/DRILL-5457
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.10.0
>Reporter: Boaz Ben-Zvi
>Assignee: Boaz Ben-Zvi
> Fix For: 1.11.0
>
>
> Support gradual spilling memory to disk as the available memory gets too 
> small to allow in memory work for the Hash Aggregate Operator.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (DRILL-5546) Schema change problems caused by empty batch

2017-06-02 Thread Jinfeng Ni (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035221#comment-16035221
 ] 

Jinfeng Ni commented on DRILL-5546:
---

Thanks for reviewing the proposal. 

Some clarifications.
1.  The concept of *Empty Batch* (EB) defined in the proposal is exactly same 
as what you described as *Null result set*. The batch does not contain data nor 
schema. 
2. In general, Drill operators are expected to handle *Empty result set*. As 
you described, *Empty result set* could be produced by a Filter operator, or 
any operator which does pruning, such as Join, Limit. I do not think there is a 
fundamental problem for Drill operators to handle Empty result set.
3. AFAIK, some operator (UnionAll) may have bug to handle *Empty result set*. 
The bug was introduced when trying to fix issues related to *Null result set*. 
See the proposal for the related JIRA issues.  The thought is with this 
proposal on handling *Null result set*, we will be able to correct the bug in 
operators such as UnionAll.
4. The semantics for different operators (Scan/Join/Union/Filter) handling 
*Fast NONE* in the proposal seems to be same as what you describe in the 
comment. 
5. "So, the scanner should return an empty batch (with schema) if a reader 
produces one (that is, skip null batches, return an empty batch.)"  If *no* 
reader produces one, the proposal is to return a *Fast NONE*, in stead of a 
batch with injected nullable-int column. 
6. Under *fast schema*, the first batch is same as a regular batch, if the 
query only contains non-blocking operators. Only when query has blocking 
operators such as HashJoin/HashAgg, the first batch going through after the 
blocking operator contains schema only.
7. With the concept of *Fast NONE*,  operator may not return any batch. If any 
execution branch (minor fragment) returns a batch with schema, such batch will 
still pop up along the execution pipeline, and there is no difference from the 
perspective of *fast schema*.
8. The proposal discusses the special cases where all the minor fragments 
return *fast NONE* (essentially entire dataset is empty). 


> Schema change problems caused by empty batch
> 
>
> Key: DRILL-5546
> URL: https://issues.apache.org/jira/browse/DRILL-5546
> Project: Apache Drill
>  Issue Type: Bug
>Reporter: Jinfeng Ni
>Assignee: Jinfeng Ni
>
> There have been a few JIRAs opened related to schema change failure caused by 
> empty batch. This JIRA is opened as an umbrella for all those related JIRAS ( 
> such as DRILL-4686, DRILL-4734, DRILL4476, DRILL-4255, etc).
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (DRILL-5546) Schema change problems caused by empty batch

2017-06-02 Thread Paul Rogers (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16034946#comment-16034946
 ] 

Paul Rogers commented on DRILL-5546:


In general, I agree with the proposal. The only suggestion might be to change 
the emphasis.

In looking carefully at the readers, we see that an empty result set (empty 
batch) is a natural outcome of reading. Some files just happen to be empty. If 
filters are pushed down, then some files just happen to have no matching rows.

Readers produce two distinct kinds of empty result sets:

* *Empty result set*: The reader found no data, but was able to find a schema. 
(Example: Parquet with a filter push-down or a JDBC query that returns no 
results.)
* *Null result set*: The reader found no data *and* no schema. (Example: empty 
CSV or JSON file.)


Note that filters also can produce an empty result set (if no rows match).

The Drill iterator protocol should be able to handle both kinds. It is perhaps 
a bit naive to expect that every operator has both a schema and a data set.

All operators should be able to identify, and handle, both null and empty 
result sets.

For the scanner, if one reader returns a null result set, just skip it and move 
to the next reader until a schema is found. If no reader has a non-null result 
set, then that branch of the query has no data (and no schema). That result 
should bubble up, with each operator handling the case depending on semantics. 
For example, a filter ignores the null result set. A UNION ALL skips that 
result set when assembling the result. A join handles the case depending on the 
side of the join and INNER/OUTER semantics, and so on.

To support the schema "fast track", operators should return an empty batch, 
with just schema, on the first call to {{next()}}. So, the scanner should 
return an empty batch (with schema) if a reader produces one (that is, skip 
null batches, return an empty batch.)

Again, each operator should, on the first (preferably empty) batch, assemble 
output schema according to the rules for that operator.

Do we have a spec and/or JIRA that describes the design behind the "fast 
schema" feature added shortly after 1.0? We should consult that to ensure the 
empty batch handling here is consistent with that design.

> Schema change problems caused by empty batch
> 
>
> Key: DRILL-5546
> URL: https://issues.apache.org/jira/browse/DRILL-5546
> Project: Apache Drill
>  Issue Type: Bug
>Reporter: Jinfeng Ni
>Assignee: Jinfeng Ni
>
> There have been a few JIRAs opened related to schema change failure caused by 
> empty batch. This JIRA is opened as an umbrella for all those related JIRAS ( 
> such as DRILL-4686, DRILL-4734, DRILL4476, DRILL-4255, etc).
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)