Re: CountTrigger FIRE or FIRE_AND_PURGE

2016-08-29 Thread Fabian Hueske
Hi Paul,

This blog post [1] includes an example of an early trigger that should
pretty much do what you are looking for.
This one [2] explains the windowing mechanics of Flink (window assigner,
trigger, function, etc).

Hope this helps,
Fabian

[1]
https://www.mapr.com/blog/essential-guide-streaming-first-processing-apache-flink
[2] http://flink.apache.org/news/2015/12/04/Introducing-windows.html

2016-08-30 0:25 GMT+02:00 Paul Joireman :

> Hi all,
>
>
> I'm attempting to use long SlidingEventTime window (duration 24 hours) but
> I would like updates more frequently than the 24 hour length.  I
> naeively attempted to use a simple CountTrigger(10) to give me the window
> every time 10 samples are collected, however, the window processing
> function I'm using only seems to get the latest 10 not the whole window
> (which I what I was hoping for).   The code looks like it simply fires
> after the count is reached but it seems like it is doing a FIRE and PURGE,
> I cant' seem to use the iterator in the window processing function to get
> more than 10 elements at a time.  Is there something I'm missing in order
> to get at the full content of the window data.
>
>
> Paul
>


CountTrigger FIRE or FIRE_AND_PURGE

2016-08-29 Thread Paul Joireman
Hi all,


I'm attempting to use long SlidingEventTime window (duration 24 hours) but I 
would like updates more frequently than the 24 hour length.  I naeively 
attempted to use a simple CountTrigger(10) to give me the window every time 10 
samples are collected, however, the window processing function I'm using only 
seems to get the latest 10 not the whole window (which I what I was hoping 
for).   The code looks like it simply fires after the count is reached but it 
seems like it is doing a FIRE and PURGE, I cant' seem to use the iterator in 
the window processing function to get more than 10 elements at a time.  Is 
there something I'm missing in order to get at the full content of the window 
data.


Paul


Re: Cannot pass objects with null-valued fields to the next operator

2016-08-29 Thread Stephan Ewen
Hi!

Null is indeed not supported for some basic data types (tuples / case
classes).

Can you use Option for nullable fields?

Stephan


On Mon, Aug 29, 2016 at 8:04 PM, Jack Huang  wrote:

> Hi all,
>
> It seems like flink does not allow passing case class objects with
> null-valued fields to the next operators. I am getting the following error
> message:
>
> *Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator*
>at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
>at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
>at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:399)
>at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:381)
>at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
>at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
>at 
> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:340)
>at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
>at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:239)
>at java.lang.Thread.run(Thread.java:745)*Caused by: 
> java.lang.NullPointerException*
>at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:67)
>at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:32)
>at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:90)
>at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:30)
>at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:90)
>at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:30)
>at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:371)
>... 9 more
>
> ​
> This error goes away when I force all objects to not have fields with null
> values. However, null is a valid value in my use case. Is there a way to
> make it work? I am using flink-1.1.1.
>
>
> Thanks,
> Jack
>


Cannot pass objects with null-valued fields to the next operator

2016-08-29 Thread Jack Huang
Hi all,

It seems like flink does not allow passing case class objects with
null-valued fields to the next operators. I am getting the following error
message:

*Caused by: java.lang.RuntimeException: Could not forward element to
next operator*
   at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
   at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
   at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:399)
   at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:381)
   at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
   at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
   at 
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:340)
   at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
   at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:239)
   at java.lang.Thread.run(Thread.java:745)*Caused by:
java.lang.NullPointerException*
   at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:67)
   at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:32)
   at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:90)
   at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:30)
   at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:90)
   at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:30)
   at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:371)
   ... 9 more

​
This error goes away when I force all objects to not have fields with null
values. However, null is a valid value in my use case. Is there a way to
make it work? I am using flink-1.1.1.


Thanks,
Jack


Data Transfer between TM should be encrypted

2016-08-29 Thread vinay patil
Hi Ufuk,

This is regarding this issue
https://issues.apache.org/jira/browse/FLINK-4404

How can we achieve this, I am able to decrypt the data from Kafka coming
in, but I want to make sure that the data is encrypted when flowing between
TM's.

One approach I can think of is to decrypt the data at the start of each
operator and encrypt it at the end of each operator, but I feel this is not
an efficient approach.

I just want to check if there are alternatives to this and can this be
achieved by doing some configurations.

Regards,
Vinay Patil




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-Transfer-between-TM-should-be-encrypted-tp8781.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Resource isolation in flink among multiple jobs

2016-08-29 Thread Abhishek Agarwal
Hi Robert,
Thanks for your reply. Few follow up questions -
Is there any timeline for mesos support?

In standalone installations, how about having one Task slot per TaskManager
and multiple TaskManager instances per machine?

On Mon, Aug 29, 2016 at 7:13 PM, Robert Metzger  wrote:

> Hi,
>
> for isolation, we recommend using YARN, or soon Mesos.
>
> For standalone installations, you'll need to manually set up multiple
> independent Flink clusters within one physical cluster if you want them to
> be isolated.
>
> Regards,
> Robert
>
>
> On Mon, Aug 29, 2016 at 1:41 PM, Abhishek Agarwal 
> wrote:
>
>> In a standalone flink cluster, the TaskManager has multiple slots and can
>> run different applications in different slot but still within same JVM.
>> Given that, two applications are running under same process, how is the cpu
>> and memory isolation achieved?
>>
>> Are there any recommendations to achieve isolation between applications?
>>
>> --
>> Regards,
>> Abhishek Agarwal
>>
>>
>


-- 
Regards,
Abhishek Agarwal


Re: Flink JMX

2016-08-29 Thread Sreejith S
Thank you very much Chesnay, your pointer on "java.rmi.server.hostname"
solved the issue. Now i am able to get flink metrics in 1.1.1

"host" setting in JMX Reporter was because i thought of not exposing JMX
metrics as public. Since my flink cluster is in AWS cloud. So i tried to
bind it to private ip and private to pulic ip mapping for JMX port is
disabled.Not sure this make sense :)

Thank You @Robert for the comments.

Regards,
Srijith


On Mon, Aug 29, 2016 at 8:16 PM, Chesnay Schepler 
wrote:

> Hello,
>
> That you can't access JMX in 1.0.3 even though you set all the JVM JMX
> options is unrelated to Flink. As such your JMX setup in general is broken.
> Note that in order to remotely access JMX you usually have to set
> "java.rmi.server.hostname" system-property on the host as well.
>
> Regarding the reporter:
>
>- When you manually set all the JVM JMX properties you don't have to
>specify a port for the JMXReporter.
>- There is no "host" setting for the JMX reporter. Out of curiosity,
>what do you think setting it would do?
>
> Please keep us updated about your progress.
>
> Regards,
> Chesnay
>
>
> On 29.08.2016 15:41, Robert Metzger wrote:
>
> Hi,
>
> I think in Flink 1.1.1 JMX will be started on port 8080, 8081 or 8082 (on
> the JM, 8081 is probably occupied by the web interface).
>
> On Mon, Aug 29, 2016 at 1:25 PM, Sreejith S  wrote:
>
>> Hi Chesnay,
>>
>> I added the below configuration in flink-conf in each taskmanagers.
>> (flink 1.0.3 version )
>>
>> # Enable JMX
>>
>> env.java.opts: -Dcom.sun.management.jmxremote
>> -Dcom.sun.management.jmxremote.port=
>> -Dcom.sun.management.jmxremote.authenticate=false
>> -Dcom.sun.management.jmxremote.ssl=false
>> -Dcom.sun.management.jmxremote.host=XX.XX.XX.XX
>>
>> Then tried to access it via 
>> service:jmx:rmi:///jndi/rmi://XX.XX.XX.XX:/jmxrmi
>> from a normal java program using javax.management.remote.JMXConnector
>>
>> But connection refusing. I checked my port, its opened.
>>
>> Then i added below configs in Flink 1.1.1
>>
>> # JMX Metrics
>> metrics.reporters : jmx_reporter
>> metrics.reporter.jmx_reporter.class: org.apache.flink.metrics.jmx.J
>> MXReporter
>> metrics.reporter.jmx_reporter.host: XX.XX.XX.XX
>> metrics.reporter.jmx_reporter.port: 8080-8082
>>
>> But no hope. Am i miss anything ?
>>
>> Thank You
>>
>>
>> On Mon, Aug 29, 2016 at 4:36 PM, Chesnay Schepler < 
>> ches...@apache.org> wrote:
>>
>>> Hello,
>>>
>>> can you post the jmx config entries and give us more details on how you
>>> want to access it?
>>>
>>> Regards,
>>> Chesnay
>>>
>>>
>>> On 29.08.2016 12:09, Sreejith S wrote:
>>>
>>> Hi All,
>>>
>>> I am using Flink-1.1.1 and i enabled JMX metrics in configuration file.
>>> In the task manger log i can see JMX is running.
>>>
>>> Is this metrics  exposed only through Flink Metrics API's ?
>>>
>>> I tried to connect to flink JMX URL using normal javax , but connections
>>> getting refused.
>>>
>>> Thanks,
>>>
>>> --
>>>
>>>
>>> *Sreejith.S*
>>> https://github.com/srijiths/
>>> http://srijiths.wordpress.com/
>>> tweet2sree@twitter 
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>> --
>>
>>
>> *Sreejith.S*
>> https://github.com/srijiths/
>> http://srijiths.wordpress.com/
>> tweet2sree@twitter 
>>
>>
>>
>>
>>
>>
>>
>
>


-- 


*Sreejith.S*
https://github.com/srijiths/
http://srijiths.wordpress.com/
tweet2sree@twitter 


Re: Flink JMX

2016-08-29 Thread Chesnay Schepler

Hello,

That you can't access JMX in 1.0.3 even though you set all the JVM JMX 
options is unrelated to Flink. As such your JMX setup in general is broken.
Note that in order to remotely access JMX you usually have to set 
"java.rmi.server.hostname" system-property on the host as well.


Regarding the reporter:

 * When you manually set all the JVM JMX properties you don't have to
   specify a port for the JMXReporter.
 * There is no "host" setting for the JMX reporter. Out of curiosity,
   what do you think setting it would do?

Please keep us updated about your progress.

Regards,
Chesnay

On 29.08.2016 15:41, Robert Metzger wrote:

Hi,

I think in Flink 1.1.1 JMX will be started on port 8080, 8081 or 8082 
(on the JM, 8081 is probably occupied by the web interface).


On Mon, Aug 29, 2016 at 1:25 PM, Sreejith S > wrote:


Hi Chesnay,

I added the below configuration in flink-conf in each
taskmanagers. (flink 1.0.3 version )

# Enable JMX

env.java.opts: -Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.port=
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.host=XX.XX.XX.XX

Then tried to access it
via service:jmx:rmi:///jndi/rmi://XX.XX.XX.XX:/jmxrmi from a
normal java program using javax.management.remote.JMXConnector

But connection refusing. I checked my port, its opened.

Then i added below configs in Flink 1.1.1

# JMX Metrics
metrics.reporters : jmx_reporter
metrics.reporter.jmx_reporter.class:
org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx_reporter.host: XX.XX.XX.XX
metrics.reporter.jmx_reporter.port: 8080-8082

But no hope. Am i miss anything ?

Thank You


On Mon, Aug 29, 2016 at 4:36 PM, Chesnay Schepler
> wrote:

Hello,

can you post the jmx config entries and give us more details
on how you want to access it?

Regards,
Chesnay


On 29.08.2016 12:09, Sreejith S wrote:

Hi All,

I am using Flink-1.1.1 and i enabled JMX metrics in
configuration file. In the task manger log i can see JMX is
running.

Is this metrics  exposed only through Flink Metrics API's ?

I tried to connect to flink JMX URL using normal javax , but
connections getting refused.

Thanks,

-- 



*Sreejith.S*
https://github.com/srijiths/
http://srijiths.wordpress.com/
**tweet2sree@twitter 












-- 



*Sreejith.S*
https://github.com/srijiths/
http://srijiths.wordpress.com/
**tweet2sree@twitter 












Gracefully Stopping Streaming Job Programmatically in LocalStreamEnvironment

2016-08-29 Thread Konstantin Knauf
Hi everyone,

I have an integration test for which a use a LocalStreamEnvironment.
Currently, the Flink Job is started in a separated thread, which I
interrupt after some time and then do some assertions.

In this situation is there a better way to stop/cancel a running job in
LocalStreamEnvironment programmatically. Side-Info: The job is reading
from a Kafka Cluster, which is programmatically started for each test run.

Cheers,

Konstantin

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082



signature.asc
Description: OpenPGP digital signature


Re: Submitting watermarks through a Kinesis stream

2016-08-29 Thread Steffen Hausmann
That's just awesome!

Thanks,
Steffen

On August 29, 2016 3:39:52 PM GMT+02:00, Stephan Ewen  wrote:
>You are thinking too complicated here ;-) because Flink internally
>already
>does all the logic of monitoring the minimum watermark across stream
>partitions.
>As long as you match the Flink source parallelism to the number of
>Kinesis
>shared, that part is taken care of for you.
>
>You only need to publish watermarks to the shared that describe that
>shard's particular event time.
>
>On Mon, Aug 29, 2016 at 3:30 PM, Steffen Hausmann <
>stef...@hausmann-family.de> wrote:
>
>> Hi there,
>>
>> I'm feeding a Flink stream with events from a Kinesis stream and I'm
>> looking for some guidance on how to enable event time in the Flink
>stream.
>>
>> I've read through the documentation and it seems like I want to add
>events
>> that carry watermark information to the Kinesis stream and
>subsequently use
>> AssignerWithPunctuatedWatermarks to read and extract the watermark
>> information to the Flink stream. However, as a Kinesis stream is
>composed
>> from potentially multiple shards, which are similar to Kafka
>partitions,
>> using a single event to determine the watermark off the Flink stream
>may
>> affect the semantics of the system:
>>
>> Kinesis guarantees the order within a single shard but not across the
>> entire stream. So if a single watermark event is added to the stream,
>it
>> ends up in a particular shard and this shard may be processed faster
>that
>> others. Accordingly, when the event is read and used to determine the
>> watermark in the Flink stream, there may be still unprocessed events
>in
>> other shards with an event time that is lower than that of the
>already
>> processed watermark event.
>>
>> Therefore, it seems like I should submit a watermark event to every
>shard,
>> keep track of the last watermark event for each shard, and use the
>minimum
>> time of those watermark events to determine the watermark for the
>Flink
>> stream.
>>
>> Am I thinking too complicated here? Any guidance on how to implement
>this
>> correctly is highly appreciated.
>>
>> Thanks,
>> Steffen
>>



Re: Resource isolation in flink among multiple jobs

2016-08-29 Thread Robert Metzger
Hi,

for isolation, we recommend using YARN, or soon Mesos.

For standalone installations, you'll need to manually set up multiple
independent Flink clusters within one physical cluster if you want them to
be isolated.

Regards,
Robert


On Mon, Aug 29, 2016 at 1:41 PM, Abhishek Agarwal 
wrote:

> In a standalone flink cluster, the TaskManager has multiple slots and can
> run different applications in different slot but still within same JVM.
> Given that, two applications are running under same process, how is the cpu
> and memory isolation achieved?
>
> Are there any recommendations to achieve isolation between applications?
>
> --
> Regards,
> Abhishek Agarwal
>
>


Re: Accessing state in connected streams

2016-08-29 Thread aris kol
Any other opinion on this?


Thanks :)

Aris



From: aris kol 
Sent: Sunday, August 28, 2016 12:04 AM
To: user@flink.apache.org
Subject: Re: Accessing state in connected streams


In the implementation I am passing just one CoFlatMapFunction, where flatMap1, 
which operates on EventA, just emits a None (doesn't do anything practically) 
and flatMap2 tries to access the state and throws the NPE.

It wouldn't make sense to use a mapper in this context, I would still want to 
flatten afterwards before pushing dowstream.


Aris



From: Sameer W 
Sent: Saturday, August 27, 2016 11:40 PM
To: user@flink.apache.org
Subject: Re: Accessing state in connected streams

Ok sorry about that :-). I misunderstood as I am not familiar with Scala code. 
Just curious though how are you passing two MapFunction's to the flatMap 
function on the connected stream. The interface of ConnectedStream requires 
just one CoMap function- 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.html

Sameer

On Sat, Aug 27, 2016 at 6:13 PM, aris kol 
> wrote:

Let's say I have two types sharing the same trait

trait Event {
def id: Id
}

case class EventA(id: Id, info: InfoA) extends Event
case class EventB(id: Id, info: InfoB) extends Event

Each of these events gets pushed to a Kafka topic and gets consumed by a stream 
in Flink.

Let's say I have two streams

Events of type A create state:

val typeAStream = env.addSource(...)
.flatMap(someUnmarshallerForA)
.keyBy(_.id)
.mapWithState(...)

val typeBStream = env.addSource(...)
.flatMap(someUnmarshallerForB)
.keyBy(_.id)

I want now to process the events in typeBStream using the information stored in 
the State of typeAStream.

One approach would be to use the same stream for the two topics and then 
pattern match, but Event subclasses may grow in numbers and
may have different loads, thus I may want to keep things separate.

Would something along the lines of:

typeAStream.connect(typeBStream).
flatMap(
new IdentityFlatMapFunction(),
new SomeRichFlatMapFunctionForEventB[EventB, O] with StateFulFuntion[EventB, O, 
G[EventA]] { ... }
)

work?

I tried this approach and I ended up in a NPE because the state object was not 
initialized (meaning it was not there).


Thanks,
Aris




Re: Submitting watermarks through a Kinesis stream

2016-08-29 Thread Stephan Ewen
You are thinking too complicated here ;-) because Flink internally already
does all the logic of monitoring the minimum watermark across stream
partitions.
As long as you match the Flink source parallelism to the number of Kinesis
shared, that part is taken care of for you.

You only need to publish watermarks to the shared that describe that
shard's particular event time.

On Mon, Aug 29, 2016 at 3:30 PM, Steffen Hausmann <
stef...@hausmann-family.de> wrote:

> Hi there,
>
> I'm feeding a Flink stream with events from a Kinesis stream and I'm
> looking for some guidance on how to enable event time in the Flink stream.
>
> I've read through the documentation and it seems like I want to add events
> that carry watermark information to the Kinesis stream and subsequently use
> AssignerWithPunctuatedWatermarks to read and extract the watermark
> information to the Flink stream. However, as a Kinesis stream is composed
> from potentially multiple shards, which are similar to Kafka partitions,
> using a single event to determine the watermark off the Flink stream may
> affect the semantics of the system:
>
> Kinesis guarantees the order within a single shard but not across the
> entire stream. So if a single watermark event is added to the stream, it
> ends up in a particular shard and this shard may be processed faster that
> others. Accordingly, when the event is read and used to determine the
> watermark in the Flink stream, there may be still unprocessed events in
> other shards with an event time that is lower than that of the already
> processed watermark event.
>
> Therefore, it seems like I should submit a watermark event to every shard,
> keep track of the last watermark event for each shard, and use the minimum
> time of those watermark events to determine the watermark for the Flink
> stream.
>
> Am I thinking too complicated here? Any guidance on how to implement this
> correctly is highly appreciated.
>
> Thanks,
> Steffen
>


Submitting watermarks through a Kinesis stream

2016-08-29 Thread Steffen Hausmann

Hi there,

I'm feeding a Flink stream with events from a Kinesis stream and I'm 
looking for some guidance on how to enable event time in the Flink stream.


I've read through the documentation and it seems like I want to add 
events that carry watermark information to the Kinesis stream and 
subsequently use AssignerWithPunctuatedWatermarks to read and extract 
the watermark information to the Flink stream. However, as a Kinesis 
stream is composed from potentially multiple shards, which are similar 
to Kafka partitions, using a single event to determine the watermark off 
the Flink stream may affect the semantics of the system:


Kinesis guarantees the order within a single shard but not across the 
entire stream. So if a single watermark event is added to the stream, it 
ends up in a particular shard and this shard may be processed faster 
that others. Accordingly, when the event is read and used to determine 
the watermark in the Flink stream, there may be still unprocessed events 
in other shards with an event time that is lower than that of the 
already processed watermark event.


Therefore, it seems like I should submit a watermark event to every 
shard, keep track of the last watermark event for each shard, and use 
the minimum time of those watermark events to determine the watermark 
for the Flink stream.


Am I thinking too complicated here? Any guidance on how to implement 
this correctly is highly appreciated.


Thanks,
Steffen


Re: Dynamic scaling in flink

2016-08-29 Thread Robert Metzger
Hi,
this JIRA is a good starting point:
https://issues.apache.org/jira/browse/FLINK-3755

If you don't care about processing guarantees and you are using a stateless
streaming job, you can implement a simple Kafka consumer that uses Kafka's
consumer group mechanism. I recently implemented such a Kafka consumer if
you want to use it as a base for your work:
https://github.com/rmetzger/scratch/blob/kafka-group-consumer/src/main/java/com/dataartisans/FlinkKafkaGroupConsumer.java

Regards,
Robert

On Mon, Aug 29, 2016 at 11:54 AM, Abhishek Agarwal 
wrote:

> Thanks Stephan. I understand guaranteeing exactly once semantics with the
> dynamic scaling is tough. If I were to let go of the exactly once
> requirement, is it not possible in current version? It would be really
> great if you can point me to the JIRA tracking this work.
>
> On Mon, Aug 29, 2016 at 2:30 PM, Stephan Ewen  wrote:
>
>> Hi!
>>
>> There is a lot of work in progress on that feature, and it looks like you
>> can expect the next version to have some upscale/downscale feature that
>> maintains exactly-once semantics.
>>
>> Stephan
>>
>>
>> On Mon, Aug 29, 2016 at 9:00 AM, Abhishek Agarwal 
>> wrote:
>>
>>> Is it possible to upscale or downscale a flink application without
>>> re-deploying (similar to rebalancing in storm)?
>>>
>>> --
>>> Regards,
>>> Abhishek Agarwal
>>>
>>>
>>
>
>
> --
> Regards,
> Abhishek Agarwal
>
>


Re: different Kafka serialization for keyed and non keyed messages

2016-08-29 Thread Robert Metzger
Hi Rss,

> why Flink implements different serialization schemes for keyed and non
keyed messages for Kafka?

The non-keyed serialization schema is a basic schema, which works for most
use cases.
For advanced users which need access to the key, offsets, the partition or
topic, there's the keyed ser schema.
But the keyed schema is richer and can completely subsume the simple,
non-keyed one.

> As a result I see that a message which are serialized by
TypeInformationKeyValueSerializationSchema may be deserialized by Flink's
SimpleStringSchema() or by Kafka's StringSerializer only with additional
first symbol.

The TypeInformationKeyValueSerializationSchema is only meant to be used for
Flink <--> Flink communication through Kafka, because it depends on Flink's
internal serializers (it might even depend on the exact ExecutionConfig
settings).


> The question, is it correct behavior of Flink? And should I implement own
serializer and partitioner for Flink's Kafka sink if I want to use just
simple String serialization which may be read by all other tools without
Flink?

The behavior is correct. If the SimpleStringSchema is not sufficient for
the other systems, you need to impl. your own serializer.

> And second question, why Flink requires to implement a custom partitioner
for serialized byte[] stream instead of using of primary objects as in
Kafka's partitioner? Or instead of just allowing to use Kafka's partitioner
class.

If you are not specifying any Flink partitioner, we'll use the configured
Kafka partitioner.
The advantage of using Flink's own partitioner is that you can access
information like the subtaskId and the number of subtasks.

Regards,
Robert




On Sun, Aug 28, 2016 at 6:16 PM, rss rss  wrote:

> Hello,
>
>   why Flink implements different serialization schemes for keyed and non
> keyed messages for Kafka?
>
>   I'm using two ways of loading of messages to Kafka. First way is on-fly
> loading without Flink by Kafka's means only. In this case I'm using
> something like:
>
> props.put("partitioner.class", KafkaPartitioner.class.getCanonicalName());
> props.put("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
> props.put("key.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
>
> producer = new KafkaProducer<>(props);
> String key = event.getUserId();
> String  value = DummyEvent.eventToString(event);
> producer.send(new ProducerRecord<>(topic, key, value));
>
>
>  And from Flink side I can read it without a key by code like:
>
> DataStream dataStream = env
> .addSource(new FlinkKafkaConsumer08(
> "topic",
> new *SimpleStringSchema(),* kafkaProps));
>
> As a result I have pure message without a key. Actually I need a key only
> for partitioning by Kafka and I have an appropriate class
> https://github.com/rssdev10/flink-kafka-streaming/blob/
> master/src/main/java/KafkaPartitioner.java . That is standard java-hash
> for String class.
>
>
>   Also I have other case for messages loading from hadoop to Kafka. I'm
> using Flink for this purpose. All is ok when I'm using
>
> dataStream.addSink(new FlinkKafkaProducer08<>(config.getProperty("topic", 
> Config.INPUT_TOPIC_NAME),
> new SimpleStringSchema(),
> kafkaProps));
>
> But I need partitioning in Kafka and I changed it to
>
> TypeInformation> stringStringInfo =
> TypeInfoParser.parse("org.apache.flink.api.java.tuple.Tuple2 String>");
>
> KeyedSerializationSchema> schema =
> new TypeInformationKeyValueSerializationSchema<>(String.class, 
> String.class, env.getConfig());
>
> dataStream
> .map(json -> {
> Event event = gson.fromJson(json, Event.class);
> return new Tuple2(event.getUserId(), json);
> }).returns(stringStringInfo)
> .setParallelism(partitions)
> .addSink(new FlinkKafkaProducer08<>(config.getProperty("topic", 
> Config.INPUT_TOPIC_NAME),
> schema,
> kafkaProps));
>
> As a result I see that a message which are serialized by
> TypeInformationKeyValueSerializationSchema may be deserialized by Flink's
> SimpleStringSchema() or by Kafka's StringSerializer only with additional
> first symbol. I guess this is a size of String which is added by
> org.apache.flink.types.StringValue#writeString. That is the value of a
> message is not more readable by Spark, Storm, Kafka consumer with standard
> deserialization
>
>The question, is it correct behavior of Flink? And should I implement
> own serializer and partitioner for Flink's Kafka sink if I want to use just
> simple String serialization which may be read by all other tools without
> Flink?
>
>And second question, why Flink requires to implement a custom
> partitioner for serialized byte[] stream instead of using of primary
> objects as in Kafka's partitioner? Or instead of just 

Re: Kafka and Flink's partitions

2016-08-29 Thread rss rss
Hello, thanks for the answer.

1. There is currently no way to avoid the repartitioning. When you do a
> keyBy(), Flink will shuffle the data through the network. What you would
> need is a way to tell Flink that the data is already partitioned. If you
> would use keyed state, you would also need to ensure that the same hash
> function is used for the partitions and the state.
>

Is it an assumption only or are some examples exist? Yesterday I wrote a
question about incompatibility of keyed serializer in Flink with Kafka's
deserializer.

2. Why do you assume that this would end up in one partition?


Just assumption. I don't know ways how to check it.

3. You can also read old messages from a Kafka topic by setting the
> "auto.offset.reset" to "smallest" (or "latest") and using a new "group.id
> ".
>

Ok, I know about it. But "smallest" is a way to repeat test with same data.


The question from my side in general. Is the implementation
https://github.com/rssdev10/flink-kafka-streaming/blob/master/src/main/java/FlinkStreamingConsumer.java
appropriate to the schema in the first email?


Regarding the example. This small autonomous test is based on DIMA's
project. And in this form you can use it, If it may be useful.


Thanks,
best regards.

2016-08-29 13:54 GMT+02:00 Robert Metzger :

> Hi rss,
>
> Concerning your questions:
> 1. There is currently no way to avoid the repartitioning. When you do a
> keyBy(), Flink will shuffle the data through the network. What you would
> need is a way to tell Flink that the data is already partitioned. If you
> would use keyed state, you would also need to ensure that the same hash
> function is used for the partitions and the state.
>
> 2. Why do you assume that this would end up in one partition?
>
> 3. You can also read old messages from a Kafka topic by setting the
> "auto.offset.reset" to "smallest" (or "latest") and using a new "group.id
> ".
>
> I'll add Aljoscha and Kostas to the eMail. Maybe they can help with the
> incorrect results of the windowing.
>
> Regards,
> Robert
>
>
> On Thu, Aug 25, 2016 at 8:21 PM, rss rss  wrote:
>
>> Hello,
>>
>>   I want to implement something like a schema of processing which is
>> presented on following diagram. This is calculation of number of unique
>> users per specified time with assumption that we have > 100k events per
>> second and > 100M unique users:
>>
>>
>>
>>  I have one Kafka's topic of events with a partitioner by hash(userId) %
>> partitionsNum  https://github.com/rssdev10/fl
>> ink-kafka-streaming/blob/master/src/main/java/KafkaPartitioner.java. I
>> have prepared a runnable example https://github.com/rssdev10/fl
>> ink-kafka-streaming/blob/master/src/main/java/FlinkStreamingConsumer.java
>>
>>  And the project is available by https://github.com/rssdev10/fl
>> ink-kafka-streaming/ . Also see this page about how to run data
>> generator and run the test.
>>
>>  Basic assumption. I need to calculate a number of unique identifiers, so
>> I need to store them in a memory in Set structure but the size of
>> this data structure is dozens GB. So I need to partitioning data by
>> identifier to reduce size and collect only already calculated numbers per
>> specified time. E.g. every hour.
>>
>>  Questions:
>>
>>1. The logic of Flink is very hidden. Window operator requires keyed
>>stream. Does it means that when I'm doing
>>
>>eventStream.keyBy(event -> event.partition(partNum));
>>
>>with the same partitioner as used for Kafka then Flink saves primary
>>partitions? I want to avoid any repartitioning.
>>2. Then I'm doing
>>
>>WindowedStream uniqUsersWin =
>>userIdKeyed.timeWindow(Time.seconds(windowDurationTime));
>>
>>DataStream uniqUsers = 
>> uniqUsersWin.trigger(ProcessingTimeTrigger.create())
>>.fold(new UniqAggregator(), (FoldFunction) 
>> (accumulator, value) -> {
>>accumulator.uniqIds.add(value.getUserId());
>>
>>accumulator.registerEvent(value);
>>
>>return accumulator;
>>})
>>
>>does it mean that I have only one partition?
>>3. Next, I want to collect partial results of aggregation. I'm using
>>a custom trigger https://github.com/rssdev10/fl
>>ink-kafka-streaming/blob/master/src/main/java/CountOrTimeTrigger.java
>>
>> 
>>which provides firing on collected partial aggregates accordingly to 
>> number
>>of Kafka's partitions of by emergency time if the number of aggregates is
>>not enough. And the following code for aggregation:
>>
>>AllWindowedStream combinedUniqNumStream =
>>uniqUsers
>>.timeWindowAll(Time.seconds(emergencyTriggerTimeout))
>>
>> 

Re: Flink long-running YARN configuration

2016-08-29 Thread Robert Metzger
The JobManager UI starts when running Flink on YARN.
The address of the UI is registered at YARN, so you can also access it
through YARNs command line tools or its web interface.

On Fri, Aug 26, 2016 at 7:28 PM, Trevor Grant 
wrote:

> Stephan,
>
> Will the jobmanager-UI exist?  E.g. if I am running Flink on YARN will I
> be able to submit apps/see logs and DAGs through the web interface?
>
> thanks,
> tg
>
>
>
> Trevor Grant
> Data Scientist
> https://github.com/rawkintrevo
> http://stackexchange.com/users/3002022/rawkintrevo
> http://trevorgrant.org
>
> *"Fortunate is he, who is able to know the causes of things."  -Virgil*
>
>
> On Thu, Aug 25, 2016 at 12:59 PM, Stephan Ewen  wrote:
>
>> Hi Craig!
>>
>> For YARN sessions, Flink will
>>   - (a) register the app master hostname/port/etc at Yarn, so you can get
>> them from example from the yarn UI and tools
>>   - (b) it will create a .yarn-properties file that contain the
>> hostname/ports info. Future calls to the command line pick up the info from
>> there.
>>
>> /cc Robert
>>
>> Greetings,
>> Stephan
>>
>>
>> On Thu, Aug 25, 2016 at 5:02 PM, Foster, Craig 
>> wrote:
>>
>>> I'm trying to understand Flink YARN configuration. The flink-conf.yaml
>>> file is supposedly the way to configure Flink, except when you launch Flink
>>> using YARN since that's determined for the AM. The following is
>>> contradictory or not completely clear:
>>>
>>>
>>>
>>> "The system will use the configuration in conf/flink-config.yaml.
>>> Please follow our configuration guide
>>> 
>>>  if you want to change something.
>>>
>>> Flink on YARN will overwrite the following configuration parameters
>>> jobmanager.rpc.address (because the JobManager is always allocated at
>>> different machines), taskmanager.tmp.dirs (we are using the tmp
>>> directories given by YARN) and parallelism.default if the number of
>>> slots has been specified."
>>>
>>>
>>>
>>> OK, so it will use conf/flink-config.yaml, except for
>>> jobmanager.rpc.address/port which will be decided by YARN and not
>>> necessarily reported to the user since those are dynamically allocated by
>>> YARN. That's fine with me, but if I want to make a "long-running" Flink
>>> cluster available for more than one user, where do I check in Flink for the
>>> Application Master hostname--or do I just have to scrape output of logs
>>> (which would definitely be undesirable)? First, I thought this would be
>>> written by Flink to conf/flink-config.yaml. It is not. Then I thought it
>>> must surely be written to the HDFS configuration directory (under something
>>> like hdfs://$USER/.flink/) for that application but that is merely copied
>>> from the original conf/flink-config.yaml and doesn't have an accurate
>>> configuration for the specified application. So is there an accurate config
>>> somewhere in HDFS or on the ResourceManager--i.e. where could I
>>> programmatically find that (outside of manipulating YARN app names or
>>> scraping)?
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Craig
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>


Re: How to set a custom JAVA_HOME when run flink on YARN?

2016-08-29 Thread Robert Metzger
The "env.java.home" variable is only evaluated by the start scripts, not
the YARN code.

The solution you've mentioned earlier is a good work around in my opinion.

On Fri, Aug 26, 2016 at 3:48 AM, Renkai  wrote:

> It seems that this config variant only effect local cluster and stand alone
> cluster,not effect yarn.
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/How-to-set-a-
> custom-JAVA-HOME-when-run-flink-on-YARN-tp8676p8709.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: TimeWindowAll doeesn't assign properly

2016-08-29 Thread Sendoh
Hi,

As I found, the problem was rebalance() because the data arrives in 1 minute
(it re-processes old events) and it's a bit strange that when configured
watermark as 10 minutes it worked. 
After removing rebalance(), it works as expected that setting watermark
less.

DataStream streams = env.addSource(
new FlinkKafkaConsumer09<>(topicList, new JSONSchema(),
properties))
*.rebalance()*
.assignTimestampsAndWatermarks(new CorrelationWatermark());

Best,

Sendoh



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/TimeWindowAll-doeesn-t-assign-properly-with-EventTime-tp8201p8754.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Programatically collect taskmanagers details from Job Manager

2016-08-29 Thread Stephan Ewen
You should be able to call the Monitor handler of the JobManager:
http://jobmanagerhost:8081/taskmanagers

That gives you a JSON response like this:

{ "taskmanagers [
  { "id" : "7c8835b89acf533cb8a5119dbcaf4b4f",
"path" : "akka.tcp://flink@127.0.1.1:56343/user/taskmanager",
"dataPort" : 56199,
"timeSinceLastHeartbeat" : 1472461577563,
"slotsNumber" : 4,
},
  { "id " :"460bfbf8b540bf28befd616cbc9ea532",
"path" : "akka.tcp://flink@127.0.1.1:58404/user/taskmanager",
"dataPort" : 39233,
"timeSinceLastHeartbeat" : 1472461577967,
"slotsNumber":4,
}
]
}

That should have the hostname/IP in the TaskManager URL:

Hope that helps

Stephan


On Sun, Aug 28, 2016 at 3:03 PM, Sreejith S  wrote:

> Hi All,
>
> Is there any programmatic way to get the taskmanager details from a flink
> clluster ?
>
> At least the IP of taskmanagers ?
>
> Thanks,
>
> --
>
>
> *Sreejith.S*
> https://github.com/srijiths/
> tweet2sree@twitter 
>
>
>
>
>
>
>


Re: Joda exclude in java quickstart maven archetype

2016-08-29 Thread Fabian Hueske
Hi Flavio,

yes, Joda should not be excluded.
This will be fixed in Flink 1.1.2.

Cheers, Fabian


2016-08-29 11:00 GMT+02:00 Flavio Pompermaier :

> Hi to all,
> I've tried to  upgrade from Flink 1.0.2 to 1.1.1 so I've copied the
> excludes of the maven shade plugin from the java quickstart pom but it
> includes the exclude of joda (that is not contained in the flink-dist
> jar).  That causes my job to fail.
> Shouldn't it be removed from the exclude list?
>
> Best,
> Flavio
>


Re: Dynamic scaling in flink

2016-08-29 Thread Stephan Ewen
Hi!

There is a lot of work in progress on that feature, and it looks like you
can expect the next version to have some upscale/downscale feature that
maintains exactly-once semantics.

Stephan


On Mon, Aug 29, 2016 at 9:00 AM, Abhishek Agarwal 
wrote:

> Is it possible to upscale or downscale a flink application without
> re-deploying (similar to rebalancing in storm)?
>
> --
> Regards,
> Abhishek Agarwal
>
>


Re: Apache siddhi into Flink

2016-08-29 Thread Chesnay Schepler

Hello Aparup,

could you provide more information about Siddhi? How mature is it; how 
is the community? How does it compare to the Flink's CEP library?


How should this integration look like? Are you proposing to replace the 
current CEP library, or will they co-exist with different use-cases for 
each?


If we used Siddhi in Flink, how exactly would Flink's runtime be 
involved in the processing?


Regards,
Chesnay

On 28.08.2016 23:21, Aparup Banerjee (apbanerj) wrote:

Sorry for the semantic difference.



On Aug 28, 2016, at 12:05 PM, Trevor Grant > wrote:



Thank you for confirming Hao,

Aparup, please don't refer to it as "Apache Siddhi", that is misleading.


Trevor Grant
Data Scientist
https://github.com/rawkintrevo
http://stackexchange.com/users/3002022/rawkintrevo
http://trevorgrant.org

/"Fortunate is he, who is able to know the causes of things."  -Virgil/


On Sun, Aug 28, 2016 at 10:50 AM, Hao Chen > wrote:


Siddhi is not apache project, but licensed under apache license
v2, being open sourced and maintained by wso2.

- Hao

On Sun, Aug 28, 2016 at 11:11 PM, Trevor Grant
> wrote:

Aparup,

Was Siddhi recently added as an incubator project?  I can't
find it in the project directory or or on github.com/apache
. The closest thing I can find is
this: https://github.com/wso2/siddhi


tg




Trevor Grant
Data Scientist
https://github.com/rawkintrevo
http://stackexchange.com/users/3002022/rawkintrevo

http://trevorgrant.org

/"Fortunate is he, who is able to know the causes of things."
 -Virgil/


On Sat, Aug 27, 2016 at 5:36 PM, Chen Qin > wrote:

​+1​



On Aug 26, 2016, at 11:23 PM, Aparup Banerjee (apbanerj)
> wrote:

Hi-

Has anyone looked into embedding apache siddhi into Flink.

Thanks,
Aparup









Dynamic scaling in flink

2016-08-29 Thread Abhishek Agarwal
Is it possible to upscale or downscale a flink application without
re-deploying (similar to rebalancing in storm)?

-- 
Regards,
Abhishek Agarwal