[ 
https://issues.apache.org/jira/browse/HIVE-28790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marta Kuczora updated HIVE-28790:
---------------------------------
    Description: 
*Steps to reproduce:
*
{code:java}
    set mapreduce.job.reduces=7;
    create external table ext(a int) stored as textfile;
    insert into table ext values(1),(2),(3),(4),(5),(6),(7), (8), (9), (12);
    create table full_acid(a int) stored as orc 
tblproperties("transactional"="true");

    insert into table full_acid select * from ext where a != 3 and a <=7 group 
by a;
    insert into table full_acid select * from ext where a>7 group by a;

    set mapreduce.job.reduces=1;
    delete from full_acid where a in (2, 12);
{code}

The delete will fail with the following exception:
{code}
Caused by: java.lang.ArrayIndexOutOfBoundsException: 6
        at 
org.apache.hadoop.hive.ql.exec.FileSinkOperator$FSPaths.closeWriters(FileSinkOperator.java:258)
{code}

The problem is in the FileSinkOperator.createDynamicBucket method:
{code}
   public int createDynamicBucket(int bucketNum) {
      // this assumes all paths are bucket names (which means no lookup is 
needed)
      int writerOffset = bucketNum;
      if (updaters.length <= writerOffset) {
        this.updaters = Arrays.copyOf(updaters, writerOffset + 1);
        this.outPaths = Arrays.copyOf(outPaths, writerOffset + 1);
        this.finalPaths = Arrays.copyOf(finalPaths, writerOffset + 1);
      }

      if (this.finalPaths[writerOffset] == null) {
        if (conf.isDirectInsert()) {
          this.outPathsCommitted = Arrays.copyOf(outPathsCommitted, 
writerOffset + 1);
          this.finalPaths[writerOffset] = buildTmpPath();
          this.outPaths[writerOffset] = buildTmpPath();
        } else {
          // uninitialized bucket
          String bucketName =
              Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), 
bucketNum);
          this.finalPaths[writerOffset] = new Path(bDynParts ? buildTmpPath() : 
parent, bucketName);
          this.outPaths[writerOffset] = new Path(buildTaskOutputTempPath(), 
bucketName);
        }
      }
      return writerOffset;
    }
  } // class FSPaths
{code}

In the first part the updaters, outPaths and finalPaths arrays are copied if 
the writerOffset is not smaller than their length. So these array are extended. 
But in the second part when the outPathsCommitted array is copied, the size of 
the array is not compared with the writerOffset. So it can happen that the 
outPathsCommitted array is reduced. If this situation happens it leads to the 
ArrayIndexOutOfBoundsException when closing the writes, because the 
outPathsCommitted array is shorter than the updaters array.

*About the reproduction:
*
The first insert into the full_acid table creates files with buckets 1, 2, 3, 
5, 6
The second insert creates the files with buckets 1, 4, 6

The bucket number for 6 is 537264128, for 4 is 537133056, so for 4 it is 
smaller than for 6.
To reproduce the issue, we need to delete a row from bucket 6 and bucket 4 
together and make it so, that both rows are processed by the same 
FileSinkOperator. It will process the row from bucket 6 first, so makes the 
arraycopy in dynamicBucketing with the writerOffset 6. Then comes the row for 4 
and in the dynamicBucketing it will do the second arrayCopy wrongly. So the 
finalPath array will be size 7, but the outPathsCommitted will be arrayCopied 
to size 4+1. This will cause the exception when closing the writers.
By setting the reducer number to 1 before the delete, both rows are processed 
by the same FileSinkOperator.

  was:
*Steps to reproduce:

{code:java}
    set mapreduce.job.reduces=7;
    create external table ext(a int) stored as textfile;
    insert into table ext values(1),(2),(3),(4),(5),(6),(7), (8), (9), (12);
    create table full_acid(a int) stored as orc 
tblproperties("transactional"="true");

    insert into table full_acid select * from ext where a != 3 and a <=7 group 
by a;
    insert into table full_acid select * from ext where a>7 group by a;

    set mapreduce.job.reduces=1;
    delete from full_acid where a in (2, 12);
{code}

The delete will fail with the following exception:
{code}
Caused by: java.lang.ArrayIndexOutOfBoundsException: 6
        at 
org.apache.hadoop.hive.ql.exec.FileSinkOperator$FSPaths.closeWriters(FileSinkOperator.java:258)
{code}

The problem is in the FileSinkOperator.createDynamicBucket method:
{code}
   public int createDynamicBucket(int bucketNum) {
      // this assumes all paths are bucket names (which means no lookup is 
needed)
      int writerOffset = bucketNum;
      if (updaters.length <= writerOffset) {
        this.updaters = Arrays.copyOf(updaters, writerOffset + 1);
        this.outPaths = Arrays.copyOf(outPaths, writerOffset + 1);
        this.finalPaths = Arrays.copyOf(finalPaths, writerOffset + 1);
      }

      if (this.finalPaths[writerOffset] == null) {
        if (conf.isDirectInsert()) {
          this.outPathsCommitted = Arrays.copyOf(outPathsCommitted, 
writerOffset + 1);
          this.finalPaths[writerOffset] = buildTmpPath();
          this.outPaths[writerOffset] = buildTmpPath();
        } else {
          // uninitialized bucket
          String bucketName =
              Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), 
bucketNum);
          this.finalPaths[writerOffset] = new Path(bDynParts ? buildTmpPath() : 
parent, bucketName);
          this.outPaths[writerOffset] = new Path(buildTaskOutputTempPath(), 
bucketName);
        }
      }
      return writerOffset;
    }
  } // class FSPaths
{code}

In the first part the updaters, outPaths and finalPaths arrays are copied if 
the writerOffset is not smaller than their length. So these array are extended. 
But in the second part when the outPathsCommitted array is copied, the size of 
the array is not compared with the writerOffset. So it can happen that the 
outPathsCommitted array is reduced. If this situation happens it leads to the 
ArrayIndexOutOfBoundsException when closing the writes, because the 
outPathsCommitted array is shorter than the updaters array.

*About the reproduction:
The first insert into the full_acid table creates files with buckets 1, 2, 3, 
5, 6
The second insert creates the files with buckets 1, 4, 6

The bucket number for 6 is 537264128, for 4 is 537133056, so for 4 it is 
smaller than for 6.
To reproduce the issue, we need to delete a row from bucket 6 and bucket 4 
together and make it so, that both rows are processed by the same 
FileSinkOperator. It will process the row from bucket 6 first, so makes the 
arraycopy in dynamicBucketing with the writerOffset 6. Then comes the row for 4 
and in the dynamicBucketing it will do the second arrayCopy wrongly. So the 
finalPath array will be size 7, but the outPathsCommitted will be arrayCopied 
to size 4+1. This will cause the exception when closing the writers.
By setting the reducer number to 1 before the delete, both rows are processed 
by the same FileSinkOperator.


> ACID deletes are failing with ArrayIndexOutOfBoundsException when direct 
> insert is enabled 
> -------------------------------------------------------------------------------------------
>
>                 Key: HIVE-28790
>                 URL: https://issues.apache.org/jira/browse/HIVE-28790
>             Project: Hive
>          Issue Type: Bug
>    Affects Versions: 4.0.0
>            Reporter: Marta Kuczora
>            Priority: Major
>
> *Steps to reproduce:
> *
> {code:java}
>     set mapreduce.job.reduces=7;
>     create external table ext(a int) stored as textfile;
>     insert into table ext values(1),(2),(3),(4),(5),(6),(7), (8), (9), (12);
>     create table full_acid(a int) stored as orc 
> tblproperties("transactional"="true");
>     insert into table full_acid select * from ext where a != 3 and a <=7 
> group by a;
>     insert into table full_acid select * from ext where a>7 group by a;
>     set mapreduce.job.reduces=1;
>     delete from full_acid where a in (2, 12);
> {code}
> The delete will fail with the following exception:
> {code}
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 6
>       at 
> org.apache.hadoop.hive.ql.exec.FileSinkOperator$FSPaths.closeWriters(FileSinkOperator.java:258)
> {code}
> The problem is in the FileSinkOperator.createDynamicBucket method:
> {code}
>    public int createDynamicBucket(int bucketNum) {
>       // this assumes all paths are bucket names (which means no lookup is 
> needed)
>       int writerOffset = bucketNum;
>       if (updaters.length <= writerOffset) {
>         this.updaters = Arrays.copyOf(updaters, writerOffset + 1);
>         this.outPaths = Arrays.copyOf(outPaths, writerOffset + 1);
>         this.finalPaths = Arrays.copyOf(finalPaths, writerOffset + 1);
>       }
>       if (this.finalPaths[writerOffset] == null) {
>         if (conf.isDirectInsert()) {
>           this.outPathsCommitted = Arrays.copyOf(outPathsCommitted, 
> writerOffset + 1);
>           this.finalPaths[writerOffset] = buildTmpPath();
>           this.outPaths[writerOffset] = buildTmpPath();
>         } else {
>           // uninitialized bucket
>           String bucketName =
>               Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), 
> bucketNum);
>           this.finalPaths[writerOffset] = new Path(bDynParts ? buildTmpPath() 
> : parent, bucketName);
>           this.outPaths[writerOffset] = new Path(buildTaskOutputTempPath(), 
> bucketName);
>         }
>       }
>       return writerOffset;
>     }
>   } // class FSPaths
> {code}
> In the first part the updaters, outPaths and finalPaths arrays are copied if 
> the writerOffset is not smaller than their length. So these array are 
> extended. But in the second part when the outPathsCommitted array is copied, 
> the size of the array is not compared with the writerOffset. So it can happen 
> that the outPathsCommitted array is reduced. If this situation happens it 
> leads to the ArrayIndexOutOfBoundsException when closing the writes, because 
> the outPathsCommitted array is shorter than the updaters array.
> *About the reproduction:
> *
> The first insert into the full_acid table creates files with buckets 1, 2, 3, 
> 5, 6
> The second insert creates the files with buckets 1, 4, 6
> The bucket number for 6 is 537264128, for 4 is 537133056, so for 4 it is 
> smaller than for 6.
> To reproduce the issue, we need to delete a row from bucket 6 and bucket 4 
> together and make it so, that both rows are processed by the same 
> FileSinkOperator. It will process the row from bucket 6 first, so makes the 
> arraycopy in dynamicBucketing with the writerOffset 6. Then comes the row for 
> 4 and in the dynamicBucketing it will do the second arrayCopy wrongly. So the 
> finalPath array will be size 7, but the outPathsCommitted will be arrayCopied 
> to size 4+1. This will cause the exception when closing the writers.
> By setting the reducer number to 1 before the delete, both rows are processed 
> by the same FileSinkOperator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to