Facing weird problem while reading Parquet
Hi everyone, I am using spark-core-2.4 and spark-sql-2.4 (java spark). While reading 40K parquet part files from a single HDFS directory, somehow spark is spanning only 20037 parallel tasks, which is weird. My initial experience with spark is that while reading number of total tasks are equal to number of part files (until and unless we haven't provided *spark.default.parallelism* and *hdfs.input.split.size*) Each part file size varies from 58 to 61MB in size, and HDFS block size is 512 MB. I checked stage info and found for many tasks input size was doubled/tripled, which resulted in failure of executors due to Out Of Memory. Also when I tried reading a single part file, spark spanned a total of 17 tasks (which is also weird). Here it increased the tasks, also I checked input metrics, only one task was having some input records others were showing zero. It is our first time using parquet and we are doing simple write and read. for writing - *ds.write().parquet(outputPath); // this is writing 40K part files* for reading - *sqlContext.read().parquet(inputPath).javaRDD() // here we are trying to read same 40K part files* *Regards,* *Prateek Rajput* -- *-* *This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to whom they are addressed. If you have received this email in error, please notify the system manager. This message contains confidential information and is intended only for the individual named. If you are not the named addressee, you should not disseminate, distribute or copy this email. Please notify the sender immediately by email if you have received this email by mistake and delete this email from your system. If you are not the intended recipient, you are notified that disclosing, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited.* *Any views or opinions presented in this email are solely those of the author and do not necessarily represent those of the organization. Any information on shares, debentures or similar instruments, recommended product pricing, valuations and the like are for information purposes only. It is not meant to be an instruction or recommendation, as the case may be, to buy or to sell securities, products, services nor an offer to buy or sell securities, products or services unless specifically stated to be so on behalf of the Flipkart group. Employees of the Flipkart group of companies are expressly required not to make defamatory statements and not to infringe or authorise any infringement of copyright or any other legal right by email communications. Any such communication is contrary to organizational policy and outside the scope of the employment of the individual concerned. The organization will not accept any liability in respect of such communication, and the employee responsible will be personally liable for any damages or other liability arising.* *Our organization accepts no liability for the content of this email, or for the consequences of any actions taken on the basis of the information *provided,* unless that information is subsequently confirmed in writing. If you are not the intended recipient, you are notified that disclosing, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited.* _-_
Re: Getting EOFFileException while reading from sequence file in spark
Hi all, Please share if anyone have faced the same problem. There are many similar issues on web but I did not find any solution and reason why this happens. It will be really helpful. Regards, Prateek On Mon, Apr 29, 2019 at 3:18 PM Prateek Rajput wrote: > I checked and removed 0 sized files then also it is coming. And sometimes > when there is no 0 size file then also it is happening. > I checked data also if it is corrupted by directly opening that file and > checking it. I traced whole data but did not find any issue. For hadoop > Map-Reduce no such issue is coming it is happening in case of spark only. > > On Mon, Apr 29, 2019 at 2:50 PM Deepak Sharma > wrote: > >> This can happen if the file size is 0 >> >> On Mon, Apr 29, 2019 at 2:28 PM Prateek Rajput >> wrote: >> >>> Hi guys, >>> I am getting this strange error again and again while reading from from >>> a sequence file in spark. >>> User class threw exception: org.apache.spark.SparkException: Job aborted. >>> at >>> org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:100) >>> at >>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1096) >>> at >>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094) >>> at >>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094) >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) >>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) >>> at >>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1094) >>> at >>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1067) >>> at >>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032) >>> at >>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032) >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) >>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) >>> at >>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1032) >>> at >>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:958) >>> at >>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958) >>> at >>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958) >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) >>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) >>> at >>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:957) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1499) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1478) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1478) >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) >>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) >>> at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1478) >>> at >>> org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:550) >>> at >>> org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45) >>> at >>> com.flipkart.prognos.spark.UniqueDroppedFSN.main(UniqueDroppedFSN.java:42) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at >>> org.apache.spark.deploy.yarn.Appl
Re: How to specify number of Partition using newAPIHadoopFile()
On Tue, Apr 30, 2019 at 6:48 PM Vatsal Patel wrote: > *Issue: * > > When I am reading sequence file in spark, I can specify the number of > partitions as an argument to the API, below is the way > *public JavaPairRDD sequenceFile(String path, Class > keyClass, Class valueClass, int minPartitions)* > > *In newAPIHadoopFile(), this support has been removed. below are the APIs.* > >- public V>> JavaPairRDD *newAPIHadoopFile*(String path, Class fClass, >Class kClass, Class vClass, Configuration conf) >- > >public > >JavaPairRDD *newAPIHadoopRDD*(Configuration conf, Class >fClass, Class kClass, Class vClass) > > Is there a way to specify the number of partitions when I will read *Avro* > file using *newAPIHadoopFile*(). I explored and found that we can specify > the hadoop configuration and in that, we can set various Hadoop properties. > but there we can specify the size using this property > ("*mapred.max.split.size","50mb"). > *based on this it will calculate the number of partitions but then each > partition's size may or may not be equal or less than the specified size. > >- *note - *A way other than repartition() > > *Execution Environment* > >- SPARK-JAVA VERSION - 2.4.0 >- JDK VERSION - 1.8 >- SPARK ARTIFACTID - spark-core_2.11 >- AVRO VERSION - 1.8.2 > > Please help us understand, why this issue is coming? > > Thanks, > Vatsal >
Re: Getting EOFFileException while reading from sequence file in spark
I checked and removed 0 sized files then also it is coming. And sometimes when there is no 0 size file then also it is happening. I checked data also if it is corrupted by directly opening that file and checking it. I traced whole data but did not find any issue. For hadoop Map-Reduce no such issue is coming it is happening in case of spark only. On Mon, Apr 29, 2019 at 2:50 PM Deepak Sharma wrote: > This can happen if the file size is 0 > > On Mon, Apr 29, 2019 at 2:28 PM Prateek Rajput > wrote: > >> Hi guys, >> I am getting this strange error again and again while reading from from a >> sequence file in spark. >> User class threw exception: org.apache.spark.SparkException: Job aborted. >> at >> org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:100) >> at >> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1096) >> at >> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094) >> at >> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) >> at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) >> at >> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1094) >> at >> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1067) >> at >> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032) >> at >> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) >> at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) >> at >> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1032) >> at >> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:958) >> at >> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958) >> at >> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) >> at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) >> at >> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:957) >> at >> org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1499) >> at >> org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1478) >> at >> org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1478) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) >> at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) >> at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1478) >> at >> org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:550) >> at >> org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45) >> at >> com.flipkart.prognos.spark.UniqueDroppedFSN.main(UniqueDroppedFSN.java:42) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678) >> Caused by: org.apache.spark.SparkException: Job aborted due to stage >> failure: Task 186 in stage 0.0 failed 4 times, most recent failure: Lost >> task 186.3 in stage 0.0 (TID 179, prod-fdphadoop-krios-dn-1039, executor >> 1): java.io.EOFException >> at java.io.DataInputStream.readFully(DataInputStream.java:197) >> at >> org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:70) >> at org.apache.hadoop.io.DataOutputBuffer.write(DataOu
Getting EOFFileException while reading from sequence file in spark
Hi guys, I am getting this strange error again and again while reading from from a sequence file in spark. User class threw exception: org.apache.spark.SparkException: Job aborted. at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:100) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1096) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1094) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1067) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1032) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:958) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:957) at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1499) at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1478) at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1478) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1478) at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:550) at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45) at com.flipkart.prognos.spark.UniqueDroppedFSN.main(UniqueDroppedFSN.java:42) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 186 in stage 0.0 failed 4 times, most recent failure: Lost task 186.3 in stage 0.0 (TID 179, prod-fdphadoop-krios-dn-1039, executor 1): java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:197) at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:70) at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:120) at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2436) at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2568) at org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:82) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:293) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:224) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at