[
https://issues.apache.org/jira/browse/PIG-4323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14273540#comment-14273540
]
Carlos Balduz commented on PIG-4323:
------------------------------------
Thanks for your reply. If you meant
{code:java}
if (PigMapReduce.sJobConfInternal.get() == null)
initializeJobConf();
{code}
then yes, it works.
> PackageConverter hanging in Spark
> ---------------------------------
>
> Key: PIG-4323
> URL: https://issues.apache.org/jira/browse/PIG-4323
> Project: Pig
> Issue Type: Bug
> Components: spark
> Reporter: Carlos Balduz
> Assignee: Carlos Balduz
> Attachments: PIG-4323.patch
>
>
> After the patch introduced in PIG-4237, PackageConverter hangs for large
> jobs, since it has to deserialize JobConfiguration each time apply is called,
> making it way too slow:
> {code:java}
> public Tuple apply(final Tuple t) {
> initializeJobConf();
> ...
> {code}
> After having a job stuck for 30 minutes, I made a thread dump and saw that
> the main issue was:
> {code:java}
> Thread 11128: (state = IN_JAVA)
> - java.util.zip.InflaterInputStream.read(byte[], int, int) @bci=53, line=152
> (Compiled frame; information may be imprecise)
> - java.util.zip.GZIPInputStream.read(byte[], int, int) @bci=17, line=116
> (Compiled frame)
> -
> org.apache.hadoop.io.WritableUtils.readCompressedByteArray(java.io.DataInput)
> @bci=65, line=44 (Compiled frame)
> - org.apache.hadoop.io.WritableUtils.readCompressedString(java.io.DataInput)
> @bci=1, line=87 (Compiled frame)
> -
> org.apache.hadoop.io.WritableUtils.readCompressedStringArray(java.io.DataInput)
> @bci=29, line=185 (Compiled frame)
> - org.apache.hadoop.conf.Configuration.readFields(java.io.DataInput)
> @bci=37, line=2564 (Compiled frame)
> -
> org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer.deserializeJobConf(byte[])
> @bci=24, line=80 (Compiled frame)
> -
> org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter$PackageFunction.initializeJobConf()
> @bci=4, line=60 (Compiled frame)
> -
> org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter$PackageFunction.apply(org.apache.pig.data.Tuple)
> @bci=1, line=67 (Compiled frame)
> -
> org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter$PackageFunction.apply(java.lang.Object)
> @bci=5, line=48 (Compiled frame)
> - scala.collection.Iterator$$anon$11.next() @bci=13, line=328 (Compiled
> frame)
> - scala.collection.convert.Wrappers$IteratorWrapper.next() @bci=4, line=30
> (Compiled frame)
> -
> org.apache.pig.backend.hadoop.executionengine.spark.converter.POOutputConsumerIterator.readNext()
> @bci=44, line=61 (Compiled frame)
> -
> org.apache.pig.backend.hadoop.executionengine.spark.converter.POOutputConsumerIterator.readNext()
> @bci=162, line=78 (Compiled frame)
> -
> org.apache.pig.backend.hadoop.executionengine.spark.converter.POOutputConsumerIterator.hasNext()
> @bci=1, line=91 (Compiled frame)
> -
> org.apache.pig.backend.hadoop.executionengine.spark.converter.POOutputConsumerIterator.readNext()
> @bci=133, line=75 (Compiled frame)
> -
> org.apache.pig.backend.hadoop.executionengine.spark.converter.POOutputConsumerIterator.hasNext()
> @bci=1, line=91 (Compiled frame)
> - scala.collection.convert.Wrappers$JIteratorWrapper.hasNext() @bci=4,
> line=41 (Compiled frame)
> - scala.collection.Iterator$$anon$11.hasNext() @bci=4, line=327 (Compiled
> frame)
> - scala.collection.Iterator$$anon$11.hasNext() @bci=4, line=327 (Compiled
> frame)
> - scala.collection.Iterator$class.foreach(scala.collection.Iterator,
> scala.Function1) @bci=1, line=727 (Compiled frame)
> - scala.collection.AbstractIterator.foreach(scala.Function1) @bci=2,
> line=1157 (Interpreted frame)
> -
> org.apache.spark.shuffle.hash.HashShuffleWriter.write(scala.collection.Iterator)
> @bci=95, line=65 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=170, line=68 (Interpreted frame)
> -
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
> @bci=2, line=41 (Interpreted frame)
> - org.apache.spark.scheduler.Task.run(long) @bci=67, line=54 (Interpreted
> frame)
> - org.apache.spark.executor.Executor$TaskRunner.run() @bci=336, line=177
> (Interpreted frame)
> -
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
> @bci=95, line=1145 (Interpreted frame)
> - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
> (Interpreted frame)
> - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame)
> {code}
> I have added a condition that checks whether it has already been initialized
> or not, making it work again.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)