Thank you for the answer Robert!

I realize it's a single JVM running, yet I would expect programs to behave
in the same way, i.e. serialization to happen (even if not necessary), in
order to catch this kind of bugs before cluster deployment.
Is this simply not possible or is it a design choice we made for some
reason?

-V.

On 29 June 2015 at 09:53, Robert Metzger <rmetz...@apache.org> wrote:

> It is working in the IDE because there we execute everything in the same
> JVM, so the mapper can access the correct value of the static variable.
> When submitting a job with the CLI frontend, there are at least two JVMs
> involved, and code running in the JM/TM can not access the value from the
> static variable in the Cli frontend.
>
> On Sun, Jun 28, 2015 at 9:43 PM, Vasiliki Kalavri <
> vasilikikala...@gmail.com> wrote:
>
>> Hi everyone,
>>
>> Mihail and I have now solved the issue.
>>
>> The exception was caused because the array size in question was read from
>> a static field of the enclosing class, inside an anonymous mapper. Making
>> the mapper a standalone class and passing the array size to the constructor
>> solved the issue.
>>
>> What I don't understand though, is why this worked fine when the job was
>> executed from inside the IDE. Is serialization handled differently
>> (skipped) in this case?
>>
>> Cheers,
>> Vasia.
>>
>> On 26 June 2015 at 11:30, Mihail Vieru <vi...@informatik.hu-berlin.de>
>> wrote:
>>
>>>  Hi Vasia,
>>>
>>> *InitVerticesMapper* is called in the run method of APSP:
>>>
>>> *    @Override*
>>> *    public Graph<K, Tuple2<Integer[],String>, NullValue> run(Graph<K,
>>> Tuple2<Integer[],String>, NullValue> input) {*
>>>
>>> *        VertexCentricConfiguration parameters = new
>>> VertexCentricConfiguration();*
>>> *        parameters.setSolutionSetUnmanagedMemory(false);*
>>>
>>> *        return input.mapVertices(new
>>> InitVerticesMapper<K>(srcVertexId))*
>>> *                .runVertexCentricIteration(new VertexDistanceUpdater<K,
>>> Tuple2<Integer[],String>, Integer>(srcVertexId),*
>>> *                        new MinDistanceMessenger<K,
>>> Tuple2<Integer[],String>, Integer, NullValue>(srcVertexId),*
>>> *                        maxIterations, parameters);*
>>> *        }*
>>>
>>> I'll send you the full code via a private e-mail.
>>>
>>> Cheers,
>>> Mihail
>>>
>>>
>>> On 26.06.2015 11:10, Vasiliki Kalavri wrote:
>>>
>>>  Hi Mihail,
>>>
>>>  could you share your code or at least the implementations of
>>> getVerticesDataSet() and InitVerticesMapper so I can take a look?
>>> Where is InitVerticesMapper called above?
>>>
>>>  Cheers,
>>> Vasia.
>>>
>>>
>>> On 26 June 2015 at 10:51, Mihail Vieru <vi...@informatik.hu-berlin.de>
>>> wrote:
>>>
>>>>  Hi Robert,
>>>>
>>>> I'm using the same input data, as well as the same parameters I use in
>>>> the IDE's run configuration.
>>>> I don't run the job on the cluster (yet), but locally, by starting
>>>> Flink with the start-local.sh script.
>>>>
>>>>
>>>> I will try to explain my code a bit. The *Integer[] *array is
>>>> initialized in the *getVerticesDataSet()* method.
>>>>
>>>> *        DataSet<Vertex<Integer, Tuple2<Integer[],String> >> vertices =
>>>> getVerticesDataSet(env);*
>>>> *        ...*
>>>> *        Graph<Integer, Tuple2<Integer[],String>, NullValue> graph =
>>>> Graph.fromDataSet(vertices, edges, env);*
>>>> *        ...*
>>>> *        Graph<Integer, Tuple2<Integer[],String>, NullValue>
>>>> intermediateGraph = *
>>>> *                graph.run(new APSP<Integer>(srcVertexId,
>>>> maxIterations));*
>>>>
>>>>
>>>> In APSP I'm addressing it in the *InitVerticesMapper*, but is now
>>>> suddenly empty.
>>>>
>>>> Best,
>>>> Mihail
>>>>
>>>>
>>>> On 26.06.2015 10:00, Robert Metzger wrote:
>>>>
>>>> Hi Mihail,
>>>>
>>>>  the NPE has been thrown from
>>>> *graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)*. I guess
>>>> that is code written by you or a library you are using.
>>>> Maybe the data you are using on the cluster is different from your
>>>> local test data?
>>>>
>>>>  Best,
>>>> Robert
>>>>
>>>>
>>>> On Thu, Jun 25, 2015 at 7:41 PM, Mihail Vieru <
>>>> vi...@informatik.hu-berlin.de> wrote:
>>>>
>>>>>  Hi,
>>>>>
>>>>> I get an ArrayIndexOutOfBoundsException when I run my job from a JAR
>>>>> in the CLI.
>>>>> This doesn't occur in the IDE.
>>>>>
>>>>> I've build the JAR using the "maven-shade-plugin" and the pom.xml
>>>>> configuration Robert has provided here:
>>>>>
>>>>> https://stackoverflow.com/questions/30102523/linkage-failure-when-running-apache-flink-jobs
>>>>> I specify the entry point using the "-c" option.
>>>>>
>>>>> The array the Exception refers to is actually initialized when a
>>>>> vertices dataset is read from the file system.
>>>>>
>>>>> Any ideas on what could cause this issue?
>>>>>
>>>>> Best,
>>>>> Mihail
>>>>>
>>>>> P.S.: the stack trace:
>>>>>
>>>>> *org.apache.flink.client.program.ProgramInvocationException: The
>>>>> program execution failed: Job execution failed.*
>>>>> *    at org.apache.flink.client.program.Client.run(Client.java:413)*
>>>>> *    at org.apache.flink.client.program.Client.run(Client.java:356)*
>>>>> *    at org.apache.flink.client.program.Client.run(Client.java:349)*
>>>>> *    at
>>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)*
>>>>> *    at graphdistance.KAPSPNaiveJob.main(KAPSPNaiveJob.java:56)*
>>>>> *    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
>>>>> *    at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)*
>>>>> *    at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>>>>> *    at java.lang.reflect.Method.invoke(Method.java:606)*
>>>>> *    at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)*
>>>>> *    at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)*
>>>>> *    at org.apache.flink.client.program.Client.run(Client.java:315)*
>>>>> *    at
>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)*
>>>>> *    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)*
>>>>> *    at
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)*
>>>>> *    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)*
>>>>> *Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>>> execution failed.*
>>>>> *    at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)*
>>>>> *    at
>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)*
>>>>> *    at
>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)*
>>>>> *    at
>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)*
>>>>> *    at
>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)*
>>>>> *    at
>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)*
>>>>> *    at
>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)*
>>>>> *    at
>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)*
>>>>> *    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)*
>>>>> *    at
>>>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)*
>>>>> *    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)*
>>>>> *    at akka.actor.ActorCell.invoke(ActorCell.scala:487)*
>>>>> *    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)*
>>>>> *    at akka.dispatch.Mailbox.run(Mailbox.scala:221)*
>>>>> *    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)*
>>>>> *    at
>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)*
>>>>> *    at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)*
>>>>> *    at
>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)*
>>>>> *    at
>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)*
>>>>> *Caused by: java.lang.ArrayIndexOutOfBoundsException: 0*
>>>>> *    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)*
>>>>> *    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:48)*
>>>>> *    at org.apache.flink.graph.Graph$2.map(Graph.java:389)*
>>>>> *    at org.apache.flink.graph.Graph$2.map(Graph.java:387)*
>>>>> *    at
>>>>> org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)*
>>>>> *    at
>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)*
>>>>> *    at
>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)*
>>>>> *    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)*
>>>>> *    at java.lang.Thread.run(Thread.java:745)*
>>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>
>

Reply via email to