[ https://issues.apache.org/jira/browse/BEAM-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16013318#comment-16013318 ]
Jean-Baptiste Onofré commented on BEAM-2308: -------------------------------------------- Can you try to use {{TextIO.read()}} instead of {{HDFSFileSource}}. It seems the {{HDFSFileSource}} reader is not able to read more "record". > run beam on spark runner successfully with small data,but fail with a big data > ------------------------------------------------------------------------------ > > Key: BEAM-2308 > URL: https://issues.apache.org/jira/browse/BEAM-2308 > Project: Beam > Issue Type: Bug > Components: runner-spark, sdk-java-core > Environment: spark jar 1.6.2 > Reporter: liyuntian > Assignee: Amit Sela > > run beam on spark runner successfully with small data,but fail with a big > data about 1G.my spark configuration:--num-executors 3 --executor-memory 4G > --executor-cores 5. > this is my code: > Read.Bounded<KV<LongWritable, Text>> source = > Read.from(HDFSFileSource.from(inputPath, TextInputFormat.class, > LongWritable.class, Text.class).withConfiguration(config)); > PCollection<KV<LongWritable, Text>> recordsFromHdfs = > pipeline.apply(source); > PCollection<List<String>> recordsList = > recordsFromHdfs.apply(ParDo.of(new InputHdfsFileFn(delimit, > firstTableColumnsSize))); > //convert to flow > String nextOutputTable; > Map<String, ComponentPara> map = beamTable.row(firstTableName); > ComponentPara component = (ComponentPara) > map.values().toArray()[0]; > PCollection<List<String>> nextPCollection = > ComponentConvert.convert(component,recordsList); > //write result to hdfs > PCollection<String> recordsToHdfs = > nextPCollection.apply(ParDo.of(new OutputHdfsFileFn(delimit))); > HiveTable.deleteBeamFileOnHdfs(outputPath); > logger.info("输出文件位置:"+outputPath); > > recordsToHdfs.apply(Write.to(HDFSFileSink.<String>toText(outputPath).withConfiguration(config))); > pipeline.run().waitUntilFinish(); > this is error: > 17/05/16 21:31:57 WARN scheduler.TaskSetManager: Lost task 3.0 in stage 1.0 > (TID 13, etl-dev-02): java.util.NoSuchElementException > at > org.apache.beam.sdk.io.hdfs.HDFSFileSource$HDFSFileReader.getCurrent(HDFSFileSource.java:510) > at > org.apache.beam.runners.spark.io.SourceRDD$Bounded$1.next(SourceRDD.java:142) > at > org.apache.beam.runners.spark.io.SourceRDD$Bounded$1.next(SourceRDD.java:111) > at > scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$12.next(Iterator.scala:357) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at > scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:30) > at > org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:165) > at > org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145) > at > org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140) > at > org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162) > at > org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145) > at > org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140) > at > org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162) > at > org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145) > at > org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140) > at > org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162) > at > org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145) > at > org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140) > at > org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162) > at > org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145) > at > org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140) > at > org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162) > at > org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145) > at > org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140) > at > org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162) > at > org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145) > at > org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140) > at > org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162) > at > org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145) > at > org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140) > at > org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162) > at > org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145) > at > org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140) > at > scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at > org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) > at > org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > 17/05/16 21:31:57 INFO scheduler.TaskSetManager: Starting task 3.1 in stage > 1.0 (TID 20, etl-dev-02, partition 3,PROCESS_LOCAL, 42372 bytes) -- This message was sent by Atlassian JIRA (v6.3.15#6346)