Re: Accessing state in connected streams

2016-08-30 Thread aris kol
Hi Aljoscha,


I removed business objects and logic etc.. I am happy to post here []  I am 
sure this is a common issue when you start to seriously mess with state.


Assuming a type for the Output
And assuming that there is a function (EventA :=> String) in the mapWithState 
operator of typeAStream (implying the State is just a Seq[String] per key)

def coFun = new CoFlatMapFunction[EventA, EventB, Option[Output]] {

override def flatMap1(in: EventA, out: Collector[Option[Output]]) = 
out.collect(None)

override def flatMap2(in: EventB, out: Collector[Option[Output]]) = {

 new RichFlatMapFunction[EventB, Option[Output]] with StatefulFunction[EventB, 
Option[Output], Seq[String]] {

   lazy val stateTypeInfo: TypeInformation[Seq[String]] = 
implicitly[TypeInformation[Seq[String]]]
   lazy val serializer: TypeSerializer[Seq[String]] = 
stateTypeInfo.createSerializer(getRuntimeContext.getExecutionConfig)
   override lazy val stateSerializer: TypeSerializer[Seq[String]] = serializer

   override def flatMap(in: EventB, out: Collector[Option[Output]]): Unit = {
 out.collect(
   applyWithState(
 in,
 (in, state) =>
   (state match {
 case None => None
 case Some(s) => Some(Output(...))
   }, state)
   )
 )
   }

   flatMap(in, out)

 }
}
}

applyWithState throws the exception and my intuition says I am doing seriously 
wrong in the instantiation. I tried to make something work using the 
mapWithState implementation as a guide and I ended up here.

Thanks,
Aris


From: Aljoscha Krettek <aljos...@apache.org>
Sent: Tuesday, August 30, 2016 10:06 AM
To: user@flink.apache.org
Subject: Re: Accessing state in connected streams

Hi Aris,
I think you're on the right track with using a CoFlatMap for this. Could you 
maybe post the code of your CoFlatMapFunction (or you could send it to me 
privately if you have concerns with publicly posting it) then I could have a 
look.

Cheers,
Aljoscha

On Mon, 29 Aug 2016 at 15:48 aris kol 
<gizera...@hotmail.com<mailto:gizera...@hotmail.com>> wrote:

Any other opinion on this?


Thanks :)

Aris

From: aris kol <gizera...@hotmail.com<mailto:gizera...@hotmail.com>>
Sent: Sunday, August 28, 2016 12:04 AM

To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Accessing state in connected streams

In the implementation I am passing just one CoFlatMapFunction, where flatMap1, 
which operates on EventA, just emits a None (doesn't do anything practically) 
and flatMap2 tries to access the state and throws the NPE.

It wouldn't make sense to use a mapper in this context, I would still want to 
flatten afterwards before pushing dowstream.


Aris



From: Sameer W <sam...@axiomine.com<mailto:sam...@axiomine.com>>
Sent: Saturday, August 27, 2016 11:40 PM
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Accessing state in connected streams

Ok sorry about that :-). I misunderstood as I am not familiar with Scala code. 
Just curious though how are you passing two MapFunction's to the flatMap 
function on the connected stream. The interface of ConnectedStream requires 
just one CoMap function- 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.html

Sameer

On Sat, Aug 27, 2016 at 6:13 PM, aris kol 
<gizera...@hotmail.com<mailto:gizera...@hotmail.com>> wrote:

Let's say I have two types sharing the same trait

trait Event {
def id: Id
}

case class EventA(id: Id, info: InfoA) extends Event
case class EventB(id: Id, info: InfoB) extends Event

Each of these events gets pushed to a Kafka topic and gets consumed by a stream 
in Flink.

Let's say I have two streams

Events of type A create state:

val typeAStream = env.addSource(...)
.flatMap(someUnmarshallerForA)
.keyBy(_.id)
.mapWithState(...)

val typeBStream = env.addSource(...)
.flatMap(someUnmarshallerForB)
.keyBy(_.id)

I want now to process the events in typeBStream using the information stored in 
the State of typeAStream.

One approach would be to use the same stream for the two topics and then 
pattern match, but Event subclasses may grow in numbers and
may have different loads, thus I may want to keep things separate.

Would something along the lines of:

typeAStream.connect(typeBStream).
flatMap(
new IdentityFlatMapFunction(),
new SomeRichFlatMapFunctionForEventB[EventB, O] with StateFulFuntion[EventB, O, 
G[EventA]] { ... }
)

work?

I tried this approach and I ended up in a NPE because the state object was not 
initialized (meaning it was not there).


Thanks,
Aris




Re: Accessing state in connected streams

2016-08-29 Thread aris kol
Any other opinion on this?


Thanks :)

Aris



From: aris kol <gizera...@hotmail.com>
Sent: Sunday, August 28, 2016 12:04 AM
To: user@flink.apache.org
Subject: Re: Accessing state in connected streams


In the implementation I am passing just one CoFlatMapFunction, where flatMap1, 
which operates on EventA, just emits a None (doesn't do anything practically) 
and flatMap2 tries to access the state and throws the NPE.

It wouldn't make sense to use a mapper in this context, I would still want to 
flatten afterwards before pushing dowstream.


Aris



From: Sameer W <sam...@axiomine.com>
Sent: Saturday, August 27, 2016 11:40 PM
To: user@flink.apache.org
Subject: Re: Accessing state in connected streams

Ok sorry about that :-). I misunderstood as I am not familiar with Scala code. 
Just curious though how are you passing two MapFunction's to the flatMap 
function on the connected stream. The interface of ConnectedStream requires 
just one CoMap function- 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.html

Sameer

On Sat, Aug 27, 2016 at 6:13 PM, aris kol 
<gizera...@hotmail.com<mailto:gizera...@hotmail.com>> wrote:

Let's say I have two types sharing the same trait

trait Event {
def id: Id
}

case class EventA(id: Id, info: InfoA) extends Event
case class EventB(id: Id, info: InfoB) extends Event

Each of these events gets pushed to a Kafka topic and gets consumed by a stream 
in Flink.

Let's say I have two streams

Events of type A create state:

val typeAStream = env.addSource(...)
.flatMap(someUnmarshallerForA)
.keyBy(_.id)
.mapWithState(...)

val typeBStream = env.addSource(...)
.flatMap(someUnmarshallerForB)
.keyBy(_.id)

I want now to process the events in typeBStream using the information stored in 
the State of typeAStream.

One approach would be to use the same stream for the two topics and then 
pattern match, but Event subclasses may grow in numbers and
may have different loads, thus I may want to keep things separate.

Would something along the lines of:

typeAStream.connect(typeBStream).
flatMap(
new IdentityFlatMapFunction(),
new SomeRichFlatMapFunctionForEventB[EventB, O] with StateFulFuntion[EventB, O, 
G[EventA]] { ... }
)

work?

I tried this approach and I ended up in a NPE because the state object was not 
initialized (meaning it was not there).


Thanks,
Aris




Re: Accessing state in connected streams

2016-08-27 Thread aris kol
In the implementation I am passing just one CoFlatMapFunction, where flatMap1, 
which operates on EventA, just emits a None (doesn't do anything practically) 
and flatMap2 tries to access the state and throws the NPE.

It wouldn't make sense to use a mapper in this context, I would still want to 
flatten afterwards before pushing dowstream.


Aris



From: Sameer W <sam...@axiomine.com>
Sent: Saturday, August 27, 2016 11:40 PM
To: user@flink.apache.org
Subject: Re: Accessing state in connected streams

Ok sorry about that :-). I misunderstood as I am not familiar with Scala code. 
Just curious though how are you passing two MapFunction's to the flatMap 
function on the connected stream. The interface of ConnectedStream requires 
just one CoMap function- 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.html

Sameer

On Sat, Aug 27, 2016 at 6:13 PM, aris kol 
<gizera...@hotmail.com<mailto:gizera...@hotmail.com>> wrote:

Let's say I have two types sharing the same trait

trait Event {
def id: Id
}

case class EventA(id: Id, info: InfoA) extends Event
case class EventB(id: Id, info: InfoB) extends Event

Each of these events gets pushed to a Kafka topic and gets consumed by a stream 
in Flink.

Let's say I have two streams

Events of type A create state:

val typeAStream = env.addSource(...)
.flatMap(someUnmarshallerForA)
.keyBy(_.id)
.mapWithState(...)

val typeBStream = env.addSource(...)
.flatMap(someUnmarshallerForB)
.keyBy(_.id)

I want now to process the events in typeBStream using the information stored in 
the State of typeAStream.

One approach would be to use the same stream for the two topics and then 
pattern match, but Event subclasses may grow in numbers and
may have different loads, thus I may want to keep things separate.

Would something along the lines of:

typeAStream.connect(typeBStream).
flatMap(
new IdentityFlatMapFunction(),
new SomeRichFlatMapFunctionForEventB[EventB, O] with StateFulFuntion[EventB, O, 
G[EventA]] { ... }
)

work?

I tried this approach and I ended up in a NPE because the state object was not 
initialized (meaning it was not there).


Thanks,
Aris




Re: Accessing state in connected streams

2016-08-27 Thread aris kol
Hi Sameer,


Thank you for your quick response.


I don't think event ordering is the problem here, the processor doesn't assume 
any ordering.

KeyedStream[EventA] stores a state of type Set[InfoA] on its key, which I would 
like KeyedStream[EventB] to access.

The code operates on an Option[Set[InfoA]] without inviting trouble by invoking 
.get.

applyWithState throws the exception because the private ValueState[S] is never 
initialised.

Since KeyedStream[EventA] successfully updates the state, it can could be that:

- There is some wrong config in SomeRichFlatMapFunctionForEventB, which is fine 
and can be fixed

- I am doing something completely wrong that Flink doesn't support.


Thanks,

Aris



From: Sameer W <sam...@axiomine.com>
Sent: Saturday, August 27, 2016 10:17 PM
To: user@flink.apache.org
Subject: Re: Accessing state in connected streams

There is no guarantee about the order in which each stream elements arrive in a 
connected streams. You have to check if the elements have arrived from Stream A 
before using the information to process elements from Stream B. Otherwise you 
have to buffer elements from stream B and check if there are unprocessed 
elements from stream B when elements arrive from stream A. You might need to do 
that for elements from both streams depending on how you use them.

You will get  NPE if you assume events have arrived from A and but they might 
be lagging behind.

On Sat, Aug 27, 2016 at 6:13 PM, aris kol 
<gizera...@hotmail.com<mailto:gizera...@hotmail.com>> wrote:

Let's say I have two types sharing the same trait

trait Event {
def id: Id
}

case class EventA(id: Id, info: InfoA) extends Event
case class EventB(id: Id, info: InfoB) extends Event

Each of these events gets pushed to a Kafka topic and gets consumed by a stream 
in Flink.

Let's say I have two streams

Events of type A create state:

val typeAStream = env.addSource(...)
.flatMap(someUnmarshallerForA)
.keyBy(_.id)
.mapWithState(...)

val typeBStream = env.addSource(...)
.flatMap(someUnmarshallerForB)
.keyBy(_.id)

I want now to process the events in typeBStream using the information stored in 
the State of typeAStream.

One approach would be to use the same stream for the two topics and then 
pattern match, but Event subclasses may grow in numbers and
may have different loads, thus I may want to keep things separate.

Would something along the lines of:

typeAStream.connect(typeBStream).
flatMap(
new IdentityFlatMapFunction(),
new SomeRichFlatMapFunctionForEventB[EventB, O] with StateFulFuntion[EventB, O, 
G[EventA]] { ... }
)

work?

I tried this approach and I ended up in a NPE because the state object was not 
initialized (meaning it was not there).


Thanks,
Aris




Accessing state in connected streams

2016-08-27 Thread aris kol
Let's say I have two types sharing the same trait

trait Event {
def id: Id
}

case class EventA(id: Id, info: InfoA) extends Event
case class EventB(id: Id, info: InfoB) extends Event

Each of these events gets pushed to a Kafka topic and gets consumed by a stream 
in Flink.

Let's say I have two streams

Events of type A create state:

val typeAStream = env.addSource(...)
.flatMap(someUnmarshallerForA)
.keyBy(_.id)
.mapWithState(...)

val typeBStream = env.addSource(...)
.flatMap(someUnmarshallerForB)
.keyBy(_.id)

I want now to process the events in typeBStream using the information stored in 
the State of typeAStream.

One approach would be to use the same stream for the two topics and then 
pattern match, but Event subclasses may grow in numbers and
may have different loads, thus I may want to keep things separate.

Would something along the lines of:

typeAStream.connect(typeBStream).
flatMap(
new IdentityFlatMapFunction(),
new SomeRichFlatMapFunctionForEventB[EventB, O] with StateFulFuntion[EventB, O, 
G[EventA]] { ... }
)

work?

I tried this approach and I ended up in a NPE because the state object was not 
initialized (meaning it was not there).


Thanks,
Aris



RE: 1.1 release

2016-07-18 Thread aris kol
Dropping the binary on my qa cluster as we speak.Thanks for the prompt reponse

> From: u...@apache.org
> Date: Mon, 18 Jul 2016 15:47:25 +0200
> Subject: Re: 1.1 release
> To: user@flink.apache.org
> 
> We are in the processing of fixing the last issues before starting the
> 1.1 RC1 release vote. The latest discussion can be found in these
> threads:
> 
> RC0 1.1. preview (with known issues):
> https://mail-archives.apache.org/mod_mbox/flink-dev/201607.mbox/%3cCAKiyyaH_USuVD6etfehaRm6GLkGie=dczg_g+5+bt5x+8s5...@mail.gmail.com%3e
> 
> Initial release discussion:
> https://mail-archives.apache.org/mod_mbox/flink-dev/201607.mbox/%3c577637ed.2090...@apache.org%3e
> 
> 
> On Mon, Jul 18, 2016 at 3:30 PM, aris kol <gizera...@hotmail.com> wrote:
> > Hi,
> >
> > Any clues as to when 1.1 will be released?
> >
> > Thank,
> > Aris
> >
  

1.1 release

2016-07-18 Thread aris kol
Hi,
Any clues as to when 1.1 will be released?
Thank,Aris
  

1.1-snapshot issues

2016-05-17 Thread aris kol
Hi guys,
Since yesterday, I am getting this:
[warn]  apache.snapshots: tried[warn]   
http://repository.apache.org/snapshots/org/apache/flink/flink-scala_2.11/1.1-SNAPSHOT/flink-scala_2.11-1.1-SNAPSHOT.pom[error]
 SERVER ERROR: Proxy Error 
url=http://repository.apache.org/snapshots/org/apache/flink/flink-streaming-scala_2.11/1.1-SNAPSHOT/maven-metadata.xml[error]
 SERVER ERROR: Proxy Error 
url=http://repository.apache.org/snapshots/org/apache/flink/flink-streaming-scala_2.11/1.1-SNAPSHOT/flink-streaming-scala_2.11-1.1-SNAPSHOT.pom
It seems that sbt cannot resolve the snapshot dependency. 
Any ideas?
Cheers,Aris   

RE: classpath issue on yarn

2016-04-28 Thread aris kol
So,I shaded guava.The whole think works fine locally (stand alone local flink), 
but on yarn (forgot to mention it runs on EMR), I get the 
following:org.apache.flink.client.program.ProgramInvocationException: Unknown 
I/O error while extracting contained jar files. at 
org.apache.flink.client.program.PackagedProgram.extractContainedLibaries(PackagedProgram.java:729)
   at 
org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:192)
 at org.apache.flink.client.CliFrontend.buildProgram(CliFrontend.java:922)  
 at org.apache.flink.client.CliFrontend.run(CliFrontend.java:301)at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)   at 
org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)Caused by: 
java.util.zip.ZipException: error in opening zip file  at 
java.util.zip.ZipFile.open(Native Method)at 
java.util.zip.ZipFile.(ZipFile.java:219)   at 
java.util.zip.ZipFile.(ZipFile.java:149)   at 
java.util.jar.JarFile.(JarFile.java:166)   at 
java.util.jar.JarFile.(JarFile.java:130)   at 
org.apache.flink.client.program.PackagedProgram.extractContainedLibaries(PackagedProgram.java:647)
   ... 5 moreI removed the shaded dependency and I got back to the previous 
error.Any clues?Thanks,Aris
From: gizera...@hotmail.com
To: user@flink.apache.org
Subject: RE: classpath issue on yarn
Date: Tue, 26 Apr 2016 21:03:50 +




Hi Robert,
Thank you for your prompt response.No, I downloaded it from an apache mirror.I 
think yarn loads the hadoop universe before the user classpath by default, so I 
reckon I would get this exception even without flink in the middle.I can still 
see both the old and the new MoreExecutors class in flink-dist (the old as 
org/apache/flink/hadoop/shaded/com/google/common/util/concurrentthe new as 
org/apache/flink/shaded/com/google/common/util/concurrent)I reckon I should try 
to shade guava in my side, but the Shade plugin in sbt-assembly is quite fresh.
I will try and report.
Thanks,Aris


From: rmetz...@apache.org
Date: Tue, 26 Apr 2016 18:42:31 +0200
Subject: Re: classpath issue on yarn
To: user@flink.apache.org

Hi Aris,
Did you build the 1.0.2 flink-dist yourself?If not, which exact version did you 
download?For example this file: 
http://www.apache.org/dyn/closer.lua/flink/flink-1.0.2/flink-1.0.2-bin-hadoop2-scala_2.11.tgz
 has a clean flink-dist jar.


On Tue, Apr 26, 2016 at 12:28 PM, aris kol <gizera...@hotmail.com> wrote:



Hi guys,I ran into a weird classpath issue while running a streaming job on a 
yarn cluster.I have a relatively simple flow that reads data from kafka, does a 
few manipulations and then indexes them on Elasticsearch (2.3).I use the 
elasticsearch2 connector (1.1-SNAPSHOT) (bleeding edge, I know).The stream 
works fine in a local flink node (1.0.2) (reading from remote kafka and writing 
to remote es).However, when deployed to the remote YARN cluster (again, flink 
1.0.2) the following exception is thrown:```04/26/2016 10:07:30 Source: Custom 
Source -> Flat Map -> Sink: Unnamed(1/8) switched to FAILED 
java.lang.NoSuchMethodError: 
com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
 at org.elasticsearch.threadpool.ThreadPool.(ThreadPool.java:190)   
 at 
org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
   at 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
  at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215) 
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at 
java.lang.Thread.run(Thread.java:745)04/26/2016 10:07:30 Job execution 
switched to status FAILING.java.lang.NoSuchMethodError: 
com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
   at org.elasticsearch.threadpool.ThreadPool.(ThreadPool.java:190) 
   at 
org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
   at 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
  at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215) 
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at 
java.lang.Thread.run(Thread.ja