Keying a data stream by tenant and performing ML on each sub-stream - help

2023-07-18 Thread Catalin Stavaru
Hello everyone,

Here is my use case: I have an event data stream which I need to key by a
certain field (tenant id) and then for each tenant's events I need to
independently perform ML clustering using FlinkML's OnlineKMeans component.
I am using Java.

I tried different approaches but none of them seems to be correct.

Basically, I try to keep an OnlineKMeansModel instance as per-tenant state
using a keyed processing function on the input DataStream. In the
processing function for the current event, if I cannot retrieve the
OnlineKMeansModel instance for the event's tenant id, I will create one and
store it as state for that tenant id, to use it in the future.

However, this doesn't seem to be the correct way to do it in Flink, I am
facing many hurdles using this approach.

- The OnlineKMeans takes a table (as in Table API) as input;  that table is
basically a view of the event data stream, filtered by a certain tenant id.
How do I go about this ?
- The OnlineKMeansModel is provided a table to output its predictions to.
How do I go about this table ?
- I get many "this class is not serializable" errors, a sign that I am not
using the correct approach.

etc.

In essence, I feel that I am overlooking a fundamental aspect when it comes
to implementing a functional approach for performing FlinkML computations
independently for each key within a keyed data stream.

In the hope that my use case was understood, I am asking you for help on
the correct approach for this scenario.

Thank you !

-- 
Catalin Stavaru


Re: Backpressure due to busy sub-tasks

2022-12-16 Thread Alexis Sarda-Espinosa
Hi Martijn,

yes, that's what I meant, the throughput in the process function(s) didn't
change, so even if they were busy 100% of the time with parallelism=2, they
were processing data quickly enough.

Regards,
Alexis.

Am Fr., 16. Dez. 2022 um 14:20 Uhr schrieb Martijn Visser <
martijnvis...@apache.org>:

> Hi,
>
> Backpressure implies that it's actually a later operator that is busy. So
> in this case, that would be your process function that can't handle the
> incoming load from your Kafka source.
>
> Best regards,
>
> Martijn
>
> On Tue, Dec 13, 2022 at 7:46 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a Kafka source (the new one) in Flink 1.15 that's followed by a
>> process function with parallelism=2. Some days, I see long periods of
>> backpressure in the source. During those times, the pool-usage metrics of
>> all tasks stay between 0 and 1%, but the process function appears 100% busy.
>>
>> To try to avoid backpressure, I increased parallelism to 3. It seems to
>> help, and busy-time decreased to around 80%, but something that caught my
>> attention is that throughput remained unchanged. Concretely, if X is the
>> number of events being written to the Kafka topic every second, each
>> instance of the process function receives roughly X/2 events/s with
>> parallelism=2, and X/3 with parallelism=3.
>>
>> I'm wondering a couple of things.
>>
>> 1. Is it possible that backpressure in this case is essentially a "false
>> positive" because the function is busy 100% of the time even though it's
>> processing enough data?
>> 2. Does Flink expose any way to tune this type of backpressure mechanism?
>>
>> Regards,
>> Alexis.
>>
>


Re: Backpressure due to busy sub-tasks

2022-12-16 Thread Martijn Visser
Hi,

Backpressure implies that it's actually a later operator that is busy. So
in this case, that would be your process function that can't handle the
incoming load from your Kafka source.

Best regards,

Martijn

On Tue, Dec 13, 2022 at 7:46 PM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hello,
>
> I have a Kafka source (the new one) in Flink 1.15 that's followed by a
> process function with parallelism=2. Some days, I see long periods of
> backpressure in the source. During those times, the pool-usage metrics of
> all tasks stay between 0 and 1%, but the process function appears 100% busy.
>
> To try to avoid backpressure, I increased parallelism to 3. It seems to
> help, and busy-time decreased to around 80%, but something that caught my
> attention is that throughput remained unchanged. Concretely, if X is the
> number of events being written to the Kafka topic every second, each
> instance of the process function receives roughly X/2 events/s with
> parallelism=2, and X/3 with parallelism=3.
>
> I'm wondering a couple of things.
>
> 1. Is it possible that backpressure in this case is essentially a "false
> positive" because the function is busy 100% of the time even though it's
> processing enough data?
> 2. Does Flink expose any way to tune this type of backpressure mechanism?
>
> Regards,
> Alexis.
>


Backpressure due to busy sub-tasks

2022-12-13 Thread Alexis Sarda-Espinosa
Hello,

I have a Kafka source (the new one) in Flink 1.15 that's followed by a
process function with parallelism=2. Some days, I see long periods of
backpressure in the source. During those times, the pool-usage metrics of
all tasks stay between 0 and 1%, but the process function appears 100% busy.

To try to avoid backpressure, I increased parallelism to 3. It seems to
help, and busy-time decreased to around 80%, but something that caught my
attention is that throughput remained unchanged. Concretely, if X is the
number of events being written to the Kafka topic every second, each
instance of the process function receives roughly X/2 events/s with
parallelism=2, and X/3 with parallelism=3.

I'm wondering a couple of things.

1. Is it possible that backpressure in this case is essentially a "false
positive" because the function is busy 100% of the time even though it's
processing enough data?
2. Does Flink expose any way to tune this type of backpressure mechanism?

Regards,
Alexis.


Stateful function with GCP Pub/Sub ingress/egress

2022-03-16 Thread David Dixon
The statefun docs have some nice examples of how to use Kafka and Kinesis
for ingress/egress in conjunction with a function. Is there some
documentation or example code I could reference to do the same with a GCP
Pub/Sub topic? Thanks.

Dave


Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

2021-03-29 Thread David Anderson
Yes, since the two streams have the same type, you can union the two
streams, key the resulting stream, and then apply something like a
RichFlatMapFunction. Or you can connect the two streams (again, they'll
need to be keyed so you can use state), and apply a RichCoFlatMapFunction.
You can use whichever of these approaches is simpler for your use case.

On Mon, Mar 29, 2021 at 7:56 AM vishalovercome  wrote:

> I've gone through the example as well as the documentation and I still
> couldn't understand whether my use case requires joining. 1. What would
> happen if I didn't join? 2. As the 2 incoming data streams have the same
> type, if joining is absolutely necessary then just a union
> (oneStream.union(anotherStream)) followed by a keyBy should be good enough
> right? I am asking this because I would prefer to use the simple
> RichMapFunction or RichFlatMapFunction as opposed to the
> RichCoFlatMapFunction. Thanks a lot!
> --
> Sent from the Apache Flink User Mailing List archive. mailing list archive
>  at
> Nabble.com.
>


Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

2021-03-29 Thread vishalovercome
I've gone through the example as well as the documentation and I still
couldn't understand whether my use case requires joining. 1. What would
happen if I didn't join?2. As the 2 incoming data streams have the same
type, if joining is absolutely necessary then just a union
(oneStream.union(anotherStream)) followed by a keyBy should be good enough
right? I am asking this because I would prefer to use the simple
RichMapFunction or RichFlatMapFunction as opposed to the
RichCoFlatMapFunction.Thanks a lot!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

2021-03-24 Thread vishalovercome
Let me make the example more concrete. Say O1 gets as input a data stream T1
which it splits into two using some function and produces DataStreams of
type T2 and T3, each of which are partitioned by the same key function TK.
Now after O2 processes a stream, it could sometimes send the stream to O3
(T4) using the same key function again. Now I want to know whether: 

1. Data from streams T3 with key K and T4 with key K end up affecting the
state variables for the same key K or different. I would think that would be
the case but wanted a confirmation
2. An explicit join is needed or not, i.e. whether this will achieve what I
want:

result2 = T1.filter(fn2).keyBy(TK).map(richfn2).keyBy(TK).map(whatever O3
does)
result3 = T1.filter(fn3).keyBy(TK).map(whatever O3 does)




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

2021-03-24 Thread David Anderson
For an example of a similar join implemented as a RichCoFlatMap, see [1].
For more background, the Flink docs have a tutorial [2] on how to work with
connected streams.

[1] https://github.com/apache/flink-training/tree/master/rides-and-fares
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/learn-flink/etl.html#connected-streams

On Wed, Mar 24, 2021 at 8:55 AM Matthias Pohl 
wrote:

> 1. yes - the same key would affect the same state variable
> 2. you need a join to have the same operator process both streams
>
> Matthias
>
> On Wed, Mar 24, 2021 at 7:29 AM vishalovercome 
> wrote:
>
>> Let me make the example more concrete. Say O1 gets as input a data stream
>> T1
>> which it splits into two using some function and produces DataStreams of
>> type T2 and T3, each of which are partitioned by the same key function TK.
>> Now after O2 processes a stream, it could sometimes send the stream to O3
>> (T4) using the same key function again. Now I want to know whether:
>>
>> 1. Data from streams T3 with key K and T4 with key K end up affecting the
>> state variables for the same key K or different. I would think that would
>> be
>> the case but wanted a confirmation
>> 2. An explicit join is needed or not, i.e. whether this will achieve what
>> I
>> want:
>>
>> result2 = T1.filter(fn2).keyBy(TK).map(richfn2).keyBy(TK).map(whatever O3
>> does)
>> result3 = T1.filter(fn3).keyBy(TK).map(whatever O3 does)
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>


Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

2021-03-24 Thread Matthias Pohl
1. yes - the same key would affect the same state variable
2. you need a join to have the same operator process both streams

Matthias

On Wed, Mar 24, 2021 at 7:29 AM vishalovercome  wrote:

> Let me make the example more concrete. Say O1 gets as input a data stream
> T1
> which it splits into two using some function and produces DataStreams of
> type T2 and T3, each of which are partitioned by the same key function TK.
> Now after O2 processes a stream, it could sometimes send the stream to O3
> (T4) using the same key function again. Now I want to know whether:
>
> 1. Data from streams T3 with key K and T4 with key K end up affecting the
> state variables for the same key K or different. I would think that would
> be
> the case but wanted a confirmation
> 2. An explicit join is needed or not, i.e. whether this will achieve what I
> want:
>
> result2 = T1.filter(fn2).keyBy(TK).map(richfn2).keyBy(TK).map(whatever O3
> does)
> result3 = T1.filter(fn3).keyBy(TK).map(whatever O3 does)
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

2021-03-23 Thread vishalovercome
Suppose i have a job with 3 operators with the following job graph:

O1 => O2 // data stream partitioned by keyBy
O1 => O3 // data stream partitioned by keyBy
O2 => O3 // data stream partitioned by keyBy 

If operator O3 receives inputs from two operators and both inputs have the
same type and value for a key then will the two streams end up in the same
sub-task and therefore affect the same state variables keyed to that
particular key? Do the streams themselves have to have the same type or is
it enough that just the keys of each of the input streams have the same type
and value? 

If they're not guaranteed to affect the same state then how can we achieve
the same? I would prefer to use the simple
RichMapFunction/RichFlatmapFunction for modelling my operators as opposed to
any join function.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

2021-03-23 Thread Matthias Pohl
Hi Vishal,
I'm not 100% sure what you're trying to do. But the partitioning by a key
just relies on the key on the used parallelism. So, I guess, what you
propose should work.
You would have to rely on some join function, though, when merging two
input operators into one again.

I hope that was helpful.
Best,
Matthias

On Tue, Mar 23, 2021 at 3:29 PM vishalovercome  wrote:

> Suppose i have a job with 3 operators with the following job graph:
>
> O1 => O2 // data stream partitioned by keyBy
> O1 => O3 // data stream partitioned by keyBy
> O2 => O3 // data stream partitioned by keyBy
>
> If operator O3 receives inputs from two operators and both inputs have the
> same type and value for a key then will the two streams end up in the same
> sub-task and therefore affect the same state variables keyed to that
> particular key? Do the streams themselves have to have the same type or is
> it enough that just the keys of each of the input streams have the same
> type
> and value?
>
> If they're not guaranteed to affect the same state then how can we achieve
> the same? I would prefer to use the simple
> RichMapFunction/RichFlatmapFunction for modelling my operators as opposed
> to
> any join function.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Change in sub-task id assignment from 1.9 to 1.10?

2020-08-12 Thread Zhu Zhu
Hi Ken,

There were no such changes in my mind.
And in Flink there was no designed logic to scatter subtasks of the same
operator into different taskmanagers.

One workaround to solve your problem could be to increase the parallelism
of
your source vertex to be no smaller than no other operator so that each
slot can contain a source task. With config cluster.evenly-spread-out-slots
set to true, slots can be evenly distributed in all available taskmanagers
in most cases.

Thanks,
Zhu Zhu

Ken Krugler  于2020年8月7日周五 上午5:28写道:

> Hi all,
>
> Was there any change in how sub-tasks get allocated to TMs, from Flink 1.9
> to 1.10?
>
> Specifically for consecutively numbered sub-tasks (e.g. 0, 1, 2) did it
> become more or less likely that they’d be allocated to the same Task
> Manager?
>
> Asking because a workflow that ran fine in 1.9 now has a “hot” TM that’s
> having trouble keeping up with a Kafka topic.
>
> The most promising explanation is that now there are three sub-tasks on
> the same TM that are reading from that topic, versus previously they’d be
> scattered across multiple TMs.
>
> But I don’t see significant changes in this area post 1.8
>
> Thanks,
>
> — Ken
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>


Change in sub-task id assignment from 1.9 to 1.10?

2020-08-06 Thread Ken Krugler
Hi all,

Was there any change in how sub-tasks get allocated to TMs, from Flink 1.9 to 
1.10?

Specifically for consecutively numbered sub-tasks (e.g. 0, 1, 2) did it become 
more or less likely that they’d be allocated to the same Task Manager?

Asking because a workflow that ran fine in 1.9 now has a “hot” TM that’s having 
trouble keeping up with a Kafka topic.

The most promising explanation is that now there are three sub-tasks on the 
same TM that are reading from that topic, versus previously they’d be scattered 
across multiple TMs.

But I don’t see significant changes in this area post 1.8

Thanks,

— Ken

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Re: sub

2020-04-14 Thread Sivaprasanna
Hi,

To subscribe, you have to send a mail to user-subscr...@flink.apache.org

On Wed, 15 Apr 2020 at 7:33 AM, lamber-ken  wrote:

> user@flink.apache.org
>


sub

2020-04-14 Thread lamber-ken
user@flink.apache.org

Re: Colocating Sub-Tasks across jobs / Sharing Task Slots across jobs

2020-02-27 Thread Andrey Zagrebin
Hi Ben,

I think at the moment, it is not possible because of current scheduling
design which Xintong has already mentioned.
The jobs are completely isolated and there is no synchronisation between
their deployment.
Alignment of tasks by e.g. key groups in general is difficult as it is up
to the scheduler to decide where to deploy each job subtask.

The restart strategy is only for the failure scenario. Any jobs changes
require full job restart at the moment.

I pull in Gary and Zhu to add more details if I miss something here.

Best,
Andrey

On Tue, Feb 25, 2020 at 1:38 PM Xintong Song  wrote:

> Do you believe the code of the operators of the restarted Region can be
>> changed between restarts?
>
>
> I'm not an expert on the restart strategies, but AFAIK the answer is
> probably not. Sorry I overlooked that you need to modify the job.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Feb 25, 2020 at 6:00 PM Benoît Paris <
> benoit.pa...@centraliens-lille.org> wrote:
>
>> Hi Xintong
>>
>> Thank you for your answer. This seems promising, I'll look into it.
>>
>> Do you believe the code of the operators of the restarted Region can be
>> changed between restarts?
>>
>> Best
>> Benoît
>>
>>
>> On Tue, Feb 25, 2020 at 2:30 AM Xintong Song 
>> wrote:
>>
>>> Hi Ben,
>>>
>>> You can not share slots across jobs. Flink adopts a two-level slot
>>> scheduling mechanism. Slots are firstly allocated to each job, then the
>>> JobMaster decides which tasks should be executed in which slots, i.e. slot
>>> sharing.
>>>
>>> I think what you are looking for is Pipelined Region Restart Strategy
>>> [1], which restarts only the tasks connected by pipelined edges instead of
>>> the whole job graph.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/task_failure_recovery.html#restart-pipelined-region-failover-strategy
>>>
>>>
>>>
>>> On Mon, Feb 24, 2020 at 11:28 PM Benoît Paris <
>>> benoit.pa...@centraliens-lille.org> wrote:
>>>
>>>> Hello all!
>>>>
>>>> I have a setup composed of several streaming pipelines. These have
>>>> different deployment lifecycles: I want to be able to modify and redeploy
>>>> the topology of one while the other is still up. I am thus putting them in
>>>> different jobs.
>>>>
>>>> The problem is I have a Co-Location constraint between one subtask of
>>>> each pipeline; I'd like them to run on the same TaskSlots, much like if
>>>> they were sharing a TaskSlot; or at least have them on the same JVM.
>>>>
>>>> A semi-official feature
>>>> "DataStream.getTransformation().setCoLocationGroupKey(stringKey)" [1]
>>>> exists for this, but seem to be tied to the Sub-Tasks actually being able
>>>> to be co-located on the same Task Slot.
>>>>
>>>> The documentation mentions [2] that it might be impossible to do ("Flink
>>>> allows subtasks to share slots even if they are subtasks of different
>>>> tasks, so long as they are *from the same job*").
>>>>
>>>> The streaming pipelines are numerous (about 10), and I can't afford to
>>>> increase the number of TaskSlots per TaskManager. I also would like to
>>>> avoid putting all the pipelines in the same job, restarting it every time a
>>>> single one changes.
>>>>
>>>> I'd like to have mailing list's informed opinion about this, if there
>>>> are workarounds, or if I could reconsider my problem under another angle.
>>>>
>>>> Cheers
>>>> Ben
>>>>
>>>> [1]
>>>> https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java#L116
>>>>
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-master/concepts/runtime.html#task-slots-and-resources
>>>>
>>>
>>
>> --
>> Benoît Paris
>> Ingénieur Machine Learning Explicable
>> Tél : +33 6 60 74 23 00
>> http://benoit.paris
>> http://explicable.ml
>>
>


Re: Colocating Sub-Tasks across jobs / Sharing Task Slots across jobs

2020-02-25 Thread Xintong Song
>
> Do you believe the code of the operators of the restarted Region can be
> changed between restarts?


I'm not an expert on the restart strategies, but AFAIK the answer is
probably not. Sorry I overlooked that you need to modify the job.

Thank you~

Xintong Song



On Tue, Feb 25, 2020 at 6:00 PM Benoît Paris <
benoit.pa...@centraliens-lille.org> wrote:

> Hi Xintong
>
> Thank you for your answer. This seems promising, I'll look into it.
>
> Do you believe the code of the operators of the restarted Region can be
> changed between restarts?
>
> Best
> Benoît
>
>
> On Tue, Feb 25, 2020 at 2:30 AM Xintong Song 
> wrote:
>
>> Hi Ben,
>>
>> You can not share slots across jobs. Flink adopts a two-level slot
>> scheduling mechanism. Slots are firstly allocated to each job, then the
>> JobMaster decides which tasks should be executed in which slots, i.e. slot
>> sharing.
>>
>> I think what you are looking for is Pipelined Region Restart Strategy
>> [1], which restarts only the tasks connected by pipelined edges instead of
>> the whole job graph.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/task_failure_recovery.html#restart-pipelined-region-failover-strategy
>>
>>
>>
>> On Mon, Feb 24, 2020 at 11:28 PM Benoît Paris <
>> benoit.pa...@centraliens-lille.org> wrote:
>>
>>> Hello all!
>>>
>>> I have a setup composed of several streaming pipelines. These have
>>> different deployment lifecycles: I want to be able to modify and redeploy
>>> the topology of one while the other is still up. I am thus putting them in
>>> different jobs.
>>>
>>> The problem is I have a Co-Location constraint between one subtask of
>>> each pipeline; I'd like them to run on the same TaskSlots, much like if
>>> they were sharing a TaskSlot; or at least have them on the same JVM.
>>>
>>> A semi-official feature
>>> "DataStream.getTransformation().setCoLocationGroupKey(stringKey)" [1]
>>> exists for this, but seem to be tied to the Sub-Tasks actually being able
>>> to be co-located on the same Task Slot.
>>>
>>> The documentation mentions [2] that it might be impossible to do ("Flink
>>> allows subtasks to share slots even if they are subtasks of different
>>> tasks, so long as they are *from the same job*").
>>>
>>> The streaming pipelines are numerous (about 10), and I can't afford to
>>> increase the number of TaskSlots per TaskManager. I also would like to
>>> avoid putting all the pipelines in the same job, restarting it every time a
>>> single one changes.
>>>
>>> I'd like to have mailing list's informed opinion about this, if there
>>> are workarounds, or if I could reconsider my problem under another angle.
>>>
>>> Cheers
>>> Ben
>>>
>>> [1]
>>> https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java#L116
>>>
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-master/concepts/runtime.html#task-slots-and-resources
>>>
>>
>
> --
> Benoît Paris
> Ingénieur Machine Learning Explicable
> Tél : +33 6 60 74 23 00
> http://benoit.paris
> http://explicable.ml
>


Re: Colocating Sub-Tasks across jobs / Sharing Task Slots across jobs

2020-02-25 Thread Benoît Paris
Hi Xintong

Thank you for your answer. This seems promising, I'll look into it.

Do you believe the code of the operators of the restarted Region can be
changed between restarts?

Best
Benoît


On Tue, Feb 25, 2020 at 2:30 AM Xintong Song  wrote:

> Hi Ben,
>
> You can not share slots across jobs. Flink adopts a two-level slot
> scheduling mechanism. Slots are firstly allocated to each job, then the
> JobMaster decides which tasks should be executed in which slots, i.e. slot
> sharing.
>
> I think what you are looking for is Pipelined Region Restart Strategy [1],
> which restarts only the tasks connected by pipelined edges instead of the
> whole job graph.
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/task_failure_recovery.html#restart-pipelined-region-failover-strategy
>
>
>
> On Mon, Feb 24, 2020 at 11:28 PM Benoît Paris <
> benoit.pa...@centraliens-lille.org> wrote:
>
>> Hello all!
>>
>> I have a setup composed of several streaming pipelines. These have
>> different deployment lifecycles: I want to be able to modify and redeploy
>> the topology of one while the other is still up. I am thus putting them in
>> different jobs.
>>
>> The problem is I have a Co-Location constraint between one subtask of
>> each pipeline; I'd like them to run on the same TaskSlots, much like if
>> they were sharing a TaskSlot; or at least have them on the same JVM.
>>
>> A semi-official feature
>> "DataStream.getTransformation().setCoLocationGroupKey(stringKey)" [1]
>> exists for this, but seem to be tied to the Sub-Tasks actually being able
>> to be co-located on the same Task Slot.
>>
>> The documentation mentions [2] that it might be impossible to do ("Flink
>> allows subtasks to share slots even if they are subtasks of different
>> tasks, so long as they are *from the same job*").
>>
>> The streaming pipelines are numerous (about 10), and I can't afford to
>> increase the number of TaskSlots per TaskManager. I also would like to
>> avoid putting all the pipelines in the same job, restarting it every time a
>> single one changes.
>>
>> I'd like to have mailing list's informed opinion about this, if there are
>> workarounds, or if I could reconsider my problem under another angle.
>>
>> Cheers
>> Ben
>>
>> [1]
>> https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java#L116
>>
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/concepts/runtime.html#task-slots-and-resources
>>
>

-- 
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00
http://benoit.paris
http://explicable.ml


Re: Colocating Sub-Tasks across jobs / Sharing Task Slots across jobs

2020-02-24 Thread Xintong Song
Hi Ben,

You can not share slots across jobs. Flink adopts a two-level slot
scheduling mechanism. Slots are firstly allocated to each job, then the
JobMaster decides which tasks should be executed in which slots, i.e. slot
sharing.

I think what you are looking for is Pipelined Region Restart Strategy [1],
which restarts only the tasks connected by pipelined edges instead of the
whole job graph.

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/task_failure_recovery.html#restart-pipelined-region-failover-strategy



On Mon, Feb 24, 2020 at 11:28 PM Benoît Paris <
benoit.pa...@centraliens-lille.org> wrote:

> Hello all!
>
> I have a setup composed of several streaming pipelines. These have
> different deployment lifecycles: I want to be able to modify and redeploy
> the topology of one while the other is still up. I am thus putting them in
> different jobs.
>
> The problem is I have a Co-Location constraint between one subtask of each
> pipeline; I'd like them to run on the same TaskSlots, much like if they
> were sharing a TaskSlot; or at least have them on the same JVM.
>
> A semi-official feature
> "DataStream.getTransformation().setCoLocationGroupKey(stringKey)" [1]
> exists for this, but seem to be tied to the Sub-Tasks actually being able
> to be co-located on the same Task Slot.
>
> The documentation mentions [2] that it might be impossible to do ("Flink
> allows subtasks to share slots even if they are subtasks of different
> tasks, so long as they are *from the same job*").
>
> The streaming pipelines are numerous (about 10), and I can't afford to
> increase the number of TaskSlots per TaskManager. I also would like to
> avoid putting all the pipelines in the same job, restarting it every time a
> single one changes.
>
> I'd like to have mailing list's informed opinion about this, if there are
> workarounds, or if I could reconsider my problem under another angle.
>
> Cheers
> Ben
>
> [1]
> https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java#L116
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/concepts/runtime.html#task-slots-and-resources
>


Colocating Sub-Tasks across jobs / Sharing Task Slots across jobs

2020-02-24 Thread Benoît Paris
Hello all!

I have a setup composed of several streaming pipelines. These have
different deployment lifecycles: I want to be able to modify and redeploy
the topology of one while the other is still up. I am thus putting them in
different jobs.

The problem is I have a Co-Location constraint between one subtask of each
pipeline; I'd like them to run on the same TaskSlots, much like if they
were sharing a TaskSlot; or at least have them on the same JVM.

A semi-official feature
"DataStream.getTransformation().setCoLocationGroupKey(stringKey)" [1]
exists for this, but seem to be tied to the Sub-Tasks actually being able
to be co-located on the same Task Slot.

The documentation mentions [2] that it might be impossible to do ("Flink
allows subtasks to share slots even if they are subtasks of different
tasks, so long as they are *from the same job*").

The streaming pipelines are numerous (about 10), and I can't afford to
increase the number of TaskSlots per TaskManager. I also would like to
avoid putting all the pipelines in the same job, restarting it every time a
single one changes.

I'd like to have mailing list's informed opinion about this, if there are
workarounds, or if I could reconsider my problem under another angle.

Cheers
Ben

[1]
https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java#L116

[2]
https://ci.apache.org/projects/flink/flink-docs-master/concepts/runtime.html#task-slots-and-resources


Re: Sub-user

2020-01-02 Thread vino yang
Hi Jary,

All the Flink's mailing list information can be found here[1].

[1]: https://flink.apache.org/community.html#mailing-lists

Best,
Vino

Benchao Li  于2020年1月2日周四 下午4:56写道:

> Hi Jary,
>
> You need to send a email to *user-subscr...@flink.apache.org
> * to subscribe, not user@flink.apache.org
> .
>
> Jary Zhen  于2020年1月2日周四 下午4:53写道:
>
>>
>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


Re: Sub-user

2020-01-02 Thread Benchao Li
Hi Jary,

You need to send a email to *user-subscr...@flink.apache.org
* to subscribe, not user@flink.apache.org.

Jary Zhen  于2020年1月2日周四 下午4:53写道:

>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Sub-user

2020-01-02 Thread Jary Zhen



Re: All sub-tasks stuck in CREATED state after "BlobServerConnection: GET operation failed"

2018-03-29 Thread Nico Kruber
That BlobServerConnection error is caused by a TaskManager which
requested a BLOB (a jar file) but then closed the connection. I guess
that may happen when the job is cancelled and the TaskManager processes
are terminated.

If this is not happening during that scenario, then your TaskManager
probably died from something else, but since you didn't see anything in
the logs there, I don't think this is an issue.


Nico

On 29/03/18 16:24, Gary Yao wrote:
> Hi Juho,
> 
> Thanks for the follow up. Regarding the BlobServerConnection error, Nico
> (cc'ed)
> might have an idea. 
> 
> Best,
> Gary
> 
> On Thu, Mar 29, 2018 at 4:08 PM, Juho Autio  <mailto:juho.au...@rovio.com>> wrote:
> 
> Sorry, my bad. I checked the persisted jobmanager logs and can see
> that job was still being restarted at 15:31 and then at 15:36. If I
> wouldn't have terminated the cluster, I believe the flink job / yarn
> app would've eventually exited as failed.
> 
> 
> On Thu, Mar 29, 2018 at 4:49 PM, Juho Autio  <mailto:juho.au...@rovio.com>> wrote:
> 
> Thanks again, Gary.
> 
> It's true that I only let the job remain in the stuck state for
> something between 10-15 minutes. Then I shut down the cluster.
> 
> But: if restart strategy is being applied, shouldn't I have seen
> those messages in job manager log? In my case it kept all quiet
> since ~2018-03-28 15:27 and I terminated it at ~28-03-2018 15:36.
> 
> Do you happen to know about what that BlobServerConnection error
> means in the code? If it may lead into some unrecoverable state
> (where neither restart is attempted, nor job is failed for good)..
> 
> On Thu, Mar 29, 2018 at 4:39 PM, Gary Yao
> mailto:g...@data-artisans.com>> wrote:
> 
> Hi Juho,
> 
> The log message
> 
>   Could not allocate all requires slots within timeout of
> 30 ms. Slots required: 20, slots allocated: 8
> 
> indicates that you do not have enough resources in your
> cluster left. Can you
> verify that after you started the job submission the YARN
> cluster does not reach
> its maximum capacity? You can also try submitting the job
> with a lower
> parallelism.
> 
> I think the reason why the YARN application is not
> immediately shown as failed
> is that your restart strategy attempts to start the job 3
> times. On every
> attempt the job is blocked on the slot allocation timeout
> for at least 30 ms
> (5 minutes). I have tried submitting
> examples/streaming/WordCount.jar with the
> same restart strategy on EMR, and the CLI only returns after
> around 20 minutes.
> 
> As a side note, beginning from Flink 1.5, you do not need to
> specify -yn -ys
> because resource allocations are dynamic by default
> (FLIP-6). The parameter -yst
> is deprecated and should not be needed either.
> 
> Best,
> Gary
> 
> On Thu, Mar 29, 2018 at 8:59 AM, Juho Autio
> mailto:juho.au...@rovio.com>> wrote:
> 
> I built a new Flink distribution from release-1.5 branch
> yesterday.
> 
> The first time I tried to run a job with it ended up in
> some stalled state so that the job didn't manage to
> (re)start but what makes it worse is that it didn't exit
> as failed either.
> 
>     Next time I tried running the same job (but new EMR
> cluster & all from scratch) it just worked normally.
> 
> On the problematic run, The YARN job was started and
> Flink UI was being served, but Flink UI kept showing
> status CREATED for all sub-tasks and nothing seemed to
> be happening.
> 
> I found this in Job manager log first (could be unrelated) :
> 
> 2018-03-28 15:26:17,449 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph 
>       - Job UniqueIdStream
> (43ed4ace55974d3c486452a45ee5db93) switched from state
> RUNNING to FAILING.
> 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate all requires slots wi

Re: All sub-tasks stuck in CREATED state after "BlobServerConnection: GET operation failed"

2018-03-29 Thread Gary Yao
Hi Juho,

Thanks for the follow up. Regarding the BlobServerConnection error, Nico
(cc'ed)
might have an idea.

Best,
Gary

On Thu, Mar 29, 2018 at 4:08 PM, Juho Autio  wrote:

> Sorry, my bad. I checked the persisted jobmanager logs and can see that
> job was still being restarted at 15:31 and then at 15:36. If I wouldn't
> have terminated the cluster, I believe the flink job / yarn app would've
> eventually exited as failed.
>
>
> On Thu, Mar 29, 2018 at 4:49 PM, Juho Autio  wrote:
>
>> Thanks again, Gary.
>>
>> It's true that I only let the job remain in the stuck state for something
>> between 10-15 minutes. Then I shut down the cluster.
>>
>> But: if restart strategy is being applied, shouldn't I have seen those
>> messages in job manager log? In my case it kept all quiet since ~2018-03-28
>> 15:27 and I terminated it at ~28-03-2018 15:36.
>>
>> Do you happen to know about what that BlobServerConnection error means in
>> the code? If it may lead into some unrecoverable state (where neither
>> restart is attempted, nor job is failed for good)..
>>
>> On Thu, Mar 29, 2018 at 4:39 PM, Gary Yao  wrote:
>>
>>> Hi Juho,
>>>
>>> The log message
>>>
>>>   Could not allocate all requires slots within timeout of 30 ms.
>>> Slots required: 20, slots allocated: 8
>>>
>>> indicates that you do not have enough resources in your cluster left.
>>> Can you
>>> verify that after you started the job submission the YARN cluster does
>>> not reach
>>> its maximum capacity? You can also try submitting the job with a lower
>>> parallelism.
>>>
>>> I think the reason why the YARN application is not immediately shown as
>>> failed
>>> is that your restart strategy attempts to start the job 3 times. On every
>>> attempt the job is blocked on the slot allocation timeout for at least
>>> 30 ms
>>> (5 minutes). I have tried submitting examples/streaming/WordCount.jar
>>> with the
>>> same restart strategy on EMR, and the CLI only returns after around 20
>>> minutes.
>>>
>>> As a side note, beginning from Flink 1.5, you do not need to specify -yn
>>> -ys
>>> because resource allocations are dynamic by default (FLIP-6). The
>>> parameter -yst
>>> is deprecated and should not be needed either.
>>>
>>> Best,
>>> Gary
>>>
>>> On Thu, Mar 29, 2018 at 8:59 AM, Juho Autio 
>>> wrote:
>>>
>>>> I built a new Flink distribution from release-1.5 branch yesterday.
>>>>
>>>> The first time I tried to run a job with it ended up in some stalled
>>>> state so that the job didn't manage to (re)start but what makes it worse is
>>>> that it didn't exit as failed either.
>>>>
>>>> Next time I tried running the same job (but new EMR cluster & all from
>>>> scratch) it just worked normally.
>>>>
>>>> On the problematic run, The YARN job was started and Flink UI was being
>>>> served, but Flink UI kept showing status CREATED for all sub-tasks and
>>>> nothing seemed to be happening.
>>>>
>>>> I found this in Job manager log first (could be unrelated) :
>>>>
>>>> 2018-03-28 15:26:17,449 INFO  
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>>   - Job UniqueIdStream (43ed4ace55974d3c486452a45ee5db93) switched
>>>> from state RUNNING to FAILING.
>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>>> Could not allocate all requires slots within timeout of 30 ms. Slots
>>>> required: 20, slots allocated: 8
>>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambd
>>>> a$scheduleEager$36(ExecutionGraph.java:984)
>>>> at java.util.concurrent.CompletableFuture.uniExceptionally(Comp
>>>> letableFuture.java:870)
>>>> at java.util.concurrent.CompletableFuture$UniExceptionally.tryF
>>>> ire(CompletableFuture.java:852)
>>>> at java.util.concurrent.CompletableFuture.postComplete(Completa
>>>> bleFuture.java:474)
>>>> at java.util.concurrent.CompletableFuture.completeExceptionally
>>>> (CompletableFuture.java:1977)
>>>> at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjun
>>>> ctFuture.handleCompletedFuture(FutureUtils.java:551)
>>>> at java.util.concurrent.Completab

Re: All sub-tasks stuck in CREATED state after "BlobServerConnection: GET operation failed"

2018-03-29 Thread Juho Autio
Sorry, my bad. I checked the persisted jobmanager logs and can see that job
was still being restarted at 15:31 and then at 15:36. If I wouldn't have
terminated the cluster, I believe the flink job / yarn app would've
eventually exited as failed.

On Thu, Mar 29, 2018 at 4:49 PM, Juho Autio  wrote:

> Thanks again, Gary.
>
> It's true that I only let the job remain in the stuck state for something
> between 10-15 minutes. Then I shut down the cluster.
>
> But: if restart strategy is being applied, shouldn't I have seen those
> messages in job manager log? In my case it kept all quiet since ~2018-03-28
> 15:27 and I terminated it at ~28-03-2018 15:36.
>
> Do you happen to know about what that BlobServerConnection error means in
> the code? If it may lead into some unrecoverable state (where neither
> restart is attempted, nor job is failed for good)..
>
> On Thu, Mar 29, 2018 at 4:39 PM, Gary Yao  wrote:
>
>> Hi Juho,
>>
>> The log message
>>
>>   Could not allocate all requires slots within timeout of 30 ms.
>> Slots required: 20, slots allocated: 8
>>
>> indicates that you do not have enough resources in your cluster left. Can
>> you
>> verify that after you started the job submission the YARN cluster does
>> not reach
>> its maximum capacity? You can also try submitting the job with a lower
>> parallelism.
>>
>> I think the reason why the YARN application is not immediately shown as
>> failed
>> is that your restart strategy attempts to start the job 3 times. On every
>> attempt the job is blocked on the slot allocation timeout for at least
>> 30 ms
>> (5 minutes). I have tried submitting examples/streaming/WordCount.jar
>> with the
>> same restart strategy on EMR, and the CLI only returns after around 20
>> minutes.
>>
>> As a side note, beginning from Flink 1.5, you do not need to specify -yn
>> -ys
>> because resource allocations are dynamic by default (FLIP-6). The
>> parameter -yst
>> is deprecated and should not be needed either.
>>
>> Best,
>> Gary
>>
>> On Thu, Mar 29, 2018 at 8:59 AM, Juho Autio  wrote:
>>
>>> I built a new Flink distribution from release-1.5 branch yesterday.
>>>
>>> The first time I tried to run a job with it ended up in some stalled
>>> state so that the job didn't manage to (re)start but what makes it worse is
>>> that it didn't exit as failed either.
>>>
>>> Next time I tried running the same job (but new EMR cluster & all from
>>> scratch) it just worked normally.
>>>
>>> On the problematic run, The YARN job was started and Flink UI was being
>>> served, but Flink UI kept showing status CREATED for all sub-tasks and
>>> nothing seemed to be happening.
>>>
>>> I found this in Job manager log first (could be unrelated) :
>>>
>>> 2018-03-28 15:26:17,449 INFO  
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>   - Job UniqueIdStream (43ed4ace55974d3c486452a45ee5db93) switched
>>> from state RUNNING to FAILING.
>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>> Could not allocate all requires slots within timeout of 30 ms. Slots
>>> required: 20, slots allocated: 8
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambd
>>> a$scheduleEager$36(ExecutionGraph.java:984)
>>> at java.util.concurrent.CompletableFuture.uniExceptionally(Comp
>>> letableFuture.java:870)
>>> at java.util.concurrent.CompletableFuture$UniExceptionally.tryF
>>> ire(CompletableFuture.java:852)
>>> at java.util.concurrent.CompletableFuture.postComplete(Completa
>>> bleFuture.java:474)
>>> at java.util.concurrent.CompletableFuture.completeExceptionally
>>> (CompletableFuture.java:1977)
>>> at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjun
>>> ctFuture.handleCompletedFuture(FutureUtils.java:551)
>>> at java.util.concurrent.CompletableFuture.uniWhenComplete(Compl
>>> etableFuture.java:760)
>>> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFi
>>> re(CompletableFuture.java:736)
>>> at java.util.concurrent.CompletableFuture.postComplete(Completa
>>> bleFuture.java:474)
>>> at java.util.concurrent.CompletableFuture.completeExceptionally
>>> (CompletableFuture.java:1977)
>>> at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete
>>> (FutureUtils.java:789)
>>> at akka.dispatch.OnComplete.internal(Future.scala:258)

Re: All sub-tasks stuck in CREATED state after "BlobServerConnection: GET operation failed"

2018-03-29 Thread Juho Autio
Thanks again, Gary.

It's true that I only let the job remain in the stuck state for something
between 10-15 minutes. Then I shut down the cluster.

But: if restart strategy is being applied, shouldn't I have seen those
messages in job manager log? In my case it kept all quiet since ~2018-03-28
15:27 and I terminated it at ~28-03-2018 15:36.

Do you happen to know about what that BlobServerConnection error means in
the code? If it may lead into some unrecoverable state (where neither
restart is attempted, nor job is failed for good)..

On Thu, Mar 29, 2018 at 4:39 PM, Gary Yao  wrote:

> Hi Juho,
>
> The log message
>
>   Could not allocate all requires slots within timeout of 30 ms. Slots
> required: 20, slots allocated: 8
>
> indicates that you do not have enough resources in your cluster left. Can
> you
> verify that after you started the job submission the YARN cluster does not
> reach
> its maximum capacity? You can also try submitting the job with a lower
> parallelism.
>
> I think the reason why the YARN application is not immediately shown as
> failed
> is that your restart strategy attempts to start the job 3 times. On every
> attempt the job is blocked on the slot allocation timeout for at least
> 30 ms
> (5 minutes). I have tried submitting examples/streaming/WordCount.jar
> with the
> same restart strategy on EMR, and the CLI only returns after around 20
> minutes.
>
> As a side note, beginning from Flink 1.5, you do not need to specify -yn
> -ys
> because resource allocations are dynamic by default (FLIP-6). The
> parameter -yst
> is deprecated and should not be needed either.
>
> Best,
> Gary
>
> On Thu, Mar 29, 2018 at 8:59 AM, Juho Autio  wrote:
>
>> I built a new Flink distribution from release-1.5 branch yesterday.
>>
>> The first time I tried to run a job with it ended up in some stalled
>> state so that the job didn't manage to (re)start but what makes it worse is
>> that it didn't exit as failed either.
>>
>> Next time I tried running the same job (but new EMR cluster & all from
>> scratch) it just worked normally.
>>
>> On the problematic run, The YARN job was started and Flink UI was being
>> served, but Flink UI kept showing status CREATED for all sub-tasks and
>> nothing seemed to be happening.
>>
>> I found this in Job manager log first (could be unrelated) :
>>
>> 2018-03-28 15:26:17,449 INFO  
>> org.apache.flink.runtime.executiongraph.ExecutionGraph
>>   - Job UniqueIdStream (43ed4ace55974d3c486452a45ee5db93) switched
>> from state RUNNING to FAILING.
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Could not allocate all requires slots within timeout of 30 ms. Slots
>> required: 20, slots allocated: 8
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambd
>> a$scheduleEager$36(ExecutionGraph.java:984)
>> at java.util.concurrent.CompletableFuture.uniExceptionally(Comp
>> letableFuture.java:870)
>> at java.util.concurrent.CompletableFuture$UniExceptionally.
>> tryFire(CompletableFuture.java:852)
>> at java.util.concurrent.CompletableFuture.postComplete(Completa
>> bleFuture.java:474)
>> at java.util.concurrent.CompletableFuture.completeExceptionally
>> (CompletableFuture.java:1977)
>> at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjun
>> ctFuture.handleCompletedFuture(FutureUtils.java:551)
>> at java.util.concurrent.CompletableFuture.uniWhenComplete(Compl
>> etableFuture.java:760)
>> at java.util.concurrent.CompletableFuture$UniWhenComplete.
>> tryFire(CompletableFuture.java:736)
>> at java.util.concurrent.CompletableFuture.postComplete(Completa
>> bleFuture.java:474)
>> at java.util.concurrent.CompletableFuture.completeExceptionally
>> (CompletableFuture.java:1977)
>> at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete
>> (FutureUtils.java:789)
>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>> at org.apache.flink.runtime.concurrent.Executors$DirectExecutio
>> nContext.execute(Executors.java:83)
>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(
>> Promise.scala:44)
>> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Pro
>> mise.scala:252)
>> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupp
>> ort.scala:603)
>> at akka.ac

Re: All sub-tasks stuck in CREATED state after "BlobServerConnection: GET operation failed"

2018-03-29 Thread Gary Yao
Hi Juho,

The log message

  Could not allocate all requires slots within timeout of 30 ms. Slots
required: 20, slots allocated: 8

indicates that you do not have enough resources in your cluster left. Can
you
verify that after you started the job submission the YARN cluster does not
reach
its maximum capacity? You can also try submitting the job with a lower
parallelism.

I think the reason why the YARN application is not immediately shown as
failed
is that your restart strategy attempts to start the job 3 times. On every
attempt the job is blocked on the slot allocation timeout for at least
30 ms
(5 minutes). I have tried submitting examples/streaming/WordCount.jar with
the
same restart strategy on EMR, and the CLI only returns after around 20
minutes.

As a side note, beginning from Flink 1.5, you do not need to specify -yn -ys
because resource allocations are dynamic by default (FLIP-6). The parameter
-yst
is deprecated and should not be needed either.

Best,
Gary

On Thu, Mar 29, 2018 at 8:59 AM, Juho Autio  wrote:

> I built a new Flink distribution from release-1.5 branch yesterday.
>
> The first time I tried to run a job with it ended up in some stalled state
> so that the job didn't manage to (re)start but what makes it worse is that
> it didn't exit as failed either.
>
> Next time I tried running the same job (but new EMR cluster & all from
> scratch) it just worked normally.
>
> On the problematic run, The YARN job was started and Flink UI was being
> served, but Flink UI kept showing status CREATED for all sub-tasks and
> nothing seemed to be happening.
>
> I found this in Job manager log first (could be unrelated) :
>
> 2018-03-28 15:26:17,449 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph
>   - Job UniqueIdStream (43ed4ace55974d3c486452a45ee5db93) switched
> from state RUNNING to FAILING.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate all requires slots within timeout of 30 ms. Slots
> required: 20, slots allocated: 8
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.
> lambda$scheduleEager$36(ExecutionGraph.java:984)
> at java.util.concurrent.CompletableFuture.uniExceptionally(
> CompletableFuture.java:870)
> at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(
> CompletableFuture.java:852)
> at java.util.concurrent.CompletableFuture.postComplete(
> CompletableFuture.java:474)
> at java.util.concurrent.CompletableFuture.completeExceptionally(
> CompletableFuture.java:1977)
> at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.
> handleCompletedFuture(FutureUtils.java:551)
> at java.util.concurrent.CompletableFuture.uniWhenComplete(
> CompletableFuture.java:760)
> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(
> CompletableFuture.java:736)
> at java.util.concurrent.CompletableFuture.postComplete(
> CompletableFuture.java:474)
> at java.util.concurrent.CompletableFuture.completeExceptionally(
> CompletableFuture.java:1977)
> at org.apache.flink.runtime.concurrent.FutureUtils$1.
> onComplete(FutureUtils.java:789)
> at akka.dispatch.OnComplete.internal(Future.scala:258)
> at akka.dispatch.OnComplete.internal(Future.scala:256)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.
> execute(Executors.java:83)
> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.
> scala:44)
> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(
> Promise.scala:252)
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(
> AskSupport.scala:603)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
> at scala.concurrent.Future$InternalCallbackExecutor$.
> unbatchedExecute(Future.scala:601)
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.
> scala:109)
> at scala.concurrent.Future$InternalCallbackExecutor$.
> execute(Future.scala:599)
> at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(
> LightArrayRevolverScheduler.scala:329)
> at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(
> LightArrayRevolverScheduler.scala:280)
> at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(
> LightArrayRevolverScheduler.scala:284)
> at akka.actor.LightArrayRevolverScheduler$$anon$4.run(
> LightArrayRevolverScheduler.scala:236)
> at java.lang.Thread.run(Thread.java:748)
>
>
> After this there was:
>
> 2018-03-28 15:26:17,521 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph
>   - Restarting the job UniqueIdStre

All sub-tasks stuck in CREATED state after "BlobServerConnection: GET operation failed"

2018-03-28 Thread Juho Autio
I built a new Flink distribution from release-1.5 branch yesterday.

The first time I tried to run a job with it ended up in some stalled state
so that the job didn't manage to (re)start but what makes it worse is that
it didn't exit as failed either.

Next time I tried running the same job (but new EMR cluster & all from
scratch) it just worked normally.

On the problematic run, The YARN job was started and Flink UI was being
served, but Flink UI kept showing status CREATED for all sub-tasks and
nothing seemed to be happening.

I found this in Job manager log first (could be unrelated) :

2018-03-28 15:26:17,449 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
UniqueIdStream (43ed4ace55974d3c486452a45ee5db93) switched from state
RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate all requires slots within timeout of 30 ms. Slots
required: 20, slots allocated: 8
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$36(ExecutionGraph.java:984)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:551)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:789)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)


After this there was:

2018-03-28 15:26:17,521 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Restarting
the job UniqueIdStream (43ed4ace55974d3c486452a45ee5db93).


And some time after that:

2018-03-28 15:27:39,125 ERROR
org.apache.flink.runtime.blob.BlobServerConnection- GET
operation failed
java.io.EOFException: Premature end of GET request
at
org.apache.flink.runtime.blob.BlobServerConnection.get(BlobServerConnection.java:275)
at
org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:117)


Task manager logs didn't have any errors.


Is that error about BlobServerConnection severe enough to make the job get
stuck like this? Seems like a Flink bug, at least that it just gets stuck
and doesn't either restart or make the YARN app exit as failed?



My launch command is basically:

flink-${FLINK_VERSION}/bin/flink run -m yarn-cluster -yn ${NODE_COUNT} -ys
${SLOT_COUNT} -yjm ${JOB_MANAGER_MEMORY} -ytm ${TASK_MANAGER_MEMORY} -yst
-yD restart-strategy=fixed-delay -yD
restart-strategy.fixed-delay.attempts=3 -yD
"restart-strategy.fixed-delay.delay=30 s" -p ${PARALLELISM} $@


I'm also setting this to fix some classloading error (with the previous
build that still works)
-yD.classloader.resolve-order=parent-first


Cluster was AWS EMR, release 5.12.0.

Thanks.


Re: Re-keying / sub-keying a stream without repartitioning

2017-04-26 Thread Elias Levy
On Wed, Apr 26, 2017 at 5:11 AM, Aljoscha Krettek 
wrote:

> I did spend some time thinking about this and we had the idea for a while
> now to add an operation like “keyByWithoutPartitioning()” (name not final
> ;-) that would allow the user to tell the system that we don’t have to do a
> reshuffle. This would work if the key-type (and keys) would stay exactly
> the same.
>
> I think it wouldn’t work for your case because the key type changes and
> elements for key (A, B) would normally be reshuffled to different instances
> than with key (A), i.e. (1, 1) does not belong to the same key-group as
> (1). Would you agree that this happens in your case?
>

It happens if I use keyBy().  But there is no need for it to happen, which
is why I was asking about rekeying without repartitioning.  The stream is
already partitioned by A, so all elements of a new stream keyed by (A,B)
are already being processed by the local task.  Reshuffling as a result of
rekeying would have no benefit and would double the network traffic.  It is
why I suggested subKey(B) may be a good to clearly indicate that the new
key just sub-partitions the existing key partition without requiring
reshuffling.

Why would you not be able to use a different key type with
keyByWithoutRepartitioning()?


Re: Re-keying / sub-keying a stream without repartitioning

2017-04-26 Thread Aljoscha Krettek
Hi Elias,
sorry for the delay, this must have fallen under the table after Flink Forward.

I did spend some time thinking about this and we had the idea for a while now 
to add an operation like “keyByWithoutPartitioning()” (name not final ;-) that 
would allow the user to tell the system that we don’t have to do a reshuffle. 
This would work if the key-type (and keys) would stay exactly the same.

I think it wouldn’t work for your case because the key type changes and 
elements for key (A, B) would normally be reshuffled to different instances 
than with key (A), i.e. (1, 1) does not belong to the same key-group as (1). 
Would you agree that this happens in your case?

Best,
Aljoscha 

> On 25. Apr 2017, at 23:32, Elias Levy  wrote:
> 
> Anyone?
> 
> On Fri, Apr 21, 2017 at 10:15 PM, Elias Levy  > wrote:
> This is something that has come up before on the list, but in a different 
> context.  I have a need to rekey a stream but would prefer the stream to not 
> be repartitioned.  There is no gain to repartitioning, as the new partition 
> key is a composite of the stream key, going from a key of A to a key of (A, 
> B), so all values for the resulting streams are already being rerouted to the 
> same node and repartitioning them to other nodes would simply generate 
> unnecessary network traffic and serde overhead.
> 
> Unlike previous use cases, I am not trying to perform aggregate operations.  
> Instead I am executing CEP patterns.  Some patterns apply the the stream 
> keyed by A and some on the stream keyed by (A,B).
> 
> The API does not appear to have an obvious solution to this situation. 
> keyBy() will repartition and there is isn't something like subKey() to 
> subpartion a stream without repartitioning (e.g. keyBy(A).subKey(B)).
> 
> I suppose I could accomplish it by using partitionCustom(), ignoring the 
> second element in the key, and delegating to the default partitioner passing 
> it only the first element, thus resulting in no change of task assignment.
> 
> Thoughts?
> 



Re: Re-keying / sub-keying a stream without repartitioning

2017-04-25 Thread Elias Levy
Anyone?

On Fri, Apr 21, 2017 at 10:15 PM, Elias Levy 
wrote:

> This is something that has come up before on the list, but in a different
> context.  I have a need to rekey a stream but would prefer the stream to
> not be repartitioned.  There is no gain to repartitioning, as the new
> partition key is a composite of the stream key, going from a key of A to a
> key of (A, B), so all values for the resulting streams are already being
> rerouted to the same node and repartitioning them to other nodes would
> simply generate unnecessary network traffic and serde overhead.
>
> Unlike previous use cases, I am not trying to perform aggregate
> operations.  Instead I am executing CEP patterns.  Some patterns apply the
> the stream keyed by A and some on the stream keyed by (A,B).
>
> The API does not appear to have an obvious solution to this situation.
> keyBy() will repartition and there is isn't something like subKey() to
> subpartion a stream without repartitioning (e.g. keyBy(A).subKey(B)).
>
> I suppose I could accomplish it by using partitionCustom(), ignoring the
> second element in the key, and delegating to the default partitioner
> passing it only the first element, thus resulting in no change of task
> assignment.
>
> Thoughts?
>


Re-keying / sub-keying a stream without repartitioning

2017-04-21 Thread Elias Levy
This is something that has come up before on the list, but in a different
context.  I have a need to rekey a stream but would prefer the stream to
not be repartitioned.  There is no gain to repartitioning, as the new
partition key is a composite of the stream key, going from a key of A to a
key of (A, B), so all values for the resulting streams are already being
rerouted to the same node and repartitioning them to other nodes would
simply generate unnecessary network traffic and serde overhead.

Unlike previous use cases, I am not trying to perform aggregate
operations.  Instead I am executing CEP patterns.  Some patterns apply the
the stream keyed by A and some on the stream keyed by (A,B).

The API does not appear to have an obvious solution to this situation.
keyBy() will repartition and there is isn't something like subKey() to
subpartion a stream without repartitioning (e.g. keyBy(A).subKey(B)).

I suppose I could accomplish it by using partitionCustom(), ignoring the
second element in the key, and delegating to the default partitioner
passing it only the first element, thus resulting in no change of task
assignment.

Thoughts?


Re: fan out parallel-able operator sub-task beyond total slots number

2016-04-18 Thread Till Rohrmann
Hi Chen,

two subtasks of the same operator can never be executed within the same
slot/pipeline. The `slotSharingGroup` allows you to only control which
subtasks of different operators can be executed along side in the same
slot. It basically allows you to break pipelines into smaller ones.
Therefore, you need at least as many slots as the maximum degree of
parallelism is in your program (so in your case 1000).

Cheers,
Till

On Sun, Apr 17, 2016 at 6:54 PM, Chen Qin  wrote:

> Hi there,
>
>
> I try run large number of subtasks within a task slot using slot sharing
> group. The usage scenario tried to adress operator that makes a network
> call with high latency yet less memory or cpu footprint. (sample code below)
>
> From doc provided, slotsharinggroup seems the place to look at. Yet it
> seems it were not designed to address the scenario above.
>
> https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#workers-slots-resources
>
> My question is, which is best way to fan out large number of sub tasking
> parallel within a task?
>
> public void testFanOut() throws Exception{
> env = StreamExecutionEnvironment.getExecutionEnvironment();
> ...
> 
> env.addSource(...).setParallelism(1).disableChaining().shuffle().flatMap(new 
> FlatMapFunction() {
> @Override
> public void flatMap(DummyFlinkRecord dummyFlinkRecord, 
> Collector collector) throws Exception {
> Thread.sleep(1000); //latency is high, needs to fan out
> collector.collect(1l);
> }
> }).slotSharingGroup("flatmap").setParallelism(100).rebalance().filter(new 
> FilterFunction() {
> @Override
> public boolean filter(Long aLong) throws Exception {
> return true;
> }
> }).setParallelism(10).addSink(new SinkFunction() {
> @Override
> public void invoke(Long aLong) throws Exception {
> System.out.println(aLong);
> }
> });
> env.execute("fan out 100 subtasks for 1s delay mapper");
> }
>
> Thanks,
> Chen Qin
>


fan out parallel-able operator sub-task beyond total slots number

2016-04-17 Thread Chen Qin
Hi there,


I try run large number of subtasks within a task slot using slot sharing
group. The usage scenario tried to adress operator that makes a network
call with high latency yet less memory or cpu footprint. (sample code below)

>From doc provided, slotsharinggroup seems the place to look at. Yet it
seems it were not designed to address the scenario above.
https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#workers-slots-resources

My question is, which is best way to fan out large number of sub tasking
parallel within a task?

public void testFanOut() throws Exception{
env = StreamExecutionEnvironment.getExecutionEnvironment();
...
env.addSource(...).setParallelism(1).disableChaining().shuffle().flatMap(new
FlatMapFunction() {
@Override
public void flatMap(DummyFlinkRecord dummyFlinkRecord,
Collector collector) throws Exception {
Thread.sleep(1000); //latency is high, needs to fan out
collector.collect(1l);
}
}).slotSharingGroup("flatmap").setParallelism(100).rebalance().filter(new
FilterFunction() {
@Override
public boolean filter(Long aLong) throws Exception {
return true;
}
}).setParallelism(10).addSink(new SinkFunction() {
@Override
public void invoke(Long aLong) throws Exception {
System.out.println(aLong);
}
});
env.execute("fan out 100 subtasks for 1s delay mapper");
}

Thanks,
Chen Qin


Re: streaming hdfs sub folders

2016-02-26 Thread Stephan Ewen
;> I forgot to mention I'm using an AvroInputFormat to read the file
>>>>>> (that might be relevant how the flag needs to be applied)
>>>>>> See the code Snipped below:
>>>>>>
>>>>>> DataStream inStream =
>>>>>> env.readFile(new AvroInputFormat(new 
>>>>>> Path(filePath), EndSongCleanedPq.class), filePath);
>>>>>>
>>>>>>
>>>>>> On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann 
>>>>>> wrote:
>>>>>>
>>>>>>> The program is a DataStream program, it usually it gets the data
>>>>>>> from kafka. It's an anomaly detection program that learns from the 
>>>>>>> stream
>>>>>>> itself. The reason I want to read from files is to test different 
>>>>>>> settings
>>>>>>> of the algorithm and compare them.
>>>>>>>
>>>>>>> I think I don't need to reply things in the exact order (wich is not
>>>>>>> possible with parallel reads anyway) and I have written the program so 
>>>>>>> it
>>>>>>> can deal with out of order events.
>>>>>>> I only need the subfolders to be processed roughly in order. Its
>>>>>>> fine to process some stuff from 01 before everything from 00 is 
>>>>>>> finished,
>>>>>>> if I get records from all 24 subfolders at the same time things will 
>>>>>>> break
>>>>>>> though. If I set the flag will it try to get data from all sub dir's in
>>>>>>> parallel or will it go sub dir by sub dir?
>>>>>>>
>>>>>>> Also can you point me to some documentation or something where I can
>>>>>>> see how to set the Flag?
>>>>>>>
>>>>>>> cheers Martin
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi!
>>>>>>>>
>>>>>>>> Going through nested folders is pretty simple, there is a flag on
>>>>>>>> the FileInputFormat that makes sure those are read.
>>>>>>>>
>>>>>>>> Tricky is the part that all "00" files should be read before the
>>>>>>>> "01" files. If you still want parallel reads, that means you need to 
>>>>>>>> sync
>>>>>>>> at some point, wait for all parallel parts to finish with the "00" work
>>>>>>>> before anyone may start with the "01" work.
>>>>>>>>
>>>>>>>> Is your training program a DataStream or a DataSet program?`
>>>>>>>>
>>>>>>>> Stephan
>>>>>>>>
>>>>>>>> On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I have a streaming machine learning job that usually runs with
>>>>>>>>> input from kafka. To tweak the models I need to run on some old data 
>>>>>>>>> from
>>>>>>>>> HDFS.
>>>>>>>>>
>>>>>>>>> Unfortunately the data on HDFS is spread out over several
>>>>>>>>> subfolders. Basically I have a datum with one subfolder for each hour
>>>>>>>>> within those are the actual input files I'm interested in.
>>>>>>>>>
>>>>>>>>> Basically what I need is a source that goes through the subfolder
>>>>>>>>> in order and streams the files into the program. I'm using event 
>>>>>>>>> timestamps
>>>>>>>>> so all files in 00 need to be processed before 01.
>>>>>>>>>
>>>>>>>>> Has anyone an idea on how to do this?
>>>>>>>>>
>>>>>>>>> cheers Martin
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: streaming hdfs sub folders

2016-02-23 Thread Martin Neumann
I'm not very familiar with the inner workings of the InputFomat's. calling
.open() got rid of the Nullpointer but the stream still produces no output.

As a temporary solution I wrote a batch job that just unions all the
different datasets and puts them (sorted) into a single folder.

cheers Martin

On Fri, Feb 19, 2016 at 2:39 PM, Robert Metzger  wrote:

> Hi Martin,
>
> where is the null pointer exception thrown?
> I think you didn't call the open() method of the AvroInputFormat. Maybe
> that's the issue.
>
> On Thu, Feb 18, 2016 at 5:01 PM, Martin Neumann  wrote:
>
>> I tried to implement your idea but I'm getting NullPointer exceptions
>> from the AvroInputFormat any Idea what I'm doing wrong?
>> See the code below:
>>
>> public static void main(String[] args) throws Exception {
>>
>> // set up the execution environment
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setParallelism(1);
>>
>> env.fromElements("00", "01", "02","03","22","23")
>> .flatMap(new FileExtractor())
>> .filter(new LocationFiter())
>> .flatMap(new PreProcessEndSongClean())
>> .writeAsCsv(outPath);
>>
>>
>> env.execute("something");
>> }
>>
>> private static class FileExtractor implements 
>> FlatMapFunction{
>>
>> @Override
>> public void flatMap(String s, Collector collector) 
>> throws Exception {
>> AvroInputFormat avroInputFormat = new 
>> AvroInputFormat(new 
>> Path("hdfs:///anonym/cleaned/endsong/2016-01-01/"+s), 
>> EndSongCleanedPq.class);
>> avroInputFormat.setReuseAvroValue(false);
>> while (! avroInputFormat.reachedEnd()){
>> EndSongCleanedPq res = avroInputFormat.nextRecord(new 
>> EndSongCleanedPq());
>> if (res != null) collector.collect(res);
>> }
>> }
>> }
>>
>>
>> On Thu, Feb 18, 2016 at 4:06 PM, Martin Neumann  wrote:
>>
>>> I guess I need to set the parallelism for the FlatMap to 1 to make sure
>>> I read one file at a time. The downside I see with this is that I will be
>>> not able to read in parallel from HDFS (and the files are Huge).
>>>
>>> I give it a try and see how much performance I loose.
>>>
>>> cheers Martin
>>>
>>> On Thu, Feb 18, 2016 at 2:32 PM, Stephan Ewen  wrote:
>>>
>>>> Martin,
>>>>
>>>> I think you can approximate this in an easy way like this:
>>>>
>>>>   - On the client, you traverse your directories to collect all files
>>>> that you need, collect all file paths in a list.
>>>>   - Then you have a source "env.fromElements(paths)".
>>>>   - Then you flatMap and in the FlatMap, run the Avro input format
>>>> (open it per path, then call it to get all elements)
>>>>
>>>> That gives you pretty much full control about in which order the files
>>>> should be processed.
>>>>
>>>> What do you think?
>>>>
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Feb 17, 2016 at 9:42 PM, Martin Neumann 
>>>> wrote:
>>>>
>>>>> I forgot to mention I'm using an AvroInputFormat to read the file
>>>>> (that might be relevant how the flag needs to be applied)
>>>>> See the code Snipped below:
>>>>>
>>>>> DataStream inStream =
>>>>> env.readFile(new AvroInputFormat(new 
>>>>> Path(filePath), EndSongCleanedPq.class), filePath);
>>>>>
>>>>>
>>>>> On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann 
>>>>> wrote:
>>>>>
>>>>>> The program is a DataStream program, it usually it gets the data from
>>>>>> kafka. It's an anomaly detection program that learns from the stream
>>>>>> itself. The reason I want to read from files is to test different 
>>>>>> settings
>>>>>> of the algorithm and compare them.
>>>>>>
>>>>>> I think I don't need to reply things in the exact order (wich is not
>>>>>> possible with parallel reads anyway) and I have written the program so it
>>>>>> can deal with out of order events.
>>>>>> I only need the s

Re: streaming hdfs sub folders

2016-02-19 Thread Robert Metzger
Hi Martin,

where is the null pointer exception thrown?
I think you didn't call the open() method of the AvroInputFormat. Maybe
that's the issue.

On Thu, Feb 18, 2016 at 5:01 PM, Martin Neumann  wrote:

> I tried to implement your idea but I'm getting NullPointer exceptions from
> the AvroInputFormat any Idea what I'm doing wrong?
> See the code below:
>
> public static void main(String[] args) throws Exception {
>
> // set up the execution environment
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
>
> env.fromElements("00", "01", "02","03","22","23")
> .flatMap(new FileExtractor())
> .filter(new LocationFiter())
> .flatMap(new PreProcessEndSongClean())
> .writeAsCsv(outPath);
>
>
> env.execute("something");
> }
>
> private static class FileExtractor implements 
> FlatMapFunction{
>
> @Override
> public void flatMap(String s, Collector collector) 
> throws Exception {
> AvroInputFormat avroInputFormat = new 
> AvroInputFormat(new 
> Path("hdfs:///anonym/cleaned/endsong/2016-01-01/"+s), EndSongCleanedPq.class);
> avroInputFormat.setReuseAvroValue(false);
> while (! avroInputFormat.reachedEnd()){
> EndSongCleanedPq res = avroInputFormat.nextRecord(new 
> EndSongCleanedPq());
> if (res != null) collector.collect(res);
> }
> }
> }
>
>
> On Thu, Feb 18, 2016 at 4:06 PM, Martin Neumann  wrote:
>
>> I guess I need to set the parallelism for the FlatMap to 1 to make sure I
>> read one file at a time. The downside I see with this is that I will be not
>> able to read in parallel from HDFS (and the files are Huge).
>>
>> I give it a try and see how much performance I loose.
>>
>> cheers Martin
>>
>> On Thu, Feb 18, 2016 at 2:32 PM, Stephan Ewen  wrote:
>>
>>> Martin,
>>>
>>> I think you can approximate this in an easy way like this:
>>>
>>>   - On the client, you traverse your directories to collect all files
>>> that you need, collect all file paths in a list.
>>>   - Then you have a source "env.fromElements(paths)".
>>>   - Then you flatMap and in the FlatMap, run the Avro input format (open
>>> it per path, then call it to get all elements)
>>>
>>> That gives you pretty much full control about in which order the files
>>> should be processed.
>>>
>>> What do you think?
>>>
>>> Stephan
>>>
>>>
>>> On Wed, Feb 17, 2016 at 9:42 PM, Martin Neumann 
>>> wrote:
>>>
>>>> I forgot to mention I'm using an AvroInputFormat to read the file (that
>>>> might be relevant how the flag needs to be applied)
>>>> See the code Snipped below:
>>>>
>>>> DataStream inStream =
>>>> env.readFile(new AvroInputFormat(new 
>>>> Path(filePath), EndSongCleanedPq.class), filePath);
>>>>
>>>>
>>>> On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann 
>>>> wrote:
>>>>
>>>>> The program is a DataStream program, it usually it gets the data from
>>>>> kafka. It's an anomaly detection program that learns from the stream
>>>>> itself. The reason I want to read from files is to test different settings
>>>>> of the algorithm and compare them.
>>>>>
>>>>> I think I don't need to reply things in the exact order (wich is not
>>>>> possible with parallel reads anyway) and I have written the program so it
>>>>> can deal with out of order events.
>>>>> I only need the subfolders to be processed roughly in order. Its fine
>>>>> to process some stuff from 01 before everything from 00 is finished, if I
>>>>> get records from all 24 subfolders at the same time things will break
>>>>> though. If I set the flag will it try to get data from all sub dir's in
>>>>> parallel or will it go sub dir by sub dir?
>>>>>
>>>>> Also can you point me to some documentation or something where I can
>>>>> see how to set the Flag?
>>>>>
>>>>> cheers Martin
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen 
>>>>> wrote:
>>>>&

Re: streaming hdfs sub folders

2016-02-18 Thread Martin Neumann
I tried to implement your idea but I'm getting NullPointer exceptions from
the AvroInputFormat any Idea what I'm doing wrong?
See the code below:

public static void main(String[] args) throws Exception {

// set up the execution environment
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

env.fromElements("00", "01", "02","03","22","23")
.flatMap(new FileExtractor())
.filter(new LocationFiter())
.flatMap(new PreProcessEndSongClean())
.writeAsCsv(outPath);


env.execute("something");
}

private static class FileExtractor implements
FlatMapFunction{

@Override
public void flatMap(String s, Collector
collector) throws Exception {
AvroInputFormat avroInputFormat = new
AvroInputFormat(new
Path("hdfs:///anonym/cleaned/endsong/2016-01-01/"+s),
EndSongCleanedPq.class);
avroInputFormat.setReuseAvroValue(false);
while (! avroInputFormat.reachedEnd()){
EndSongCleanedPq res = avroInputFormat.nextRecord(new
EndSongCleanedPq());
if (res != null) collector.collect(res);
}
}
}


On Thu, Feb 18, 2016 at 4:06 PM, Martin Neumann  wrote:

> I guess I need to set the parallelism for the FlatMap to 1 to make sure I
> read one file at a time. The downside I see with this is that I will be not
> able to read in parallel from HDFS (and the files are Huge).
>
> I give it a try and see how much performance I loose.
>
> cheers Martin
>
> On Thu, Feb 18, 2016 at 2:32 PM, Stephan Ewen  wrote:
>
>> Martin,
>>
>> I think you can approximate this in an easy way like this:
>>
>>   - On the client, you traverse your directories to collect all files
>> that you need, collect all file paths in a list.
>>   - Then you have a source "env.fromElements(paths)".
>>   - Then you flatMap and in the FlatMap, run the Avro input format (open
>> it per path, then call it to get all elements)
>>
>> That gives you pretty much full control about in which order the files
>> should be processed.
>>
>> What do you think?
>>
>> Stephan
>>
>>
>> On Wed, Feb 17, 2016 at 9:42 PM, Martin Neumann  wrote:
>>
>>> I forgot to mention I'm using an AvroInputFormat to read the file (that
>>> might be relevant how the flag needs to be applied)
>>> See the code Snipped below:
>>>
>>> DataStream inStream =
>>> env.readFile(new AvroInputFormat(new 
>>> Path(filePath), EndSongCleanedPq.class), filePath);
>>>
>>>
>>> On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann 
>>> wrote:
>>>
>>>> The program is a DataStream program, it usually it gets the data from
>>>> kafka. It's an anomaly detection program that learns from the stream
>>>> itself. The reason I want to read from files is to test different settings
>>>> of the algorithm and compare them.
>>>>
>>>> I think I don't need to reply things in the exact order (wich is not
>>>> possible with parallel reads anyway) and I have written the program so it
>>>> can deal with out of order events.
>>>> I only need the subfolders to be processed roughly in order. Its fine
>>>> to process some stuff from 01 before everything from 00 is finished, if I
>>>> get records from all 24 subfolders at the same time things will break
>>>> though. If I set the flag will it try to get data from all sub dir's in
>>>> parallel or will it go sub dir by sub dir?
>>>>
>>>> Also can you point me to some documentation or something where I can
>>>> see how to set the Flag?
>>>>
>>>> cheers Martin
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen 
>>>> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> Going through nested folders is pretty simple, there is a flag on the
>>>>> FileInputFormat that makes sure those are read.
>>>>>
>>>>> Tricky is the part that all "00" files should be read before the "01"
>>>>> files. If you still want parallel reads, that means you need to sync at
>>>>> some point, wait for all parallel parts to finish with the "00" work 
>>>>> before
>>>>> anyone may start with the "01" work.
>>>>>
>>>>> Is your training program a DataStream or a DataSet program?`
>>>>>
>>>>> Stephan
>>>>>
>>>>> On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann 
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have a streaming machine learning job that usually runs with input
>>>>>> from kafka. To tweak the models I need to run on some old data from HDFS.
>>>>>>
>>>>>> Unfortunately the data on HDFS is spread out over several subfolders.
>>>>>> Basically I have a datum with one subfolder for each hour within those 
>>>>>> are
>>>>>> the actual input files I'm interested in.
>>>>>>
>>>>>> Basically what I need is a source that goes through the subfolder in
>>>>>> order and streams the files into the program. I'm using event timestamps 
>>>>>> so
>>>>>> all files in 00 need to be processed before 01.
>>>>>>
>>>>>> Has anyone an idea on how to do this?
>>>>>>
>>>>>> cheers Martin
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: streaming hdfs sub folders

2016-02-18 Thread Martin Neumann
I guess I need to set the parallelism for the FlatMap to 1 to make sure I
read one file at a time. The downside I see with this is that I will be not
able to read in parallel from HDFS (and the files are Huge).

I give it a try and see how much performance I loose.

cheers Martin

On Thu, Feb 18, 2016 at 2:32 PM, Stephan Ewen  wrote:

> Martin,
>
> I think you can approximate this in an easy way like this:
>
>   - On the client, you traverse your directories to collect all files that
> you need, collect all file paths in a list.
>   - Then you have a source "env.fromElements(paths)".
>   - Then you flatMap and in the FlatMap, run the Avro input format (open
> it per path, then call it to get all elements)
>
> That gives you pretty much full control about in which order the files
> should be processed.
>
> What do you think?
>
> Stephan
>
>
> On Wed, Feb 17, 2016 at 9:42 PM, Martin Neumann  wrote:
>
>> I forgot to mention I'm using an AvroInputFormat to read the file (that
>> might be relevant how the flag needs to be applied)
>> See the code Snipped below:
>>
>> DataStream inStream =
>> env.readFile(new AvroInputFormat(new 
>> Path(filePath), EndSongCleanedPq.class), filePath);
>>
>>
>> On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann  wrote:
>>
>>> The program is a DataStream program, it usually it gets the data from
>>> kafka. It's an anomaly detection program that learns from the stream
>>> itself. The reason I want to read from files is to test different settings
>>> of the algorithm and compare them.
>>>
>>> I think I don't need to reply things in the exact order (wich is not
>>> possible with parallel reads anyway) and I have written the program so it
>>> can deal with out of order events.
>>> I only need the subfolders to be processed roughly in order. Its fine to
>>> process some stuff from 01 before everything from 00 is finished, if I get
>>> records from all 24 subfolders at the same time things will break though.
>>> If I set the flag will it try to get data from all sub dir's in parallel or
>>> will it go sub dir by sub dir?
>>>
>>> Also can you point me to some documentation or something where I can see
>>> how to set the Flag?
>>>
>>> cheers Martin
>>>
>>>
>>>
>>>
>>> On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen  wrote:
>>>
>>>> Hi!
>>>>
>>>> Going through nested folders is pretty simple, there is a flag on the
>>>> FileInputFormat that makes sure those are read.
>>>>
>>>> Tricky is the part that all "00" files should be read before the "01"
>>>> files. If you still want parallel reads, that means you need to sync at
>>>> some point, wait for all parallel parts to finish with the "00" work before
>>>> anyone may start with the "01" work.
>>>>
>>>> Is your training program a DataStream or a DataSet program?`
>>>>
>>>> Stephan
>>>>
>>>> On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have a streaming machine learning job that usually runs with input
>>>>> from kafka. To tweak the models I need to run on some old data from HDFS.
>>>>>
>>>>> Unfortunately the data on HDFS is spread out over several subfolders.
>>>>> Basically I have a datum with one subfolder for each hour within those are
>>>>> the actual input files I'm interested in.
>>>>>
>>>>> Basically what I need is a source that goes through the subfolder in
>>>>> order and streams the files into the program. I'm using event timestamps 
>>>>> so
>>>>> all files in 00 need to be processed before 01.
>>>>>
>>>>> Has anyone an idea on how to do this?
>>>>>
>>>>> cheers Martin
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: streaming hdfs sub folders

2016-02-18 Thread Stephan Ewen
Martin,

I think you can approximate this in an easy way like this:

  - On the client, you traverse your directories to collect all files that
you need, collect all file paths in a list.
  - Then you have a source "env.fromElements(paths)".
  - Then you flatMap and in the FlatMap, run the Avro input format (open it
per path, then call it to get all elements)

That gives you pretty much full control about in which order the files
should be processed.

What do you think?

Stephan


On Wed, Feb 17, 2016 at 9:42 PM, Martin Neumann  wrote:

> I forgot to mention I'm using an AvroInputFormat to read the file (that
> might be relevant how the flag needs to be applied)
> See the code Snipped below:
>
> DataStream inStream =
> env.readFile(new AvroInputFormat(new 
> Path(filePath), EndSongCleanedPq.class), filePath);
>
>
> On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann  wrote:
>
>> The program is a DataStream program, it usually it gets the data from
>> kafka. It's an anomaly detection program that learns from the stream
>> itself. The reason I want to read from files is to test different settings
>> of the algorithm and compare them.
>>
>> I think I don't need to reply things in the exact order (wich is not
>> possible with parallel reads anyway) and I have written the program so it
>> can deal with out of order events.
>> I only need the subfolders to be processed roughly in order. Its fine to
>> process some stuff from 01 before everything from 00 is finished, if I get
>> records from all 24 subfolders at the same time things will break though.
>> If I set the flag will it try to get data from all sub dir's in parallel or
>> will it go sub dir by sub dir?
>>
>> Also can you point me to some documentation or something where I can see
>> how to set the Flag?
>>
>> cheers Martin
>>
>>
>>
>>
>> On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen  wrote:
>>
>>> Hi!
>>>
>>> Going through nested folders is pretty simple, there is a flag on the
>>> FileInputFormat that makes sure those are read.
>>>
>>> Tricky is the part that all "00" files should be read before the "01"
>>> files. If you still want parallel reads, that means you need to sync at
>>> some point, wait for all parallel parts to finish with the "00" work before
>>> anyone may start with the "01" work.
>>>
>>> Is your training program a DataStream or a DataSet program?`
>>>
>>> Stephan
>>>
>>> On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a streaming machine learning job that usually runs with input
>>>> from kafka. To tweak the models I need to run on some old data from HDFS.
>>>>
>>>> Unfortunately the data on HDFS is spread out over several subfolders.
>>>> Basically I have a datum with one subfolder for each hour within those are
>>>> the actual input files I'm interested in.
>>>>
>>>> Basically what I need is a source that goes through the subfolder in
>>>> order and streams the files into the program. I'm using event timestamps so
>>>> all files in 00 need to be processed before 01.
>>>>
>>>> Has anyone an idea on how to do this?
>>>>
>>>> cheers Martin
>>>>
>>>>
>>>
>>
>


Re: streaming hdfs sub folders

2016-02-17 Thread Martin Neumann
I forgot to mention I'm using an AvroInputFormat to read the file (that
might be relevant how the flag needs to be applied)
See the code Snipped below:

DataStream inStream =
env.readFile(new AvroInputFormat(new
Path(filePath), EndSongCleanedPq.class), filePath);


On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann  wrote:

> The program is a DataStream program, it usually it gets the data from
> kafka. It's an anomaly detection program that learns from the stream
> itself. The reason I want to read from files is to test different settings
> of the algorithm and compare them.
>
> I think I don't need to reply things in the exact order (wich is not
> possible with parallel reads anyway) and I have written the program so it
> can deal with out of order events.
> I only need the subfolders to be processed roughly in order. Its fine to
> process some stuff from 01 before everything from 00 is finished, if I get
> records from all 24 subfolders at the same time things will break though.
> If I set the flag will it try to get data from all sub dir's in parallel or
> will it go sub dir by sub dir?
>
> Also can you point me to some documentation or something where I can see
> how to set the Flag?
>
> cheers Martin
>
>
>
>
> On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen  wrote:
>
>> Hi!
>>
>> Going through nested folders is pretty simple, there is a flag on the
>> FileInputFormat that makes sure those are read.
>>
>> Tricky is the part that all "00" files should be read before the "01"
>> files. If you still want parallel reads, that means you need to sync at
>> some point, wait for all parallel parts to finish with the "00" work before
>> anyone may start with the "01" work.
>>
>> Is your training program a DataStream or a DataSet program?`
>>
>> Stephan
>>
>> On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann  wrote:
>>
>>> Hi,
>>>
>>> I have a streaming machine learning job that usually runs with input
>>> from kafka. To tweak the models I need to run on some old data from HDFS.
>>>
>>> Unfortunately the data on HDFS is spread out over several subfolders.
>>> Basically I have a datum with one subfolder for each hour within those are
>>> the actual input files I'm interested in.
>>>
>>> Basically what I need is a source that goes through the subfolder in
>>> order and streams the files into the program. I'm using event timestamps so
>>> all files in 00 need to be processed before 01.
>>>
>>> Has anyone an idea on how to do this?
>>>
>>> cheers Martin
>>>
>>>
>>
>


Re: streaming hdfs sub folders

2016-02-17 Thread Martin Neumann
The program is a DataStream program, it usually it gets the data from
kafka. It's an anomaly detection program that learns from the stream
itself. The reason I want to read from files is to test different settings
of the algorithm and compare them.

I think I don't need to reply things in the exact order (wich is not
possible with parallel reads anyway) and I have written the program so it
can deal with out of order events.
I only need the subfolders to be processed roughly in order. Its fine to
process some stuff from 01 before everything from 00 is finished, if I get
records from all 24 subfolders at the same time things will break though.
If I set the flag will it try to get data from all sub dir's in parallel or
will it go sub dir by sub dir?

Also can you point me to some documentation or something where I can see
how to set the Flag?

cheers Martin




On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen  wrote:

> Hi!
>
> Going through nested folders is pretty simple, there is a flag on the
> FileInputFormat that makes sure those are read.
>
> Tricky is the part that all "00" files should be read before the "01"
> files. If you still want parallel reads, that means you need to sync at
> some point, wait for all parallel parts to finish with the "00" work before
> anyone may start with the "01" work.
>
> Is your training program a DataStream or a DataSet program?`
>
> Stephan
>
> On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann  wrote:
>
>> Hi,
>>
>> I have a streaming machine learning job that usually runs with input from
>> kafka. To tweak the models I need to run on some old data from HDFS.
>>
>> Unfortunately the data on HDFS is spread out over several subfolders.
>> Basically I have a datum with one subfolder for each hour within those are
>> the actual input files I'm interested in.
>>
>> Basically what I need is a source that goes through the subfolder in
>> order and streams the files into the program. I'm using event timestamps so
>> all files in 00 need to be processed before 01.
>>
>> Has anyone an idea on how to do this?
>>
>> cheers Martin
>>
>>
>


Re: streaming hdfs sub folders

2016-02-17 Thread Stephan Ewen
Hi!

Going through nested folders is pretty simple, there is a flag on the
FileInputFormat that makes sure those are read.

Tricky is the part that all "00" files should be read before the "01"
files. If you still want parallel reads, that means you need to sync at
some point, wait for all parallel parts to finish with the "00" work before
anyone may start with the "01" work.

Is your training program a DataStream or a DataSet program?`

Stephan

On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann  wrote:

> Hi,
>
> I have a streaming machine learning job that usually runs with input from
> kafka. To tweak the models I need to run on some old data from HDFS.
>
> Unfortunately the data on HDFS is spread out over several subfolders.
> Basically I have a datum with one subfolder for each hour within those are
> the actual input files I'm interested in.
>
> Basically what I need is a source that goes through the subfolder in order
> and streams the files into the program. I'm using event timestamps so all
> files in 00 need to be processed before 01.
>
> Has anyone an idea on how to do this?
>
> cheers Martin
>
>


streaming hdfs sub folders

2016-02-16 Thread Martin Neumann
Hi,

I have a streaming machine learning job that usually runs with input from
kafka. To tweak the models I need to run on some old data from HDFS.

Unfortunately the data on HDFS is spread out over several subfolders.
Basically I have a datum with one subfolder for each hour within those are
the actual input files I'm interested in.

Basically what I need is a source that goes through the subfolder in order
and streams the files into the program. I'm using event timestamps so all
files in 00 need to be processed before 01.

Has anyone an idea on how to do this?

cheers Martin


Re: Flink Streaming and Google Cloud Pub/Sub?

2015-09-15 Thread Robert Metzger
Hey Martin,
I don't think anybody used Google Cloud Pub/Sub with Flink yet.

There are no tutorials for implementing streaming sources and sinks, but
Flink has a few connectors that you can use as a reference.
For the sources, you basically have to extend RichSourceFunction (or
RichParallelSourceFunction).
As long as the data rate is low, you can start with a non-parallel source
implementation. This makes the implementation a bit easier in the beginning.

Let us know if you need more help.

Robert



On Mon, Sep 14, 2015 at 11:04 PM, Martin Neumann  wrote:

> Hej,
>
> Has anyone tried use connect Flink Streaming to Google Cloud Pub/Sub and
> has a code example for me?
> If I have to implement my own sources and sinks are there any good
> tutorials for that?
>
> cheers Martin
>
>
>


Flink Streaming and Google Cloud Pub/Sub?

2015-09-14 Thread Martin Neumann
Hej,

Has anyone tried use connect Flink Streaming to Google Cloud Pub/Sub and
has a code example for me?
If I have to implement my own sources and sinks are there any good
tutorials for that?

cheers Martin