Yeah I tried the normal MRInput/Output and it made no difference, I'll switch back to using them then.
Like I said, all I had before was those three lines, nothing about split distribution or am configuration specified. Just running with createMRInputPayload instead of the grouping API appears to be working, so perhaps I did not need grouping at all. With that API the getSplits() method is not called in the AM. On Sun, Aug 10, 2014 at 7:34 PM, Bikas Saha <[email protected]> wrote: > First of all MRInput/OutputLegacy is not recommended. They are present > only for applications that have used MR in hacky ways that we don’t want to > support. > > > > Coming to the main question. Were you using grouping of splits earlier? If > yes then MyInputFormat would have been called inside the AM even then. So > things should not be different now. If you were not using grouping earlier > then were you generating the splits on the client and then distributing > them to tasks via the AM (did your AM run MRInputSplitDistributor). If so, > then in the configurer you can specify generateSplitsInAM(false). This will > generate splits in the client and distribute them to tasks in the AM. > > > > Bikas > > > > *From:* Thaddeus Diamond [mailto:[email protected]] > *Sent:* Sunday, August 10, 2014 3:40 PM > *To:* [email protected] > *Subject:* Re: TezGroupedSplit ClassCastException > > > > Okay, > > > > So I tried to change: > > > > byte[] mapInputPayload = > MRHelpers.createMRInputPayloadWithGrouping(mapPayload); > > MRHelpers.addMRInput(mapVertex, mapInputPayload, null); > > MRHelpers.addMROutputLegacy(mapVertex, mapPayload); > > > > To the suggested > > > > DataSourceDescriptor dataSource = > MRInputLegacy.createConfigurer(configuration, MyInputFormat.class).create(); > > mapVertex.addDataSource("initialmapinput", dataSource); > > > > DataSinkDescriptor dataSink = > MROutputLegacy.createtConfigurer(configuration, > NullOutputFormat.class).create(); > > mapVertex.addDataSink("initialmapoutput", dataSink); > > > > But it appears that the AM is trying to call MyInputFormat.getSplits() > which it never used to. This is causing an application issue since my > application expects getSplits() to be called from the client when > MRHelpers.generateInputSplits(...) is called. Using MRInput instead of > MRInputLegacy did not help. > > > > The stacktrace of the getSplits() call is: > > > > at > org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat.getSplits(TezGroupedSplitsInputFormat.java:102) > > at > org.apache.tez.mapreduce.hadoop.MRHelpers.generateNewSplits(MRHelpers.java:267) > > at > org.apache.tez.mapreduce.hadoop.MRHelpers.generateInputSplitsToMem(MRHelpers.java:460) > > at > org.apache.tez.mapreduce.common.MRInputAMSplitGenerator.initialize(MRInputAMSplitGenerator.java:108) > > at > org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:175) > > at > org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:169) > > at java.security.AccessController.doPrivileged(Native Method) > > at javax.security.auth.Subject.doAs(Subject.java:415) > > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1554) > > at > org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:169) > > at > org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:156) > > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > at java.lang.Thread.run(Thread.java:744) > > > > - Thad > > > > > > On Sat, Aug 9, 2014 at 7:55 PM, Bikas Saha <[email protected]> wrote: > > The following change is correct. However, this MRHelpers methods is soon > going to disappear. We recommend you switch to MRInput.createConfigurer() > and MROutput.createConfigurer() methods. Also, switch to the > *EdgeConfigurer methods e.g. OrderedPartitionedEdgeConfigurer for edge > configuration. Please look at WordCount or OrderedWordCount for example > code. > > > > byte[] mapInputPayload = > MRHelpers.createMRInputPayloadWithGrouping(mapPayload, > JobContextInputFormat.class.getName()); > > > > To > > > > byte[] mapInputPayload = > MRHelpers.createMRInputPayloadWithGrouping(mapPayload); > > > > > > *From:* Thaddeus Diamond [mailto:[email protected]] > *Sent:* Saturday, August 09, 2014 4:10 PM > *To:* [email protected] > *Subject:* TezGroupedSplit ClassCastException > > > > After changeset b0c87d9 my Tez DAG is now broken. In the logs I'm seeing: > > > > TaskAttempt 3 failed, info=[Error: Failure while running > task:java.lang.ClassCastException: > com.hadapt.de.storage.hadoop.StorageJobInputSplit cannot be cast to > org.apache.hadoop.mapreduce.split.TezGroupedSplit > > at > org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat.createRecordReader(TezGroupedSplitsInputFormat.java:111) > > at > org.apache.tez.mapreduce.lib.MRReaderMapReduce.setupNewRecordReader(MRReaderMapReduce.java:148) > > at > org.apache.tez.mapreduce.lib.MRReaderMapReduce.<init>(MRReaderMapReduce.java:78) > > at > org.apache.tez.mapreduce.input.MRInput.initializeInternal(MRInput.java:327) > > at > org.apache.tez.mapreduce.input.MRInputLegacy.init(MRInputLegacy.java:109) > > at > org.apache.tez.mapreduce.processor.map.MapProcessor.run(MapProcessor.java:103) > > at > org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:324) > > at > org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:180) > > at > org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:172) > > at java.security.AccessController.doPrivileged(Native Method) > > at javax.security.auth.Subject.doAs(Subject.java:415) > > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1554) > > at > org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.call(TezTaskRunner.java:172) > > at > org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.call(TezTaskRunner.java:167) > > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > at java.lang.Thread.run(Thread.java:744) > > ]], Vertex failed as one or more tasks failed. failedTasks:1] > > > > In order to get my DAG to compile I had to change the following line: > > > > byte[] mapInputPayload = > MRHelpers.createMRInputPayloadWithGrouping(mapPayload, > JobContextInputFormat.class.getName()); > > > > To > > > > byte[] mapInputPayload = > MRHelpers.createMRInputPayloadWithGrouping(mapPayload); > > > > I think the relevant change in the Tez project's commit is: > > > > private static byte[] createMRInputPayload(ByteString bytes, > > - MRSplitsProto mrSplitsProto, String inputFormatName) throws > IOException { > > + MRSplitsProto mrSplitsProto, boolean isGrouped) throws IOException { > > MRInputUserPayloadProto.Builder userPayloadBuilder = > MRInputUserPayloadProto > > .newBuilder(); > > userPayloadBuilder.setConfigurationBytes(bytes); > > if (mrSplitsProto != null) { > > userPayloadBuilder.setSplits(mrSplitsProto); > > } > > - if (inputFormatName!=null) { > > - userPayloadBuilder.setInputFormatName(inputFormatName); > > - } > > > > + userPayloadBuilder.setGroupingEnabled(isGrouped); > > > // TODO Should this be a ByteBuffer or a byte array ? A ByteBuffer > would be > > // more efficient. > > return userPayloadBuilder.build().toByteArray(); > > } > > - > > + > > > > Please advise. > > > > Thanks, > > Thad > > > CONFIDENTIALITY NOTICE > NOTICE: This message is intended for the use of the individual or entity > to which it is addressed and may contain information that is confidential, > privileged and exempt from disclosure under applicable law. If the reader > of this message is not the intended recipient, you are hereby notified that > any printing, copying, dissemination, distribution, disclosure or > forwarding of this communication is strictly prohibited. If you have > received this communication in error, please contact the sender immediately > and delete it from your system. Thank You. > > > > CONFIDENTIALITY NOTICE > NOTICE: This message is intended for the use of the individual or entity > to which it is addressed and may contain information that is confidential, > privileged and exempt from disclosure under applicable law. If the reader > of this message is not the intended recipient, you are hereby notified that > any printing, copying, dissemination, distribution, disclosure or > forwarding of this communication is strictly prohibited. If you have > received this communication in error, please contact the sender immediately > and delete it from your system. Thank You. >
