Re: Why the current time of my window never reaches the end time when I use KeyedProcessFunction?

2019-06-18 Thread Piotr Nowojski
Hi,

Isn’t your problem that the source is constantly emitting the data and bumping 
your timers? Keep in mind that the code that you are basing on has the 
following characteristic:

> In the following example a KeyedProcessFunction maintains counts per key, and 
> emits a key/count pair whenever a minute passes without an update for that key

Piotrek

> On 17 Jun 2019, at 15:51, Felipe Gutierrez  
> wrote:
> 
> Hi,
> 
> I used this example of KeyedProcessFunction from the FLink website [1] and I 
> have implemented my own KeyedProcessFunction to process some approximation 
> counting [2]. This worked very well. Then I switched the data source to 
> consume strings from Twitter [3]. The data source is consuming the strings 
> because I can see it when I debug. However, the time comparison is always 
> different on the onTimer() method, and I never get the results of the window 
> processing. I don't know the exact reason that this is happening. I guess it 
> is because my state is too heavy. But, still shouldn't the time be correct at 
> some point to finish the evaluation of my window?
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example
>  
> 
> [2] 
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowSocket.java
>  
> 
> [3] 
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java
>  
> 
> 
> Kind Regards,
> Felipe
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com 
> 


Re: Has Flink a kafka processing location strategy?

2019-06-18 Thread Konstantin Knauf
Hi Theo,

no, sorry, the Kafka partitions that each subtask is assigned to is only
determined by the index of the subtask.

Best,

Konstantin



On Mon, Jun 17, 2019 at 2:57 PM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi,
>
>
>
> We have a Hadoop/YARN Cluster with Kafka and Flink/YARN running on the
> same machines.
>
>
>
> In Spark (Streaming), there is a PreferBrokers location strategy, so that
> the executors consume those kafka partitions which are served from the same
> machines kafka broker. (
> https://spark.apache.org/docs/2.4.0/streaming-kafka-0-10-integration.html#locationstrategies
> )
>
>
>
> I wonder if there is such thing in Flink as well? I didn’t find anything
> yet.
>
>
>
> Best regards
>
> Theo Diefenthal
>


-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: 20. - 21.06.2019, 10.08.2019 - 31.08.2019, 05.09. -
06.09.2010


--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Role of Job Manager

2019-06-18 Thread Pankaj Chand
I am trying to understand the role of Job Manager in Flink, and have come
across two possibly distinct interpretations.

1. The online documentation v1.8 signifies that there is at least one Job
Manager in a cluster, and it is closely tied to the cluster of machines, by
managing all jobs in that cluster of machines.

This signifies that Flink's Job Manager is much like Hadoop's Application
Manager.

2. The book, "Stream Processing with Apache Flink", writes that, "The Job
Manager is the master process that controls the execution of a single
application—each application is controlled by a different Job Manager."

This signifies that Flink defaults to one Job Manager per job, and the Job
Manager is closely tied to that single job, much like Hadoop's Application
Master for each job.

Please let me know which one is correct.

Pankaj


Re: Checkpointing & File stream with

2019-06-18 Thread Sung Gon Yi
It works well now with following codes:
——
TextInputFormat specFileFormat = new TextInputFormat(new Path(specFile));
specFileFormat.setFilesFilter(FilePathFilter.createDefaultFilter());
DataStream specificationFileStream = env
.readFile(specFileFormat, specFile, 
FileProcessingMode.PROCESS_CONTINUOUSLY, 100L, BasicTypeInfo.STRING_TYPE_INFO)
——

Thanks.

> On 18 Jun 2019, at 3:38 PM, Yun Tang  wrote:
> 
> Hi Sung
> 
> How about using FileProcessingMode.PROCESS_CONTINUOUSLY [1] as watch type 
> when reading data from HDFS.FileProcessingMode.PROCESS_CONTINUOUSLY would 
> periodically monitor the source while default FileProcessingMode.PROCESS_ONCE 
> would only process once the data and exit.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/datastream_api.html#data-sources
>  
> 
> 
> Best
> Yun Tang
> From: Sung Gon Yi 
> Sent: Tuesday, June 18, 2019 14:13
> To: user@flink.apache.org
> Subject: Checkpointing & File stream with
>  
> Hello,
> 
> I work on joining two streams, one is from Kafka and another is from a file 
> (small size).
> Stream processing works well, but checkpointing is failed with following 
> message.
> The file only has less than 100 lines and the pipeline related file reading 
> is finished with “FINISHED’ o as soon as deployed.
> 
> After that, checkpointing is failed with following message:
> ——
> 2019-06-17 20:25:13,575 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
> triggering task Source: Custom File Source (1/1) of job 
> d26afe055f249c172c1dcb3311508e83 is not in state RUNNING but FINISHED 
> instead. Aborting checkpoint.
> ——
> 
> Custom File Source is related following codes
> ——
> DataStream specificationFileStream = env.readTextFile(specFile)
> ——
> 
> To perform checkpointing successfully, I write a code of custom source 
> function to keep working (almost sleep after reading a file). I wonder it is 
> correct way.
> 
> Sincerely,
> Sung Gon



Re: How to restart/recover on reboot?

2019-06-18 Thread Till Rohrmann
When a single machine fails you should rather call `taskmanager.sh
start`/`jobmanager.sh start` to start a single process. `start-cluster.sh`
will start multiple processes on different machines.

Cheers,
Till

On Mon, Jun 17, 2019 at 4:30 PM John Smith  wrote:

> Well some reasons, machine reboots/maintenance etc... Host/VM crashes and
> restarts. And same goes for the job manager. I don't want/need to have to
> document/remember some start process for sys admins/devops.
>
> So far I have looked at ./start-cluster.sh and all it seems to do is SSH
> into all the specified nodes and starts the processes using the jobmanager
> and taskmanager scripts. I don't see anything special in any of the sh
> scripts.
> I configured passwordless ssh through terraform and all that works great
> only when trying to do the manual start through systemd. I may have
> something missing...
>
>
>
> On Mon, 17 Jun 2019 at 09:41, Till Rohrmann  wrote:
>
>> Hi John,
>>
>> I have not much experience wrt setting Flink up via systemd services. Why
>> do you want to do it like that?
>>
>> 1. In standalone mode, Flink won't automatically restart TaskManagers.
>> This only works on Yarn and Mesos atm.
>> 2. In case of a lost TaskManager, you should run `taskmanager.sh start`.
>> This script simply starts a new TaskManager process.
>> 3. I guess you could use systemd to bring up a Flink TaskManager process
>> on start up.
>>
>> Cheers,
>> Till
>>
>> On Fri, Jun 14, 2019 at 5:56 PM John Smith 
>> wrote:
>>
>>> I looked into the start-cluster.sh and I don't see anything special. So
>>> technically it should be as easy as installing Systemd services to run
>>> jobamanger.sh and taskmanager.sh respectively?
>>>
>>> On Wed, 12 Jun 2019 at 13:02, John Smith  wrote:
>>>
 The installation instructions do not indicate how to create systemd
 services.

 1- When task nodes fail, will the job leader detect this and ssh and
 restart the task node? From my testing it doesn't seem like it.
 2- How do we recover a lost node? Do we simply go back to the master
 node and run start-cluster.sh and the script is smart enough to figure out
 what is missing?
 3- Or do we need to create systemd services and if so on which command
 do we start the service on?

>>>


Re: Need for user class path accessibility on all nodes

2019-06-18 Thread Till Rohrmann
Hi Abdul,

as Biao said the `--classpath` option should only be used if you want to
make dependencies available which are not included in the submitted user
code jar. E.g. if you have installed a large library which is too costly to
ship every time you submit a job. Usually, you would not need to specify
this option if you build an uber jar.

Cheers,
Till

On Tue, Jun 18, 2019 at 7:23 AM Biao Liu  wrote:

> Ah, sorry for misunderstanding.
> So what you are asking is that why we need "--classpath"? I'm not sure
> what the original author think of it. I guess the listed below might be
> considered.
> 1. Avoid duplicated deploying. If some common jars are deployed in advance
> to each node of cluster, the jobs depend on these jars could avoid
> deploying one by one.
> 2. Support NFS which is mentioned in option description of "--classpath".
>
>
> Abdul Qadeer  于2019年6月18日周二 上午11:45写道:
>
>> Hi Biao,
>>
>> I am aware of it - that's not my question.
>>
>> On Mon, Jun 17, 2019 at 7:42 PM Biao Liu  wrote:
>>
>>> Hi Abdul, "--classpath " can be used for those are not included in
>>> user jar. If all your classes are included in your jar passed to Flink, you
>>> don't need this "--classpath".
>>>
>>> Abdul Qadeer  于2019年6月18日周二 上午3:08写道:
>>>
 Hi!

 I was going through submission of a Flink program through CLI. I see
 that "--classpath " needs to be accessible from all nodes in the
 cluster as per documentation. As I understand the jar files are already
 part of the blob uploaded to JobManager from the CLI. The TaskManagers can
 download this blob when the receive the task and access the classes from
 there. Why is there a need to be able to access these files from every node
 then? It makes sense to use Distributed File System to access these jars if
 the network is not reachable to download blob files. Or if the blob doesn't
 contain metadata to differentiate between child class loader classes and
 the rest. However it seems like the TaskManager always tries to access the
 specified class paths irrespective of Network Partitions.




Re: Why the current time of my window never reaches the end time when I use KeyedProcessFunction?

2019-06-18 Thread Felipe Gutierrez
I am sorry, I wanted to point this reference
https://stackoverflow.com/a/47071833/2096986 which implements a window on a
ProcessFunction in Flink.
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Tue, Jun 18, 2019 at 9:22 AM Piotr Nowojski  wrote:

> Hi,
>
> Isn’t your problem that the source is constantly emitting the data and
> bumping your timers? Keep in mind that the code that you are basing on has
> the following characteristic:
>
> > In the following example a KeyedProcessFunction maintains counts per
> key, and emits a key/count pair whenever a *minute passes without an
> update for that key*
>
> Piotrek
>
> On 17 Jun 2019, at 15:51, Felipe Gutierrez 
> wrote:
>
> Hi,
>
> I used this example of KeyedProcessFunction from the FLink website [1] and
> I have implemented my own KeyedProcessFunction to process some
> approximation counting [2]. This worked very well. Then I switched the data
> source to consume strings from Twitter [3]. The data source is consuming
> the strings because I can see it when I debug. However, the time comparison
> is always different on the onTimer() method, and I never get the results of
> the window processing. I don't know the exact reason that this is
> happening. I guess it is because my state is too heavy. But, still
> shouldn't the time be correct at some point to finish the evaluation of my
> window?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example
> [2]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowSocket.java
> [3]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java
>
> Kind Regards,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>
>
>


Re: Why the current time of my window never reaches the end time when I use KeyedProcessFunction?

2019-06-18 Thread Felipe Gutierrez
I achieved some enhancement based on [1]. My code is here [2]. Basically I
am using "ctx.timerService().registerProcessingTimeTimer(timeoutTime);"
inside the processElement method to trigger the onTimer method. And when
the onTimer method is triggered I clean the state using
"hllStateTwitter.clear();". However, I still have a question. I set the
time out to 5000 miliseconds and the onTimer method is triggered slightly
different. Why is it happening?

process: 1560850703025 - 1560850708025
onTimer: 1560850708025 - 1560850713017 = 4992
3> estimate cardinality: 544
process: 1560850709019 - 1560850714019
onTimer: 1560850714019 - 1560850718942 = 4923
3> estimate cardinality: 485
process: 1560850714027 - 1560850719027
onTimer: 1560850719027 - 1560850723936 = 4909
3> estimate cardinality: 438
process: 1560850719035 - 1560850724035

[1] https://stackoverflow.com/a/53646529/2096986
[2]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Tue, Jun 18, 2019 at 11:15 AM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> I am sorry, I wanted to point this reference
> https://stackoverflow.com/a/47071833/2096986 which implements a window on
> a ProcessFunction in Flink.
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>
>
> On Tue, Jun 18, 2019 at 9:22 AM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> Isn’t your problem that the source is constantly emitting the data and
>> bumping your timers? Keep in mind that the code that you are basing on has
>> the following characteristic:
>>
>> > In the following example a KeyedProcessFunction maintains counts per
>> key, and emits a key/count pair whenever a *minute passes without an
>> update for that key*
>>
>> Piotrek
>>
>> On 17 Jun 2019, at 15:51, Felipe Gutierrez 
>> wrote:
>>
>> Hi,
>>
>> I used this example of KeyedProcessFunction from the FLink website [1]
>> and I have implemented my own KeyedProcessFunction to process some
>> approximation counting [2]. This worked very well. Then I switched the data
>> source to consume strings from Twitter [3]. The data source is consuming
>> the strings because I can see it when I debug. However, the time comparison
>> is always different on the onTimer() method, and I never get the results of
>> the window processing. I don't know the exact reason that this is
>> happening. I guess it is because my state is too heavy. But, still
>> shouldn't the time be correct at some point to finish the evaluation of my
>> window?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example
>> [2]
>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowSocket.java
>> [3]
>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java
>>
>> Kind Regards,
>> Felipe
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>>
>>


Re: Calculate a 4 hour time window for sum,count with sum and count output every 5 mins

2019-06-18 Thread Felipe Gutierrez
Hi Vijay,

I managed by using
"ctx.timerService().registerProcessingTimeTimer(timeoutTime);" on the
processElement method and clearing the state on the onTimer method. This is
my program [1].

[1]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java

Kind Regards,
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Mon, Jun 17, 2019 at 8:57 PM Rafi Aroch  wrote:

> Hi Vijay,
>
> When using windows, you may use the 'trigger' to set a Custom Trigger
> which would trigger your *ProcessWindowFunction* accordingly.
>
> In your case, you would probably use:
>
>> *.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(5)))*
>>
>
> Thanks,
> Rafi
>
>
> On Mon, Jun 17, 2019 at 9:01 PM Vijay Balakrishnan 
> wrote:
>
>> I am also implementing the ProcessWindowFunction and accessing the
>> windowState to get data but how do i push data out every 5 mins during a 4
>> hr time window ?? I am adding a globalState to handle the 4 hr window ???
>> Or should I still use the context.windowState even for the 4 hr window ?
>>
>> public  class MGroupingAggregateClass extends ProcessWindowFunction<>
>>> {
>>>
>>> private MapState timedGroupKeyState;
>>> private MapState globalGroupKeyState;
>>> private final MapStateDescriptor
>>> timedMapKeyStateDescriptor =
>>>new MapStateDescriptor<>("timedGroupKeyState",
>>>String.class, Object.class);
>>> private final MapStateDescriptor
>>> globalMapKeyStateDescriptor =
>>>new MapStateDescriptor<>("globalGroupKeyState",
>>>String.class, Object.class);
>>>
>>>
>>> public void open(Configuration ..) {
>>> timedGroupKeyState =
>>> getRuntimeContext().getMapState(timedMapKeyStateDescriptor);
>>> globalGroupKeyState =
>>> getRuntimeContext().getMapState(globalMapKeyStateDescriptor);
>>> }
>>>
>>> public void process(MonitoringTuple currKey, Context context,
>>> Iterable> elements,
>>>Collector> out) throws
>>> Exception {
>>>logger.info("Entered MGroupingAggregateWindowProcessing -
>>> process interval:{}, currKey:{}", interval, currKey);
>>>timedGroupKeyState =
>>> context.windowState().getMapState(timedMapKeyStateDescriptor);
>>>globalGroupKeyState =
>>> context.globalState().getMapState(globalMapKeyStateDescriptor);
>>> ...
>>> //get data fromm state
>>> Object timedGroupStateObj = timedGroupKeyState.get(groupKey);
>>>
>>> //how do i push the data out every 5 mins to the sink during the 4 hr
>>> window ??
>>>
>>> }
>>>
>>
>>
>>
>>
>>
>>
>>
>> On Mon, Jun 17, 2019 at 10:06 AM Vijay Balakrishnan 
>> wrote:
>>
>>> Hi,
>>> Need to calculate a 4 hour time window for count, sum with current
>>> calculated results being output every 5 mins.
>>> How do i do that ?
>>> Currently, I calculate results for 5 sec and 5 min time windows fine on
>>> the KeyedStream.
>>>
>>> Time timeWindow = getTimeWindowFromInterval(interval);//eg: timeWindow =
 Time.seconds(timeIntervalL);
 KeyedStream, ...> monitoringTupleKeyedStream =
 kinesisStream.keyBy(...);
 final WindowedStream, , TimeWindow>
 windowStream =
 monitoringTupleKeyedStream
 .timeWindow(timeWindow);
 DataStream<> enrichedMGStream = windowStream.aggregate(
 new MGroupingWindowAggregateClass(...),
 new MGroupingAggregateClass())
 .map(new Monitoring...(...));
 enrichedMGStream.addSink(..);

>>>
>>>
>>> TIA,
>>> Vijay
>>>
>>


Re: How to restart/recover on reboot?

2019-06-18 Thread John Smith
Yes, that is understood. But I don't see why we cannot call jobmanager.sh
and taskmanager.sh to build the cluster and have them run as systemd units.

I looked at start-cluster.sh and all it does is SSH and call jobmanager.sh
which then cascades to taskmanager.sh I just have to pin point what's
missing to have systemd service working. In fact calling jobmanager.sh as
systemd service actually sees the shared masters, slaves and
flink-conf.yaml. But it binds to local host.

Maybe one way to do it would be to bootstrap the cluster with
./start-cluster.sh and then install systemd services for jobmanager.sh and
tsakmanager.sh

Like I said I don't want to have some process in place to remind admins
they need to manually start a node every time they patch or a host goes
down for what ever reason.

On Tue, 18 Jun 2019 at 04:31, Till Rohrmann  wrote:

> When a single machine fails you should rather call `taskmanager.sh
> start`/`jobmanager.sh start` to start a single process. `start-cluster.sh`
> will start multiple processes on different machines.
>
> Cheers,
> Till
>
> On Mon, Jun 17, 2019 at 4:30 PM John Smith  wrote:
>
>> Well some reasons, machine reboots/maintenance etc... Host/VM crashes and
>> restarts. And same goes for the job manager. I don't want/need to have to
>> document/remember some start process for sys admins/devops.
>>
>> So far I have looked at ./start-cluster.sh and all it seems to do is SSH
>> into all the specified nodes and starts the processes using the jobmanager
>> and taskmanager scripts. I don't see anything special in any of the sh
>> scripts.
>> I configured passwordless ssh through terraform and all that works great
>> only when trying to do the manual start through systemd. I may have
>> something missing...
>>
>>
>>
>> On Mon, 17 Jun 2019 at 09:41, Till Rohrmann  wrote:
>>
>>> Hi John,
>>>
>>> I have not much experience wrt setting Flink up via systemd services.
>>> Why do you want to do it like that?
>>>
>>> 1. In standalone mode, Flink won't automatically restart TaskManagers.
>>> This only works on Yarn and Mesos atm.
>>> 2. In case of a lost TaskManager, you should run `taskmanager.sh start`.
>>> This script simply starts a new TaskManager process.
>>> 3. I guess you could use systemd to bring up a Flink TaskManager process
>>> on start up.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Jun 14, 2019 at 5:56 PM John Smith 
>>> wrote:
>>>
 I looked into the start-cluster.sh and I don't see anything special. So
 technically it should be as easy as installing Systemd services to run
 jobamanger.sh and taskmanager.sh respectively?

 On Wed, 12 Jun 2019 at 13:02, John Smith 
 wrote:

> The installation instructions do not indicate how to create systemd
> services.
>
> 1- When task nodes fail, will the job leader detect this and ssh and
> restart the task node? From my testing it doesn't seem like it.
> 2- How do we recover a lost node? Do we simply go back to the master
> node and run start-cluster.sh and the script is smart enough to figure out
> what is missing?
> 3- Or do we need to create systemd services and if so on which command
> do we start the service on?
>



Side output in ProcessFunction.onTimer

2019-06-18 Thread Frank Wilson
Hi,

Is there a way to make side outputs in an onTimer callback in
ProcessFunction?

I want to side output events that belong to a session that was below the
minimum duration threshold. Currently these events are just discarded but
I’d like more traceability.

Thanks,

Frank


Flink error handling

2019-06-18 Thread Steven Nelson


Hello!

We are internally having a debate on how best to handle exceptions within our 
operators. Some advocate for wrapping maps/flatMaps inside a processfunction 
and sending the error to a side output. Other options are returning a custom 
Either that gets filtered and mapped into different sinks.  

Are there any recommendations or standard solutions to this?

-Steve

Re: Role of Job Manager

2019-06-18 Thread Eduardo Winpenny Tejedor
Hi Pankaj,

I have no experience with Hadoop but from the book I gathered there's one
Job Manager per application i.e. per jar (as in the example in the first
chapter). This is not to say there's one Job Manager per job. Actually I
don't think the word Job is defined in the book, I've seen Task defined,
and those do have Task Managers

Hope this is along the right lines

Regards,
Eduardo

On Tue, 18 Jun 2019, 08:42 Pankaj Chand,  wrote:

> I am trying to understand the role of Job Manager in Flink, and have come
> across two possibly distinct interpretations.
>
> 1. The online documentation v1.8 signifies that there is at least one Job
> Manager in a cluster, and it is closely tied to the cluster of machines, by
> managing all jobs in that cluster of machines.
>
> This signifies that Flink's Job Manager is much like Hadoop's Application
> Manager.
>
> 2. The book, "Stream Processing with Apache Flink", writes that, "The Job
> Manager is the master process that controls the execution of a single
> application—each application is controlled by a different Job Manager."
>
> This signifies that Flink defaults to one Job Manager per job, and the Job
> Manager is closely tied to that single job, much like Hadoop's Application
> Master for each job.
>
> Please let me know which one is correct.
>
> Pankaj
>


Re: Need for user class path accessibility on all nodes

2019-06-18 Thread Abdul Qadeer
Thanks Biao/Till, that answers my question.


On Tue, 18 Jun 2019 at 01:41, Till Rohrmann  wrote:

> Hi Abdul,
>
> as Biao said the `--classpath` option should only be used if you want to
> make dependencies available which are not included in the submitted user
> code jar. E.g. if you have installed a large library which is too costly to
> ship every time you submit a job. Usually, you would not need to specify
> this option if you build an uber jar.
>
> Cheers,
> Till
>
> On Tue, Jun 18, 2019 at 7:23 AM Biao Liu  wrote:
>
>> Ah, sorry for misunderstanding.
>> So what you are asking is that why we need "--classpath"? I'm not sure
>> what the original author think of it. I guess the listed below might be
>> considered.
>> 1. Avoid duplicated deploying. If some common jars are deployed in
>> advance to each node of cluster, the jobs depend on these jars could avoid
>> deploying one by one.
>> 2. Support NFS which is mentioned in option description of "--classpath".
>>
>>
>> Abdul Qadeer  于2019年6月18日周二 上午11:45写道:
>>
>>> Hi Biao,
>>>
>>> I am aware of it - that's not my question.
>>>
>>> On Mon, Jun 17, 2019 at 7:42 PM Biao Liu  wrote:
>>>
 Hi Abdul, "--classpath " can be used for those are not included in
 user jar. If all your classes are included in your jar passed to Flink, you
 don't need this "--classpath".

 Abdul Qadeer  于2019年6月18日周二 上午3:08写道:

> Hi!
>
> I was going through submission of a Flink program through CLI. I see
> that "--classpath " needs to be accessible from all nodes in the
> cluster as per documentation. As I understand the jar files are already
> part of the blob uploaded to JobManager from the CLI. The TaskManagers can
> download this blob when the receive the task and access the classes from
> there. Why is there a need to be able to access these files from every 
> node
> then? It makes sense to use Distributed File System to access these jars 
> if
> the network is not reachable to download blob files. Or if the blob 
> doesn't
> contain metadata to differentiate between child class loader classes and
> the rest. However it seems like the TaskManager always tries to access the
> specified class paths irrespective of Network Partitions.
>
>


Re: How to restart/recover on reboot?

2019-06-18 Thread Till Rohrmann
I guess it should work if you installed a systemd service which simply
calls `jobmanager.sh start` or `taskmanager.sh start`.

Cheers,
Till

On Tue, Jun 18, 2019 at 4:29 PM John Smith  wrote:

> Yes, that is understood. But I don't see why we cannot call jobmanager.sh
> and taskmanager.sh to build the cluster and have them run as systemd units.
>
> I looked at start-cluster.sh and all it does is SSH and call jobmanager.sh
> which then cascades to taskmanager.sh I just have to pin point what's
> missing to have systemd service working. In fact calling jobmanager.sh as
> systemd service actually sees the shared masters, slaves and
> flink-conf.yaml. But it binds to local host.
>
> Maybe one way to do it would be to bootstrap the cluster with
> ./start-cluster.sh and then install systemd services for jobmanager.sh and
> tsakmanager.sh
>
> Like I said I don't want to have some process in place to remind admins
> they need to manually start a node every time they patch or a host goes
> down for what ever reason.
>
> On Tue, 18 Jun 2019 at 04:31, Till Rohrmann  wrote:
>
>> When a single machine fails you should rather call `taskmanager.sh
>> start`/`jobmanager.sh start` to start a single process. `start-cluster.sh`
>> will start multiple processes on different machines.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jun 17, 2019 at 4:30 PM John Smith 
>> wrote:
>>
>>> Well some reasons, machine reboots/maintenance etc... Host/VM crashes
>>> and restarts. And same goes for the job manager. I don't want/need to have
>>> to document/remember some start process for sys admins/devops.
>>>
>>> So far I have looked at ./start-cluster.sh and all it seems to do is SSH
>>> into all the specified nodes and starts the processes using the jobmanager
>>> and taskmanager scripts. I don't see anything special in any of the sh
>>> scripts.
>>> I configured passwordless ssh through terraform and all that works great
>>> only when trying to do the manual start through systemd. I may have
>>> something missing...
>>>
>>>
>>>
>>> On Mon, 17 Jun 2019 at 09:41, Till Rohrmann 
>>> wrote:
>>>
 Hi John,

 I have not much experience wrt setting Flink up via systemd services.
 Why do you want to do it like that?

 1. In standalone mode, Flink won't automatically restart TaskManagers.
 This only works on Yarn and Mesos atm.
 2. In case of a lost TaskManager, you should run `taskmanager.sh
 start`. This script simply starts a new TaskManager process.
 3. I guess you could use systemd to bring up a Flink TaskManager
 process on start up.

 Cheers,
 Till

 On Fri, Jun 14, 2019 at 5:56 PM John Smith 
 wrote:

> I looked into the start-cluster.sh and I don't see anything special.
> So technically it should be as easy as installing Systemd services to run
> jobamanger.sh and taskmanager.sh respectively?
>
> On Wed, 12 Jun 2019 at 13:02, John Smith 
> wrote:
>
>> The installation instructions do not indicate how to create systemd
>> services.
>>
>> 1- When task nodes fail, will the job leader detect this and ssh and
>> restart the task node? From my testing it doesn't seem like it.
>> 2- How do we recover a lost node? Do we simply go back to the master
>> node and run start-cluster.sh and the script is smart enough to figure 
>> out
>> what is missing?
>> 3- Or do we need to create systemd services and if so on which
>> command do we start the service on?
>>
>


Re: [EXTERNAL] Re: How to restart/recover on reboot?

2019-06-18 Thread PoolakkalMukkath, Shakir
Hi Tim,John,

I do agree with the issue John mentioned and have the same problem.

We can only start a standalone HA cluster with ./start-cluster.sh script. And 
then when there are failures, we can restart those components individually by 
calling jobmanager.sh/ jobmanager.sh.  This works great

But , Like John mentioned, If we want to start the cluster initially itself by 
running the jobmanager.sh on each JobManager nodes, it is not working. It binds 
to local and not forming the HA cluster.

Thanks,
Shakir

From: Till Rohrmann 
Date: Tuesday, June 18, 2019 at 4:23 PM
To: John Smith 
Cc: user 
Subject: [EXTERNAL] Re: How to restart/recover on reboot?

I guess it should work if you installed a systemd service which simply calls 
`jobmanager.sh start` or `taskmanager.sh start`.

Cheers,
Till

On Tue, Jun 18, 2019 at 4:29 PM John Smith 
mailto:java.dev@gmail.com>> wrote:
Yes, that is understood. But I don't see why we cannot call jobmanager.sh and 
taskmanager.sh to build the cluster and have them run as systemd units.

I looked at start-cluster.sh and all it does is SSH and call jobmanager.sh 
which then cascades to taskmanager.sh I just have to pin point what's missing 
to have systemd service working. In fact calling jobmanager.sh as systemd 
service actually sees the shared masters, slaves and flink-conf.yaml. But it 
binds to local host.

Maybe one way to do it would be to bootstrap the cluster with 
./start-cluster.sh and then install systemd services for jobmanager.sh and 
tsakmanager.sh

Like I said I don't want to have some process in place to remind admins they 
need to manually start a node every time they patch or a host goes down for 
what ever reason.

On Tue, 18 Jun 2019 at 04:31, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
When a single machine fails you should rather call `taskmanager.sh 
start`/`jobmanager.sh start` to start a single process. `start-cluster.sh` will 
start multiple processes on different machines.

Cheers,
Till

On Mon, Jun 17, 2019 at 4:30 PM John Smith 
mailto:java.dev@gmail.com>> wrote:
Well some reasons, machine reboots/maintenance etc... Host/VM crashes and 
restarts. And same goes for the job manager. I don't want/need to have to 
document/remember some start process for sys admins/devops.

So far I have looked at ./start-cluster.sh and all it seems to do is SSH into 
all the specified nodes and starts the processes using the jobmanager and 
taskmanager scripts. I don't see anything special in any of the sh scripts.
I configured passwordless ssh through terraform and all that works great only 
when trying to do the manual start through systemd. I may have something 
missing...


On Mon, 17 Jun 2019 at 09:41, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
Hi John,

I have not much experience wrt setting Flink up via systemd services. Why do 
you want to do it like that?

1. In standalone mode, Flink won't automatically restart TaskManagers. This 
only works on Yarn and Mesos atm.
2. In case of a lost TaskManager, you should run `taskmanager.sh start`. This 
script simply starts a new TaskManager process.
3. I guess you could use systemd to bring up a Flink TaskManager process on 
start up.

Cheers,
Till

On Fri, Jun 14, 2019 at 5:56 PM John Smith 
mailto:java.dev@gmail.com>> wrote:
I looked into the start-cluster.sh and I don't see anything special. So 
technically it should be as easy as installing Systemd services to run 
jobamanger.sh and taskmanager.sh respectively?

On Wed, 12 Jun 2019 at 13:02, John Smith 
mailto:java.dev@gmail.com>> wrote:
The installation instructions do not indicate how to create systemd services.

1- When task nodes fail, will the job leader detect this and ssh and restart 
the task node? From my testing it doesn't seem like it.
2- How do we recover a lost node? Do we simply go back to the master node and 
run start-cluster.sh and the script is smart enough to figure out what is 
missing?
3- Or do we need to create systemd services and if so on which command do we 
start the service on?


RE: [EXTERNAL] Re: How to restart/recover on reboot?

2019-06-18 Thread Martin, Nick
Jobmanager.sh takes an optional argument for the hostname to bind to, and 
start-cluster uses it. If you leave it blank it, the script will use whatever 
is in flink-conf.yaml (localhost is the default value that ships with flink).

The dockerized version of flink runs pretty much the way you’re trying to 
operate (i.e. each node starts itself), so the entrypoint script out of that is 
probably a good source of information about how to set it up.

From: PoolakkalMukkath, Shakir [mailto:shakir_poolakkalmukk...@comcast.com]
Sent: Tuesday, June 18, 2019 2:15 PM
To: Till Rohrmann ; John Smith 
Cc: user 
Subject: EXT :Re: [EXTERNAL] Re: How to restart/recover on reboot?

Hi Tim,John,

I do agree with the issue John mentioned and have the same problem.

We can only start a standalone HA cluster with ./start-cluster.sh script. And 
then when there are failures, we can restart those components individually by 
calling jobmanager.sh/ jobmanager.sh.  This works great

But , Like John mentioned, If we want to start the cluster initially itself by 
running the jobmanager.sh on each JobManager nodes, it is not working. It binds 
to local and not forming the HA cluster.

Thanks,
Shakir

From: Till Rohrmann mailto:trohrm...@apache.org>>
Date: Tuesday, June 18, 2019 at 4:23 PM
To: John Smith mailto:java.dev@gmail.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: [EXTERNAL] Re: How to restart/recover on reboot?

I guess it should work if you installed a systemd service which simply calls 
`jobmanager.sh start` or `taskmanager.sh start`.

Cheers,
Till

On Tue, Jun 18, 2019 at 4:29 PM John Smith 
mailto:java.dev@gmail.com>> wrote:
Yes, that is understood. But I don't see why we cannot call jobmanager.sh and 
taskmanager.sh to build the cluster and have them run as systemd units.

I looked at start-cluster.sh and all it does is SSH and call jobmanager.sh 
which then cascades to taskmanager.sh I just have to pin point what's missing 
to have systemd service working. In fact calling jobmanager.sh as systemd 
service actually sees the shared masters, slaves and flink-conf.yaml. But it 
binds to local host.

Maybe one way to do it would be to bootstrap the cluster with 
./start-cluster.sh and then install systemd services for jobmanager.sh and 
tsakmanager.sh

Like I said I don't want to have some process in place to remind admins they 
need to manually start a node every time they patch or a host goes down for 
what ever reason.

On Tue, 18 Jun 2019 at 04:31, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
When a single machine fails you should rather call `taskmanager.sh 
start`/`jobmanager.sh start` to start a single process. `start-cluster.sh` will 
start multiple processes on different machines.

Cheers,
Till

On Mon, Jun 17, 2019 at 4:30 PM John Smith 
mailto:java.dev@gmail.com>> wrote:
Well some reasons, machine reboots/maintenance etc... Host/VM crashes and 
restarts. And same goes for the job manager. I don't want/need to have to 
document/remember some start process for sys admins/devops.

So far I have looked at ./start-cluster.sh and all it seems to do is SSH into 
all the specified nodes and starts the processes using the jobmanager and 
taskmanager scripts. I don't see anything special in any of the sh scripts.
I configured passwordless ssh through terraform and all that works great only 
when trying to do the manual start through systemd. I may have something 
missing...

On Mon, 17 Jun 2019 at 09:41, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
Hi John,

I have not much experience wrt setting Flink up via systemd services. Why do 
you want to do it like that?

1. In standalone mode, Flink won't automatically restart TaskManagers. This 
only works on Yarn and Mesos atm.
2. In case of a lost TaskManager, you should run `taskmanager.sh start`. This 
script simply starts a new TaskManager process.
3. I guess you could use systemd to bring up a Flink TaskManager process on 
start up.

Cheers,
Till

On Fri, Jun 14, 2019 at 5:56 PM John Smith 
mailto:java.dev@gmail.com>> wrote:
I looked into the start-cluster.sh and I don't see anything special. So 
technically it should be as easy as installing Systemd services to run 
jobamanger.sh and taskmanager.sh respectively?

On Wed, 12 Jun 2019 at 13:02, John Smith 
mailto:java.dev@gmail.com>> wrote:
The installation instructions do not indicate how to create systemd services.

1- When task nodes fail, will the job leader detect this and ssh and restart 
the task node? From my testing it doesn't seem like it.
2- How do we recover a lost node? Do we simply go back to the master node and 
run start-cluster.sh and the script is smart enough to figure out what is 
missing?
3- Or do we need to create systemd services and if so on which command do we 
start the service on?


--

Notice: This e-mail is intended solely for use of the individual or entity to 
which it is addres

Re: How to build dependencies and connections between stream jobs?

2019-06-18 Thread 徐涛
Hi Knauf,
The solution that I can think of to coordinate between different stream 
jobs is :
For example there are two streaming jobs, Job_1 and Job_2:
Job_1:   receive data from the original kafka topic,  TOPIC_ORIG  for 
example, sink the data to another kafka topic, TOPIC_JOB_1_SINK for example. It 
should be mentioned that:  ① I implement a retract kafka sink   ②I do not use 
kafka exactly-once sink ③ every record in the TOPIC_JOB_1_SINK should have one 
unique key.  ④ each record with the same key should be send to the same kafka 
partition.
Job_2:  receive data from TOPIC_JOB_1_SINK, first group by the unique 
key and get the latest value, then go on with the logic of job 2 , finally sink 
the data to final sink(es, hbase, mysql for example)
 Here I group by unique key first, because Job_1 may fail 
and retry, so some dirty data may be included in the TOPIC_JOB_1_SINK.

So from the overview:
Job_1   
Job_2

-
 
---
|  TOPIC_ORIG -> Logic_Job_1 -> TOPIC_JOB_1_SINK  |   
——> | TOPIC_JOB_1_SINK -> GROUP_BY_UNIQUE_KEY_GET_LATEST -> Logic_Job_2 
-> FINAL_JOB_2_SINK|

-
 
---


Would you please help review the solution, if there are some better 
solutions, kindly let me know about it , thank you.


Best 
Henry

> 在 2019年6月3日,下午4:01,Konstantin Knauf  写道:
> 
> Hi Henry, 
> 
> Apache Kafka or other message queue like Apache Pulsar or AWS Kinesis are in 
> general the most common way to connect multiple streaming jobs. The 
> dependencies between streaming jobs are in my experience of a different 
> nature though. For batch jobs, it makes sense to schedule one after the other 
> or having more complicated relationships. Streaming jobs are all processing 
> data continuously, so the "coordination" happens on a different level. 
> 
> To avoid duplication, you can use the Kafka exactly-once sink, but this comes 
> with a latency penalty (transactions are only committed on checkpoint 
> completion). 
> 
> Generally, I would advise to always attach meaningful timestamps to your 
> records, so that you can use watermarking [1] to trade off between latency 
> and completeness. These could also be used to identify late records 
> (resulting from catch up after recovers), which should be ignored by 
> downstream jobs. 
> 
> There are other users, who assign a unique ID to every message going through 
> there system and only use idempotent operations (set operations) within 
> Flink, because messages are sometimes already duplicated before reaching the 
> stream processor. For downstream jobs, where an upstream job might duplicate 
> records, this could be a viable, yet limiting, approach as well. 
> 
> Hope this helps and let me know, what you think. 
> 
> Cheers, 
> 
> Konstantin
> 
> 
> 
> 
> 
> 
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#event-time-and-watermarks
>  
> 
>  
> 
> 
> 
> 
> 
> On Thu, May 30, 2019 at 11:39 AM 徐涛  > wrote:
> Hi Experts,
> In batch computing, there are products like Azkaban or airflow to 
> manage batch job dependencies. By using the dependency management tool, we 
> can build a large-scale system consist of small jobs.
> In stream processing, it is not practical to put all dependencies in 
> one job, because it will make the job being too complicated, and the state is 
> too large. I want to build a large-scale realtime system which is consist of 
> many Kafka sources and many streaming jobs, but the first thing I can think 
> of is how to build the dependencies and connections between streaming jobs. 
> The only method I can think of is using a self-implemented retract 
> Kafka sink, each streaming job is connected by Kafka topic. But because each 
> job may fail and retry, for example, the message in Kafka topic may look like 
> this:
> { “retract”:”false”, “id”:”1”, “amount”:100 }
> { “retract”:”false”, “id”:”2”, “amount”:200 }
> { “retract”:”true”, “id”:”1”, “amount”:100 }
> { “retract”:”true”, “id”:”2”, “amount”:200 }
> { “retract”:”false”, “id”:”1”, “amount”:100 }
> { “retract”:”fa

Re: [EXTERNAL] Re: How to restart/recover on reboot?

2019-06-18 Thread PoolakkalMukkath, Shakir
Hi Nick,

It works that way by explicitly setting the –host. I got mislead by the “only” 
word in doc and did not try. Thanks for the help

Thanks,
Shakir
From: "Martin, Nick" 
Date: Tuesday, June 18, 2019 at 6:31 PM
To: "PoolakkalMukkath, Shakir" , Till 
Rohrmann , John Smith 
Cc: user 
Subject: RE: [EXTERNAL] Re: How to restart/recover on reboot?

Jobmanager.sh takes an optional argument for the hostname to bind to, and 
start-cluster uses it. If you leave it blank it, the script will use whatever 
is in flink-conf.yaml (localhost is the default value that ships with flink).

The dockerized version of flink runs pretty much the way you’re trying to 
operate (i.e. each node starts itself), so the entrypoint script out of that is 
probably a good source of information about how to set it up.

From: PoolakkalMukkath, Shakir [mailto:shakir_poolakkalmukk...@comcast.com]
Sent: Tuesday, June 18, 2019 2:15 PM
To: Till Rohrmann ; John Smith 
Cc: user 
Subject: EXT :Re: [EXTERNAL] Re: How to restart/recover on reboot?

Hi Tim,John,

I do agree with the issue John mentioned and have the same problem.

We can only start a standalone HA cluster with ./start-cluster.sh script. And 
then when there are failures, we can restart those components individually by 
calling jobmanager.sh/ jobmanager.sh.  This works great

But , Like John mentioned, If we want to start the cluster initially itself by 
running the jobmanager.sh on each JobManager nodes, it is not working. It binds 
to local and not forming the HA cluster.

Thanks,
Shakir

From: Till Rohrmann mailto:trohrm...@apache.org>>
Date: Tuesday, June 18, 2019 at 4:23 PM
To: John Smith mailto:java.dev@gmail.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: [EXTERNAL] Re: How to restart/recover on reboot?

I guess it should work if you installed a systemd service which simply calls 
`jobmanager.sh start` or `taskmanager.sh start`.

Cheers,
Till

On Tue, Jun 18, 2019 at 4:29 PM John Smith 
mailto:java.dev@gmail.com>> wrote:
Yes, that is understood. But I don't see why we cannot call jobmanager.sh and 
taskmanager.sh to build the cluster and have them run as systemd units.

I looked at start-cluster.sh and all it does is SSH and call jobmanager.sh 
which then cascades to taskmanager.sh I just have to pin point what's missing 
to have systemd service working. In fact calling jobmanager.sh as systemd 
service actually sees the shared masters, slaves and flink-conf.yaml. But it 
binds to local host.

Maybe one way to do it would be to bootstrap the cluster with 
./start-cluster.sh and then install systemd services for jobmanager.sh and 
tsakmanager.sh

Like I said I don't want to have some process in place to remind admins they 
need to manually start a node every time they patch or a host goes down for 
what ever reason.

On Tue, 18 Jun 2019 at 04:31, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
When a single machine fails you should rather call `taskmanager.sh 
start`/`jobmanager.sh start` to start a single process. `start-cluster.sh` will 
start multiple processes on different machines.

Cheers,
Till

On Mon, Jun 17, 2019 at 4:30 PM John Smith 
mailto:java.dev@gmail.com>> wrote:
Well some reasons, machine reboots/maintenance etc... Host/VM crashes and 
restarts. And same goes for the job manager. I don't want/need to have to 
document/remember some start process for sys admins/devops.

So far I have looked at ./start-cluster.sh and all it seems to do is SSH into 
all the specified nodes and starts the processes using the jobmanager and 
taskmanager scripts. I don't see anything special in any of the sh scripts.
I configured passwordless ssh through terraform and all that works great only 
when trying to do the manual start through systemd. I may have something 
missing...

On Mon, 17 Jun 2019 at 09:41, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
Hi John,

I have not much experience wrt setting Flink up via systemd services. Why do 
you want to do it like that?

1. In standalone mode, Flink won't automatically restart TaskManagers. This 
only works on Yarn and Mesos atm.
2. In case of a lost TaskManager, you should run `taskmanager.sh start`. This 
script simply starts a new TaskManager process.
3. I guess you could use systemd to bring up a Flink TaskManager process on 
start up.

Cheers,
Till

On Fri, Jun 14, 2019 at 5:56 PM John Smith 
mailto:java.dev@gmail.com>> wrote:
I looked into the start-cluster.sh and I don't see anything special. So 
technically it should be as easy as installing Systemd services to run 
jobamanger.sh and taskmanager.sh respectively?

On Wed, 12 Jun 2019 at 13:02, John Smith 
mailto:java.dev@gmail.com>> wrote:
The installation instructions do not indicate how to create systemd services.

1- When task nodes fail, will the job leader detect this and ssh and restart 
the task node? From my testing it doesn't seem like it.
2- How do we recover a lost node? Do we simply go back to the master node and 
run start-c

Re: [EXTERNAL] Re: How to restart/recover on reboot?

2019-06-18 Thread John Smith
Ah ok we need to pass --host. The command line help sais jobmanager.sh
?!?! If I recall. I have to go check tomorrow...

On Tue., Jun. 18, 2019, 10:05 p.m. PoolakkalMukkath, Shakir, <
shakir_poolakkalmukk...@comcast.com> wrote:

> Hi Nick,
>
>
>
> It works that way by explicitly setting the –host. I got mislead by the
> *“only”* word in doc and did not try. Thanks for the help
>
>
>
> Thanks,
>
> Shakir
>
> *From: *"Martin, Nick" 
> *Date: *Tuesday, June 18, 2019 at 6:31 PM
> *To: *"PoolakkalMukkath, Shakir" ,
> Till Rohrmann , John Smith 
> *Cc: *user 
> *Subject: *RE: [EXTERNAL] Re: How to restart/recover on reboot?
>
>
>
> Jobmanager.sh takes an optional argument for the hostname to bind to, and
> start-cluster uses it. If you leave it blank it, the script will use
> whatever is in flink-conf.yaml (localhost is the default value that ships
> with flink).
>
>
>
> The dockerized version of flink runs pretty much the way you’re trying to
> operate (i.e. each node starts itself), so the entrypoint script out of
> that is probably a good source of information about how to set it up.
>
>
>
> *From:* PoolakkalMukkath, Shakir [mailto:
> shakir_poolakkalmukk...@comcast.com]
> *Sent:* Tuesday, June 18, 2019 2:15 PM
> *To:* Till Rohrmann ; John Smith <
> java.dev@gmail.com>
> *Cc:* user 
> *Subject:* EXT :Re: [EXTERNAL] Re: How to restart/recover on reboot?
>
>
>
> Hi Tim,John,
>
>
>
> I do agree with the issue John mentioned and have the same problem.
>
>
>
> We can only *start* a standalone HA cluster with ./start-cluster.sh
> script. And then when there are failures, we can *restart* those
> components individually by calling jobmanager.sh/ jobmanager.sh.  This
> works great
>
> But , Like John mentioned, If we want to start the cluster initially
> itself by running the jobmanager.sh on each JobManager nodes, it is not
> working. It binds to local and not forming the HA cluster.
>
>
>
> Thanks,
>
> Shakir
>
>
>
> *From: *Till Rohrmann 
> *Date: *Tuesday, June 18, 2019 at 4:23 PM
> *To: *John Smith 
> *Cc: *user 
> *Subject: *[EXTERNAL] Re: How to restart/recover on reboot?
>
>
>
> I guess it should work if you installed a systemd service which simply
> calls `jobmanager.sh start` or `taskmanager.sh start`.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Tue, Jun 18, 2019 at 4:29 PM John Smith  wrote:
>
> Yes, that is understood. But I don't see why we cannot call jobmanager.sh
> and taskmanager.sh to build the cluster and have them run as systemd units.
>
> I looked at start-cluster.sh and all it does is SSH and call jobmanager.sh
> which then cascades to taskmanager.sh I just have to pin point what's
> missing to have systemd service working. In fact calling jobmanager.sh as
> systemd service actually sees the shared masters, slaves and
> flink-conf.yaml. But it binds to local host.
>
>
>
> Maybe one way to do it would be to bootstrap the cluster with
> ./start-cluster.sh and then install systemd services for jobmanager.sh and
> tsakmanager.sh
>
>
>
> Like I said I don't want to have some process in place to remind admins
> they need to manually start a node every time they patch or a host goes
> down for what ever reason.
>
>
>
> On Tue, 18 Jun 2019 at 04:31, Till Rohrmann  wrote:
>
> When a single machine fails you should rather call `taskmanager.sh
> start`/`jobmanager.sh start` to start a single process. `start-cluster.sh`
> will start multiple processes on different machines.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Mon, Jun 17, 2019 at 4:30 PM John Smith  wrote:
>
> Well some reasons, machine reboots/maintenance etc... Host/VM crashes and
> restarts. And same goes for the job manager. I don't want/need to have to
> document/remember some start process for sys admins/devops.
>
> So far I have looked at ./start-cluster.sh and all it seems to do is SSH
> into all the specified nodes and starts the processes using the jobmanager
> and taskmanager scripts. I don't see anything special in any of the sh
> scripts.
> I configured passwordless ssh through terraform and all that works great
> only when trying to do the manual start through systemd. I may have
> something missing...
>
>
>
> On Mon, 17 Jun 2019 at 09:41, Till Rohrmann  wrote:
>
> Hi John,
>
>
>
> I have not much experience wrt setting Flink up via systemd services. Why
> do you want to do it like that?
>
>
>
> 1. In standalone mode, Flink won't automatically restart TaskManagers.
> This only works on Yarn and Mesos atm.
>
> 2. In case of a lost TaskManager, you should run `taskmanager.sh start`.
> This script simply starts a new TaskManager process.
>
> 3. I guess you could use systemd to bring up a Flink TaskManager process
> on start up.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Fri, Jun 14, 2019 at 5:56 PM John Smith  wrote:
>
> I looked into the start-cluster.sh and I don't see anything special. So
> technically it should be as easy as installing Systemd services to run
> jobamanger.sh and taskmanager.sh respectively?
>
>
>
> On Wed, 12 Jun 2019 at 13:02, John Smi

unsubscribe

2019-06-18 Thread Sheel Pancholi



Re: Role of Job Manager

2019-06-18 Thread Biao Liu
Hi Pankaj,

That's really a good question. There was a refactor of architecture
before[1]. So there might be some descriptions used the outdated concept.

Before refactoring, Job Manager is a centralized role. It controls whole
cluster and all jobs which is described in your interpretation 1.

After refactoring, the old Job Manager is separated into several roles,
Resource Manager, Dispatcher, new Job Manager, etc. The new Job Manager is
responsible for only one job, which is described in your interpretation 2.

So the document you refer to is outdated. Would you mind telling us the URL
of this document? I think we should update it to avoid misleading more
people.

1. https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077

Eduardo Winpenny Tejedor  于2019年6月19日周三
上午1:12写道:

> Hi Pankaj,
>
> I have no experience with Hadoop but from the book I gathered there's one
> Job Manager per application i.e. per jar (as in the example in the first
> chapter). This is not to say there's one Job Manager per job. Actually I
> don't think the word Job is defined in the book, I've seen Task defined,
> and those do have Task Managers
>
> Hope this is along the right lines
>
> Regards,
> Eduardo
>
> On Tue, 18 Jun 2019, 08:42 Pankaj Chand, 
> wrote:
>
>> I am trying to understand the role of Job Manager in Flink, and have come
>> across two possibly distinct interpretations.
>>
>> 1. The online documentation v1.8 signifies that there is at least one Job
>> Manager in a cluster, and it is closely tied to the cluster of machines, by
>> managing all jobs in that cluster of machines.
>>
>> This signifies that Flink's Job Manager is much like Hadoop's Application
>> Manager.
>>
>> 2. The book, "Stream Processing with Apache Flink", writes that, "The
>> Job Manager is the master process that controls the execution of a single
>> application—each application is controlled by a different Job Manager."
>>
>> This signifies that Flink defaults to one Job Manager per job, and the
>> Job Manager is closely tied to that single job, much like Hadoop's
>> Application Master for each job.
>>
>> Please let me know which one is correct.
>>
>> Pankaj
>>
>