Response inline 2016-03-03 23:39 GMT+01:00 Prasanth Jayachandran < pjayachand...@hortonworks.com>:
> Small Correction inline. > > On Mar 3, 2016, at 4:28 PM, Prasanth Jayachandran < > pjayachand...@hortonworks.com> wrote: > > Hi Patrick > > Please find answers inline.. > > > On Mar 1, 2016, at 8:41 AM, Patrick Duin <patd...@gmail.com> wrote: > > Hi Prasanth, > > Thanks for this. I tried out the configuration and I wanted to share some > number with you. > > My test setup is a cascading job that reads in 240 files (ranging from > 1.5GB to 2.5GB). > In the job log I get the duration from these lines: > INFO log.PerfLogger: </PERFLOG method=OrcGetSplits start=1456747523670 > end=1456747640171 duration=116501 > from=org.apache.hadoop.hive.ql.io.orc.ReaderImpl> > > Running this without any of the configuration takes:116501 ms > Setting both flags as per your email: 27233 ms > A nice improvement. > > > This gain is mainly from avoiding reading of footers during split > computation.. 27s for 240 files still looks a lot to me. > > Is this config set to true? hive.orc.splits.include.file.footer > > I don't set this (the default seems to be false). If I set this to true my job runs slower (creeping towards 40s) and failing with my original error when trying to run it on more files. > > My guess is its still reading the footers which could be the reason for > 27s. I will run some tests to see if there is a bug (reading footers > despite disabling cache and predicate pushdown). > > > It should read footer but shouldn’t read metadata if predicate pushdown is > disabled. Metadata section for large files could be big relative to the > size of footer. Metadata is required only for split elimination and not for > split computation. > > But doing the same test on data where the files have file size smaller > than 256MB (The orc block size). > The orcGetSplits takes: 2741 ms > With or without setting the configuration, result are the same. > > > This seems to be correct. > > > This is still a fairly big gap. Knowing we can tune the performance with > your suggested configuration is great as we might not always have the > option to repartition our data. Still avoiding spanning files over multiple > blocks seems to have much more of an impact even though it is > counter-intuitive. > Would be good to know if other users have similar experiences. > > Again thanks for your help. > > Kind regards, > Patrick. > > > > 2016-02-29 6:38 GMT+00:00 Prasanth Jayachandran < > pjayachand...@hortonworks.com>: > >> Hi Patrick >> >> Please find answers inline >> >> On Feb 26, 2016, at 9:36 AM, Patrick Duin <patd...@gmail.com> wrote: >> >> Hi Prasanth. >> >> Thanks for the quick reply! >> >> The logs don't show much more of the stacktrace I'm afraid: >> java.lang.NullPointerException >> at >> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.run(OrcInputFormat.java:809) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> >> >> The stacktrace isn't really the issue though. The NullPointer is a >> symptom caused by not being able to return any stripes, if you look at the >> line in the code it is because the 'stripes' field is null which should >> never happen. This, we think, is caused by failing namenode network >> traffic. We would have lots of IO warning in the logs saying block's cannot >> be found or e.g.: >> 16/02/01 13:20:34 WARN hdfs.BlockReaderFactory: I/O error constructing >> remote block reader. >> java.io.IOException: java.lang.InterruptedException >> at org.apache.hadoop.ipc.Client.call(Client.java:1448) >> at org.apache.hadoop.ipc.Client.call(Client.java:1400) >> at >> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) >> at com.sun.proxy.$Proxy32.getServerDefaults(Unknown Source) >> at >> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getServerDefaults(ClientNamenodeProtocolTranslatorPB.java:268) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at >> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) >> at >> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) >> at com.sun.proxy.$Proxy33.getServerDefaults(Unknown Source) >> at >> org.apache.hadoop.hdfs.DFSClient.getServerDefaults(DFSClient.java:1007) >> at >> org.apache.hadoop.hdfs.DFSClient.shouldEncryptData(DFSClient.java:2062) >> at >> org.apache.hadoop.hdfs.DFSClient.newDataEncryptionKey(DFSClient.java:2068) >> at >> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.checkTrustAndSend(SaslDataTransferClient.java:208) >> at >> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.peerSend(SaslDataTransferClient.java:159) >> at >> org.apache.hadoop.hdfs.net.TcpPeerServer.peerFromSocketAndKey(TcpPeerServer.java:90) >> at >> org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3123) >> at >> org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:755) >> at >> org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:670) >> at >> org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:337) >> at >> org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576) >> at >> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800) >> at >> org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:848) >> at java.io.DataInputStream.readFully(DataInputStream.java:195) >> at >> org.apache.hadoop.hive.ql.io.orc.ReaderImpl.extractMetaInfoFromFooter(ReaderImpl.java:407) >> at >> org.apache.hadoop.hive.ql.io.orc.ReaderImpl.<init>(ReaderImpl.java:311) >> at >> org.apache.hadoop.hive.ql.io.orc.OrcFile.createReader(OrcFile.java:228) >> at >> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.populateAndCacheStripeDetails(OrcInputFormat.java:885) >> at >> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.run(OrcInputFormat.java:771) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.lang.InterruptedException >> at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:400) >> at java.util.concurrent.FutureTask.get(FutureTask.java:187) >> at >> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1047) >> at org.apache.hadoop.ipc.Client.call(Client.java:1442) >> ... 33 more >> >> Our job doesn't always fail sometimes splits get calculated. We suspect >> when the namenode is too busy our job maybe hits some time-outs and the >> whole thing fails. >> >> Our intuition has been the same as you suggest, bigger files is better. >> But we see a degradation in performance as soon as our files get bigger >> than the ORC block size. Keeping file size within ORC block size sounds >> silly but when looking at the code (OrcInputFormat) we think it cuts out a >> bunch of code that is causing us problems. The code we are trying to hit is: >> https://github.com/apache/hive/blob/release-0.14.0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java#L656 >> . >> >> >> This line is hit only when the file does not span multiple blocks and is >> less than a max split size (by default same as block size). If you want to >> avoid reading the footers for split elimination or if you are not using >> SARGs then I would recommend the following configurations >> >> // disables file footer cache. When this cache is disabled file footers >> are not read >> set hive.orc.cache.stripe.details.size=-1; >> >> // disables predicate pushdown (when not using SARG no need for this) >> set hive.optimize.index.filter=false; >> >> >> Avoiding the scheduling. >> >> In our case we are not using any SARG but we do use column projection. >> >> Any idea why if we query the data via Hive we don't have this issue? >> >> Let me know if you need more information. Thanks for the insights, much >> appreciated. >> >> Kind regards, >> Patrick >> >> >> 2016-02-25 22:20 GMT+01:00 Prasanth Jayachandran < >> pjayachand...@hortonworks.com>: >> >>> >>> > On Feb 25, 2016, at 3:15 PM, Prasanth Jayachandran < >>> pjayachand...@hortonworks.com> wrote: >>> > >>> > Hi Patrick >>> > >>> > Can you paste entire stacktrace? Looks like NPE happened during split >>> generation but stack trace is incomplete to know what caused it. >>> > >>> > In Hive 0.14.0, the stripe size is changed to 64MB. The default block >>> size for ORC files is 256MB. 4 stripes can fit a block. ORC does padding to >>> avoid stripes straddling HDFS blocks. During split calculation, ORC footer >>> which contains stripe level column statistics is read to perform split >>> pruning based on predicate condition specified via SARG(Search Argument). >>> > >>> > For example: Assume column ‘state’ is sorted and the predicate >>> condition is ‘state’=“CA" >>> > Stripe 1: min = AZ max = FL >>> > Stripe 2: min = GA max = MN >>> > Stripe 3: min = MS max = SC >>> > Stripe 4: min = SD max = WY >>> > >>> > In this case, only stripe 1 satisfies the above predicate condition. >>> So only 1 split with stripe 1 will be created. >>> > So if there are huge number of small files, then footers from all >>> files has to be read to do split pruning. If there are few number of large >>> files then only few footers have to be read. Also the minimum splittable >>> position is stripe boundary. So having fewer large files has the advantage >>> of reading less data during split pruning. >>> > >>> > If you can send me the full stacktrace, I can tell what is causing the >>> exception here. I will also let you know of any workaround/next hive >>> version with the fix. >>> > >>> > In more recent hive versions, hive 1.2.0 onwards. OrcInputFormat is >>> has strategies to decided when to read footers and when not to read footers >>> automatically. You can configure the strategy that you want based on the >>> workload. In case of many small files, footers will not be read and with >>> large files footers will be read for split pruning. >>> >>> The default strategy does it automatically (choosing between when to >>> read and when not to footers). It is configurable as well. >>> >>> > >>> > Thanks >>> > Prasanth >>> > >>> >> On Feb 25, 2016, at 7:08 AM, Patrick Duin <patd...@gmail.com> wrote: >>> >> >>> >> Hi, >>> >> >>> >> We've recently moved one of our datasets to ORC and we use Cascading >>> and Hive to read this data. We've had problems reading the data via >>> Cascading, because of the generation of splits. >>> >> We read in a large number of files (thousands) and they are about 1GB >>> each. We found that the split calculation took minutes on our cluster and >>> often didn't succeed at all (when our namenode was busy). >>> >> When digging through the code of the >>> 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.class' we figured out that >>> if we make the files less then the ORC block size (256MB) the code would >>> avoid lots of namenode calls. We applied this solution and made our files >>> smaller and that solved the problem. Split calculation in our job went from >>> 10+ mins to a couple of seconds and always succeeds. >>> >> We feel it is counterintuitive as bigger files are usually better in >>> HDFS. We've also seen that doing a hive query on the data does not present >>> this problem. Internally Hive seem to take a completely different execution >>> path and is not using the OrcInputFormat but uses >>> 'org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.class'. >>> >> >>> >> Can someone explain the reason for this difference or shed some light >>> on the behaviour we are seeing? Any help will be greatly appreciated. We >>> are using hive-0.14.0. >>> >> >>> >> Kind regards, >>> >> Patrick >>> >> >>> >> Here is the stack-trace that we would see when our Cascading job >>> failed to calculate the splits: >>> >> Caused by: java.lang.RuntimeException: serious problem >>> >> at >>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$Context.waitForTasks(OrcInputFormat.java:478) >>> >> at >>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:949) >>> >> at >>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:974) >>> >> at >>> com.hotels.corc.mapred.CorcInputFormat.getSplits(CorcInputFormat.java:201) >>> >> at >>> cascading.tap.hadoop.io.MultiInputFormat.getSplits(MultiInputFormat.java:200) >>> >> at >>> cascading.tap.hadoop.io.MultiInputFormat.getSplits(MultiInputFormat.java:142) >>> >> at >>> org.apache.hadoop.mapreduce.JobSubmitter.writeOldSplits(JobSubmitter.java:624) >>> >> at >>> org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:616) >>> >> at >>> org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:492) >>> >> at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1296) >>> >> at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1293) >>> >> at java.security.AccessController.doPrivileged(Native Method) >>> >> at javax.security.auth.Subject.doAs(Subject.java:415) >>> >> at >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) >>> >> at org.apache.hadoop.mapreduce.Job.submit(Job.java:1293) >>> >> at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:585) >>> >> at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:580) >>> >> at java.security.AccessController.doPrivileged(Native Method) >>> >> at javax.security.auth.Subject.doAs(Subject.java:415) >>> >> at >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) >>> >> at >>> org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:580) >>> >> at >>> org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:571) >>> >> at >>> cascading.flow.hadoop.planner.HadoopFlowStepJob.internalNonBlockingStart(HadoopFlowStepJob.java:106) >>> >> at >>> cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:265) >>> >> at >>> cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:184) >>> >> at >>> cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:146) >>> >> at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:48) >>> >> ... 4 more >>> >> Caused by: java.lang.NullPointerException >>> >> at >>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.run(OrcInputFormat.java:809) >>> > >>> >> >