[GitHub] flink pull request: Stale Synchronous Parallel Iterations

2016-01-19 Thread nltran
Github user nltran commented on the pull request:

https://github.com/apache/flink/pull/967#issuecomment-172786594
  
An ecosystem of 3rd party libraries for Flink makes sense to me. That would 
be a good place for experimental features (à la Google Labs back in its time) 
that are somehow backed by the Flink community. That would be a good place for 
the parameter server code in its current state, as it not a finished product as 
the thrid-packages that are listed on  
http://flink.apache.org/community.html#third-party-packages

The code for the parameter server is separated in 
https://github.com/apache/flink/pull/1102, with a generic interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stale Synchronous Parallel Iterations

2016-01-15 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/967#issuecomment-171970520
  
I think the SSP code look in good shape.

Concerning the parameter server code, I think we should not add this this 
to Flink's repository, but it should be hosted in an external repository. We 
have to be a bit cautious at this point to not overload the Flink code base.

I would very much like to get a culture starting where people also build 
and offer libraries and tools for Flink in their own (3rd party) repositories. 
It has the advantage that teh main authors are in control of the code. Bugfixes 
can be applied fast, release can be made frequently (no Apache processes). The 
core repository stays slim, which makes it easier to maintain.

  - One possible place for the parameter server code would be a repository 
under https://github.com/project-flink . We can create one and grant you access 
rights.

  - We can link the parameter server code under 
http://flink.apache.org/community.html#third-party-packages

What we would need to do is split up this code into the SSP part (gets into 
the Flink code) and the Parameter Server part. The SSP part can probably be 
kept independent from any use with parameter servers (even though the most 
common and meaningful application is in conjunction with a parameter server).

Any thoughts on this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stale Synchronous Parallel Iterations

2016-01-05 Thread nltran
Github user nltran commented on the pull request:

https://github.com/apache/flink/pull/967#issuecomment-168965380
  
Hello guys,

Do you have any plans or updates on this one?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stale Synchronous Parallel Iterations

2016-01-05 Thread hsaputra
Github user hsaputra commented on the pull request:

https://github.com/apache/flink/pull/967#issuecomment-169143768
  
@nltran Looks like the current state of PR has conflict. Would you mind 
rebasing with latest?
Thx!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stale Synchronous Parallel Iterations

2015-09-07 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/967#issuecomment-138416633
  
@nltran , I went through you code and if I understand correctly, the 
slack-related synchronization is done by the Runtime itself using events. I 
like that idea more than mine which involved synchronization by the Parameter 
Server itself which is, well, sort of messy.
Could you rebase this to the current master?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stale Synchronous Parallel Iterations

2015-08-31 Thread nltran
Github user nltran commented on a diff in the pull request:

https://github.com/apache/flink/pull/967#discussion_r38295439
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/iterative/event/EventWithAggregatorsTest.java
 ---
@@ -123,6 +169,31 @@ private IterationEventWithAggregators 
pipeThroughSerialization(IterationEventWit
return null;
}
}
+
+   private ClockTaskEvent pipeThroughSerialization2(ClockTaskEvent event) {
+   try {
+   ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+   DataOutputStream out = new DataOutputStream(baos);
+   event.write(new OutputViewDataOutputStreamWrapper(out));
+   out.flush();
+
+   byte[] data = baos.toByteArray();
+   out.close();
+   baos.close();
+
+   DataInputStream in = new DataInputStream(new 
ByteArrayInputStream(data));
+   ClockTaskEvent newEvent = 
event.getClass().newInstance();
+   newEvent.read(new InputViewDataInputStreamWrapper(in));
+   in.close();
+
+   return newEvent;
+   } catch (Exception e) {
+   System.err.println(e.getMessage());
+   e.printStackTrace();
+   Assert.fail("Test threw an exception: " + 
e.getMessage());
+   return null;
--- End diff --

I actually reproduced the same as in 
```testSerializationOfEventWithAggregateValues```. What you say makes sense in 
both cases though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stale Synchronous Parallel Iterations

2015-08-25 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/967#discussion_r37883572
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/iterative/event/EventWithAggregatorsTest.java
 ---
@@ -123,6 +169,31 @@ private IterationEventWithAggregators 
pipeThroughSerialization(IterationEventWit
return null;
}
}
+
+   private ClockTaskEvent pipeThroughSerialization2(ClockTaskEvent event) {
+   try {
+   ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+   DataOutputStream out = new DataOutputStream(baos);
+   event.write(new OutputViewDataOutputStreamWrapper(out));
+   out.flush();
+
+   byte[] data = baos.toByteArray();
+   out.close();
+   baos.close();
+
+   DataInputStream in = new DataInputStream(new 
ByteArrayInputStream(data));
+   ClockTaskEvent newEvent = 
event.getClass().newInstance();
+   newEvent.read(new InputViewDataInputStreamWrapper(in));
+   in.close();
+
+   return newEvent;
+   } catch (Exception e) {
+   System.err.println(e.getMessage());
+   e.printStackTrace();
+   Assert.fail(Test threw an exception:  + 
e.getMessage());
+   return null;
--- End diff --

`null` will never be returned. Why do you catch the exception here don't 
let it bubble up?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stale Synchronous Parallel Iterations

2015-08-20 Thread nltran
Github user nltran commented on the pull request:

https://github.com/apache/flink/pull/967#issuecomment-132935694
  
@StephanEwen 
Both interfaces for the parameter and the parameter server are quite 
similar. The things I see are the following:
- The parameter server code is tightly weaved to the core
- Lots of synchronization and registration still happen via sleeps


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stale Synchronous Parallel Iterations

2015-08-15 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/967#issuecomment-131403208
  
@nltran How would your use of the parameter server work together with what 
is proposed in #1003 ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stale Synchronous Parallel Iterations

2015-08-14 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/967#issuecomment-131030134
  
Hi @nltran,

thanks for updating the PR! 
I don't have time for a thorough review now, but I'll answer your last 
question.
ASF homepage states: *[The ASF desires that all contributors of ideas, 
code, or documentation to the Apache projects complete, sign, and submit [...] 
an Individual Contributor License Agreement.](http://www.apache.org/licenses)*

So, yes it would be very nice, if you would sign and submit an ICLA. In 
case, your employer assigned you to work on an Apache project, also an CCLA is 
necessary (see link above). 
Thank you very much, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stale Synchronous Parallel Iterations

2015-08-13 Thread nltran
Github user nltran commented on the pull request:

https://github.com/apache/flink/pull/967#issuecomment-130734997
  
The code now passes all the checks :smiley:  I have addressed the previous 
comments you made previously. @StephanEwen , @fhueske could you give it a 
another review? Namely:
* The SSP slack configuration is now in TaskConfig instead of job-wide. At 
some point we might want to unify both iterations strategies since the BSP 
iteration mode is an edge case for SSP with the slack equal to zero.

* The parameter server is now completely orthogonal to Flink core. It is up 
to the user to set it up and call it. I'm preparing a sample job that uses both 
SSP and calls to a parameter server.

* Correct license header. Will I have to fill in a Contributor License 
Agreement at some point?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stale Synchronous Parallel Iterations

2015-08-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/967#discussion_r36280020
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/ps/impl/ParameterServerIgniteImpl.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2015 EURA NOVA.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ps.impl;
+
+import org.apache.flink.ps.model.ParameterElement;
+import org.apache.flink.ps.model.ParameterServer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of {@link org.apache.flink.ps.model.ParameterServer} 
using Apache Ignite
+ */
+public class ParameterServerIgniteImpl implements ParameterServer {
+
+   private static final Logger log = 
LoggerFactory.getLogger(ParameterServerIgniteImpl.class);
+
+   public final static String CACHE_NAME = 
ParameterServerIgniteImpl.class.getSimpleName();
+
+   public final static String GRID_NAME = FLINK_PARAMETER_SERVER;
+
+   public static CacheConfigurationString, ParameterElement 
getParameterCacheConfiguration() {
+   CacheConfigurationString, ParameterElement parameterCacheCfg 
= new CacheConfigurationString, ParameterElement();
+   parameterCacheCfg.setCacheMode(CacheMode.PARTITIONED);
+   parameterCacheCfg.setName(CACHE_NAME + _parameter);
+// parameterCacheCfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+   return parameterCacheCfg;
+   }
+
+   public static CacheConfigurationString, ParameterElement 
getSharedCacheConfiguration() {
+   CacheConfigurationString, ParameterElement sharedCacheCfg = 
new CacheConfigurationString, ParameterElement();
+   sharedCacheCfg.setCacheMode(CacheMode.REPLICATED);
+   sharedCacheCfg.setName(CACHE_NAME + _SHARED);
+   return sharedCacheCfg;
+   }
+
+   private Ignite ignite = null;
+   private IgniteCacheString, ParameterElement parameterCache = null;
+   private IgniteCacheString, ParameterElement sharedCache = null;
+
+   public ParameterServerIgniteImpl(String name, boolean client) {
+   try {
+   CacheConfigurationString, ParameterElement 
parameterCacheCfg = getParameterCacheConfiguration();
+   CacheConfigurationString, ParameterElement 
sharedCacheCfg =
+   getSharedCacheConfiguration();
+
+   IgniteConfiguration cfg1 = new IgniteConfiguration();
+   cfg1.setGridName(name);
+   cfg1.setPeerClassLoadingEnabled(true);
+   cfg1.setCacheConfiguration(parameterCacheCfg, 
sharedCacheCfg);
+
+   // Beware that client mode in Ignite have a whole 
different meaning than what it usually means
+   // You still need to hav a grid instantiated to access 
the values, though you won't store anything
+   if (client) {
+   cfg1.setClientMode(true);
+   Ignition.setClientMode(true);
+   }
+   if (log.isInfoEnabled()) {
+   String mode = client ? client : server;
+   log.info(Starting parameter server  + name + 
 in  + mode +  mode);
+   }
+   this.ignite = Ignition.start(cfg1);
+
+   parameterCache = 
ignite.getOrCreateCache(parameterCacheCfg).withAsync();
+   sharedCache = 
ignite.getOrCreateCache(sharedCacheCfg).withAsync();
+
+   log.debug(I hereby confirm that parameter cache is 
async enabled: 

[GitHub] flink pull request: Stale Synchronous Parallel Iterations

2015-08-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/967#discussion_r36280063
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 ---
@@ -920,15 +928,33 @@ private JobVertex 
createBulkIterationHead(BulkPartialSolutionPlanNode pspn) {
}

// reset the vertex type to iteration head
-   
headVertex.setInvokableClass(IterationHeadPactTask.class);
+   if(s.equals(BulkIterationStrategy.PLAIN) ) {
+   
headVertex.setInvokableClass(IterationHeadPactTask.class);
+   }
+   else if(s.equals(BulkIterationStrategy.SSP)) {
+// 
headVertex.setInvokableClass(ABSPIterationHeadPactTask.class);
+   
headVertex.setInvokableClass(SSPIterationHeadPactTask.class);
+   }
headConfig = new 
TaskConfig(headVertex.getConfiguration());
toReturn = null;
} else {
// instantiate the head vertex and give it a no-op 
driver as the driver strategy.
// everything else happens in the post visit, after the 
input (the initial partial solution)
// is connected.
+// HEAD
headVertex = new JobVertex(PartialSolution 
(+iteration.getNodeName()+));
-   
headVertex.setInvokableClass(IterationHeadPactTask.class);
+// headVertex = new AbstractJobVertex(PartialSolution 
(+iteration.getNodeName()+));
+
+   if(s.equals(BulkIterationStrategy.PLAIN) ) {
+   
headVertex.setInvokableClass(IterationHeadPactTask.class);
+   }
+   else if(s.equals(BulkIterationStrategy.SSP)) {
+// 
headVertex.setInvokableClass(ABSPIterationHeadPactTask.class);
+   
headVertex.setInvokableClass(SSPIterationHeadPactTask.class);
+   }
+//===
+// 
headVertex.setInvokableClass(IterationHeadPactTask.class);
+// upstream/master
--- End diff --

merge conflict leftover?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stale Synchronous Parallel Iterations

2015-08-05 Thread nltran
Github user nltran commented on a diff in the pull request:

https://github.com/apache/flink/pull/967#discussion_r36280226
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 ---
@@ -920,15 +928,33 @@ private JobVertex 
createBulkIterationHead(BulkPartialSolutionPlanNode pspn) {
}

// reset the vertex type to iteration head
-   
headVertex.setInvokableClass(IterationHeadPactTask.class);
+   if(s.equals(BulkIterationStrategy.PLAIN) ) {
+   
headVertex.setInvokableClass(IterationHeadPactTask.class);
+   }
+   else if(s.equals(BulkIterationStrategy.SSP)) {
+// 
headVertex.setInvokableClass(ABSPIterationHeadPactTask.class);
+   
headVertex.setInvokableClass(SSPIterationHeadPactTask.class);
+   }
headConfig = new 
TaskConfig(headVertex.getConfiguration());
toReturn = null;
} else {
// instantiate the head vertex and give it a no-op 
driver as the driver strategy.
// everything else happens in the post visit, after the 
input (the initial partial solution)
// is connected.
+// HEAD
headVertex = new JobVertex(PartialSolution 
(+iteration.getNodeName()+));
-   
headVertex.setInvokableClass(IterationHeadPactTask.class);
+// headVertex = new AbstractJobVertex(PartialSolution 
(+iteration.getNodeName()+));
+
+   if(s.equals(BulkIterationStrategy.PLAIN) ) {
+   
headVertex.setInvokableClass(IterationHeadPactTask.class);
+   }
+   else if(s.equals(BulkIterationStrategy.SSP)) {
+// 
headVertex.setInvokableClass(ABSPIterationHeadPactTask.class);
+   
headVertex.setInvokableClass(SSPIterationHeadPactTask.class);
+   }
+//===
+// 
headVertex.setInvokableClass(IterationHeadPactTask.class);
+// upstream/master
--- End diff --

Indeed, I am currently preparing a cleaner commit, sorry about that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stale Synchronous Parallel Iterations

2015-08-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/967#discussion_r36280632
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 ---
@@ -920,15 +928,33 @@ private JobVertex 
createBulkIterationHead(BulkPartialSolutionPlanNode pspn) {
}

// reset the vertex type to iteration head
-   
headVertex.setInvokableClass(IterationHeadPactTask.class);
+   if(s.equals(BulkIterationStrategy.PLAIN) ) {
+   
headVertex.setInvokableClass(IterationHeadPactTask.class);
+   }
+   else if(s.equals(BulkIterationStrategy.SSP)) {
+// 
headVertex.setInvokableClass(ABSPIterationHeadPactTask.class);
+   
headVertex.setInvokableClass(SSPIterationHeadPactTask.class);
+   }
headConfig = new 
TaskConfig(headVertex.getConfiguration());
toReturn = null;
} else {
// instantiate the head vertex and give it a no-op 
driver as the driver strategy.
// everything else happens in the post visit, after the 
input (the initial partial solution)
// is connected.
+// HEAD
headVertex = new JobVertex(PartialSolution 
(+iteration.getNodeName()+));
-   
headVertex.setInvokableClass(IterationHeadPactTask.class);
+// headVertex = new AbstractJobVertex(PartialSolution 
(+iteration.getNodeName()+));
+
+   if(s.equals(BulkIterationStrategy.PLAIN) ) {
+   
headVertex.setInvokableClass(IterationHeadPactTask.class);
+   }
+   else if(s.equals(BulkIterationStrategy.SSP)) {
+// 
headVertex.setInvokableClass(ABSPIterationHeadPactTask.class);
+   
headVertex.setInvokableClass(SSPIterationHeadPactTask.class);
+   }
+//===
+// 
headVertex.setInvokableClass(IterationHeadPactTask.class);
+// upstream/master
--- End diff --

Okay ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stale Synchronous Parallel Iterations

2015-08-03 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/967#issuecomment-127236312
  
We are thinking about more general tooling for parameter servers (with 
implementations different from Ignite), because the request for that has come 
up. Whether we want to ship a parameter server with Flink, or simply add 
interfaces that help connecting against PS, is undecided, yet. 

For that reason, we should probably not overdesign this now. Let's add SSP 
to the runtime, week Ignite out, and make a nice self-contained example where 
Ignite is purely used in the user-code.

I think we can make a self-contained example in the following way:
The example MapFunction that communicates with the parameter server 
instantiates it's Ignite in the `open()` function, if it is in the first 
superstep. It then proceeds as in your code. 

To make this real-world applicable, I assume one would need dedicated 
Ignite processes anyways. In the example, the parameters are lost after the 
server is torn down, but so are they with an in taskmanager parameter server, 
for all users of YARN as well (where TMs are shut down after a job).

The example here serves the purpose of demonstrating the capabilities and 
use of SSP, and to give people a blueprint how to connect this to a parameter 
server.

What do you think about this?

Her are some other comments:
  - Using Longs as parameter IDs is probably quite a bit cheaper than 
Strings.
  - The pull request also still contains markup from merging.
  - Some constants are hard-wired into the code. Would be good to pull them 
into static final fields at least, so it is very visible that these are 
predefined constants.
  - You seem to re-initialize the caches in every superstep. In general, 
initialization in `open()` in iterations needs to be done carefully, because 
`open()` is called at the beginning of each superstep. You can always check in 
which superstep you are using 
`getIterationRuntimeContext().getCurrentSuperstep()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stale Synchronous Parallel Iterations

2015-08-03 Thread nltran
Github user nltran commented on the pull request:

https://github.com/apache/flink/pull/967#issuecomment-127257276
  
Thank you for your feedback!

I think this is a fine way to go. I will then provide the parameter server 
related code in a different module and include an example of how to run a SSP 
job with calls to the parameter server.

You seem to re-initialize the caches in every superstep. In general, 
initialization in open() in iterations needs to be done carefully, because 
open() is called at the beginning of each superstep. You can always check in 
which superstep you are using getIterationRuntimeContext().getCurrentSuperstep()

We initialize the caches at the first superstep by checking if the 
reference is null, as the assumption is that since the same instantiated task 
is reused through the iterations, the cache is initialized only in the first 
superstep and the reference is not null in the subsequent supersteps. Would you 
recommend to do that check rather on the number of superstep?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stale Synchronous Parallel Iterations

2015-08-03 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/967#issuecomment-127258102
  
Ah, my bad. Concerning the initialization, I overlooked the null-check. It 
is good the way it is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stale Synchronous Parallel Iterations

2015-08-02 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/967#issuecomment-127050940
  
Wow, impressive contribution! I would very much like to add SSP to Flink.

This is quite a lot, so it may take a bit to review and merge.

Here are some initial thoughts:

  - The SSP slack is defined as a job-wide parameter. Can you make it a 
parameter of the respective bulk iteration (that also has the mode PLAIN/SSP) ?

  - The code includes Apache Ignite as a parameter server. Is it possible 
to make this independent of Ignite? I know that SSP is usually used in 
parameter server ML settings, but I would like to not tie Ignite (another big 
dependency) into the Flink codebase.





---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stale Synchronous Parallel Iterations

2015-07-31 Thread nltran
GitHub user nltran opened a pull request:

https://github.com/apache/flink/pull/967

Stale Synchronous Parallel Iterations

Here is a pull request containing our development on [Stale Synchronous 
Parallel 
(SSP)](http://reports-archive.adm.cs.cmu.edu/anon/ml2013/CMU-ML-13-103.pdf) 
iterations. The pull request contains:

* Code supporting SSP
* API to configure and enable SSP from the ExecutionEnvironment

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nltran/flink SSP

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/967.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #967


commit 0435bb9114f48aeefb330e7cf92610be84f79c81
Author: Nam-Luc Tran namluc.t...@euranova.eu
Date:   2015-07-31T15:21:08Z

* Added the model for parameter server and parameter element
* Added parameter server implementation based on Apache Ignite
* Started instance of parameter server in TaskManager
* Added Apache Ignite as dependency in pom.xml

commit decc66db856d3d640e7e29128e2696c0941091dd
Author: Nam-Luc Tran namluc.t...@euranova.eu
Date:   2015-07-31T15:24:32Z

* Extended RichMapFunction with methods to access parameter server

commit 55398da553ce697af9bea881ea7131818edb82d2
Author: Nam-Luc Tran namluc.t...@euranova.eu
Date:   2015-07-31T15:25:08Z

* Extended DataSet API to enable SSP and configuration

commit 6ef80ceac5e6ef897769e1883823d152d0c04070
Author: Nam-Luc Tran namluc.t...@euranova.eu
Date:   2015-07-31T15:25:55Z

* Extended ExecutionEnvironment and ExecutionConfig to enable SSP and SPP 
configuration

commit 368ca1c101034aefa8b6ac0ce4791133976831e0
Author: Nam-Luc Tran namluc.t...@euranova.eu
Date:   2015-07-31T15:29:29Z

* Added drop-in control structures for Stale Synchronous Parallel iterations

commit 23fb6518fe7042ad859bb9474266131f2bdb669c
Author: Nam-Luc Tran namluc.t...@euranova.eu
Date:   2015-07-31T15:30:31Z

* Added the events used by the control structures for Stale Synchronous 
Parallel iterations

commit 48810bde7783610112751f2221126b69a1ac9b56
Author: Nam-Luc Tran namluc.t...@euranova.eu
Date:   2015-07-31T15:32:02Z

* Extended the job translation to take into account the control structures
  related to Stale Synchronous Parallel Iterations




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---