[ 
https://issues.apache.org/jira/browse/HIVE-22769?focusedWorklogId=428552&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-428552
 ]

ASF GitHub Bot logged work on HIVE-22769:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Apr/20 13:07
            Start Date: 29/Apr/20 13:07
    Worklog Time Spent: 10m 
      Work Description: pgaref commented on a change in pull request #998:
URL: https://github.com/apache/hive/pull/998#discussion_r417299400



##########
File path: 
llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/PassThruOffsetReader.java
##########
@@ -20,23 +20,81 @@
 import java.io.IOException;
 
 import 
org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.ReaderWithOffsets;
+import org.apache.hadoop.hive.ql.exec.FooterBuffer;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 
 @SuppressWarnings("rawtypes") class PassThruOffsetReader implements 
ReaderWithOffsets {
   protected final RecordReader sourceReader;
   protected final Object key;
   protected final Writable value;
+  protected final JobConf jobConf;
+  protected final int skipHeaderCnt;
+  protected final int skipFooterCnt;
+  private transient FooterBuffer footerBuffer;
+  private transient boolean initialized = false;
 
-  PassThruOffsetReader(RecordReader sourceReader) {
+  PassThruOffsetReader(RecordReader sourceReader, JobConf jobConf, int 
headerCnt, int footerCnt) {
     this.sourceReader = sourceReader;
-    key = sourceReader.createKey();
-    value = (Writable)sourceReader.createValue();
+    this.key = sourceReader.createKey();
+    this.value = (Writable)sourceReader.createValue();
+    this.jobConf = jobConf;
+    this.skipHeaderCnt = headerCnt;
+    this.skipFooterCnt = footerCnt;
   }
 
   @Override
   public boolean next() throws IOException {
-    return sourceReader.next(key, value);
+    try {
+      boolean opNotEOF = true;
+      /**
+       * Start reading a new file.
+       * If file contains header, skip header lines before reading the records.
+       * If file contains footer, used FooterBuffer to cache and remove footer
+       * records at the end of the file.
+       */
+      if (!initialized) {
+        // Skip header lines.
+        opNotEOF = Utilities.skipHeader(sourceReader, skipFooterCnt, key, 
value);
+
+        // Initialize footer buffer.
+        if (opNotEOF && skipFooterCnt > 0) {
+          footerBuffer = new FooterBuffer();
+          opNotEOF = footerBuffer.initializeBuffer(jobConf, sourceReader, 
skipFooterCnt, (WritableComparable) key, value);
+        }
+        this.initialized = true;
+      }
+
+      if (opNotEOF && footerBuffer == null) {
+        /**
+         * When file doesn't end after skipping header line
+         * and there is NO footer lines, read normally.
+         */
+        opNotEOF = sourceReader.next(key, value);
+      }
+
+      if (opNotEOF && footerBuffer != null) {
+        /**
+         * When file doesn't end after skipping header line
+         * and there IS footer lines, update footerBuffer
+         */
+        opNotEOF = footerBuffer.updateBuffer(jobConf, sourceReader, 
(WritableComparable) key, value);
+      }
+
+      if (opNotEOF) {
+        // File reached the end
+        return true;
+      } else {
+        // Done reading
+        close();

Review comment:
       You are right, EncodedDataReader does take care of closing the reader -- 
so removing it from here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 428552)
    Time Spent: 40m  (was: 0.5h)

> Incorrect query results and query failure during split generation for 
> compressed text files
> -------------------------------------------------------------------------------------------
>
>                 Key: HIVE-22769
>                 URL: https://issues.apache.org/jira/browse/HIVE-22769
>             Project: Hive
>          Issue Type: Bug
>          Components: File Formats
>    Affects Versions: 3.0.0, 3.1.0
>            Reporter: Chiran Ravani
>            Assignee: Panagiotis Garefalakis
>            Priority: Critical
>         Attachments: HIVE-22769.01.patch, testcase1.csv.bz2, testcase2.csv.bz2
>
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> Hive Query produces incorrect results when data is in text format and 
> compressed and for certain data the query fails during split generation.
> This behavior is seen when skip.header.line.count and skip.footer.line.count 
> are set for table.
> Case 1: Select count/aggregate query produces Incorrect row counts/displays 
> all rows (when hive.fetch.task.conversion=none)
> Steps to reproduce:
> 1. Create table as below
> {code}
> CREATE EXTERNAL TABLE `testcase1`(id int, name string) ROW FORMAT SERDE 
> 'org.apache.hadoop.hive.serde2.OpenCSVSerde' LOCATION '/user/hive/testcase1' 
> TBLPROPERTIES ("skip.header.line.count"="1", "skip.footer.line.count"="1");
> {code}
> 2. Upload attached testcase1.csv.bz2 file to /user/hive/testcase1
> 3. Run count(*) on table.
> {code}
> > select * from testcase1;
> INFO  : Compiling 
> command(queryId=hive_20200124053854_454b03c1-d4c5-4dba-a2c2-91c09f4b670f): 
> select * from testcase1
> INFO  : Semantic Analysis Completed (retrial = false)
> INFO  : Returning Hive schema: 
> Schema(fieldSchemas:[FieldSchema(name:testcase1.id, type:string, 
> comment:null), FieldSchema(name:testcase1.name, type:string, comment:null)], 
> properties:null)
> INFO  : Completed compiling 
> command(queryId=hive_20200124053854_454b03c1-d4c5-4dba-a2c2-91c09f4b670f); 
> Time taken: 0.07 seconds
> INFO  : Executing 
> command(queryId=hive_20200124053854_454b03c1-d4c5-4dba-a2c2-91c09f4b670f): 
> select * from testcase1
> INFO  : Completed executing 
> command(queryId=hive_20200124053854_454b03c1-d4c5-4dba-a2c2-91c09f4b670f); 
> Time taken: 0.007 seconds
> INFO  : OK
> +---------------+-----------------+
> | testcase1.id  | testcase1.name  |
> +---------------+-----------------+
> | 2             | 2019-12-31      |
> +---------------+-----------------+
> 1 row selected (0.111 seconds)
> > select count(*) from testcase1
> INFO  : Compiling 
> command(queryId=hive_20200124053645_a7d699b7-c7e1-4d92-8d99-666b0a010ba7): 
> select count(*) from testcase1
> INFO  : Semantic Analysis Completed (retrial = false)
> INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:_c0, 
> type:bigint, comment:null)], properties:null)
> INFO  : Completed compiling 
> command(queryId=hive_20200124053645_a7d699b7-c7e1-4d92-8d99-666b0a010ba7); 
> Time taken: 0.073 seconds
> INFO  : Executing 
> command(queryId=hive_20200124053645_a7d699b7-c7e1-4d92-8d99-666b0a010ba7): 
> select count(*) from testcase1
> INFO  : Query ID = hive_20200124053645_a7d699b7-c7e1-4d92-8d99-666b0a010ba7
> INFO  : Total jobs = 1
> INFO  : Launching Job 1 out of 1
> INFO  : Starting task [Stage-1:MAPRED] in serial mode
> INFO  : Subscribed to counters: [] for queryId: 
> hive_20200124053645_a7d699b7-c7e1-4d92-8d99-666b0a010ba7
> INFO  : Session is already open
> INFO  : Dag name: select count(*) from testcase1 (Stage-1)
> INFO  : Status: Running (Executing on YARN cluster with App id 
> application_1579811438512_0046)
> .
> .
> .
> INFO  : Completed executing 
> command(queryId=hive_20200124053645_a7d699b7-c7e1-4d92-8d99-666b0a010ba7); 
> Time taken: 4.228 seconds
> INFO  : OK
> +------+
> | _c0  |
> +------+
> | 3    |
> +------+
> 1 row selected (4.335 seconds)
> {code}
> Case 2: Select count/aggregate query fails with java.lang.ClassCastException: 
> java.io.PushbackInputStream cannot be cast to org.apache.hadoop.fs.Seekable
> The issue is only seen when there is a space in a field (eg:- "3,2019-12-31 
> 01" second column has a space)
> Steps to reproduce:
> 1. Create table as below
> {code}
> CREATE EXTERNAL TABLE `testcase2`(id int, name string) ROW FORMAT SERDE 
> 'org.apache.hadoop.hive.serde2.OpenCSVSerde' LOCATION '/user/hive/testcase2' 
> TBLPROPERTIES ("skip.header.line.count"="1", "skip.footer.line.count"="1");
> {code}
> 2. Upload attached testcase2.csv.bz2 file to /user/hive/testcase2
> 3. Run count(*) on table.
> {code}
> 0: > select * from testcase2;
> INFO  : Compiling 
> command(queryId=hive_20200124053159_5d8ce56a-183d-4359-a147-bd470d82e134): 
> select * from testcase2
> INFO  : Semantic Analysis Completed (retrial = false)
> INFO  : Returning Hive schema: 
> Schema(fieldSchemas:[FieldSchema(name:testcase2.id, type:string, 
> comment:null), FieldSchema(name:testcase2.name, type:string, comment:null)], 
> properties:null)
> INFO  : Completed compiling 
> command(queryId=hive_20200124053159_5d8ce56a-183d-4359-a147-bd470d82e134); 
> Time taken: 0.075 seconds
> INFO  : Executing 
> command(queryId=hive_20200124053159_5d8ce56a-183d-4359-a147-bd470d82e134): 
> select * from testcase2
> INFO  : Completed executing 
> command(queryId=hive_20200124053159_5d8ce56a-183d-4359-a147-bd470d82e134); 
> Time taken: 0.01 seconds
> INFO  : OK
> +---------------+-----------------+
> | testcase2.id  | testcase2.name  |
> +---------------+-----------------+
> | 2             | 2019-12-31 01   |
> +---------------+-----------------+
> 1 row selected (0.119 seconds)
> > select count(*) from testcase2;
> INFO  : Compiling 
> command(queryId=hive_20200124053542_a7d6820e-c3df-4d70-bc00-f3916441da88): 
> select count(*) from testcase2
> INFO  : Semantic Analysis Completed (retrial = false)
> INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:_c0, 
> type:bigint, comment:null)], properties:null)
> INFO  : Completed compiling 
> command(queryId=hive_20200124053542_a7d6820e-c3df-4d70-bc00-f3916441da88); 
> Time taken: 0.079 seconds
> INFO  : Executing 
> command(queryId=hive_20200124053542_a7d6820e-c3df-4d70-bc00-f3916441da88): 
> select count(*) from testcase2
> INFO  : Query ID = hive_20200124053542_a7d6820e-c3df-4d70-bc00-f3916441da88
> INFO  : Total jobs = 1
> INFO  : Launching Job 1 out of 1
> INFO  : Starting task [Stage-1:MAPRED] in serial mode
> INFO  : Subscribed to counters: [] for queryId: 
> hive_20200124053542_a7d6820e-c3df-4d70-bc00-f3916441da88
> INFO  : Session is already open
> INFO  : Dag name: select count(*) from testcase2 (Stage-1)
> ERROR : Status: Failed
> ERROR : Vertex failed, vertexName=Map 1, 
> vertexId=vertex_1579811438512_0046_2_00, diagnostics=[Vertex 
> vertex_1579811438512_0046_2_00 [Map 1] killed/failed due 
> to:ROOT_INPUT_INIT_FAILURE, Vertex Input: testcase2 initializer failed, 
> vertex=vertex_1579811438512_0046_2_00 [Map 1], java.lang.ClassCastException: 
> java.io.PushbackInputStream cannot be cast to org.apache.hadoop.fs.Seekable
>       at 
> org.apache.hadoop.fs.FSDataInputStream.getPos(FSDataInputStream.java:75)
>       at 
> org.apache.hadoop.hive.ql.io.SkippingTextInputFormat.getCachedStartIndex(SkippingTextInputFormat.java:128)
>       at 
> org.apache.hadoop.hive.ql.io.SkippingTextInputFormat.makeSplitInternal(SkippingTextInputFormat.java:74)
>       at 
> org.apache.hadoop.hive.ql.io.SkippingTextInputFormat.makeSplit(SkippingTextInputFormat.java:66)
>       at 
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:379)
>       at 
> org.apache.hadoop.hive.ql.io.HiveInputFormat.addSplitsForGroup(HiveInputFormat.java:532)
>       at 
> org.apache.hadoop.hive.ql.io.HiveInputFormat.getSplits(HiveInputFormat.java:789)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator.initialize(HiveSplitGenerator.java:243)
>       at 
> org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:278)
>       at 
> org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:269)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
>       at 
> org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:269)
>       at 
> org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:253)
>       at 
> com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>       at 
> com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
>       at 
> com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> ]
> {code}
> The above behavior appears after applying HIVE-21924



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to