[ https://issues.apache.org/jira/browse/PIG-4313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Carlos Balduz updated PIG-4313: ------------------------------- Description: The current implemenation of the LIMIT operation does not take into account the actual limit introduced in the script when iterating over the tuples. POOutputConsumerIterator will iterate over all of them even if the limit has already been reached. However, it uses POLimit's getNextTuple method to get the next result, and this returns a Result with a STATUS_EOP status once the limit has been reached. Since POOutputConsumerIterator calls recursively the readNext method when a STATUS_EOP is returned, a StackOverflowError is thrown for large datasets. I have solved this by creating a subclass of POOutputConsumerIterator that takes into account the limit when reading results, so that it ends once it has found the desired number of STATUS_OK results. {code:java} 2014-11-07 13:35:33,715 [Result resolver thread-0] WARN org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0, master): java.lang.StackOverflowError: org.joda.time.format.DateTimeFormatterBuilder$MatchingParser.parseInto(DateTimeFormatterBuilder.java:2793) org.joda.time.format.DateTimeFormatterBuilder$Composite.parseInto(DateTimeFormatterBuilder.java:2695) org.joda.time.format.DateTimeFormatterBuilder$MatchingParser.parseInto(DateTimeFormatterBuilder.java:2793) org.joda.time.format.DateTimeFormatterBuilder$Composite.parseInto(DateTimeFormatterBuilder.java:2695) org.joda.time.format.DateTimeFormatterBuilder$MatchingParser.parseInto(DateTimeFormatterBuilder.java:2793) org.joda.time.format.DateTimeFormatterBuilder$Composite.parseInto(DateTimeFormatterBuilder.java:2695) org.joda.time.format.DateTimeFormatterBuilder$MatchingParser.parseInto(DateTimeFormatterBuilder.java:2793) org.joda.time.format.DateTimeFormatterBuilder$Composite.parseInto(DateTimeFormatterBuilder.java:2695) org.joda.time.format.DateTimeFormatterBuilder$MatchingParser.parseInto(DateTimeFormatterBuilder.java:2793) org.joda.time.format.DateTimeFormatterBuilder$Composite.parseInto(DateTimeFormatterBuilder.java:2695) org.joda.time.format.DateTimeFormatterBuilder$MatchingParser.parseInto(DateTimeFormatterBuilder.java:2793) org.joda.time.format.DateTimeFormatterBuilder$Composite.parseInto(DateTimeFormatterBuilder.java:2695) org.joda.time.format.DateTimeFormatter.parseDateTime(DateTimeFormatter.java:846) org.apache.pig.builtin.ToDate.extractDateTime(ToDate.java:124) org.apache.pig.builtin.Utf8StorageConverter.bytesToDateTime(Utf8StorageConverter.java:541) org.apache.pig.impl.util.CastUtils.convertToType(CastUtils.java:61) org.apache.pig.builtin.PigStorage.applySchema(PigStorage.java:339) org.apache.pig.builtin.PigStorage.getNext(PigStorage.java:282) arq.hadoop.pig.loaders.PigStorage.getNext(PigStorage.java:49) org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader.nextKeyValue(PigRecordReader.java:204) org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:138) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:95) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) ... (~1000 lines) {code} was: The current implemenation of the LIMIT operation does not take into account the actual limit introduced in the script when iterating over the tuples. POOutputConsumerIterator will iterate over all of them even if the limit has already been reached. However, it uses POLimit's getNextTuple method to get the next result, and this returns a Result with a STATUS_EOP status once the limit has been reached. Since POOutputConsumerIterator calls recursively the readNext method when a STATUS_EOP is returned, a StackOverflowError is thrown for large datasets. I have solved this by creating a subclass of POOutputConsumerIterator that takes into account the limit when reading results, so that it ends once it has found the desired number of STATUS_OK results. 2014-11-07 13:35:33,715 [Result resolver thread-0] WARN org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0, master): java.lang.StackOverflowError: org.joda.time.format.DateTimeFormatterBuilder$MatchingParser.parseInto(DateTimeFormatterBuilder.java:2793) org.joda.time.format.DateTimeFormatterBuilder$Composite.parseInto(DateTimeFormatterBuilder.java:2695) org.joda.time.format.DateTimeFormatterBuilder$MatchingParser.parseInto(DateTimeFormatterBuilder.java:2793) org.joda.time.format.DateTimeFormatterBuilder$Composite.parseInto(DateTimeFormatterBuilder.java:2695) org.joda.time.format.DateTimeFormatterBuilder$MatchingParser.parseInto(DateTimeFormatterBuilder.java:2793) org.joda.time.format.DateTimeFormatterBuilder$Composite.parseInto(DateTimeFormatterBuilder.java:2695) org.joda.time.format.DateTimeFormatterBuilder$MatchingParser.parseInto(DateTimeFormatterBuilder.java:2793) org.joda.time.format.DateTimeFormatterBuilder$Composite.parseInto(DateTimeFormatterBuilder.java:2695) org.joda.time.format.DateTimeFormatterBuilder$MatchingParser.parseInto(DateTimeFormatterBuilder.java:2793) org.joda.time.format.DateTimeFormatterBuilder$Composite.parseInto(DateTimeFormatterBuilder.java:2695) org.joda.time.format.DateTimeFormatterBuilder$MatchingParser.parseInto(DateTimeFormatterBuilder.java:2793) org.joda.time.format.DateTimeFormatterBuilder$Composite.parseInto(DateTimeFormatterBuilder.java:2695) org.joda.time.format.DateTimeFormatter.parseDateTime(DateTimeFormatter.java:846) org.apache.pig.builtin.ToDate.extractDateTime(ToDate.java:124) org.apache.pig.builtin.Utf8StorageConverter.bytesToDateTime(Utf8StorageConverter.java:541) org.apache.pig.impl.util.CastUtils.convertToType(CastUtils.java:61) org.apache.pig.builtin.PigStorage.applySchema(PigStorage.java:339) org.apache.pig.builtin.PigStorage.getNext(PigStorage.java:282) arq.hadoop.pig.loaders.PigStorage.getNext(PigStorage.java:49) org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader.nextKeyValue(PigRecordReader.java:204) org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:138) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:95) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) ... (~1000 lines) > StackOverflowError in LIMIT operation on Spark > ---------------------------------------------- > > Key: PIG-4313 > URL: https://issues.apache.org/jira/browse/PIG-4313 > Project: Pig > Issue Type: Bug > Components: spark > Reporter: Carlos Balduz > Assignee: Carlos Balduz > Attachments: PIG-4313.patch > > > The current implemenation of the LIMIT operation does not take into account > the actual limit introduced in the script when iterating over the tuples. > POOutputConsumerIterator will iterate over all of them even if the limit has > already been reached. However, it uses POLimit's getNextTuple method to get > the next result, and this returns a Result with a STATUS_EOP status once the > limit has been reached. Since POOutputConsumerIterator calls recursively the > readNext method when a STATUS_EOP is returned, a StackOverflowError is thrown > for large datasets. > I have solved this by creating a subclass of POOutputConsumerIterator that > takes into account the limit when reading results, so that it ends once it > has found the desired number of STATUS_OK results. > {code:java} > 2014-11-07 13:35:33,715 [Result resolver thread-0] WARN > org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage 0.0 (TID > 0, master): java.lang.StackOverflowError: > > org.joda.time.format.DateTimeFormatterBuilder$MatchingParser.parseInto(DateTimeFormatterBuilder.java:2793) > > org.joda.time.format.DateTimeFormatterBuilder$Composite.parseInto(DateTimeFormatterBuilder.java:2695) > > org.joda.time.format.DateTimeFormatterBuilder$MatchingParser.parseInto(DateTimeFormatterBuilder.java:2793) > > org.joda.time.format.DateTimeFormatterBuilder$Composite.parseInto(DateTimeFormatterBuilder.java:2695) > > org.joda.time.format.DateTimeFormatterBuilder$MatchingParser.parseInto(DateTimeFormatterBuilder.java:2793) > > org.joda.time.format.DateTimeFormatterBuilder$Composite.parseInto(DateTimeFormatterBuilder.java:2695) > > org.joda.time.format.DateTimeFormatterBuilder$MatchingParser.parseInto(DateTimeFormatterBuilder.java:2793) > > org.joda.time.format.DateTimeFormatterBuilder$Composite.parseInto(DateTimeFormatterBuilder.java:2695) > > org.joda.time.format.DateTimeFormatterBuilder$MatchingParser.parseInto(DateTimeFormatterBuilder.java:2793) > > org.joda.time.format.DateTimeFormatterBuilder$Composite.parseInto(DateTimeFormatterBuilder.java:2695) > > org.joda.time.format.DateTimeFormatterBuilder$MatchingParser.parseInto(DateTimeFormatterBuilder.java:2793) > > org.joda.time.format.DateTimeFormatterBuilder$Composite.parseInto(DateTimeFormatterBuilder.java:2695) > > org.joda.time.format.DateTimeFormatter.parseDateTime(DateTimeFormatter.java:846) > org.apache.pig.builtin.ToDate.extractDateTime(ToDate.java:124) > > org.apache.pig.builtin.Utf8StorageConverter.bytesToDateTime(Utf8StorageConverter.java:541) > org.apache.pig.impl.util.CastUtils.convertToType(CastUtils.java:61) > org.apache.pig.builtin.PigStorage.applySchema(PigStorage.java:339) > org.apache.pig.builtin.PigStorage.getNext(PigStorage.java:282) > arq.hadoop.pig.loaders.PigStorage.getNext(PigStorage.java:49) > > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader.nextKeyValue(PigRecordReader.java:204) > > org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:138) > > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) > scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) > scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > > scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29) > > org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:95) > > org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) > > org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) > > org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) > > org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) > > org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) > > org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) > > org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) > > org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) > > org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) > > org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) > > org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) > > org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) > > org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) > > org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) > > org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) > > org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) > > org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98) > ... > (~1000 lines) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)