Hey Sid,

I'm actually working with other devs who handle the input split side so I
don't know quite what's in there, but they are not huge.  Because of the
way we handle our application (Spring) it's easier to do all the input
split management and construction up front for jobs.  However, to make your
lives easier we're actually moving toward a native DAG implementation for
these jobs so I don't anticipate the MRHelpers methods being needed much
longer on our end.

Yes, we do use the updateLocalResources...() method.

- Thad


On Mon, Aug 11, 2014 at 2:27 AM, Siddharth Seth <[email protected]> wrote:

> Thaddeus,
> Could you provide some more details on the steps to setup 'mapPayload'
> itself. It looks like you need to generated splits on the client itself (It
> would be interesting to know why this isn't possible on the AM). Were you
> making use of "MRHelpers.updateLocalResourcesForInputSplits" earlier. If
> so, the current set of MRInput/MRInputLegacy configurer methods don't
> provide the same functionality.
> However, the recommendation in this case is to use
> generateSplitsInAM(false) as Bikas pointed out - a lot of the code to setup
> MRInputPayload should go away after this. Do you happen to know the size of
> the splits being generated. Do they have more data beyond the typical
> "path, offset, size".
> Many of the APIs in MRHelpers are in the process of being removed or
> simplified - hopefully this will stabilize more within a week or so.
>
> Thanks
> - Sid
>
>
> On Sun, Aug 10, 2014 at 8:17 PM, Thaddeus Diamond <
> [email protected]> wrote:
>
>> Yeah I tried the normal MRInput/Output and it made no difference, I'll
>> switch back to using them then.
>>
>> Like I said, all I had before was those three lines, nothing about split
>> distribution or am configuration specified.  Just running
>> with createMRInputPayload instead of the grouping API appears to be
>> working, so perhaps I did not need grouping at all.
>>
>> With that API the getSplits() method is not called in the AM.
>>
>>
>> On Sun, Aug 10, 2014 at 7:34 PM, Bikas Saha <[email protected]>
>> wrote:
>>
>>> First of all MRInput/OutputLegacy is not recommended. They are present
>>> only for applications that have used MR in hacky ways that we don’t want to
>>> support.
>>>
>>>
>>>
>>> Coming to the main question. Were you using grouping of splits earlier?
>>> If yes then MyInputFormat would have been called inside the AM even then.
>>> So things should not be different now. If you were not using grouping
>>> earlier then were you generating the splits on the client and then
>>> distributing them to tasks via the AM (did your AM run
>>> MRInputSplitDistributor). If so, then in the configurer you can specify
>>> generateSplitsInAM(false). This will generate splits in the client and
>>> distribute them to tasks in the AM.
>>>
>>>
>>>
>>> Bikas
>>>
>>>
>>>
>>> *From:* Thaddeus Diamond [mailto:[email protected]]
>>> *Sent:* Sunday, August 10, 2014 3:40 PM
>>> *To:* [email protected]
>>> *Subject:* Re: TezGroupedSplit ClassCastException
>>>
>>>
>>>
>>> Okay,
>>>
>>>
>>>
>>> So I tried to change:
>>>
>>>
>>>
>>> byte[] mapInputPayload =
>>> MRHelpers.createMRInputPayloadWithGrouping(mapPayload);
>>>
>>> MRHelpers.addMRInput(mapVertex, mapInputPayload, null);
>>>
>>> MRHelpers.addMROutputLegacy(mapVertex, mapPayload);
>>>
>>>
>>>
>>> To the suggested
>>>
>>>
>>>
>>> DataSourceDescriptor dataSource =
>>> MRInputLegacy.createConfigurer(configuration, MyInputFormat.class).create();
>>>
>>> mapVertex.addDataSource("initialmapinput", dataSource);
>>>
>>>
>>>
>>> DataSinkDescriptor dataSink =
>>> MROutputLegacy.createtConfigurer(configuration,
>>> NullOutputFormat.class).create();
>>>
>>> mapVertex.addDataSink("initialmapoutput", dataSink);
>>>
>>>
>>>
>>> But it appears that the AM is trying to call MyInputFormat.getSplits()
>>> which it never used to.  This is causing an application issue since my
>>> application expects getSplits() to be called from the client when
>>> MRHelpers.generateInputSplits(...) is called.  Using MRInput instead of
>>> MRInputLegacy did not help.
>>>
>>>
>>>
>>> The stacktrace of the getSplits() call is:
>>>
>>>
>>>
>>> at
>>> org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat.getSplits(TezGroupedSplitsInputFormat.java:102)
>>>
>>>      at
>>> org.apache.tez.mapreduce.hadoop.MRHelpers.generateNewSplits(MRHelpers.java:267)
>>>
>>>      at
>>> org.apache.tez.mapreduce.hadoop.MRHelpers.generateInputSplitsToMem(MRHelpers.java:460)
>>>
>>>      at
>>> org.apache.tez.mapreduce.common.MRInputAMSplitGenerator.initialize(MRInputAMSplitGenerator.java:108)
>>>
>>>      at
>>> org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:175)
>>>
>>>      at
>>> org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:169)
>>>
>>>      at java.security.AccessController.doPrivileged(Native Method)
>>>
>>>      at javax.security.auth.Subject.doAs(Subject.java:415)
>>>
>>>      at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1554)
>>>
>>>      at
>>> org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:169)
>>>
>>>      at
>>> org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:156)
>>>
>>>      at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>>
>>>      at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>
>>>      at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>
>>>      at java.lang.Thread.run(Thread.java:744)
>>>
>>>
>>>
>>> - Thad
>>>
>>>
>>>
>>>
>>>
>>> On Sat, Aug 9, 2014 at 7:55 PM, Bikas Saha <[email protected]>
>>> wrote:
>>>
>>> The following change is correct. However, this MRHelpers methods is soon
>>> going to disappear. We recommend you switch to MRInput.createConfigurer()
>>> and MROutput.createConfigurer() methods. Also, switch to the
>>> *EdgeConfigurer methods e.g. OrderedPartitionedEdgeConfigurer for edge
>>> configuration. Please look at WordCount or OrderedWordCount for example
>>> code.
>>>
>>>
>>>
>>> byte[] mapInputPayload =
>>> MRHelpers.createMRInputPayloadWithGrouping(mapPayload,
>>> JobContextInputFormat.class.getName());
>>>
>>>
>>>
>>> To
>>>
>>>
>>>
>>> byte[] mapInputPayload =
>>> MRHelpers.createMRInputPayloadWithGrouping(mapPayload);
>>>
>>>
>>>
>>>
>>>
>>> *From:* Thaddeus Diamond [mailto:[email protected]]
>>> *Sent:* Saturday, August 09, 2014 4:10 PM
>>> *To:* [email protected]
>>> *Subject:* TezGroupedSplit ClassCastException
>>>
>>>
>>>
>>> After changeset b0c87d9 my Tez DAG is now broken.  In the logs I'm
>>> seeing:
>>>
>>>
>>>
>>> TaskAttempt 3 failed, info=[Error: Failure while running
>>> task:java.lang.ClassCastException:
>>> com.hadapt.de.storage.hadoop.StorageJobInputSplit cannot be cast to
>>> org.apache.hadoop.mapreduce.split.TezGroupedSplit
>>>
>>>      at
>>> org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat.createRecordReader(TezGroupedSplitsInputFormat.java:111)
>>>
>>>      at
>>> org.apache.tez.mapreduce.lib.MRReaderMapReduce.setupNewRecordReader(MRReaderMapReduce.java:148)
>>>
>>>      at
>>> org.apache.tez.mapreduce.lib.MRReaderMapReduce.<init>(MRReaderMapReduce.java:78)
>>>
>>>      at
>>> org.apache.tez.mapreduce.input.MRInput.initializeInternal(MRInput.java:327)
>>>
>>>      at
>>> org.apache.tez.mapreduce.input.MRInputLegacy.init(MRInputLegacy.java:109)
>>>
>>>      at
>>> org.apache.tez.mapreduce.processor.map.MapProcessor.run(MapProcessor.java:103)
>>>
>>>      at
>>> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:324)
>>>
>>>      at
>>> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:180)
>>>
>>>      at
>>> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:172)
>>>
>>>      at java.security.AccessController.doPrivileged(Native Method)
>>>
>>>      at javax.security.auth.Subject.doAs(Subject.java:415)
>>>
>>>      at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1554)
>>>
>>>      at
>>> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.call(TezTaskRunner.java:172)
>>>
>>>      at
>>> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.call(TezTaskRunner.java:167)
>>>
>>>      at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>>
>>>      at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>
>>>      at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>
>>>      at java.lang.Thread.run(Thread.java:744)
>>>
>>> ]], Vertex failed as one or more tasks failed. failedTasks:1]
>>>
>>>
>>>
>>> In order to get my DAG to compile I had to change the following line:
>>>
>>>
>>>
>>> byte[] mapInputPayload =
>>> MRHelpers.createMRInputPayloadWithGrouping(mapPayload,
>>> JobContextInputFormat.class.getName());
>>>
>>>
>>>
>>> To
>>>
>>>
>>>
>>> byte[] mapInputPayload =
>>> MRHelpers.createMRInputPayloadWithGrouping(mapPayload);
>>>
>>>
>>>
>>> I think the relevant change in the Tez project's commit is:
>>>
>>>
>>>
>>>    private static byte[] createMRInputPayload(ByteString bytes,
>>>
>>> -      MRSplitsProto mrSplitsProto, String inputFormatName) throws
>>> IOException {
>>>
>>> +      MRSplitsProto mrSplitsProto, boolean isGrouped) throws
>>> IOException {
>>>
>>>      MRInputUserPayloadProto.Builder userPayloadBuilder =
>>> MRInputUserPayloadProto
>>>
>>>          .newBuilder();
>>>
>>>      userPayloadBuilder.setConfigurationBytes(bytes);
>>>
>>>      if (mrSplitsProto != null) {
>>>
>>>        userPayloadBuilder.setSplits(mrSplitsProto);
>>>
>>>      }
>>>
>>> -    if (inputFormatName!=null) {
>>>
>>> -      userPayloadBuilder.setInputFormatName(inputFormatName);
>>>
>>> -    }
>>>
>>>
>>>
>>> +    userPayloadBuilder.setGroupingEnabled(isGrouped);
>>>
>>>
>>>      // TODO Should this be a ByteBuffer or a byte array ? A ByteBuffer
>>> would be
>>>
>>>      // more efficient.
>>>
>>>      return userPayloadBuilder.build().toByteArray();
>>>
>>>    }
>>>
>>> -
>>>
>>> +
>>>
>>>
>>>
>>> Please advise.
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Thad
>>>
>>>
>>> CONFIDENTIALITY NOTICE
>>> NOTICE: This message is intended for the use of the individual or entity
>>> to which it is addressed and may contain information that is confidential,
>>> privileged and exempt from disclosure under applicable law. If the reader
>>> of this message is not the intended recipient, you are hereby notified that
>>> any printing, copying, dissemination, distribution, disclosure or
>>> forwarding of this communication is strictly prohibited. If you have
>>> received this communication in error, please contact the sender immediately
>>> and delete it from your system. Thank You.
>>>
>>>
>>>
>>> CONFIDENTIALITY NOTICE
>>> NOTICE: This message is intended for the use of the individual or entity
>>> to which it is addressed and may contain information that is confidential,
>>> privileged and exempt from disclosure under applicable law. If the reader
>>> of this message is not the intended recipient, you are hereby notified that
>>> any printing, copying, dissemination, distribution, disclosure or
>>> forwarding of this communication is strictly prohibited. If you have
>>> received this communication in error, please contact the sender immediately
>>> and delete it from your system. Thank You.
>>>
>>
>>
>

Reply via email to