@Stephan: I don't think there is a way to deal with this. In my
understanding, the (main) purpose of the user@ list is not to report Flink
bugs. It is a forum for users to help each other.
Flink committers happen to know a lot about the system, so its easy for
them to help users. Also, its a good way to understand how users are using
the system. This makes designing APIs, writing error messages and
documentation easier.
I'm very happy to see that more and more non-committers are joining
discussions here and helping other users.



On Mon, Jun 29, 2015 at 11:05 AM, Stephan Ewen <se...@apache.org> wrote:

> Static fields are not parts of the serialized program (by Java's
> definition). Whether the static field has the same value in the cluster
> JVMs depends on how the static field is initialized, whether it is
> initialized the same way in the shipped code, without the program's main
> method.
>
>
> BTW: We are seeing more and more cases now where committers are spending a
> lot of time on debugging reported Flink bugs, that are actually user bugs.
> At some point we need to think of a way to deal with this, as more and more
> such things are reported...
>
> On Mon, Jun 29, 2015 at 11:02 AM, Vasiliki Kalavri <
> vasilikikala...@gmail.com> wrote:
>
>> Thank you for the answer Robert!
>>
>> I realize it's a single JVM running, yet I would expect programs to
>> behave in the same way, i.e. serialization to happen (even if not
>> necessary), in order to catch this kind of bugs before cluster deployment.
>> Is this simply not possible or is it a design choice we made for some
>> reason?
>>
>> -V.
>>
>> On 29 June 2015 at 09:53, Robert Metzger <rmetz...@apache.org> wrote:
>>
>>> It is working in the IDE because there we execute everything in the same
>>> JVM, so the mapper can access the correct value of the static variable.
>>> When submitting a job with the CLI frontend, there are at least two JVMs
>>> involved, and code running in the JM/TM can not access the value from the
>>> static variable in the Cli frontend.
>>>
>>> On Sun, Jun 28, 2015 at 9:43 PM, Vasiliki Kalavri <
>>> vasilikikala...@gmail.com> wrote:
>>>
>>>> Hi everyone,
>>>>
>>>> Mihail and I have now solved the issue.
>>>>
>>>> The exception was caused because the array size in question was read
>>>> from a static field of the enclosing class, inside an anonymous mapper.
>>>> Making the mapper a standalone class and passing the array size to the
>>>> constructor solved the issue.
>>>>
>>>> What I don't understand though, is why this worked fine when the job
>>>> was executed from inside the IDE. Is serialization handled differently
>>>> (skipped) in this case?
>>>>
>>>> Cheers,
>>>> Vasia.
>>>>
>>>> On 26 June 2015 at 11:30, Mihail Vieru <vi...@informatik.hu-berlin.de>
>>>> wrote:
>>>>
>>>>>  Hi Vasia,
>>>>>
>>>>> *InitVerticesMapper* is called in the run method of APSP:
>>>>>
>>>>> *    @Override*
>>>>> *    public Graph<K, Tuple2<Integer[],String>, NullValue> run(Graph<K,
>>>>> Tuple2<Integer[],String>, NullValue> input) {*
>>>>>
>>>>> *        VertexCentricConfiguration parameters = new
>>>>> VertexCentricConfiguration();*
>>>>> *        parameters.setSolutionSetUnmanagedMemory(false);*
>>>>>
>>>>> *        return input.mapVertices(new
>>>>> InitVerticesMapper<K>(srcVertexId))*
>>>>> *                .runVertexCentricIteration(new
>>>>> VertexDistanceUpdater<K, Tuple2<Integer[],String>, Integer>(srcVertexId),*
>>>>> *                        new MinDistanceMessenger<K,
>>>>> Tuple2<Integer[],String>, Integer, NullValue>(srcVertexId),*
>>>>> *                        maxIterations, parameters);*
>>>>> *        }*
>>>>>
>>>>> I'll send you the full code via a private e-mail.
>>>>>
>>>>> Cheers,
>>>>> Mihail
>>>>>
>>>>>
>>>>> On 26.06.2015 11:10, Vasiliki Kalavri wrote:
>>>>>
>>>>>  Hi Mihail,
>>>>>
>>>>>  could you share your code or at least the implementations of
>>>>> getVerticesDataSet() and InitVerticesMapper so I can take a look?
>>>>> Where is InitVerticesMapper called above?
>>>>>
>>>>>  Cheers,
>>>>> Vasia.
>>>>>
>>>>>
>>>>> On 26 June 2015 at 10:51, Mihail Vieru <vi...@informatik.hu-berlin.de>
>>>>> wrote:
>>>>>
>>>>>>  Hi Robert,
>>>>>>
>>>>>> I'm using the same input data, as well as the same parameters I use
>>>>>> in the IDE's run configuration.
>>>>>> I don't run the job on the cluster (yet), but locally, by starting
>>>>>> Flink with the start-local.sh script.
>>>>>>
>>>>>>
>>>>>> I will try to explain my code a bit. The *Integer[] *array is
>>>>>> initialized in the *getVerticesDataSet()* method.
>>>>>>
>>>>>> *        DataSet<Vertex<Integer, Tuple2<Integer[],String> >> vertices
>>>>>> = getVerticesDataSet(env);*
>>>>>> *        ...*
>>>>>> *        Graph<Integer, Tuple2<Integer[],String>, NullValue> graph =
>>>>>> Graph.fromDataSet(vertices, edges, env);*
>>>>>> *        ...*
>>>>>> *        Graph<Integer, Tuple2<Integer[],String>, NullValue>
>>>>>> intermediateGraph = *
>>>>>> *                graph.run(new APSP<Integer>(srcVertexId,
>>>>>> maxIterations));*
>>>>>>
>>>>>>
>>>>>> In APSP I'm addressing it in the *InitVerticesMapper*, but is now
>>>>>> suddenly empty.
>>>>>>
>>>>>> Best,
>>>>>> Mihail
>>>>>>
>>>>>>
>>>>>> On 26.06.2015 10:00, Robert Metzger wrote:
>>>>>>
>>>>>> Hi Mihail,
>>>>>>
>>>>>>  the NPE has been thrown from
>>>>>> *graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)*. I guess
>>>>>> that is code written by you or a library you are using.
>>>>>> Maybe the data you are using on the cluster is different from your
>>>>>> local test data?
>>>>>>
>>>>>>  Best,
>>>>>> Robert
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 25, 2015 at 7:41 PM, Mihail Vieru <
>>>>>> vi...@informatik.hu-berlin.de> wrote:
>>>>>>
>>>>>>>  Hi,
>>>>>>>
>>>>>>> I get an ArrayIndexOutOfBoundsException when I run my job from a JAR
>>>>>>> in the CLI.
>>>>>>> This doesn't occur in the IDE.
>>>>>>>
>>>>>>> I've build the JAR using the "maven-shade-plugin" and the pom.xml
>>>>>>> configuration Robert has provided here:
>>>>>>>
>>>>>>> https://stackoverflow.com/questions/30102523/linkage-failure-when-running-apache-flink-jobs
>>>>>>> I specify the entry point using the "-c" option.
>>>>>>>
>>>>>>> The array the Exception refers to is actually initialized when a
>>>>>>> vertices dataset is read from the file system.
>>>>>>>
>>>>>>> Any ideas on what could cause this issue?
>>>>>>>
>>>>>>> Best,
>>>>>>> Mihail
>>>>>>>
>>>>>>> P.S.: the stack trace:
>>>>>>>
>>>>>>> *org.apache.flink.client.program.ProgramInvocationException: The
>>>>>>> program execution failed: Job execution failed.*
>>>>>>> *    at org.apache.flink.client.program.Client.run(Client.java:413)*
>>>>>>> *    at org.apache.flink.client.program.Client.run(Client.java:356)*
>>>>>>> *    at org.apache.flink.client.program.Client.run(Client.java:349)*
>>>>>>> *    at
>>>>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)*
>>>>>>> *    at graphdistance.KAPSPNaiveJob.main(KAPSPNaiveJob.java:56)*
>>>>>>> *    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
>>>>>>> *    at
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)*
>>>>>>> *    at
>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>>>>>>> *    at java.lang.reflect.Method.invoke(Method.java:606)*
>>>>>>> *    at
>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)*
>>>>>>> *    at
>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)*
>>>>>>> *    at org.apache.flink.client.program.Client.run(Client.java:315)*
>>>>>>> *    at
>>>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)*
>>>>>>> *    at
>>>>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)*
>>>>>>> *    at
>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)*
>>>>>>> *    at
>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)*
>>>>>>> *Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>>>>> Job execution failed.*
>>>>>>> *    at
>>>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)*
>>>>>>> *    at
>>>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)*
>>>>>>> *    at
>>>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)*
>>>>>>> *    at
>>>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)*
>>>>>>> *    at
>>>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)*
>>>>>>> *    at
>>>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)*
>>>>>>> *    at
>>>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)*
>>>>>>> *    at
>>>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)*
>>>>>>> *    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)*
>>>>>>> *    at
>>>>>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)*
>>>>>>> *    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)*
>>>>>>> *    at akka.actor.ActorCell.invoke(ActorCell.scala:487)*
>>>>>>> *    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)*
>>>>>>> *    at akka.dispatch.Mailbox.run(Mailbox.scala:221)*
>>>>>>> *    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)*
>>>>>>> *    at
>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)*
>>>>>>> *    at
>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)*
>>>>>>> *    at
>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)*
>>>>>>> *    at
>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)*
>>>>>>> *Caused by: java.lang.ArrayIndexOutOfBoundsException: 0*
>>>>>>> *    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)*
>>>>>>> *    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:48)*
>>>>>>> *    at org.apache.flink.graph.Graph$2.map(Graph.java:389)*
>>>>>>> *    at org.apache.flink.graph.Graph$2.map(Graph.java:387)*
>>>>>>> *    at
>>>>>>> org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)*
>>>>>>> *    at
>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)*
>>>>>>> *    at
>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)*
>>>>>>> *    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)*
>>>>>>> *    at java.lang.Thread.run(Thread.java:745)*
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to