Re: flink k-means on hadoop cluster

2015-06-04 Thread Robert Metzger
No, the permissions are still not correct, otherwise, Flink would not
complan.

The error message of Flink is actually pretty precise: Caused by:
java.io.FileNotFoundException: File /user/cloudera/inputs does not exist or
the user running Flink ('yarn') has insufficient permissions to access it.

Does the file exist and does the user yarn has permission to access it?

On Thu, Jun 4, 2015 at 5:57 PM, Pa Rö paul.roewer1...@googlemail.com
wrote:

 here my main class:

   public static void main(String[] args) {
   //load properties
   Properties pro = new Properties();
   try {
   
 pro.load(FlinkMain.class.getResourceAsStream(/config.properties));
   } catch (Exception e) {
   e.printStackTrace();
   }
   int maxIteration = 
 Integer.parseInt(pro.getProperty(maxiterations));
   String outputPath = pro.getProperty(flink.output);
   // set up execution environment
   ExecutionEnvironment env = 
 ExecutionEnvironment.getExecutionEnvironment();
   // get input points
   DataSetGeoTimeDataTupel points = getPointDataSet(env);
   DataSetGeoTimeDataCenter centroids = null;
   try {
   centroids = getCentroidDataSet(env);
   } catch (Exception e1) {
   e1.printStackTrace();
   }
   // set number of bulk iterations for KMeans algorithm
   IterativeDataSetGeoTimeDataCenter loop = 
 centroids.iterate(maxIteration);
   DataSetGeoTimeDataCenter newCentroids = points
   // compute closest centroid for each point
   .map(new SelectNearestCenter()).withBroadcastSet(loop, 
 centroids)
   // count and sum point coordinates for each centroid
   .groupBy(0).reduceGroup(new CentroidAccumulator())
   // compute new centroids from point counts and 
 coordinate sums
   .map(new CentroidAverager());
   // feed new centroids back into next iteration
   DataSetGeoTimeDataCenter finalCentroids = 
 loop.closeWith(newCentroids);
   DataSetTuple2Integer, GeoTimeDataTupel clusteredPoints = 
 points
   // assign points to final clusters
   .map(new 
 SelectNearestCenter()).withBroadcastSet(finalCentroids, centroids);
   // emit result
   clusteredPoints.writeAsCsv(outputPath+/points, \n,  );
   finalCentroids.writeAsText(outputPath+/centers);//print();
   // execute program
   try {
   env.execute(KMeans Flink);
   } catch (Exception e) {
   e.printStackTrace();
   }
   }

 maybe i can't use the following for the hdfs?

 clusteredPoints.writeAsCsv(outputPath+/points, \n,  );
 finalCentroids.writeAsText(outputPath+/centers);//print();


 2015-06-04 17:53 GMT+02:00 Pa Rö paul.roewer1...@googlemail.com:

 i have change the permissions from the cloudera user and try the
 following command.
 and the files exist on hdfs ;) i set the files in my properties file like
 flink.output=/user/cloudera/outputs/output_flink
 i get the same exception again, maybe the problem have an other reason?

 [cloudera@quickstart bin]$ sudo su yarn
 bash-4.1$ hadoop fs -chmod 777 /user/cloudera/inputs
 bash-4.1$ hadoop fs -chmod 777 /user/cloudera/outputs
 bash-4.1$ exit
 exit
 [cloudera@quickstart bin]$ sudo ./flink run
 /home/cloudera/Desktop/ma-flink.jar
 log4j:WARN No appenders could be found for logger
 (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
 log4j:WARN Please initialize the log4j system properly.
 log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
 more info.
 Found YARN properties file
 /home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties
 Using JobManager address from YARN properties quickstart.cloudera/
 127.0.0.1:52601
 org.apache.flink.client.program.ProgramInvocationException: The program
 execution failed: Failed to submit job fe78e9ee50cf76ac8b487919e1c951fa
 (KMeans Flink)
 at org.apache.flink.client.program.Client.run(Client.java:412)
 at org.apache.flink.client.program.Client.run(Client.java:355)
 at org.apache.flink.client.program.Client.run(Client.java:348)
 at
 org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
 at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 

Re: flink k-means on hadoop cluster

2015-06-04 Thread Pa Rö
i start the yarn-session.sh with sudo
and than the flink run command with sudo,
i get the following exception:

cloudera@quickstart bin]$ sudo ./flink run
/home/cloudera/Desktop/ma-flink.jar
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
org.apache.flink.client.program.ProgramInvocationException: Failed to
resolve JobManager
at org.apache.flink.client.program.Client.run(Client.java:378)
at org.apache.flink.client.program.Client.run(Client.java:355)
at org.apache.flink.client.program.Client.run(Client.java:348)
at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
at org.apache.flink.client.program.Client.run(Client.java:315)
at
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: java.io.IOException: JobManager at akka.tcp://
flink@127.0.0.1:6123/user/jobmanager not reachable. Please make sure that
the JobManager is running and its port is reachable.
at
org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1198)
at
org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1222)
at
org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1240)
at
org.apache.flink.runtime.jobmanager.JobManager.getJobManagerRemoteReference(JobManager.scala)
at org.apache.flink.client.program.Client.run(Client.java:375)
... 15 more
Caused by: akka.actor.ActorNotFound: Actor not found for:
ActorSelection[Anchor(akka.tcp://flink@127.0.0.1:6123/),
Path(/user/jobmanager)]
at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
at
akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
at
akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
at
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
at
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
at akka.actor.ActorCell.terminate(ActorCell.scala:369)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at 

Re: flink k-means on hadoop cluster

2015-06-04 Thread Pa Rö
here my main class:

public static void main(String[] args) {
//load properties
Properties pro = new Properties();
try {

pro.load(FlinkMain.class.getResourceAsStream(/config.properties));
} catch (Exception e) {
e.printStackTrace();
}
int maxIteration = 
Integer.parseInt(pro.getProperty(maxiterations));
String outputPath = pro.getProperty(flink.output);
// set up execution environment
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
// get input points
DataSetGeoTimeDataTupel points = getPointDataSet(env);
DataSetGeoTimeDataCenter centroids = null;
try {
centroids = getCentroidDataSet(env);
} catch (Exception e1) {
e1.printStackTrace();
}
// set number of bulk iterations for KMeans algorithm
IterativeDataSetGeoTimeDataCenter loop = 
centroids.iterate(maxIteration);
DataSetGeoTimeDataCenter newCentroids = points
// compute closest centroid for each point
.map(new SelectNearestCenter()).withBroadcastSet(loop, 
centroids)
// count and sum point coordinates for each centroid
.groupBy(0).reduceGroup(new CentroidAccumulator())
// compute new centroids from point counts and 
coordinate sums
.map(new CentroidAverager());
// feed new centroids back into next iteration
DataSetGeoTimeDataCenter finalCentroids = 
loop.closeWith(newCentroids);
DataSetTuple2Integer, GeoTimeDataTupel clusteredPoints = 
points
// assign points to final clusters
.map(new 
SelectNearestCenter()).withBroadcastSet(finalCentroids,
centroids);
// emit result
clusteredPoints.writeAsCsv(outputPath+/points, \n,  );
finalCentroids.writeAsText(outputPath+/centers);//print();
// execute program
try {
env.execute(KMeans Flink);
} catch (Exception e) {
e.printStackTrace();
}
}

maybe i can't use the following for the hdfs?

clusteredPoints.writeAsCsv(outputPath+/points, \n,  );
finalCentroids.writeAsText(outputPath+/centers);//print();


2015-06-04 17:53 GMT+02:00 Pa Rö paul.roewer1...@googlemail.com:

 i have change the permissions from the cloudera user and try the following
 command.
 and the files exist on hdfs ;) i set the files in my properties file like
 flink.output=/user/cloudera/outputs/output_flink
 i get the same exception again, maybe the problem have an other reason?

 [cloudera@quickstart bin]$ sudo su yarn
 bash-4.1$ hadoop fs -chmod 777 /user/cloudera/inputs
 bash-4.1$ hadoop fs -chmod 777 /user/cloudera/outputs
 bash-4.1$ exit
 exit
 [cloudera@quickstart bin]$ sudo ./flink run
 /home/cloudera/Desktop/ma-flink.jar
 log4j:WARN No appenders could be found for logger
 (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
 log4j:WARN Please initialize the log4j system properly.
 log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
 more info.
 Found YARN properties file
 /home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties
 Using JobManager address from YARN properties quickstart.cloudera/
 127.0.0.1:52601
 org.apache.flink.client.program.ProgramInvocationException: The program
 execution failed: Failed to submit job fe78e9ee50cf76ac8b487919e1c951fa
 (KMeans Flink)
 at org.apache.flink.client.program.Client.run(Client.java:412)
 at org.apache.flink.client.program.Client.run(Client.java:355)
 at org.apache.flink.client.program.Client.run(Client.java:348)
 at
 org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
 at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
 at
 org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
 at org.apache.flink.client.program.Client.run(Client.java:315)
 at
 org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
 at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
 at
 

Re: flink k-means on hadoop cluster

2015-06-04 Thread Pa Rö
i have change the permissions from the cloudera user and try the following
command.
and the files exist on hdfs ;) i set the files in my properties file like
flink.output=/user/cloudera/outputs/output_flink
i get the same exception again, maybe the problem have an other reason?

[cloudera@quickstart bin]$ sudo su yarn
bash-4.1$ hadoop fs -chmod 777 /user/cloudera/inputs
bash-4.1$ hadoop fs -chmod 777 /user/cloudera/outputs
bash-4.1$ exit
exit
[cloudera@quickstart bin]$ sudo ./flink run
/home/cloudera/Desktop/ma-flink.jar
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
Found YARN properties file
/home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties
Using JobManager address from YARN properties quickstart.cloudera/
127.0.0.1:52601
org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Failed to submit job fe78e9ee50cf76ac8b487919e1c951fa
(KMeans Flink)
at org.apache.flink.client.program.Client.run(Client.java:412)
at org.apache.flink.client.program.Client.run(Client.java:355)
at org.apache.flink.client.program.Client.run(Client.java:348)
at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
at org.apache.flink.client.program.Client.run(Client.java:315)
at
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to
submit job fe78e9ee50cf76ac8b487919e1c951fa (KMeans Flink)
at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:595)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:192)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at
org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits
caused an error: File /user/cloudera/inputs does not exist or the user
running Flink ('yarn') has insufficient permissions to access it.
at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.init(ExecutionJobVertex.java:162)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471)
at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:535)
... 21 more
Caused by: java.io.FileNotFoundException: File /user/cloudera/inputs does
not exist 

Re: flink k-means on hadoop cluster

2015-06-04 Thread Pa Rö
okay, it's work, i get a exception:

[cloudera@quickstart Desktop]$ cd flink-0.9-SNAPSHOT/bin/
[cloudera@quickstart bin]$ flink run /home/cloudera/Desktop/ma-flink.jar
bash: flink: command not found
[cloudera@quickstart bin]$ ./flink run /home/cloudera/Desktop/ma-flink.jar
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
Found YARN properties file
/home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties
Using JobManager address from YARN properties quickstart.cloudera/
127.0.0.1:53874
java.io.IOException: Mkdirs failed to create /user/cloudera/outputs
at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:438)
at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:783)
at
mgm.tp.bigdata.ma_commons.commons.Seeding.randomSeeding(Seeding.java:21)
at
mgm.tp.bigdata.ma_flink.FlinkMain.getCentroidDataSet(FlinkMain.java:178)
at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:47)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
at org.apache.flink.client.program.Client.run(Client.java:315)
at
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Failed to submit job 934743a5c49c6d5e31c9e8201452e36d
(KMeans Flink)
at org.apache.flink.client.program.Client.run(Client.java:412)
at org.apache.flink.client.program.Client.run(Client.java:355)
at org.apache.flink.client.program.Client.run(Client.java:348)
at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
at org.apache.flink.client.program.Client.run(Client.java:315)
at
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to
submit job 934743a5c49c6d5e31c9e8201452e36d (KMeans Flink)
at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:595)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:192)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at
org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at

Re: flink k-means on hadoop cluster

2015-06-04 Thread Robert Metzger
It looks like the user yarn which is running Flink doesn't have
permission to access the files.

Can you do sudo su yarn to become the yarn user. Then, you can do
hadoop fs -chmod 777 to make the files accessible for everyone.


On Thu, Jun 4, 2015 at 4:59 PM, Pa Rö paul.roewer1...@googlemail.com
wrote:

 okay, it's work, i get a exception:

 [cloudera@quickstart Desktop]$ cd flink-0.9-SNAPSHOT/bin/
 [cloudera@quickstart bin]$ flink run /home/cloudera/Desktop/ma-flink.jar
 bash: flink: command not found
 [cloudera@quickstart bin]$ ./flink run /home/cloudera/Desktop/ma-flink.jar
 log4j:WARN No appenders could be found for logger
 (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
 log4j:WARN Please initialize the log4j system properly.
 log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
 more info.
 Found YARN properties file
 /home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties
 Using JobManager address from YARN properties quickstart.cloudera/
 127.0.0.1:53874
 java.io.IOException: Mkdirs failed to create /user/cloudera/outputs
 at
 org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:438)
 at
 org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:783)
 at
 mgm.tp.bigdata.ma_commons.commons.Seeding.randomSeeding(Seeding.java:21)
 at
 mgm.tp.bigdata.ma_flink.FlinkMain.getCentroidDataSet(FlinkMain.java:178)
 at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:47)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
 at
 org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
 at org.apache.flink.client.program.Client.run(Client.java:315)
 at
 org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
 at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
 at
 org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
 at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
 org.apache.flink.client.program.ProgramInvocationException: The program
 execution failed: Failed to submit job 934743a5c49c6d5e31c9e8201452e36d
 (KMeans Flink)
 at org.apache.flink.client.program.Client.run(Client.java:412)
 at org.apache.flink.client.program.Client.run(Client.java:355)
 at org.apache.flink.client.program.Client.run(Client.java:348)
 at
 org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
 at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
 at
 org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
 at org.apache.flink.client.program.Client.run(Client.java:315)
 at
 org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
 at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
 at
 org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
 at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
 Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
 to submit job 934743a5c49c6d5e31c9e8201452e36d (KMeans Flink)
 at org.apache.flink.runtime.jobmanager.JobManager.org
 $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:595)
 at
 org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:192)
 at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
 at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
 at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
 at
 org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
 at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
 at
 org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
  

Re: flink k-means on hadoop cluster

2015-06-04 Thread Robert Metzger
Once you've started the YARN session, you can submit a Flink job with
./bin/flink run pathToYourJar.

The jar file of your job doesn't need to be in HDFS. It has to be in the
local file system and flink will send it to all machines.

On Thu, Jun 4, 2015 at 4:48 PM, Pa Rö paul.roewer1...@googlemail.com
wrote:

 okay, now it run on my hadoop.
 how i can start my flink job? and where must the jar file save, at hdfs or
 as local file?

 2015-06-04 16:31 GMT+02:00 Robert Metzger rmetz...@apache.org:

 Yes, you have to run these commands in the command line of the Cloudera
 VM.

 On Thu, Jun 4, 2015 at 4:28 PM, Pa Rö paul.roewer1...@googlemail.com
 wrote:

 you mean run this command on terminal/shell and not define a hue job?

 2015-06-04 16:25 GMT+02:00 Robert Metzger rmetz...@apache.org:

 It should be certainly possible to run Flink on a cloudera live VM

 I think these are the commands you need to execute:

 wget
 http://stratosphere-bin.s3-website-us-east-1.amazonaws.com/flink-0.9-SNAPSHOT-bin-hadoop2.tgz
 tar xvzf flink-0.9-SNAPSHOT-bin-hadoop2.tgz
 cd flink-0.9-SNAPSHOT/
 *export HADOOP_CONF_DIR=/usr/lib/hadoop/etc/hadoop/*
 ./bin/yarn-session.sh -n 1 -jm 1024 -tm 1024

 If that is not working for you, please post the exact error message you
 are getting and I can help you to get it to run.


 On Thu, Jun 4, 2015 at 4:18 PM, Pa Rö paul.roewer1...@googlemail.com
 wrote:

 hi robert,

 i think the problem is the hue api,
 i had the same problem with spark submit script,
 but on the new hue release, they have a spark submit api.

 i asked the group for the same problem with spark, no reply.

 i want test my app on local cluster, before i run it on the big
 cluster,
 for that i use cloudera live. maybe it give an other way to test flink
 on a local cluster vm?

 2015-06-04 16:12 GMT+02:00 Robert Metzger rmetz...@apache.org:

 Hi Paul,

 why did running Flink from the regular scripts not work for you?

 I'm not an expert on Hue, I would recommend asking in the Hue user
 forum / mailing list:
 https://groups.google.com/a/cloudera.org/forum/#!forum/hue-user.

 On Thu, Jun 4, 2015 at 4:09 PM, Pa Rö paul.roewer1...@googlemail.com
  wrote:

 thanks,
 now i want run my app on cloudera live vm single node,
 how i can define my flink job with hue?
 i try to run the flink script in the hdfs, it's not work.

 best regards,
 paul

 2015-06-02 14:50 GMT+02:00 Robert Metzger rmetz...@apache.org:

 I would recommend using HDFS.
 For that, you need to specify the paths like this:
 hdfs:///path/to/data.

 On Tue, Jun 2, 2015 at 2:48 PM, Pa Rö 
 paul.roewer1...@googlemail.com wrote:

 nice,

 which file system i must use for the cluster? java.io or
 hadoop.fs or flink?

 2015-06-02 14:29 GMT+02:00 Robert Metzger rmetz...@apache.org:

 Hi,
 you can start Flink on YARN on the Cloudera distribution.

 See here for more:
 http://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html

 These are the commands you need to execute

 wget 
 http://stratosphere-bin.s3-website-us-east-1.amazonaws.com/flink-0.9-SNAPSHOT-bin-hadoop2.tgz
 tar xvzf flink-0.9-SNAPSHOT-bin-hadoop2.tgzcd flink-0.9-SNAPSHOT/
 ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096





 On Tue, Jun 2, 2015 at 2:03 PM, Pa Rö 
 paul.roewer1...@googlemail.com wrote:

 hi community,

 i want test my flink k-means on a hadoop cluster. i use the
 cloudera live distribution. how i can run flink on this cluster? 
 maybe only
 the java dependencies are engouth?

 best regards,
 paul














Re: flink k-means on hadoop cluster

2015-06-04 Thread Pa Rö
[cloudera@quickstart bin]$ sudo su yarn
bash-4.1$ hadoop fs -chmod 777
-chmod: Not enough arguments: expected 2 but got 1
Usage: hadoop fs [generic options] -chmod [-R] MODE[,MODE]... | OCTALMODE
PATH...
bash-4.1$

you understand?

2015-06-04 17:04 GMT+02:00 Robert Metzger rmetz...@apache.org:

 It looks like the user yarn which is running Flink doesn't have
 permission to access the files.

 Can you do sudo su yarn to become the yarn user. Then, you can do
 hadoop fs -chmod 777 to make the files accessible for everyone.


 On Thu, Jun 4, 2015 at 4:59 PM, Pa Rö paul.roewer1...@googlemail.com
 wrote:

 okay, it's work, i get a exception:

 [cloudera@quickstart Desktop]$ cd flink-0.9-SNAPSHOT/bin/
 [cloudera@quickstart bin]$ flink run /home/cloudera/Desktop/ma-flink.jar
 bash: flink: command not found
 [cloudera@quickstart bin]$ ./flink run
 /home/cloudera/Desktop/ma-flink.jar
 log4j:WARN No appenders could be found for logger
 (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
 log4j:WARN Please initialize the log4j system properly.
 log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
 more info.
 Found YARN properties file
 /home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties
 Using JobManager address from YARN properties quickstart.cloudera/
 127.0.0.1:53874
 java.io.IOException: Mkdirs failed to create /user/cloudera/outputs
 at
 org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:438)
 at
 org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:783)
 at
 mgm.tp.bigdata.ma_commons.commons.Seeding.randomSeeding(Seeding.java:21)
 at
 mgm.tp.bigdata.ma_flink.FlinkMain.getCentroidDataSet(FlinkMain.java:178)
 at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:47)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
 at
 org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
 at org.apache.flink.client.program.Client.run(Client.java:315)
 at
 org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
 at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
 at
 org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
 at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
 org.apache.flink.client.program.ProgramInvocationException: The program
 execution failed: Failed to submit job 934743a5c49c6d5e31c9e8201452e36d
 (KMeans Flink)
 at org.apache.flink.client.program.Client.run(Client.java:412)
 at org.apache.flink.client.program.Client.run(Client.java:355)
 at org.apache.flink.client.program.Client.run(Client.java:348)
 at
 org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
 at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
 at
 org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
 at org.apache.flink.client.program.Client.run(Client.java:315)
 at
 org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
 at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
 at
 org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
 at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
 Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
 to submit job 934743a5c49c6d5e31c9e8201452e36d (KMeans Flink)
 at org.apache.flink.runtime.jobmanager.JobManager.org
 $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:595)
 at
 org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:192)
 at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
 at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
 at
 

Re: flink k-means on hadoop cluster

2015-06-04 Thread Pa Rö
hi robert,

i think the problem is the hue api,
i had the same problem with spark submit script,
but on the new hue release, they have a spark submit api.

i asked the group for the same problem with spark, no reply.

i want test my app on local cluster, before i run it on the big cluster,
for that i use cloudera live. maybe it give an other way to test flink on a
local cluster vm?

2015-06-04 16:12 GMT+02:00 Robert Metzger rmetz...@apache.org:

 Hi Paul,

 why did running Flink from the regular scripts not work for you?

 I'm not an expert on Hue, I would recommend asking in the Hue user forum /
 mailing list:
 https://groups.google.com/a/cloudera.org/forum/#!forum/hue-user.

 On Thu, Jun 4, 2015 at 4:09 PM, Pa Rö paul.roewer1...@googlemail.com
 wrote:

 thanks,
 now i want run my app on cloudera live vm single node,
 how i can define my flink job with hue?
 i try to run the flink script in the hdfs, it's not work.

 best regards,
 paul

 2015-06-02 14:50 GMT+02:00 Robert Metzger rmetz...@apache.org:

 I would recommend using HDFS.
 For that, you need to specify the paths like this: hdfs:///path/to/data.

 On Tue, Jun 2, 2015 at 2:48 PM, Pa Rö paul.roewer1...@googlemail.com
 wrote:

 nice,

 which file system i must use for the cluster? java.io or hadoop.fs or
 flink?

 2015-06-02 14:29 GMT+02:00 Robert Metzger rmetz...@apache.org:

 Hi,
 you can start Flink on YARN on the Cloudera distribution.

 See here for more:
 http://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html

 These are the commands you need to execute

 wget 
 http://stratosphere-bin.s3-website-us-east-1.amazonaws.com/flink-0.9-SNAPSHOT-bin-hadoop2.tgz
 tar xvzf flink-0.9-SNAPSHOT-bin-hadoop2.tgzcd flink-0.9-SNAPSHOT/
 ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096





 On Tue, Jun 2, 2015 at 2:03 PM, Pa Rö paul.roewer1...@googlemail.com
 wrote:

 hi community,

 i want test my flink k-means on a hadoop cluster. i use the cloudera
 live distribution. how i can run flink on this cluster? maybe only the 
 java
 dependencies are engouth?

 best regards,
 paul









Re: flink k-means on hadoop cluster

2015-06-04 Thread Pa Rö
bash-4.1$ hadoop fs -chmod 777 *
chmod: `config.sh': No such file or directory
chmod: `flink': No such file or directory
chmod: `flink.bat': No such file or directory
chmod: `jobmanager.sh': No such file or directory
chmod: `pyflink2.sh': No such file or directory
chmod: `pyflink3.sh': No such file or directory
chmod: `start-cluster.sh': No such file or directory
chmod: `start-cluster-streaming.sh': No such file or directory
chmod: `start-local.bat': No such file or directory
chmod: `start-local.sh': No such file or directory
chmod: `start-local-streaming.sh': No such file or directory
chmod: `start-scala-shell.sh': No such file or directory
chmod: `start-webclient.sh': No such file or directory
chmod: `stop-cluster.sh': No such file or directory
chmod: `stop-local.sh': No such file or directory
chmod: `stop-webclient.sh': No such file or directory
chmod: `taskmanager.sh': No such file or directory
chmod: `webclient.sh': No such file or directory
chmod: `yarn-session.sh': No such file or directory


2015-06-04 17:08 GMT+02:00 Pa Rö paul.roewer1...@googlemail.com:

 [cloudera@quickstart bin]$ sudo su yarn
 bash-4.1$ hadoop fs -chmod 777
 -chmod: Not enough arguments: expected 2 but got 1
 Usage: hadoop fs [generic options] -chmod [-R] MODE[,MODE]... |
 OCTALMODE PATH...
 bash-4.1$

 you understand?

 2015-06-04 17:04 GMT+02:00 Robert Metzger rmetz...@apache.org:

 It looks like the user yarn which is running Flink doesn't have
 permission to access the files.

 Can you do sudo su yarn to become the yarn user. Then, you can do
 hadoop fs -chmod 777 to make the files accessible for everyone.


 On Thu, Jun 4, 2015 at 4:59 PM, Pa Rö paul.roewer1...@googlemail.com
 wrote:

 okay, it's work, i get a exception:

 [cloudera@quickstart Desktop]$ cd flink-0.9-SNAPSHOT/bin/
 [cloudera@quickstart bin]$ flink run /home/cloudera/Desktop/ma-flink.jar
 bash: flink: command not found
 [cloudera@quickstart bin]$ ./flink run
 /home/cloudera/Desktop/ma-flink.jar
 log4j:WARN No appenders could be found for logger
 (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
 log4j:WARN Please initialize the log4j system properly.
 log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
 for more info.
 Found YARN properties file
 /home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties
 Using JobManager address from YARN properties quickstart.cloudera/
 127.0.0.1:53874
 java.io.IOException: Mkdirs failed to create /user/cloudera/outputs
 at
 org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:438)
 at
 org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:783)
 at
 mgm.tp.bigdata.ma_commons.commons.Seeding.randomSeeding(Seeding.java:21)
 at
 mgm.tp.bigdata.ma_flink.FlinkMain.getCentroidDataSet(FlinkMain.java:178)
 at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:47)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
 at
 org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
 at org.apache.flink.client.program.Client.run(Client.java:315)
 at
 org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
 at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
 at
 org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
 at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
 org.apache.flink.client.program.ProgramInvocationException: The program
 execution failed: Failed to submit job 934743a5c49c6d5e31c9e8201452e36d
 (KMeans Flink)
 at org.apache.flink.client.program.Client.run(Client.java:412)
 at org.apache.flink.client.program.Client.run(Client.java:355)
 at org.apache.flink.client.program.Client.run(Client.java:348)
 at
 org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
 at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
 at
 

Re: flink k-means on hadoop cluster

2015-06-04 Thread Pa Rö
i try this:

[cloudera@quickstart bin]$ sudo su yarn
bash-4.1$ hadoop fs -chmod 777 /user/cloudera/outputs
chmod: changing permissions of '/user/cloudera/outputs': Permission denied.
user=yarn is not the owner of inode=outputs
bash-4.1$ hadoop fs -chmod 777 /user/cloudera/inputs
chmod: changing permissions of '/user/cloudera/inputs': Permission denied.
user=yarn is not the owner of inode=inputs
bash-4.1$ exit
exit
[cloudera@quickstart bin]$ sudo ./flink run
/home/cloudera/Desktop/ma-flink.jar
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
Found YARN properties file
/home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties
Using JobManager address from YARN properties quickstart.cloudera/
127.0.0.1:53874
org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Failed to submit job 2f46ef5dff4ecf5552b3477ed1c6f4b9
(KMeans Flink)
at org.apache.flink.client.program.Client.run(Client.java:412)
at org.apache.flink.client.program.Client.run(Client.java:355)
at org.apache.flink.client.program.Client.run(Client.java:348)
at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
at org.apache.flink.client.program.Client.run(Client.java:315)
at
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to
submit job 2f46ef5dff4ecf5552b3477ed1c6f4b9 (KMeans Flink)
at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:595)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:192)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at
org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits
caused an error: File /user/cloudera/inputs does not exist or the user
running Flink ('yarn') has insufficient permissions to access it.
at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.init(ExecutionJobVertex.java:162)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471)
at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:535)
... 21 more
Caused by: java.io.FileNotFoundException: File /user/cloudera/inputs does
not exist or the user running Flink 

Re: flink k-means on hadoop cluster

2015-06-04 Thread Robert Metzger
Hi Paul,

why did running Flink from the regular scripts not work for you?

I'm not an expert on Hue, I would recommend asking in the Hue user forum /
mailing list:
https://groups.google.com/a/cloudera.org/forum/#!forum/hue-user.

On Thu, Jun 4, 2015 at 4:09 PM, Pa Rö paul.roewer1...@googlemail.com
wrote:

 thanks,
 now i want run my app on cloudera live vm single node,
 how i can define my flink job with hue?
 i try to run the flink script in the hdfs, it's not work.

 best regards,
 paul

 2015-06-02 14:50 GMT+02:00 Robert Metzger rmetz...@apache.org:

 I would recommend using HDFS.
 For that, you need to specify the paths like this: hdfs:///path/to/data.

 On Tue, Jun 2, 2015 at 2:48 PM, Pa Rö paul.roewer1...@googlemail.com
 wrote:

 nice,

 which file system i must use for the cluster? java.io or hadoop.fs or
 flink?

 2015-06-02 14:29 GMT+02:00 Robert Metzger rmetz...@apache.org:

 Hi,
 you can start Flink on YARN on the Cloudera distribution.

 See here for more:
 http://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html

 These are the commands you need to execute

 wget 
 http://stratosphere-bin.s3-website-us-east-1.amazonaws.com/flink-0.9-SNAPSHOT-bin-hadoop2.tgz
 tar xvzf flink-0.9-SNAPSHOT-bin-hadoop2.tgzcd flink-0.9-SNAPSHOT/
 ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096





 On Tue, Jun 2, 2015 at 2:03 PM, Pa Rö paul.roewer1...@googlemail.com
 wrote:

 hi community,

 i want test my flink k-means on a hadoop cluster. i use the cloudera
 live distribution. how i can run flink on this cluster? maybe only the 
 java
 dependencies are engouth?

 best regards,
 paul








Re: flink k-means on hadoop cluster

2015-06-04 Thread Robert Metzger
As the output of the hadoop tool indicates, it expects two arguments, you
only passed one (777).
The second argument it is expecting is the path to the file you want to
change.

In your case, it is:
hadoop fs -chmod 777 /user/cloudera/outputs


The reason why
hadoop fs -chmod 777 *
does not work is the following: the * is evaluated by your local bash and
expanded to the files which are present in your current, local directory.
The bash expansion is not able to expand to the files in HDFS.


On Thu, Jun 4, 2015 at 5:08 PM, Pa Rö paul.roewer1...@googlemail.com
wrote:

 [cloudera@quickstart bin]$ sudo su yarn
 bash-4.1$ hadoop fs -chmod 777
 -chmod: Not enough arguments: expected 2 but got 1
 Usage: hadoop fs [generic options] -chmod [-R] MODE[,MODE]... |
 OCTALMODE PATH...
 bash-4.1$

 you understand?

 2015-06-04 17:04 GMT+02:00 Robert Metzger rmetz...@apache.org:

 It looks like the user yarn which is running Flink doesn't have
 permission to access the files.

 Can you do sudo su yarn to become the yarn user. Then, you can do
 hadoop fs -chmod 777 to make the files accessible for everyone.


 On Thu, Jun 4, 2015 at 4:59 PM, Pa Rö paul.roewer1...@googlemail.com
 wrote:

 okay, it's work, i get a exception:

 [cloudera@quickstart Desktop]$ cd flink-0.9-SNAPSHOT/bin/
 [cloudera@quickstart bin]$ flink run /home/cloudera/Desktop/ma-flink.jar
 bash: flink: command not found
 [cloudera@quickstart bin]$ ./flink run
 /home/cloudera/Desktop/ma-flink.jar
 log4j:WARN No appenders could be found for logger
 (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
 log4j:WARN Please initialize the log4j system properly.
 log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
 for more info.
 Found YARN properties file
 /home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties
 Using JobManager address from YARN properties quickstart.cloudera/
 127.0.0.1:53874
 java.io.IOException: Mkdirs failed to create /user/cloudera/outputs
 at
 org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:438)
 at
 org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:783)
 at
 mgm.tp.bigdata.ma_commons.commons.Seeding.randomSeeding(Seeding.java:21)
 at
 mgm.tp.bigdata.ma_flink.FlinkMain.getCentroidDataSet(FlinkMain.java:178)
 at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:47)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
 at
 org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
 at org.apache.flink.client.program.Client.run(Client.java:315)
 at
 org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
 at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
 at
 org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
 at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
 org.apache.flink.client.program.ProgramInvocationException: The program
 execution failed: Failed to submit job 934743a5c49c6d5e31c9e8201452e36d
 (KMeans Flink)
 at org.apache.flink.client.program.Client.run(Client.java:412)
 at org.apache.flink.client.program.Client.run(Client.java:355)
 at org.apache.flink.client.program.Client.run(Client.java:348)
 at
 org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
 at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
 at
 org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
 at org.apache.flink.client.program.Client.run(Client.java:315)
 at
 org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
 at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
 at
 org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
 at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
 Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
 to submit job 

Re: flink k-means on hadoop cluster

2015-06-04 Thread Robert Metzger
Yes, you have to run these commands in the command line of the Cloudera VM.

On Thu, Jun 4, 2015 at 4:28 PM, Pa Rö paul.roewer1...@googlemail.com
wrote:

 you mean run this command on terminal/shell and not define a hue job?

 2015-06-04 16:25 GMT+02:00 Robert Metzger rmetz...@apache.org:

 It should be certainly possible to run Flink on a cloudera live VM

 I think these are the commands you need to execute:

 wget
 http://stratosphere-bin.s3-website-us-east-1.amazonaws.com/flink-0.9-SNAPSHOT-bin-hadoop2.tgz
 tar xvzf flink-0.9-SNAPSHOT-bin-hadoop2.tgz
 cd flink-0.9-SNAPSHOT/
 *export HADOOP_CONF_DIR=/usr/lib/hadoop/etc/hadoop/*
 ./bin/yarn-session.sh -n 1 -jm 1024 -tm 1024

 If that is not working for you, please post the exact error message you
 are getting and I can help you to get it to run.


 On Thu, Jun 4, 2015 at 4:18 PM, Pa Rö paul.roewer1...@googlemail.com
 wrote:

 hi robert,

 i think the problem is the hue api,
 i had the same problem with spark submit script,
 but on the new hue release, they have a spark submit api.

 i asked the group for the same problem with spark, no reply.

 i want test my app on local cluster, before i run it on the big cluster,
 for that i use cloudera live. maybe it give an other way to test flink
 on a local cluster vm?

 2015-06-04 16:12 GMT+02:00 Robert Metzger rmetz...@apache.org:

 Hi Paul,

 why did running Flink from the regular scripts not work for you?

 I'm not an expert on Hue, I would recommend asking in the Hue user
 forum / mailing list:
 https://groups.google.com/a/cloudera.org/forum/#!forum/hue-user.

 On Thu, Jun 4, 2015 at 4:09 PM, Pa Rö paul.roewer1...@googlemail.com
 wrote:

 thanks,
 now i want run my app on cloudera live vm single node,
 how i can define my flink job with hue?
 i try to run the flink script in the hdfs, it's not work.

 best regards,
 paul

 2015-06-02 14:50 GMT+02:00 Robert Metzger rmetz...@apache.org:

 I would recommend using HDFS.
 For that, you need to specify the paths like this:
 hdfs:///path/to/data.

 On Tue, Jun 2, 2015 at 2:48 PM, Pa Rö paul.roewer1...@googlemail.com
  wrote:

 nice,

 which file system i must use for the cluster? java.io or hadoop.fs
 or flink?

 2015-06-02 14:29 GMT+02:00 Robert Metzger rmetz...@apache.org:

 Hi,
 you can start Flink on YARN on the Cloudera distribution.

 See here for more:
 http://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html

 These are the commands you need to execute

 wget 
 http://stratosphere-bin.s3-website-us-east-1.amazonaws.com/flink-0.9-SNAPSHOT-bin-hadoop2.tgz
 tar xvzf flink-0.9-SNAPSHOT-bin-hadoop2.tgzcd flink-0.9-SNAPSHOT/
 ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096





 On Tue, Jun 2, 2015 at 2:03 PM, Pa Rö 
 paul.roewer1...@googlemail.com wrote:

 hi community,

 i want test my flink k-means on a hadoop cluster. i use the
 cloudera live distribution. how i can run flink on this cluster? 
 maybe only
 the java dependencies are engouth?

 best regards,
 paul












Re: Apache Flink transactions

2015-06-04 Thread Chiwan Park
Hi.

Flink is not DBMS. There is no equivalent operation of insert, update, remove.
But you can use map[1] or filter[2] operation to create modified dataset.

I recommend you some sildes[3][4] to understand Flink concepts.

Regards,
Chiwan Park

[1] 
http://ci.apache.org/projects/flink/flink-docs-master/apis/dataset_transformations.html#map
[2] 
http://ci.apache.org/projects/flink/flink-docs-master/apis/dataset_transformations.html#filter
[3] 
http://www.slideshare.net/robertmetzger1/introduction-to-apache-flink-palo-alto-meetup
[4] http://www.slideshare.net/dataArtisans/flink-training-dataset-api-basics


 On Jun 4, 2015, at 2:48 PM, Hawin Jiang hawin.ji...@gmail.com wrote:
 
 Hi  Admin
 
 
 
 Do we have insert, update and remove operations on Apache Flink?
 
 For example:  I have 10 million records in my test file.  I want to add one
 record, update one record and remove one record from this test file. 
 
 How to implement it by Flink?
 
 Thanks.
 
 
 
 
 
 
 
 
 
 Best regards
 
 Email: hawin.ji...@gmail.com
 






Re: coGroup Iterator NoSuchElement

2015-06-04 Thread Mustafa Elbehery
@ Stephan, I was trying to follow the concept of *Nest Join. *In other
words, I wanted to follow certain implementation to achieve my goal.

@Fabian, Well, solving the exception this way will lead to incorrect
result, as they key will always exist on one side, the iterator of the
other side will continue to emit results.

Thanks for your help, I will dig after and check for alternatives.

On Thu, Jun 4, 2015 at 11:22 AM, Fabian Hueske fhue...@gmail.com wrote:

 I am not sure if I got your question right.

 You can easily prevent the NoSuchElementException, but calling next() only
 if hasNext() returns true.

 2015-06-04 11:18 GMT+02:00 Mustafa Elbehery elbeherymust...@gmail.com:

 Yes, Its working now .. But my assumption is that I want to join
 different datasets on the common key, so it will be normal to have many
 tuples on side, which does not exist on the other side ..

 How to fix that ?!!

 On Thu, Jun 4, 2015 at 11:00 AM, Fabian Hueske fhue...@gmail.com wrote:

 Hi,

 one of the iterables of a CoGroup function can be empty. Calling
 iterator.next() on an empty iterator raises the NoSuchElementException.
 This is the expected behavior of the function.

 Are you sure your assumption about your data are correct, i.e., that the
 iterator should always have (at least) one element?

 Fabian


 2015-06-04 10:47 GMT+02:00 Mustafa Elbehery elbeherymust...@gmail.com:

 Hi,


 public static class ComputeStudiesProfile implements 
 CoGroupFunctionPerson, StudentInfo, Person {

Person person;

@Override
public void coGroup(IterablePerson iterable, IterableStudentInfo 
 iterable1, CollectorPerson collector) throws Exception {

   IteratorPerson iterator = iterable.iterator();
   person = iterator.next();

   ArrayListStudentInfo infos = new ArrayListStudentInfo();
   IteratorStudentInfo infosIterator = iterable1.iterator();

   while(infosIterator.hasNext())
 infos.add(infosIterator.next());

   if (infos.size()  0) {
  update(person, infos, collector);
   }
}

public void update(Person person, CollectionStudentInfo infos, 
 CollectorPerson collector) {
   person.setMajor(infos.iterator().next().getMajor());
   for(StudentInfo info : infos){
  person.getBestCourse().addAll(info.getCourses());
   }
   collector.collect(person);
}
 }

  
 ***


 public static class ComputeJobsProfile implements CoGroupFunctionPerson, 
 StudentJobs, Person {

@Override
public void coGroup(IterablePerson iterable, IterableStudentJobs 
 iterable1, CollectorPerson collector) throws Exception {

Person person = iterable.iterator().next();

   ArrayListStudentJobs jobs = new ArrayListStudentJobs();
   for (StudentJobs job : iterable1) {
  jobs.add(job);
   }
   if (jobs.size()  0) {
  update(person, jobs, collector);
   }
}

public void update(Person person, CollectionStudentJobs jobs, 
 CollectorPerson collector) {

   for(StudentJobs job : jobs){
  person.getJobs().addAll(job.getJobs());
   }
   collector.collect(person);
}
 }


 On Wed, Jun 3, 2015 at 11:49 PM, Stephan Ewen se...@apache.org wrote:

 Hi!

 The code snippet is not very revealing. Can you also shot the
 implementations of the CoGroupFunctions?

 Thanks!

 On Wed, Jun 3, 2015 at 3:50 PM, Mustafa Elbehery 
 elbeherymust...@gmail.com wrote:

 Code Snippet :)

 DataSetPerson updatedPersonOne = inPerson.coGroup(inStudent)
.where(name).equalTo(name)
.with(new ComputeStudiesProfile());

 DataSetPerson updatedPersonTwo = updatedPersonOne.coGroup(inJobs)
.where(name).equalTo(name)
.with(new ComputeJobsProfile());

 updatedPersonTwo.print();


 On Wed, Jun 3, 2015 at 3:45 PM, Mustafa Elbehery 
 elbeherymust...@gmail.com wrote:

 Hi,

 I am trying to write two coGrouprs in sequence on the same ETL .. In
 use common dataset in both of them, in the first coGroup I update the
 initial dataset and retrieve the result in a new dataset object. Then I 
 use
 the result in the second coGroup with another new dataset.

 While debugging, I could see the coGroup.next is *false *, however,
 in the next iteration it has elements. I tried to force enabling
 ObjectReuse, I got *half* of the expected result. I have attached a
 screenshot for the debugger.

 My question is, does this has a relation about the concurrent
 execution of different tasks in Flink. And how to solve this problem ??

 Regards.


 --
 Mustafa Elbehery
 EIT ICT Labs Master School
 http://www.masterschool.eitictlabs.eu/home/
 +49(0)15750363097
 skype: mustafaelbehery87




 --
 Mustafa Elbehery
 EIT ICT Labs Master School
 http://www.masterschool.eitictlabs.eu/home/
 +49(0)15750363097
 skype: mustafaelbehery87





 --
 Mustafa Elbehery
 

Re: coGroup Iterator NoSuchElement

2015-06-04 Thread Fabian Hueske
I am not sure if I got your question right.

You can easily prevent the NoSuchElementException, but calling next() only
if hasNext() returns true.

2015-06-04 11:18 GMT+02:00 Mustafa Elbehery elbeherymust...@gmail.com:

 Yes, Its working now .. But my assumption is that I want to join different
 datasets on the common key, so it will be normal to have many tuples on
 side, which does not exist on the other side ..

 How to fix that ?!!

 On Thu, Jun 4, 2015 at 11:00 AM, Fabian Hueske fhue...@gmail.com wrote:

 Hi,

 one of the iterables of a CoGroup function can be empty. Calling
 iterator.next() on an empty iterator raises the NoSuchElementException.
 This is the expected behavior of the function.

 Are you sure your assumption about your data are correct, i.e., that the
 iterator should always have (at least) one element?

 Fabian


 2015-06-04 10:47 GMT+02:00 Mustafa Elbehery elbeherymust...@gmail.com:

 Hi,


 public static class ComputeStudiesProfile implements 
 CoGroupFunctionPerson, StudentInfo, Person {

Person person;

@Override
public void coGroup(IterablePerson iterable, IterableStudentInfo 
 iterable1, CollectorPerson collector) throws Exception {

   IteratorPerson iterator = iterable.iterator();
   person = iterator.next();

   ArrayListStudentInfo infos = new ArrayListStudentInfo();
   IteratorStudentInfo infosIterator = iterable1.iterator();

   while(infosIterator.hasNext())
 infos.add(infosIterator.next());

   if (infos.size()  0) {
  update(person, infos, collector);
   }
}

public void update(Person person, CollectionStudentInfo infos, 
 CollectorPerson collector) {
   person.setMajor(infos.iterator().next().getMajor());
   for(StudentInfo info : infos){
  person.getBestCourse().addAll(info.getCourses());
   }
   collector.collect(person);
}
 }

  
 ***


 public static class ComputeJobsProfile implements CoGroupFunctionPerson, 
 StudentJobs, Person {

@Override
public void coGroup(IterablePerson iterable, IterableStudentJobs 
 iterable1, CollectorPerson collector) throws Exception {

Person person = iterable.iterator().next();

   ArrayListStudentJobs jobs = new ArrayListStudentJobs();
   for (StudentJobs job : iterable1) {
  jobs.add(job);
   }
   if (jobs.size()  0) {
  update(person, jobs, collector);
   }
}

public void update(Person person, CollectionStudentJobs jobs, 
 CollectorPerson collector) {

   for(StudentJobs job : jobs){
  person.getJobs().addAll(job.getJobs());
   }
   collector.collect(person);
}
 }


 On Wed, Jun 3, 2015 at 11:49 PM, Stephan Ewen se...@apache.org wrote:

 Hi!

 The code snippet is not very revealing. Can you also shot the
 implementations of the CoGroupFunctions?

 Thanks!

 On Wed, Jun 3, 2015 at 3:50 PM, Mustafa Elbehery 
 elbeherymust...@gmail.com wrote:

 Code Snippet :)

 DataSetPerson updatedPersonOne = inPerson.coGroup(inStudent)
.where(name).equalTo(name)
.with(new ComputeStudiesProfile());

 DataSetPerson updatedPersonTwo = updatedPersonOne.coGroup(inJobs)
.where(name).equalTo(name)
.with(new ComputeJobsProfile());

 updatedPersonTwo.print();


 On Wed, Jun 3, 2015 at 3:45 PM, Mustafa Elbehery 
 elbeherymust...@gmail.com wrote:

 Hi,

 I am trying to write two coGrouprs in sequence on the same ETL .. In
 use common dataset in both of them, in the first coGroup I update the
 initial dataset and retrieve the result in a new dataset object. Then I 
 use
 the result in the second coGroup with another new dataset.

 While debugging, I could see the coGroup.next is *false *, however,
 in the next iteration it has elements. I tried to force enabling
 ObjectReuse, I got *half* of the expected result. I have attached a
 screenshot for the debugger.

 My question is, does this has a relation about the concurrent
 execution of different tasks in Flink. And how to solve this problem ??

 Regards.


 --
 Mustafa Elbehery
 EIT ICT Labs Master School
 http://www.masterschool.eitictlabs.eu/home/
 +49(0)15750363097
 skype: mustafaelbehery87




 --
 Mustafa Elbehery
 EIT ICT Labs Master School
 http://www.masterschool.eitictlabs.eu/home/
 +49(0)15750363097
 skype: mustafaelbehery87





 --
 Mustafa Elbehery
 EIT ICT Labs Master School http://www.masterschool.eitictlabs.eu/home/
 +49(0)15750363097
 skype: mustafaelbehery87





 --
 Mustafa Elbehery
 EIT ICT Labs Master School http://www.masterschool.eitictlabs.eu/home/
 +49(0)15750363097
 skype: mustafaelbehery87