Re: Flink interactive Scala shell

2015-04-16 Thread Kostas Tzoumas
Great, let us know if you run into any issues.

Can you create a JIRA on the REPL and link to your repository for the
community to track the status?

On Wed, Apr 15, 2015 at 4:23 PM, Nikolaas s 
wrote:

> Thanks for the feedback guys!
> Apparently The Scala Shell compiles the Shell input to some kind of virtual
> directory.
> It should be possible to create a jar from it's content and then hand it
> over to Flink for execution in some way.
> I will further investigate..
>
> cheers,
> Nikolaas
>
> 2015-04-15 11:20 GMT+02:00 Stephan Ewen :
>
> > To give a bit of context for the exception:
> >
> > To execute a program, the classes of the user functions need to be
> > available the executing TaskManagers.
> >
> >  - If you execute locally from the IDE, all classes are in the classpath
> > anyways.
> >  - If you use the remote environment, you need to attach the jar file to
> > environment.
> >
> >  - In your case (repl), you need to make sure that the generated classes
> > are given to the TaskManager. In that sense, the approach is probably
> > similar to the case of executing with a remote environment - only that
> you
> > do not have a jar file up front, but need to generate it on the fly. As
> > Robert mentioned, https://github.com/apache/flink/pull/35 may have a
> first
> > solution to that. Other approaches are also possible, like simply always
> > bundling all classes in the directory where the repl puts its generated
> > classes.
> >
> > Greetings,
> > Stephan
> >
> >
> > On Tue, Apr 14, 2015 at 11:49 PM, Aljoscha Krettek 
> > wrote:
> >
> > > I will look into it once I have some time (end of this week, or next
> > > week probably)
> > >
> > > On Tue, Apr 14, 2015 at 8:51 PM, Robert Metzger 
> > > wrote:
> > > > Hey Nikolaas,
> > > >
> > > > Thank you for posting on the mailing list. I've met Nikolaas today in
> > > > person and we were talking a bit about an interactive shell for
> Flink,
> > > > potentially also an integration with Zeppelin.
> > > >
> > > > Great stuff I'm really looking forward to :)
> > > >
> > > > We were wondering if somebody from the list has some experience with
> > the
> > > > scala shell.
> > > > I've pointed Nikolaas also to this PR:
> > > > https://github.com/apache/flink/pull/35.
> > > >
> > > > Best,
> > > > Robert
> > > >
> > > >
> > > > On Tue, Apr 14, 2015 at 5:26 PM, nse sik <
> > nikolaas.steenber...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > >> Hi!
> > > >> I am trying to implement a scala shell for flink.
> > > >>
> > > >> I've started with a simple scala object who's main function will
> drop
> > > the
> > > >> user to the interactive scala shell (repl) at one point:
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> import scala.tools.nsc.interpreter.ILoop
> > > >> import scala.tools.nsc.Settings
> > > >>
> > > >> object Job {
> > > >>   def main(args: Array[String]) {
> > > >>
> > > >> val repl = new ILoop()
> > > >> repl.settings = new Settings()
> > > >>
> > > >> // enable this line to use scala in intellij
> > > >> repl.settings.usejavacp.value = true
> > > >>
> > > >> repl.createInterpreter()
> > > >>
> > > >> // start scala interpreter shell
> > > >> repl.process(repl.settings)
> > > >>
> > > >> repl.closeInterpreter()
> > > >> }
> > > >>   }
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> Now I am trying to execute the word count example as in:
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> scala> import org.apache.flink.api.scala._
> > > >>
> > > >> scala> val env = ExecutionEnvironment.getExecutionEnvironment
> > > >>
> > > >> scala> val text = env.fromElements("To be, or not to be,--that is
> the
> > > >> question:--","Whether 'tis nobler in the mind to suffer", "The
> slings
> > > and
> > > >> arrows of outrageous fortune","Or to take arms against a sea of
> > > troubles,")
> > > >>
> > > >> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+")
> }.map {
> > > (_,
> > > >> 1) }.groupBy(0).sum(1)
> > > >>
> > > >> scala> counts.print()
> > > >>
> > > >> scala> env.execute("Flink Scala Api Skeleton")
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> However I am running into following error:
> > > >>
> > > >> env.execute("Flink Scala Api Skeleton")
> > > >> org.apache.flink.runtime.client.JobExecutionException:
> > > >> java.lang.RuntimeException: The initialization of the DataSource's
> > > outputs
> > > >> caused an error: The type serializer factory could not load its
> > > parameters
> > > >> from the configuration due to missing classes.
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.(RuntimeEnvironment.java:187)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612)
> > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > >> at

Re: Flink interactive Scala shell

2015-04-16 Thread Robert Metzger
I would also keep an eye on this issue from the Zeppelin project:
https://issues.apache.org/jira/browse/ZEPPELIN-44
The needed infrastructure is going to be very similar

On Thu, Apr 16, 2015 at 10:15 AM, Kostas Tzoumas 
wrote:

> Great, let us know if you run into any issues.
>
> Can you create a JIRA on the REPL and link to your repository for the
> community to track the status?
>
> On Wed, Apr 15, 2015 at 4:23 PM, Nikolaas s <
> nikolaas.steenber...@gmail.com>
> wrote:
>
> > Thanks for the feedback guys!
> > Apparently The Scala Shell compiles the Shell input to some kind of
> virtual
> > directory.
> > It should be possible to create a jar from it's content and then hand it
> > over to Flink for execution in some way.
> > I will further investigate..
> >
> > cheers,
> > Nikolaas
> >
> > 2015-04-15 11:20 GMT+02:00 Stephan Ewen :
> >
> > > To give a bit of context for the exception:
> > >
> > > To execute a program, the classes of the user functions need to be
> > > available the executing TaskManagers.
> > >
> > >  - If you execute locally from the IDE, all classes are in the
> classpath
> > > anyways.
> > >  - If you use the remote environment, you need to attach the jar file
> to
> > > environment.
> > >
> > >  - In your case (repl), you need to make sure that the generated
> classes
> > > are given to the TaskManager. In that sense, the approach is probably
> > > similar to the case of executing with a remote environment - only that
> > you
> > > do not have a jar file up front, but need to generate it on the fly. As
> > > Robert mentioned, https://github.com/apache/flink/pull/35 may have a
> > first
> > > solution to that. Other approaches are also possible, like simply
> always
> > > bundling all classes in the directory where the repl puts its generated
> > > classes.
> > >
> > > Greetings,
> > > Stephan
> > >
> > >
> > > On Tue, Apr 14, 2015 at 11:49 PM, Aljoscha Krettek <
> aljos...@apache.org>
> > > wrote:
> > >
> > > > I will look into it once I have some time (end of this week, or next
> > > > week probably)
> > > >
> > > > On Tue, Apr 14, 2015 at 8:51 PM, Robert Metzger  >
> > > > wrote:
> > > > > Hey Nikolaas,
> > > > >
> > > > > Thank you for posting on the mailing list. I've met Nikolaas today
> in
> > > > > person and we were talking a bit about an interactive shell for
> > Flink,
> > > > > potentially also an integration with Zeppelin.
> > > > >
> > > > > Great stuff I'm really looking forward to :)
> > > > >
> > > > > We were wondering if somebody from the list has some experience
> with
> > > the
> > > > > scala shell.
> > > > > I've pointed Nikolaas also to this PR:
> > > > > https://github.com/apache/flink/pull/35.
> > > > >
> > > > > Best,
> > > > > Robert
> > > > >
> > > > >
> > > > > On Tue, Apr 14, 2015 at 5:26 PM, nse sik <
> > > nikolaas.steenber...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > >> Hi!
> > > > >> I am trying to implement a scala shell for flink.
> > > > >>
> > > > >> I've started with a simple scala object who's main function will
> > drop
> > > > the
> > > > >> user to the interactive scala shell (repl) at one point:
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >> import scala.tools.nsc.interpreter.ILoop
> > > > >> import scala.tools.nsc.Settings
> > > > >>
> > > > >> object Job {
> > > > >>   def main(args: Array[String]) {
> > > > >>
> > > > >> val repl = new ILoop()
> > > > >> repl.settings = new Settings()
> > > > >>
> > > > >> // enable this line to use scala in intellij
> > > > >> repl.settings.usejavacp.value = true
> > > > >>
> > > > >> repl.createInterpreter()
> > > > >>
> > > > >> // start scala interpreter shell
> > > > >> repl.process(repl.settings)
> > > > >>
> > > > >> repl.closeInterpreter()
> > > > >> }
> > > > >>   }
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >> Now I am trying to execute the word count example as in:
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >> scala> import org.apache.flink.api.scala._
> > > > >>
> > > > >> scala> val env = ExecutionEnvironment.getExecutionEnvironment
> > > > >>
> > > > >> scala> val text = env.fromElements("To be, or not to be,--that is
> > the
> > > > >> question:--","Whether 'tis nobler in the mind to suffer", "The
> > slings
> > > > and
> > > > >> arrows of outrageous fortune","Or to take arms against a sea of
> > > > troubles,")
> > > > >>
> > > > >> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+")
> > }.map {
> > > > (_,
> > > > >> 1) }.groupBy(0).sum(1)
> > > > >>
> > > > >> scala> counts.print()
> > > > >>
> > > > >> scala> env.execute("Flink Scala Api Skeleton")
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >> However I am running into following error:
> > > > >>
> > > > >> env.execute("Flink Scala Api Skeleton")
> > > > >> org.apache.flink.runtime.client.JobExecutionException:
> > > > >> java.lang.RuntimeException: The initialization of the DataSource's
> > > > outputs
> > > > >> caused

[jira] [Created] (FLINK-1892) Local job execution does not exit.

2015-04-16 Thread Kostas Tzoumas (JIRA)
Kostas Tzoumas created FLINK-1892:
-

 Summary: Local job execution does not exit.
 Key: FLINK-1892
 URL: https://issues.apache.org/jira/browse/FLINK-1892
 Project: Flink
  Issue Type: Bug
  Components: Flink on Tez
Reporter: Kostas Tzoumas
Assignee: Kostas Tzoumas


When using the LocalTezEnvironment to run a job from the IDE the job fails to 
exit after producing data. The following thread seems to run and not allow the 
job to exit:


"Thread-31" #46 prio=5 os_prio=31 tid=0x7fb5d2c43000 nid=0x5507 runnable 
[0x000127319000]
   java.lang.Thread.State: RUNNABLE
at java.lang.Throwable.fillInStackTrace(Native Method)
at java.lang.Throwable.fillInStackTrace(Throwable.java:783)
- locked <0x00076dfda130> (a java.lang.InterruptedException)
at java.lang.Throwable.(Throwable.java:250)
at java.lang.Exception.(Exception.java:54)
at java.lang.InterruptedException.(InterruptedException.java:57)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220)
at 
java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
at 
java.util.concurrent.PriorityBlockingQueue.take(PriorityBlockingQueue.java:545)
at 
org.apache.tez.dag.app.rm.LocalTaskSchedulerService$AsyncDelegateRequestHandler.processRequest(LocalTaskSchedulerService.java:322)
at 
org.apache.tez.dag.app.rm.LocalTaskSchedulerService$AsyncDelegateRequestHandler.run(LocalTaskSchedulerService.java:316)
at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-1893) Add Scala support for Flink on Tez

2015-04-16 Thread Kostas Tzoumas (JIRA)
Kostas Tzoumas created FLINK-1893:
-

 Summary: Add Scala support for Flink on Tez
 Key: FLINK-1893
 URL: https://issues.apache.org/jira/browse/FLINK-1893
 Project: Flink
  Issue Type: Improvement
  Components: Flink on Tez
Reporter: Kostas Tzoumas
Assignee: Kostas Tzoumas


Create Scala versions of LocalTezEnvironment and RemoteTezEnvironment



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-1894) Add Tez execution mode to Flink command-line tools

2015-04-16 Thread Kostas Tzoumas (JIRA)
Kostas Tzoumas created FLINK-1894:
-

 Summary: Add Tez execution mode to Flink command-line tools
 Key: FLINK-1894
 URL: https://issues.apache.org/jira/browse/FLINK-1894
 Project: Flink
  Issue Type: Improvement
  Components: Flink on Tez
Reporter: Kostas Tzoumas
Assignee: Kostas Tzoumas
Priority: Minor


To run Flink programs on Tez, users currently need to
(1) Specify the main class by env.registerMainClass
(2) Package the job in a fat jar
(3) User "hadoop jar" to submit the job to YARN

This is somewhat problematic, and certainly a worse user experience than 
regular Flink jobs. Tez execution mode should be part of Flink's command-line 
tools



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-1896) Add broadcast variables feature to Flink on Tez

2015-04-16 Thread Kostas Tzoumas (JIRA)
Kostas Tzoumas created FLINK-1896:
-

 Summary: Add broadcast variables feature to Flink on Tez
 Key: FLINK-1896
 URL: https://issues.apache.org/jira/browse/FLINK-1896
 Project: Flink
  Issue Type: Improvement
  Components: Flink on Tez
Reporter: Kostas Tzoumas
Assignee: Kostas Tzoumas
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-1895) Add task chaining to Flink on Tez

2015-04-16 Thread Kostas Tzoumas (JIRA)
Kostas Tzoumas created FLINK-1895:
-

 Summary: Add task chaining to Flink on Tez
 Key: FLINK-1895
 URL: https://issues.apache.org/jira/browse/FLINK-1895
 Project: Flink
  Issue Type: Improvement
  Components: Flink on Tez
Reporter: Kostas Tzoumas
Assignee: Kostas Tzoumas
Priority: Minor


Currently, every runtime operator is wrapped inside a Tez processor. We should 
implement some form of task chaining, and measure the performance difference



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-1897) Add accummulators and counters feature to Flink on Tez

2015-04-16 Thread Kostas Tzoumas (JIRA)
Kostas Tzoumas created FLINK-1897:
-

 Summary: Add accummulators and counters feature to Flink on Tez
 Key: FLINK-1897
 URL: https://issues.apache.org/jira/browse/FLINK-1897
 Project: Flink
  Issue Type: Improvement
  Components: Flink on Tez
Reporter: Kostas Tzoumas
Assignee: Kostas Tzoumas






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-1898) Add support for self-joins to Flink on Tez

2015-04-16 Thread Kostas Tzoumas (JIRA)
Kostas Tzoumas created FLINK-1898:
-

 Summary: Add support for self-joins to Flink on Tez
 Key: FLINK-1898
 URL: https://issues.apache.org/jira/browse/FLINK-1898
 Project: Flink
  Issue Type: Bug
  Components: Flink on Tez
Reporter: Kostas Tzoumas
Assignee: Kostas Tzoumas


Self-joins currently are not supported by Flink on Tez due to 
[TEZ-1190|https://issues.apache.org/jira/browse/TEZ-1190]. We should find a 
workaround (e.g., create a dummy node).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: About Operator and OperatorBase

2015-04-16 Thread Timo Walther

I share Stephans opinion.

By the way, we could also find a common name for operators with two 
inputs. Sometimes it's "TwoInputXXX", "DualInputXXX", 
"BinaryInputXXX"... pretty inconsistent.


On 15.04.2015 17:48, Till Rohrmann wrote:

I would also be in favour of making the distinction between the API and
common API layer more clear by using different names. This will ease the
understanding of the source code.

In the wake of a possible renaming we could also get rid of the legacy code
org.apache.flink.optimizer.dag.MatchNode and
rename org.apache.flink.runtime.operators.MatchDriver into JoinDriver to
make the naming more consistent.

On Wed, Apr 15, 2015 at 3:05 PM, Ufuk Celebi  wrote:


On 15 Apr 2015, at 15:01, Stephan Ewen  wrote:


I think we can rename the base operators.

Renaming the subclass of DataSet would be extremely api breaking. I think
that is not worth it.

Oh, that's right. We return MapOperator for DataSet operations. Stephan's
point makes sense.




[jira] [Created] (FLINK-1899) Table API Bug

2015-04-16 Thread Felix Neutatz (JIRA)
Felix Neutatz created FLINK-1899:


 Summary: Table API Bug
 Key: FLINK-1899
 URL: https://issues.apache.org/jira/browse/FLINK-1899
 Project: Flink
  Issue Type: Bug
  Components: Expression API
Affects Versions: 0.9
Reporter: Felix Neutatz
Priority: Minor


I want to run the following program

{code:scala}
case class WeightedEdge(src: Int, target: Int, weight: Double)
case class Community(communityID: Int, nodeID: Int)

case class CommunitySumTotal(communityID: Int, sumTotal: Double)

val communities: DataSet[Community]
val weightedEdges: DataSet[WeightedEdge]

val communitiesTable = communities.toTable 
val weightedEdgesTable = weightedEdges.toTable

val sumTotal = communitiesTable.join(weightedEdgesTable)
  .where("nodeID = src")
  .groupBy('communityID)
  .select('communityID, 'weight.sum).toSet[CommunitySumTotal]
{code}

but I get this exception. In my opinion the outputs do have the same field 
types.

{code:xml}
Exception in thread "main" org.apache.flink.api.table.ExpressionException: 
Expression result type org.apache.flink.api.table.Row(communityID: Integer, 
intermediate.1: Double) does not have the samefields as output type 
io.ssc.trackthetrackers.analysis.algorithms.CommunitySumTotal(communityID: 
Integer, sumTotal: Double)
at 
org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:88)
at 
org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
at 
org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
at 
io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection$.detectCommunities(LouvainCommunityDetection.scala:105)
at 
io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection$delayedInit$body.apply(LouvainCommunityDetection.scala:38)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-1900) Table API documentation example does not work

2015-04-16 Thread Timo Walther (JIRA)
Timo Walther created FLINK-1900:
---

 Summary: Table API documentation example does not work
 Key: FLINK-1900
 URL: https://issues.apache.org/jira/browse/FLINK-1900
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: Timo Walther


Running the word count example leads to

{code}
Exception in thread "main" org.apache.flink.api.table.ExpressionException: 
Expression result type org.apache.flink.api.table.Row(word: String, 
intermediate.1: Integer) does not have the samefields as output type 
io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection$WC$3(word:
 String, count: Integer)
at 
org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:88)
at 
org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
at 
org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
at 
io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection$.detectCommunities(LouvainCommunityDetection.scala:112)
at 
io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection$delayedInit$body.apply(LouvainCommunityDetection.scala:38)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at 
io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection$.main(LouvainCommunityDetection.scala:36)
at 
io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection.main(LouvainCommunityDetection.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


TableAPI - Join on two keys

2015-04-16 Thread Felix Neutatz
Hi,

I want to join two tables in the following way:

case class WeightedEdge(src: Int, target: Int, weight: Double)
case class Community(communityID: Int, nodeID: Int)

case class CommunitySumTotal(communityID: Int, sumTotal: Double)

val communities: DataSet[Community]
val weightedEdges: DataSet[WeightedEdge]

val communitiesTable = communities.toTable
val weightedEdgesTable = weightedEdges.toTable

val sumTotal = communitiesTable.join(weightedEdgesTable)
 .where("nodeID = src && nodeID = target")
 .groupBy('communityID)
 .select("communityID, weight.sum as sumTotal").toSet[CommunitySumTotal]


but I get this exception:

Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: The types of the key
fields do not match: The number of specified keys is different.
at
org.apache.flink.api.java.operators.JoinOperator.(JoinOperator.java:96)
at
org.apache.flink.api.java.operators.JoinOperator$EquiJoin.(JoinOperator.java:197)
at
org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
at
org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
at
org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
Moreover when I use the following where clause:

.where("nodeID = src || nodeID = target")

I get another error:

Exception in thread "main"
org.apache.flink.api.table.ExpressionException: Could not derive
equi-join predicates for predicate 'nodeID === 'src || 'nodeID ===
'target.

at
org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
at
org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
at
org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)


Apart from that the TableApi seems really promising. It's a really great tool.

Thank you for your help,

Felix


Re: About Operator and OperatorBase

2015-04-16 Thread Fabian Hueske
Renaming the core operators is fine with me, but I would not touch API
facing classes.
A big +1 for Timo's suggestion.

2015-04-16 6:30 GMT-05:00 Timo Walther :

> I share Stephans opinion.
>
> By the way, we could also find a common name for operators with two
> inputs. Sometimes it's "TwoInputXXX", "DualInputXXX", "BinaryInputXXX"...
> pretty inconsistent.
>
>
> On 15.04.2015 17:48, Till Rohrmann wrote:
>
>> I would also be in favour of making the distinction between the API and
>> common API layer more clear by using different names. This will ease the
>> understanding of the source code.
>>
>> In the wake of a possible renaming we could also get rid of the legacy
>> code
>> org.apache.flink.optimizer.dag.MatchNode and
>> rename org.apache.flink.runtime.operators.MatchDriver into JoinDriver to
>> make the naming more consistent.
>>
>> On Wed, Apr 15, 2015 at 3:05 PM, Ufuk Celebi  wrote:
>>
>>  On 15 Apr 2015, at 15:01, Stephan Ewen  wrote:
>>>
>>>  I think we can rename the base operators.

 Renaming the subclass of DataSet would be extremely api breaking. I
 think
 that is not worth it.

>>> Oh, that's right. We return MapOperator for DataSet operations. Stephan's
>>> point makes sense.
>>>
>>
>


Re: Apache Ignite

2015-04-16 Thread Fabian Hueske
Yes, I think that is an interesting idea. The question would be on which
level the integration would happen.
Adding Ignite as a data store to read data from and write it to should not
be too difficult.
A tighter integration such as using it as distributed hash table for ML
models might be interesting as well.

2015-04-12 11:53 GMT-05:00 sirinath :

> I am wodering if tighter integration between Apache Ignite and Flink would
> be
> of benefit to both communities
>
>
>
> --
> View this message in context:
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Apache-Ignite-tp5115.html
> Sent from the Apache Flink (Incubator) Mailing List archive. mailing list
> archive at Nabble.com.
>


[jira] [Created] (FLINK-1901) Create sample operator for Dataset

2015-04-16 Thread Theodore Vasiloudis (JIRA)
Theodore Vasiloudis created FLINK-1901:
--

 Summary: Create sample operator for Dataset
 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis


In order to be able to implement Stochastic Gradient Descent and a number of 
other machine learning algorithms we need to have a way to take a random sample 
from a Dataset.

We need to be able to sample with or without replacement from the Dataset, 
choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Rework of the window-join semantics

2015-04-16 Thread Asterios Katsifodimos
As far as I see in [1], Peter's/Gyula's suggestion is what Infosphere
Streams does: symmetric hash join.

>From [1]:
"When a tuple is received on an input port, it is inserted into the window
corresponding to the input port, which causes the window to trigger. As
part of the trigger processing, the tuple is compared against all tuples
inside the window of the opposing input port. If the tuples match, then an
output tuple will be produced for each match. If at least one output was
generated, a window punctuation will be generated after all the outputs."

Cheers,
Asterios

[1]
http://www-01.ibm.com/support/knowledgecenter/#!/SSCRJU_3.2.1/com.ibm.swg.im.infosphere.streams.spl-standard-toolkit-reference.doc/doc/join.html



On Thu, Apr 9, 2015 at 1:30 PM, Matthias J. Sax <
mj...@informatik.hu-berlin.de> wrote:

> Hi Paris,
>
> thanks for the pointer to the Naiad paper. That is quite interesting.
>
> The paper I mentioned [1], does not describe the semantics in detail; it
> is more about the implementation for the stream-joins. However, it uses
> the same semantics (from my understanding) as proposed by Gyula.
>
> -Matthias
>
> [1] Kang, Naughton, Viglas. "Evaluationg Window Joins over Unbounded
> Streams". VLDB 2002.
>
>
>
> On 04/07/2015 12:38 PM, Paris Carbone wrote:
> > Hello Matthias,
> >
> > Sure, ordering guarantees are indeed a tricky thing, I recall having
> that discussion back in TU Berlin. Bear in mind thought that DataStream,
> our abstract data type, represents a *partitioned* unbounded sequence of
> events. There are no *global* ordering guarantees made whatsoever in that
> model across partitions. If you see it more generally there are many “race
> conditions” in a distributed execution graph of vertices that process
> multiple inputs asynchronously, especially when you add joins and
> iterations into the mix (how do you deal with reprocessing “old” tuples
> that iterate in the graph). Btw have you checked the Naiad paper [1]?
> Stephan cited a while ago and it is quite relevant to that discussion.
> >
> > Also, can you cite the paper with the joining semantics you are
> referring to? That would be of good help I think.
> >
> > Paris
> >
> > [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf
> >
> > 
> >
> > 
> > On 07 Apr 2015, at 11:50, Matthias J. Sax  > wrote:
> >
> > Hi @all,
> >
> > please keep me in the loop for this work. I am highly interested and I
> > want to help on it.
> >
> > My initial thoughts are as follows:
> >
> > 1) Currently, system timestamps are used and the suggested approach can
> > be seen as state-of-the-art (there is actually a research paper using
> > the exact same join semantic). Of course, the current approach is
> > inherently non-deterministic. The advantage is, that there is no
> > overhead in keeping track of the order of records and the latency should
> > be very low. (Additionally, state-recovery is simplified. Because, the
> > processing in inherently non-deterministic, recovery can be done with
> > relaxed guarantees).
> >
> >  2) The user should be able to "switch on" deterministic processing,
> > ie, records are timestamped (either externally when generated, or
> > timestamped at the sources). Because deterministic processing adds some
> > overhead, the user should decide for it actively.
> > In this case, the order must be preserved in each re-distribution step
> > (merging is sufficient, if order is preserved within each incoming
> > channel). Furthermore, deterministic processing can be achieved by sound
> > window semantics (and there is a bunch of them). Even for
> > single-stream-windows it's a tricky problem; for join-windows it's even
> > harder. From my point of view, it is less important which semantics are
> > chosen; however, the user must be aware how it works. The most tricky
> > part for deterministic processing, is to deal with duplicate timestamps
> > (which cannot be avoided). The timestamping for (intermediate) result
> > tuples, is also an important question to be answered.
> >
> >
> > -Matthias
> >
> >
> > On 04/07/2015 11:37 AM, Gyula Fóra wrote:
> > Hey,
> >
> > I agree with Kostas, if we define the exact semantics how this works,
> this
> > is not more ad-hoc than any other stateful operator with multiple inputs.
> > (And I don't think any other system support something similar)
> >
> > We need to make some design choices that are similar to the issues we had
> > for windowing. We need to chose how we want to evaluate the windowing
> > policies (global or local) because that affects what kind of policies can
> > be parallel, but I can work on these things.
> >
> > I think this is an amazing feature, so I wouldn't necessarily rush the
> > implementation for 0.9 though.
> >
> > And thanks for helping writing these down.
> >
> > Gyula
> >
> > On Tue, Apr 7, 2015 at 11:11 A

Re: About Operator and OperatorBase

2015-04-16 Thread Maximilian Michels
+1 for keeping the API. Even though this will not change your initial
concern much, Aljoscha :) I agree with you that it would be more consistent
to call the result of an operator OperatorDataSet.

On Thu, Apr 16, 2015 at 3:16 PM, Fabian Hueske  wrote:

> Renaming the core operators is fine with me, but I would not touch API
> facing classes.
> A big +1 for Timo's suggestion.
>
> 2015-04-16 6:30 GMT-05:00 Timo Walther :
>
> > I share Stephans opinion.
> >
> > By the way, we could also find a common name for operators with two
> > inputs. Sometimes it's "TwoInputXXX", "DualInputXXX", "BinaryInputXXX"...
> > pretty inconsistent.
> >
> >
> > On 15.04.2015 17:48, Till Rohrmann wrote:
> >
> >> I would also be in favour of making the distinction between the API and
> >> common API layer more clear by using different names. This will ease the
> >> understanding of the source code.
> >>
> >> In the wake of a possible renaming we could also get rid of the legacy
> >> code
> >> org.apache.flink.optimizer.dag.MatchNode and
> >> rename org.apache.flink.runtime.operators.MatchDriver into JoinDriver to
> >> make the naming more consistent.
> >>
> >> On Wed, Apr 15, 2015 at 3:05 PM, Ufuk Celebi  wrote:
> >>
> >>  On 15 Apr 2015, at 15:01, Stephan Ewen  wrote:
> >>>
> >>>  I think we can rename the base operators.
> 
>  Renaming the subclass of DataSet would be extremely api breaking. I
>  think
>  that is not worth it.
> 
> >>> Oh, that's right. We return MapOperator for DataSet operations.
> Stephan's
> >>> point makes sense.
> >>>
> >>
> >
>


Re: Major Streaming refactoring

2015-04-16 Thread Hermann Gábor
Great! That was really needed.

On Mon, Apr 13, 2015 at 9:36 PM Gyula Fóra  wrote:

> Dear All,
>
> Today I did a major refactoring of some streaming components, with a lot of
> class renamings and some package restructuring.
>
> https://github.com/apache/flink/pull/594
>
> 1. I refactored the internal representation of the Streaming topologies
> (StreamGraph) to a more straightforward and less error-prone graph
> representation
>
> 2. I renamed a lot of internal stream classes and interfaces to have a more
> intuitive name (StreamVertex -> StreamTask, StreamInvokable ->
> StreamOperator etc.) and I also corrected the method and variable names
> everywhere to match this
>
> 3. I did some package restructuring
>
> These classes were getting very messy as we never really did a large
> refactoring as the project was evolving. I think we should definitely merge
> these changes asap, before the release as it will make future contributions
> much much easier to anyone not familiar with the streaming internals.
>
> Cheers,
> Gyula
>


[Gelly] Vertex-centric iteration updateVertex does not get called

2015-04-16 Thread Hermann Gábor
Hi all,

I am implementing a simple triangle counting example for a workshop with
vertex-centric iteration and I found that the updateVertex method only gets
called if there are new messages for that vertex. Is it the expected
behavior?

I know that the iteration should stop for the given vertex when the we
don't change the vertex value but (at least in my case) it would be useful
if the updateVertex got called with an empty message iterator. I guess
receiving zero messages might have a meaning in other cases too, and the
user would like to update the vertex value.
Does changing the current behavior make sense?

Cheers,
Gabor


Re: [Gelly] Vertex-centric iteration updateVertex does not get called

2015-04-16 Thread Andra Lungu
Hello Gabor,

Yes, currently updateVertex only gets called when a new message was
received.
Could you please describe the logic behind your triangle count? The one I
know is described at the beginning of page 1643 in this article:
http://www.cc.gatech.edu/~bader/papers/GraphBSPonXMT-MTAAP2013.pdf

As you can see, each time(for all the three supersteps), a message gets
sent.
Here is my suboptimal implementation of the algorithm in the paper (it's
supposed to prove that high degree nodes overload the system):
https://github.com/andralungu/gelly-partitioning/commit/224cb9b6917c2320e16a657a549b2a0313aeb300

It needs some serious rebasing. I'll get to it this weekend :).
Nevertheless, it should serve as a starting point for your implementation.

Let us know if you have further questions!
Andra

P.S. I'm not sure calling vertexUpdate with an empty message iterator would
be so straightforward to implement. I'll have to look into it a bit more
once I get some spare time :)



On Thu, Apr 16, 2015 at 9:44 PM, Hermann Gábor  wrote:

> Hi all,
>
> I am implementing a simple triangle counting example for a workshop with
> vertex-centric iteration and I found that the updateVertex method only gets
> called if there are new messages for that vertex. Is it the expected
> behavior?
>
> I know that the iteration should stop for the given vertex when the we
> don't change the vertex value but (at least in my case) it would be useful
> if the updateVertex got called with an empty message iterator. I guess
> receiving zero messages might have a meaning in other cases too, and the
> user would like to update the vertex value.
> Does changing the current behavior make sense?
>
> Cheers,
> Gabor
>