OK. The updateLocalResources method is being made private as part of TEZ-1407. Also, many of the MRHelpers are going away. Moving to the MRInputConfigurer, or the new method being added via TEZ-1407 (client side - but may not be supported for long) would be the way forward.
On Mon, Aug 11, 2014 at 8:58 AM, Thaddeus Diamond < [email protected]> wrote: > Hey Sid, > > I'm actually working with other devs who handle the input split side so I > don't know quite what's in there, but they are not huge. Because of the > way we handle our application (Spring) it's easier to do all the input > split management and construction up front for jobs. However, to make your > lives easier we're actually moving toward a native DAG implementation for > these jobs so I don't anticipate the MRHelpers methods being needed much > longer on our end. > > Yes, we do use the updateLocalResources...() method. > > - Thad > > > On Mon, Aug 11, 2014 at 2:27 AM, Siddharth Seth <[email protected]> wrote: > >> Thaddeus, >> Could you provide some more details on the steps to setup 'mapPayload' >> itself. It looks like you need to generated splits on the client itself (It >> would be interesting to know why this isn't possible on the AM). Were you >> making use of "MRHelpers.updateLocalResourcesForInputSplits" earlier. If >> so, the current set of MRInput/MRInputLegacy configurer methods don't >> provide the same functionality. >> However, the recommendation in this case is to use >> generateSplitsInAM(false) as Bikas pointed out - a lot of the code to setup >> MRInputPayload should go away after this. Do you happen to know the size of >> the splits being generated. Do they have more data beyond the typical >> "path, offset, size". >> Many of the APIs in MRHelpers are in the process of being removed or >> simplified - hopefully this will stabilize more within a week or so. >> >> Thanks >> - Sid >> >> >> On Sun, Aug 10, 2014 at 8:17 PM, Thaddeus Diamond < >> [email protected]> wrote: >> >>> Yeah I tried the normal MRInput/Output and it made no difference, I'll >>> switch back to using them then. >>> >>> Like I said, all I had before was those three lines, nothing about split >>> distribution or am configuration specified. Just running >>> with createMRInputPayload instead of the grouping API appears to be >>> working, so perhaps I did not need grouping at all. >>> >>> With that API the getSplits() method is not called in the AM. >>> >>> >>> On Sun, Aug 10, 2014 at 7:34 PM, Bikas Saha <[email protected]> >>> wrote: >>> >>>> First of all MRInput/OutputLegacy is not recommended. They are present >>>> only for applications that have used MR in hacky ways that we don’t want to >>>> support. >>>> >>>> >>>> >>>> Coming to the main question. Were you using grouping of splits earlier? >>>> If yes then MyInputFormat would have been called inside the AM even then. >>>> So things should not be different now. If you were not using grouping >>>> earlier then were you generating the splits on the client and then >>>> distributing them to tasks via the AM (did your AM run >>>> MRInputSplitDistributor). If so, then in the configurer you can specify >>>> generateSplitsInAM(false). This will generate splits in the client and >>>> distribute them to tasks in the AM. >>>> >>>> >>>> >>>> Bikas >>>> >>>> >>>> >>>> *From:* Thaddeus Diamond [mailto:[email protected]] >>>> *Sent:* Sunday, August 10, 2014 3:40 PM >>>> *To:* [email protected] >>>> *Subject:* Re: TezGroupedSplit ClassCastException >>>> >>>> >>>> >>>> Okay, >>>> >>>> >>>> >>>> So I tried to change: >>>> >>>> >>>> >>>> byte[] mapInputPayload = >>>> MRHelpers.createMRInputPayloadWithGrouping(mapPayload); >>>> >>>> MRHelpers.addMRInput(mapVertex, mapInputPayload, null); >>>> >>>> MRHelpers.addMROutputLegacy(mapVertex, mapPayload); >>>> >>>> >>>> >>>> To the suggested >>>> >>>> >>>> >>>> DataSourceDescriptor dataSource = >>>> MRInputLegacy.createConfigurer(configuration, >>>> MyInputFormat.class).create(); >>>> >>>> mapVertex.addDataSource("initialmapinput", dataSource); >>>> >>>> >>>> >>>> DataSinkDescriptor dataSink = >>>> MROutputLegacy.createtConfigurer(configuration, >>>> NullOutputFormat.class).create(); >>>> >>>> mapVertex.addDataSink("initialmapoutput", dataSink); >>>> >>>> >>>> >>>> But it appears that the AM is trying to call MyInputFormat.getSplits() >>>> which it never used to. This is causing an application issue since my >>>> application expects getSplits() to be called from the client when >>>> MRHelpers.generateInputSplits(...) is called. Using MRInput instead >>>> of MRInputLegacy did not help. >>>> >>>> >>>> >>>> The stacktrace of the getSplits() call is: >>>> >>>> >>>> >>>> at >>>> org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat.getSplits(TezGroupedSplitsInputFormat.java:102) >>>> >>>> at >>>> org.apache.tez.mapreduce.hadoop.MRHelpers.generateNewSplits(MRHelpers.java:267) >>>> >>>> at >>>> org.apache.tez.mapreduce.hadoop.MRHelpers.generateInputSplitsToMem(MRHelpers.java:460) >>>> >>>> at >>>> org.apache.tez.mapreduce.common.MRInputAMSplitGenerator.initialize(MRInputAMSplitGenerator.java:108) >>>> >>>> at >>>> org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:175) >>>> >>>> at >>>> org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:169) >>>> >>>> at java.security.AccessController.doPrivileged(Native Method) >>>> >>>> at javax.security.auth.Subject.doAs(Subject.java:415) >>>> >>>> at >>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1554) >>>> >>>> at >>>> org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:169) >>>> >>>> at >>>> org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:156) >>>> >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:262) >>>> >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>> >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>> >>>> at java.lang.Thread.run(Thread.java:744) >>>> >>>> >>>> >>>> - Thad >>>> >>>> >>>> >>>> >>>> >>>> On Sat, Aug 9, 2014 at 7:55 PM, Bikas Saha <[email protected]> >>>> wrote: >>>> >>>> The following change is correct. However, this MRHelpers methods is >>>> soon going to disappear. We recommend you switch to >>>> MRInput.createConfigurer() and MROutput.createConfigurer() methods. Also, >>>> switch to the *EdgeConfigurer methods e.g. OrderedPartitionedEdgeConfigurer >>>> for edge configuration. Please look at WordCount or OrderedWordCount for >>>> example code. >>>> >>>> >>>> >>>> byte[] mapInputPayload = >>>> MRHelpers.createMRInputPayloadWithGrouping(mapPayload, >>>> JobContextInputFormat.class.getName()); >>>> >>>> >>>> >>>> To >>>> >>>> >>>> >>>> byte[] mapInputPayload = >>>> MRHelpers.createMRInputPayloadWithGrouping(mapPayload); >>>> >>>> >>>> >>>> >>>> >>>> *From:* Thaddeus Diamond [mailto:[email protected]] >>>> *Sent:* Saturday, August 09, 2014 4:10 PM >>>> *To:* [email protected] >>>> *Subject:* TezGroupedSplit ClassCastException >>>> >>>> >>>> >>>> After changeset b0c87d9 my Tez DAG is now broken. In the logs I'm >>>> seeing: >>>> >>>> >>>> >>>> TaskAttempt 3 failed, info=[Error: Failure while running >>>> task:java.lang.ClassCastException: >>>> com.hadapt.de.storage.hadoop.StorageJobInputSplit cannot be cast to >>>> org.apache.hadoop.mapreduce.split.TezGroupedSplit >>>> >>>> at >>>> org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat.createRecordReader(TezGroupedSplitsInputFormat.java:111) >>>> >>>> at >>>> org.apache.tez.mapreduce.lib.MRReaderMapReduce.setupNewRecordReader(MRReaderMapReduce.java:148) >>>> >>>> at >>>> org.apache.tez.mapreduce.lib.MRReaderMapReduce.<init>(MRReaderMapReduce.java:78) >>>> >>>> at >>>> org.apache.tez.mapreduce.input.MRInput.initializeInternal(MRInput.java:327) >>>> >>>> at >>>> org.apache.tez.mapreduce.input.MRInputLegacy.init(MRInputLegacy.java:109) >>>> >>>> at >>>> org.apache.tez.mapreduce.processor.map.MapProcessor.run(MapProcessor.java:103) >>>> >>>> at >>>> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:324) >>>> >>>> at >>>> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:180) >>>> >>>> at >>>> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:172) >>>> >>>> at java.security.AccessController.doPrivileged(Native Method) >>>> >>>> at javax.security.auth.Subject.doAs(Subject.java:415) >>>> >>>> at >>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1554) >>>> >>>> at >>>> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.call(TezTaskRunner.java:172) >>>> >>>> at >>>> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.call(TezTaskRunner.java:167) >>>> >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:262) >>>> >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>> >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>> >>>> at java.lang.Thread.run(Thread.java:744) >>>> >>>> ]], Vertex failed as one or more tasks failed. failedTasks:1] >>>> >>>> >>>> >>>> In order to get my DAG to compile I had to change the following line: >>>> >>>> >>>> >>>> byte[] mapInputPayload = >>>> MRHelpers.createMRInputPayloadWithGrouping(mapPayload, >>>> JobContextInputFormat.class.getName()); >>>> >>>> >>>> >>>> To >>>> >>>> >>>> >>>> byte[] mapInputPayload = >>>> MRHelpers.createMRInputPayloadWithGrouping(mapPayload); >>>> >>>> >>>> >>>> I think the relevant change in the Tez project's commit is: >>>> >>>> >>>> >>>> private static byte[] createMRInputPayload(ByteString bytes, >>>> >>>> - MRSplitsProto mrSplitsProto, String inputFormatName) throws >>>> IOException { >>>> >>>> + MRSplitsProto mrSplitsProto, boolean isGrouped) throws >>>> IOException { >>>> >>>> MRInputUserPayloadProto.Builder userPayloadBuilder = >>>> MRInputUserPayloadProto >>>> >>>> .newBuilder(); >>>> >>>> userPayloadBuilder.setConfigurationBytes(bytes); >>>> >>>> if (mrSplitsProto != null) { >>>> >>>> userPayloadBuilder.setSplits(mrSplitsProto); >>>> >>>> } >>>> >>>> - if (inputFormatName!=null) { >>>> >>>> - userPayloadBuilder.setInputFormatName(inputFormatName); >>>> >>>> - } >>>> >>>> >>>> >>>> + userPayloadBuilder.setGroupingEnabled(isGrouped); >>>> >>>> >>>> // TODO Should this be a ByteBuffer or a byte array ? A ByteBuffer >>>> would be >>>> >>>> // more efficient. >>>> >>>> return userPayloadBuilder.build().toByteArray(); >>>> >>>> } >>>> >>>> - >>>> >>>> + >>>> >>>> >>>> >>>> Please advise. >>>> >>>> >>>> >>>> Thanks, >>>> >>>> Thad >>>> >>>> >>>> CONFIDENTIALITY NOTICE >>>> NOTICE: This message is intended for the use of the individual or >>>> entity to which it is addressed and may contain information that is >>>> confidential, privileged and exempt from disclosure under applicable law. >>>> If the reader of this message is not the intended recipient, you are hereby >>>> notified that any printing, copying, dissemination, distribution, >>>> disclosure or forwarding of this communication is strictly prohibited. If >>>> you have received this communication in error, please contact the sender >>>> immediately and delete it from your system. Thank You. >>>> >>>> >>>> >>>> CONFIDENTIALITY NOTICE >>>> NOTICE: This message is intended for the use of the individual or >>>> entity to which it is addressed and may contain information that is >>>> confidential, privileged and exempt from disclosure under applicable law. >>>> If the reader of this message is not the intended recipient, you are hereby >>>> notified that any printing, copying, dissemination, distribution, >>>> disclosure or forwarding of this communication is strictly prohibited. If >>>> you have received this communication in error, please contact the sender >>>> immediately and delete it from your system. Thank You. >>>> >>> >>> >> >
