Reynold you're totally right, as discussed offline -- I didn't think about the limit use case when I wrote this. Sandy, is it easy to fix this as part of your patch to use StatisticsData? If not, I can fix it in a separate patch.
On Sat, Jul 26, 2014 at 12:12 PM, Reynold Xin <r...@databricks.com> wrote: > That makes sense, Sandy. > > When you add the patch, can you make sure you comment inline on why the > fallback is needed? > > > > On Sat, Jul 26, 2014 at 11:46 AM, Sandy Ryza <sandy.r...@cloudera.com> > wrote: > >> I'm working on a patch that switches this stuff out with the Hadoop >> FileSystem StatisticsData, which will both give an accurate count and >> allow >> us to get metrics while the task is in progress. A hitch is that it >> relies >> on https://issues.apache.org/jira/browse/HADOOP-10688, so we still might >> want a fallback for versions of Hadoop that don't have this API. >> >> >> On Sat, Jul 26, 2014 at 10:47 AM, Reynold Xin <r...@databricks.com> >> wrote: >> >> > There is one piece of information that'd be useful to know, which is the >> > source of the input. Even in the presence of an IOException, the input >> > metrics still specifies the task is reading from Hadoop. >> > >> > However, I'm slightly confused by this -- I think usually we'd want to >> > report the number of bytes read, rather than the total input size. For >> > example, if there is a limit (only read the first 5 records), the actual >> > number of bytes read is much smaller than the total split size. >> > >> > Kay, am I mis-interpreting this? >> > >> > >> > >> > On Sat, Jul 26, 2014 at 7:42 AM, Ted Yu <yuzhih...@gmail.com> wrote: >> > >> > > Hi, >> > > Starting at line 203: >> > > try { >> > > /* bytesRead may not exactly equal the bytes read by a task: >> > split >> > > boundaries aren't >> > > * always at record boundaries, so tasks may need to read into >> > > other splits to complete >> > > * a record. */ >> > > inputMetrics.bytesRead = split.inputSplit.value.getLength() >> > > } catch { >> > > case e: java.io.IOException => >> > > logWarning("Unable to get input size to set InputMetrics for >> > > task", e) >> > > } >> > > context.taskMetrics.inputMetrics = Some(inputMetrics) >> > > >> > > If there is IOException, context.taskMetrics.inputMetrics is set by >> > > wrapping inputMetrics - as if there wasn't any error. >> > > >> > > I wonder if the above code should distinguish the error condition. >> > > >> > > Cheers >> > > >> > >> > >