[ 
https://issues.apache.org/jira/browse/SPARK-27239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajesh Kumar K updated SPARK-27239:
-----------------------------------
    Description: 
 

>From spark 2.2.x versions, when spark job processing any compressed HDFS files 
>with custom input file format then spark jobs are failing with error 
>"java.lang.IllegalArgumentException: requirement failed: length (-1) cannot be 
>negative", the custom input file format will return the number of bytes length 
>value as -1 for compressed file formats due to the compressed HDFS file are 
>non splitable, so for compressed input file format the split will be offset as 
>0 and number of bytes length as -1, spark should consider the bytes length 
>value -1 as valid split for the compressed file formats.

 

We observed that earlier versions of spark doesn’t have this validation, and 
found that from spark 2.2.x new validation got introduced in the class 
InputFileBlockHolder, so spark should accept the number of bytes length value 
-1 as valid length for input splits from spark 2.2.x as well.

 

+Below is the stack trace.+

 Caused by: java.lang.IllegalArgumentException: requirement failed: length (-1) 
cannot be negative

  at scala.Predef$.require(Predef.scala:224)

  at 
org.apache.spark.rdd.InputFileBlockHolder$.set(InputFileBlockHolder.scala:70)

  at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:226)

  at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:214)

  at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:94)

  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)

  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

  at org.apache.spark.scheduler.Task.run(Task.scala:109)

  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

  at java.lang.Thread.run(Thread.java:748)

 

+Below is the code snippet which caused this issue.+

   **    {color:#ff0000}require(length >= 0, s"length ($length) cannot be 
negative"){color} // This validation caused the issue. 

 
{code:java}
// code placeholder

 org.apache.spark.rdd.InputFileBlockHolder - spark-core

 

def set(filePath: String, startOffset: Long, length: Long): Unit = {

    require(filePath != null, "filePath cannot be null")

    require(startOffset >= 0, s"startOffset ($startOffset) cannot be negative")

    require(length >= 0, s"length ($length) cannot be negative")  

    inputBlock.set(new FileBlock(UTF8String.fromString(filePath), startOffset, 
length))

  }
{code}
 

+Steps to reproduce the issue.+

 Please refer the below code to reproduce the issue.  
{code:java}
// code placeholder

import org.apache.hadoop.mapred.JobConf

val hadoopConf = new JobConf()

import org.apache.hadoop.mapred.FileInputFormat

import org.apache.hadoop.fs.Path

FileInputFormat.setInputPaths(hadoopConf, new 
Path("/output656/part-r-00000.gz"))    

val records = 
sc.hadoopRDD(hadoopConf,classOf[com.platform.custom.storagehandler.INFAInputFormat],
 classOf[org.apache.hadoop.io.LongWritable], 
classOf[org.apache.hadoop.io.Writable]) 

records.count()
{code}
 

  was:
 

>From spark 2.2.x versions, when spark job processing any compressed HDFS files 
>with custom input file format then spark jobs are failing with error 
>"java.lang.IllegalArgumentException: requirement failed: length (-1) cannot be 
>negative", the custom input file format will return the number of bytes length 
>value as -1 for compressed file formats due to the compressed HDFS file are 
>non splitable, so for compressed input file format the split will be offset as 
>0 and number of bytes length as -1, spark should consider the bytes length 
>value -1 as valid split for the compressed file formats.

 

We observed that earlier versions of spark doesn’t have this validation, and 
found that from spark 2.2.x new validation got introduced in the class 
InputFileBlockHolder, so spark should accept the number of bytes length value 
-1 as valid length for input splits from spark 2.2.x as well.

 

+Below is the stack trace.+

 Caused by: java.lang.IllegalArgumentException: requirement failed: length (-1) 
cannot be negative

  at scala.Predef$.require(Predef.scala:224)

  at 
org.apache.spark.rdd.InputFileBlockHolder$.set(InputFileBlockHolder.scala:70)

  at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:226)

  at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:214)

  at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:94)

  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)

  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

  at org.apache.spark.scheduler.Task.run(Task.scala:109)

  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

  at java.lang.Thread.run(Thread.java:748)

 

+Below is the code snippet which caused this issue.+

   **    {color:#ff0000}require(length >= 0, s"length ($length) cannot be 
negative"){color} // This validation caused the issue. 

 
{code:java}
// code placeholder

 org.apache.spark.rdd.InputFileBlockHolder - spark-core

 

def set(filePath: String, startOffset: Long, length: Long): Unit = {

    require(filePath != null, "filePath cannot be null")

    require(startOffset >= 0, s"startOffset ($startOffset) cannot be negative")

    require(length >= 0, s"length ($length) cannot be negative")  

    inputBlock.set(new FileBlock(UTF8String.fromString(filePath), startOffset, 
length))

  }
{code}
 

+Steps to reproduce the issue.+

 Please refer the below code to reproduce the issue.  
{code:java}
// code placeholder

import org.apache.hadoop.mapred.JobConf

val hadoopConf = new JobConf()

import org.apache.hadoop.mapred.FileInputFormat

import org.apache.hadoop.fs.Path

FileInputFormat.setInputPaths(hadoopConf, new 
Path("/output656/part-r-00000.gz"))    

val records = sc.hadoopRDD(hadoopConf*,* 
classOf[com.platform.custom.storagehandler.INFAInputFormat], 
classOf[org.apache.hadoop.io.LongWritable], 
classOf[org.apache.hadoop.io.Writable]) 

records.count()
{code}
 


> Processing Compressed HDFS files with spark failing with error: 
> "java.lang.IllegalArgumentException: requirement failed: length (-1) cannot 
> be negative" from spark 2.2.X
> -------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-27239
>                 URL: https://issues.apache.org/jira/browse/SPARK-27239
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.1, 2.2.2, 2.2.3, 2.3.0, 2.3.1, 2.3.2, 2.3.3, 2.4.0
>            Reporter: Rajesh Kumar K
>            Priority: Blocker
>
>  
> From spark 2.2.x versions, when spark job processing any compressed HDFS 
> files with custom input file format then spark jobs are failing with error 
> "java.lang.IllegalArgumentException: requirement failed: length (-1) cannot 
> be negative", the custom input file format will return the number of bytes 
> length value as -1 for compressed file formats due to the compressed HDFS 
> file are non splitable, so for compressed input file format the split will be 
> offset as 0 and number of bytes length as -1, spark should consider the bytes 
> length value -1 as valid split for the compressed file formats.
>  
> We observed that earlier versions of spark doesn’t have this validation, and 
> found that from spark 2.2.x new validation got introduced in the class 
> InputFileBlockHolder, so spark should accept the number of bytes length value 
> -1 as valid length for input splits from spark 2.2.x as well.
>  
> +Below is the stack trace.+
>  Caused by: java.lang.IllegalArgumentException: requirement failed: length 
> (-1) cannot be negative
>   at scala.Predef$.require(Predef.scala:224)
>   at 
> org.apache.spark.rdd.InputFileBlockHolder$.set(InputFileBlockHolder.scala:70)
>   at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:226)
>   at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:214)
>   at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:94)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:109)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
>  
> +Below is the code snippet which caused this issue.+
>    **    {color:#ff0000}require(length >= 0, s"length ($length) cannot be 
> negative"){color} // This validation caused the issue. 
>  
> {code:java}
> // code placeholder
>  org.apache.spark.rdd.InputFileBlockHolder - spark-core
>  
> def set(filePath: String, startOffset: Long, length: Long): Unit = {
>     require(filePath != null, "filePath cannot be null")
>     require(startOffset >= 0, s"startOffset ($startOffset) cannot be 
> negative")
>     require(length >= 0, s"length ($length) cannot be negative")  
>     inputBlock.set(new FileBlock(UTF8String.fromString(filePath), 
> startOffset, length))
>   }
> {code}
>  
> +Steps to reproduce the issue.+
>  Please refer the below code to reproduce the issue.  
> {code:java}
> // code placeholder
> import org.apache.hadoop.mapred.JobConf
> val hadoopConf = new JobConf()
> import org.apache.hadoop.mapred.FileInputFormat
> import org.apache.hadoop.fs.Path
> FileInputFormat.setInputPaths(hadoopConf, new 
> Path("/output656/part-r-00000.gz"))    
> val records = 
> sc.hadoopRDD(hadoopConf,classOf[com.platform.custom.storagehandler.INFAInputFormat],
>  classOf[org.apache.hadoop.io.LongWritable], 
> classOf[org.apache.hadoop.io.Writable]) 
> records.count()
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to