Re: Data Stream union error after upgrading from 0.9 to 0.10.1

2015-12-03 Thread Robert Metzger
Hi Welly,

the fix has been merged and should be available in 0.10-SNAPSHOT.

On Wed, Dec 2, 2015 at 10:12 AM, Maximilian Michels  wrote:

> Hi Welly,
>
> We still have to decide on the next release date but I would expect
> Flink 0.10.2 within the next weeks. If you can't work around the union
> limitation, you may build your own Flink either from the master or the
> release-0.10 branch which will eventually be Flink 0.10.2.
>
> Cheers,
> Max
>
> On Tue, Dec 1, 2015 at 12:04 PM, Welly Tambunan  wrote:
> > Thanks a lot Aljoscha.
> >
> > When it will be released ?
> >
> > Cheers
> >
> > On Tue, Dec 1, 2015 at 5:48 PM, Aljoscha Krettek 
> > wrote:
> >>
> >> Hi,
> >> I relaxed the restrictions on union. This should make it into an
> upcoming
> >> 0.10.2 bugfix release.
> >>
> >> Cheers,
> >> Aljoscha
> >> > On 01 Dec 2015, at 11:23, Welly Tambunan  wrote:
> >> >
> >> > Hi All,
> >> >
> >> > After upgrading our system to the latest version from 0.9 to 0.10.1 we
> >> > have this following error.
> >> >
> >> > Exception in thread "main" java.lang.UnsupportedOperationException: A
> >> > DataStream cannot be unioned with itself
> >> >
> >> > Then i find the relevant JIRA for this one.
> >> > https://issues.apache.org/jira/browse/FLINK-3080
> >> >
> >> > Is there any plan which release this will be ?
> >> >
> >> >
> >> > Another issue i have after upgrading is can't union with different
> level
> >> > of parallelism.
> >> >
> >> > I think we will need to fall back to 0.9 again for the time being.
> >> >
> >> > Cheers
> >> >
> >> > --
> >> > Welly Tambunan
> >> > Triplelands
> >> >
> >> > http://weltam.wordpress.com
> >> > http://www.triplelands.com
> >>
> >
> >
> >
> > --
> > Welly Tambunan
> > Triplelands
> >
> > http://weltam.wordpress.com
> > http://www.triplelands.com
>


HA Mode and standalone containers compatibility ?

2015-12-03 Thread LINZ, Arnaud
Hello,



I have both streaming applications & batch applications. Since the memory needs 
are not the same, I was using a long-living container for my streaming apps and 
new short-lived containers for hosting each batch execution.



For that, I submit streaming jobs with "flink run"  and batch jobs with "flink 
run -m yarn-cluster"



This was working fine until I turned zookeeper HA mode on for my streaming 
applications.

Even if I don't set it up in the yaml flink configuration file, but with -D 
options on the yarn_session.sh command line, now my batch jobs try to run in 
the streaming container, and fails because of the lack of ressources.



My HA options are :

-Dyarn.application-attempts=10 -Drecovery.mode=zookeeper 
-Drecovery.zookeeper.quorum=h1r1en01:2181 -Drecovery.zookeeper.path.root=/flink 
 -Dstate.backend=filesystem 
-Dstate.backend.fs.checkpointdir=hdfs:///tmp/flink/checkpoints 
-Drecovery.zookeeper.storageDir=hdfs:///tmp/flink/recovery/



Am I missing something ?



Best regards,

Aranud



L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.


Re: Data Stream union error after upgrading from 0.9 to 0.10.1

2015-12-03 Thread Welly Tambunan
Hi Robert,

Thanks for the update.

We will update to this version.

Cheers

On Thu, Dec 3, 2015 at 3:49 PM, Robert Metzger  wrote:

> Hi Welly,
>
> the fix has been merged and should be available in 0.10-SNAPSHOT.
>
> On Wed, Dec 2, 2015 at 10:12 AM, Maximilian Michels 
> wrote:
>
>> Hi Welly,
>>
>> We still have to decide on the next release date but I would expect
>> Flink 0.10.2 within the next weeks. If you can't work around the union
>> limitation, you may build your own Flink either from the master or the
>> release-0.10 branch which will eventually be Flink 0.10.2.
>>
>> Cheers,
>> Max
>>
>> On Tue, Dec 1, 2015 at 12:04 PM, Welly Tambunan 
>> wrote:
>> > Thanks a lot Aljoscha.
>> >
>> > When it will be released ?
>> >
>> > Cheers
>> >
>> > On Tue, Dec 1, 2015 at 5:48 PM, Aljoscha Krettek 
>> > wrote:
>> >>
>> >> Hi,
>> >> I relaxed the restrictions on union. This should make it into an
>> upcoming
>> >> 0.10.2 bugfix release.
>> >>
>> >> Cheers,
>> >> Aljoscha
>> >> > On 01 Dec 2015, at 11:23, Welly Tambunan  wrote:
>> >> >
>> >> > Hi All,
>> >> >
>> >> > After upgrading our system to the latest version from 0.9 to 0.10.1
>> we
>> >> > have this following error.
>> >> >
>> >> > Exception in thread "main" java.lang.UnsupportedOperationException: A
>> >> > DataStream cannot be unioned with itself
>> >> >
>> >> > Then i find the relevant JIRA for this one.
>> >> > https://issues.apache.org/jira/browse/FLINK-3080
>> >> >
>> >> > Is there any plan which release this will be ?
>> >> >
>> >> >
>> >> > Another issue i have after upgrading is can't union with different
>> level
>> >> > of parallelism.
>> >> >
>> >> > I think we will need to fall back to 0.9 again for the time being.
>> >> >
>> >> > Cheers
>> >> >
>> >> > --
>> >> > Welly Tambunan
>> >> > Triplelands
>> >> >
>> >> > http://weltam.wordpress.com
>> >> > http://www.triplelands.com
>> >>
>> >
>> >
>> >
>> > --
>> > Welly Tambunan
>> > Triplelands
>> >
>> > http://weltam.wordpress.com
>> > http://www.triplelands.com
>>
>
>


-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Re: HA Mode and standalone containers compatibility ?

2015-12-03 Thread Till Rohrmann
Hi Arnaud,

as long as you don't have HA activated for your batch jobs, HA shouldn't
have an influence on the batch execution. If it interferes, then you should
see additional task manager connected to the streaming cluster when you
execute the batch job. Could you check that? Furthermore, could you check
that actually a second yarn application is started when you run the batch
jobs?

Cheers,
Till

On Thu, Dec 3, 2015 at 9:57 AM, LINZ, Arnaud 
wrote:

> Hello,
>
>
>
> I have both streaming applications & batch applications. Since the memory
> needs are not the same, I was using a long-living container for my
> streaming apps and new short-lived containers for hosting each batch
> execution.
>
>
>
> For that, I submit streaming jobs with "*flink run*"  and batch jobs with
> "*flink run -m yarn-cluster*"
>
>
>
> This was working fine until I turned zookeeper HA mode on for my streaming
> applications.
>
> Even if I don't set it up in the yaml flink configuration file, but with
> -D options on the yarn_session.sh command line, now my batch jobs try to
> run in the streaming container, and fails because of the lack of ressources.
>
>
>
> My HA options are :
>
> -Dyarn.application-attempts=10 -Drecovery.mode=zookeeper
> -Drecovery.zookeeper.quorum=h1r1en01:2181
> -Drecovery.zookeeper.path.root=/flink  -Dstate.backend=filesystem
> -Dstate.backend.fs.checkpointdir=hdfs:///tmp/flink/checkpoints
> -Drecovery.zookeeper.storageDir=hdfs:///tmp/flink/recovery/
>
>
>
> Am I missing something ?
>
>
>
> Best regards,
>
> Aranud
>
> --
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>


Re: Including option for starting job and task managers in the foreground

2015-12-03 Thread Maximilian Michels
I think the way supervisor is used in the Docker scripts is a bit hacky. It
is simply started in the foreground and does nothing. Supervisor is
actually a really nice utility to start processes in Docker containers and
monitor them.

Nevertheless, supervisor also expects commands to stay in the foreground. A
common way to work around this, is to create a script which monitors the
daemon process' pid. Thinking about this, I think we could actually add the
foreground functionality directly in the jobmanager / taskmanager shell
script like you suggested.

In the meantime, you could also use a simple script like this:

#!/usr/bin/env bash
# daemonize job manager
./bin/jobmanager start cluster
# wait until process goes down
wait $!

Cheers,
Max

On Wed, Dec 2, 2015 at 7:16 PM, Brian Chhun 
wrote:

> Thanks, I'm basing the things I'm doing based on what I see there. One
> thing that's not clear to me in that example is why supervisor is used to
> keep the container alive, rather than using some simpler means. It doesn't
> look like it's been configured to supervise anything.
>
> On Wed, Dec 2, 2015 at 11:44 AM, Maximilian Michels 
> wrote:
>
>> Have you looked at
>> https://github.com/apache/flink/tree/master/flink-contrib/docker-flink
>> ? This demonstrates how to use Flink with Docker. In particular it
>> states: "Images [..] run Supervisor to stay alive when running
>> containers."
>>
>> Have a look at flink/config-flink.sh.
>>
>> Cheers,
>> Max
>>
>> On Wed, Dec 2, 2015 at 6:29 PM, Brian Chhun
>>  wrote:
>> > Yep, I think this makes sense. I'm currently patching the
>> flink-daemon.sh
>> > script to remove the `&`, but I don't think it's a very robust solution,
>> > particularly when this script changes across versions of Flink. I'm
>> very new
>> > to Docker, but the resources I've found indicates that the process must
>> run
>> > in the foreground, though people seem to get around it with some hacks.
>> >
>> > When I have some time, I can look into refactoring some parts of the
>> scripts
>> > so that it can be started in the foreground.
>> >
>> > Thanks,
>> > Brian
>> >
>> > On Wed, Dec 2, 2015 at 3:22 AM, Maximilian Michels 
>> wrote:
>> >>
>> >> Hi Brian,
>> >>
>> >> I don't recall Docker requires commands to run in the foreground.
>> Still,
>> >> if that is your requirement, simply remove the "&" at the end of this
>> line
>> >> in flink-daemon.sh:
>> >>
>> >> $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}"
>> -classpath
>> >> "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`"
>> >> ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null &
>> >>
>> >> Cheers,
>> >> Max
>> >>
>> >> On Wed, Dec 2, 2015 at 9:26 AM, Till Rohrmann 
>> >> wrote:
>> >>>
>> >>> Hi Brian,
>> >>>
>> >>> as far as I know this is at the moment not possible with our scripts.
>> >>> However it should be relatively easy to add by simply executing the
>> Java
>> >>> command in flink-daemon.sh in the foreground. Do you want to add this?
>> >>>
>> >>> Cheers,
>> >>> Till
>> >>>
>> >>> On Dec 1, 2015 9:40 PM, "Brian Chhun" 
>> >>> wrote:
>> 
>>  Hi All,
>> 
>>  Is it possible to include a command line flag for starting job and
>> task
>>  managers in the foreground? Currently, `bin/jobmanager.sh` and
>>  `bin/taskmanager.sh` rely on `bin/flink-daemon.sh`, which starts
>> these
>>  things in the background. I'd like to execute these commands inside
>> a docker
>>  container, but it's expected that the process is running in the
>> foreground.
>>  I think it might be useful to have it run in the foreground so that
>> it can
>>  be hooked into some process supervisors. Any suggestions are
>> appreciated.
>> 
>> 
>>  Thanks,
>>  Brian
>> >>
>> >>
>> >
>>
>
>


RE: HA Mode and standalone containers compatibility ?

2015-12-03 Thread LINZ, Arnaud
Yes, it does interfere, I do have additional task managers. My batch 
application comes in my streaming cluster Flink’s GUI instead of creating its 
own container with its own GUI despite the –m yarn-cluster option.

De : Till Rohrmann [mailto:trohrm...@apache.org]
Envoyé : jeudi 3 décembre 2015 10:36
À : user@flink.apache.org
Objet : Re: HA Mode and standalone containers compatibility ?

Hi Arnaud,

as long as you don't have HA activated for your batch jobs, HA shouldn't have 
an influence on the batch execution. If it interferes, then you should see 
additional task manager connected to the streaming cluster when you execute the 
batch job. Could you check that? Furthermore, could you check that actually a 
second yarn application is started when you run the batch jobs?

Cheers,
Till

On Thu, Dec 3, 2015 at 9:57 AM, LINZ, Arnaud 
mailto:al...@bouyguestelecom.fr>> wrote:

Hello,



I have both streaming applications & batch applications. Since the memory needs 
are not the same, I was using a long-living container for my streaming apps and 
new short-lived containers for hosting each batch execution.



For that, I submit streaming jobs with "flink run"  and batch jobs with "flink 
run -m yarn-cluster"



This was working fine until I turned zookeeper HA mode on for my streaming 
applications.

Even if I don't set it up in the yaml flink configuration file, but with -D 
options on the yarn_session.sh command line, now my batch jobs try to run in 
the streaming container, and fails because of the lack of ressources.



My HA options are :

-Dyarn.application-attempts=10 -Drecovery.mode=zookeeper 
-Drecovery.zookeeper.quorum=h1r1en01:2181 -Drecovery.zookeeper.path.root=/flink 
 -Dstate.backend=filesystem 
-Dstate.backend.fs.checkpointdir=hdfs:///tmp/flink/checkpoints 
-Drecovery.zookeeper.storageDir=hdfs:///tmp/flink/recovery/



Am I missing something ?



Best regards,

Aranud



L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.



Re: HA Mode and standalone containers compatibility ?

2015-12-03 Thread Ufuk Celebi
Hey Arnaud,

thanks for reporting this. I think Till’s suggestion will help to debug this 
(checking whether a second YARN application has been started)…

You don’t want to run the batch application in HA mode, correct?

I sounds like the batch job is submitted with the same config keys. Could you 
start the batch job explicitly with -Drecovery.mode=standalone?

If you do want the batch job to be HA as well, you have to configure separate 
Zookeeper root paths:

recovery.zookeeper.path.root: /flink-streaming-1 # for the streaming session

recovery.zookeeper.path.root: /flink-batch # for the batch session

– Ufuk

> On 03 Dec 2015, at 11:01, LINZ, Arnaud  wrote:
> 
> Yes, it does interfere, I do have additional task managers. My batch 
> application comes in my streaming cluster Flink’s GUI instead of creating its 
> own container with its own GUI despite the –m yarn-cluster option.
>  
> De : Till Rohrmann [mailto:trohrm...@apache.org] 
> Envoyé : jeudi 3 décembre 2015 10:36
> À : user@flink.apache.org
> Objet : Re: HA Mode and standalone containers compatibility ?
>  
> Hi Arnaud,
>  
> as long as you don't have HA activated for your batch jobs, HA shouldn't have 
> an influence on the batch execution. If it interferes, then you should see 
> additional task manager connected to the streaming cluster when you execute 
> the batch job. Could you check that? Furthermore, could you check that 
> actually a second yarn application is started when you run the batch jobs?
>  
> Cheers,
> Till
>  
> On Thu, Dec 3, 2015 at 9:57 AM, LINZ, Arnaud  wrote:
> Hello,
> 
>  
> 
> I have both streaming applications & batch applications. Since the memory 
> needs are not the same, I was using a long-living container for my streaming 
> apps and new short-lived containers for hosting each batch execution.
> 
>  
> 
> For that, I submit streaming jobs with "flink run"  and batch jobs with 
> "flink run -m yarn-cluster"
> 
>  
> 
> This was working fine until I turned zookeeper HA mode on for my streaming 
> applications.
> 
> Even if I don't set it up in the yaml flink configuration file, but with -D 
> options on the yarn_session.sh command line, now my batch jobs try to run in 
> the streaming container, and fails because of the lack of ressources.
> 
>  
> 
> My HA options are :
> 
> -Dyarn.application-attempts=10 -Drecovery.mode=zookeeper 
> -Drecovery.zookeeper.quorum=h1r1en01:2181 
> -Drecovery.zookeeper.path.root=/flink  -Dstate.backend=filesystem 
> -Dstate.backend.fs.checkpointdir=hdfs:///tmp/flink/checkpoints 
> -Drecovery.zookeeper.storageDir=hdfs:///tmp/flink/recovery/
> 
>  
> 
> Am I missing something ?
> 
>  
> 
> Best regards,
> 
> Aranud
> 
>  
> 
> L'intégrité de ce message n'étant pas assurée sur internet, la société 
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
> n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
> l'expéditeur.
> 
> The integrity of this message cannot be guaranteed on the Internet. The 
> company that sent this message cannot therefore be held liable for its 
> content nor attachments. Any unauthorized use or dissemination is prohibited. 
> If you are not the intended recipient of this message, then please delete it 
> and notify the sender.



RE: HA Mode and standalone containers compatibility ?

2015-12-03 Thread LINZ, Arnaud
More details :

Command =
/usr/lib/flink/bin/flink run -m yarn-cluster -yn 48 -ytm 5120 -yqu batch1 -ys 4 
--class com.bouygtel.kubera.main.segstage.MainGeoSegStage 
/home/voyager/KBR/GOS/lib/KUBERA-GEO-SOURCE-0.0.1-SNAPSHOT-allinone.jar  -j 
/home/voyager/KBR/GOS/log -c /home/voyager/KBR/GOS/cfg/KBR_GOS_Config.cfg


Start of trace is :
Found YARN properties file /tmp/.yarn-properties-voyager
YARN properties set default parallelism to 24
Using JobManager address from YARN properties 
bt1shli3.bpa.bouyguestelecom.fr/172.21.125.28:36700
YARN cluster mode detected. Switching Log4j output to console


Content of /tmp/.yarn-properties-voyager
Is related to the streaming cluster :

#Generated YARN properties file
#Thu Dec 03 11:03:06 CET 2015
parallelism=24
dynamicPropertiesString=yarn.heap-cutoff-ratio\=0.6@@yarn.application-attempts\=10@@recovery.mode\=zookeeper@@recovery.zookeeper.quorum\=h1r1en01\:2181@@recovery.zookeeper.path.root\=/flink@@state.backend\=filesystem@@state.backend.fs.checkpointdir\=hdfs\:///tmp/flink/checkpoints@@recovery.zookeeper.storageDir\=hdfs\:///tmp/flink/recovery/
jobManager=172.21.125.28\:36700




De : LINZ, Arnaud
Envoyé : jeudi 3 décembre 2015 11:01
À : user@flink.apache.org
Objet : RE: HA Mode and standalone containers compatibility ?

Yes, it does interfere, I do have additional task managers. My batch 
application comes in my streaming cluster Flink’s GUI instead of creating its 
own container with its own GUI despite the –m yarn-cluster option.

De : Till Rohrmann [mailto:trohrm...@apache.org]
Envoyé : jeudi 3 décembre 2015 10:36
À : user@flink.apache.org
Objet : Re: HA Mode and standalone containers compatibility ?

Hi Arnaud,

as long as you don't have HA activated for your batch jobs, HA shouldn't have 
an influence on the batch execution. If it interferes, then you should see 
additional task manager connected to the streaming cluster when you execute the 
batch job. Could you check that? Furthermore, could you check that actually a 
second yarn application is started when you run the batch jobs?

Cheers,
Till

On Thu, Dec 3, 2015 at 9:57 AM, LINZ, Arnaud 
mailto:al...@bouyguestelecom.fr>> wrote:

Hello,



I have both streaming applications & batch applications. Since the memory needs 
are not the same, I was using a long-living container for my streaming apps and 
new short-lived containers for hosting each batch execution.



For that, I submit streaming jobs with "flink run"  and batch jobs with "flink 
run -m yarn-cluster"



This was working fine until I turned zookeeper HA mode on for my streaming 
applications.

Even if I don't set it up in the yaml flink configuration file, but with -D 
options on the yarn_session.sh command line, now my batch jobs try to run in 
the streaming container, and fails because of the lack of ressources.



My HA options are :

-Dyarn.application-attempts=10 -Drecovery.mode=zookeeper 
-Drecovery.zookeeper.quorum=h1r1en01:2181 -Drecovery.zookeeper.path.root=/flink 
 -Dstate.backend=filesystem 
-Dstate.backend.fs.checkpointdir=hdfs:///tmp/flink/checkpoints 
-Drecovery.zookeeper.storageDir=hdfs:///tmp/flink/recovery/



Am I missing something ?



Best regards,

Aranud



L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.



RE: HA Mode and standalone containers compatibility ?

2015-12-03 Thread LINZ, Arnaud
Hi,

The batch job does not need to be HA.
I stopped everything, cleaned the temp files, added -Drecovery.mode=standalone 
and it seems to work now !
Strange, but good for me for now.

Thanks,
Arnaud

-Message d'origine-
De : Ufuk Celebi [mailto:u...@apache.org] 
Envoyé : jeudi 3 décembre 2015 11:11
À : user@flink.apache.org
Objet : Re: HA Mode and standalone containers compatibility ?

Hey Arnaud,

thanks for reporting this. I think Till’s suggestion will help to debug this 
(checking whether a second YARN application has been started)…

You don’t want to run the batch application in HA mode, correct?

I sounds like the batch job is submitted with the same config keys. Could you 
start the batch job explicitly with -Drecovery.mode=standalone?

If you do want the batch job to be HA as well, you have to configure separate 
Zookeeper root paths:

recovery.zookeeper.path.root: /flink-streaming-1 # for the streaming session

recovery.zookeeper.path.root: /flink-batch # for the batch session

– Ufuk

> On 03 Dec 2015, at 11:01, LINZ, Arnaud  wrote:
> 
> Yes, it does interfere, I do have additional task managers. My batch 
> application comes in my streaming cluster Flink’s GUI instead of creating its 
> own container with its own GUI despite the –m yarn-cluster option.
>  
> De : Till Rohrmann [mailto:trohrm...@apache.org] Envoyé : jeudi 3 
> décembre 2015 10:36 À : user@flink.apache.org Objet : Re: HA Mode and 
> standalone containers compatibility ?
>  
> Hi Arnaud,
>  
> as long as you don't have HA activated for your batch jobs, HA shouldn't have 
> an influence on the batch execution. If it interferes, then you should see 
> additional task manager connected to the streaming cluster when you execute 
> the batch job. Could you check that? Furthermore, could you check that 
> actually a second yarn application is started when you run the batch jobs?
>  
> Cheers,
> Till
>  
> On Thu, Dec 3, 2015 at 9:57 AM, LINZ, Arnaud  wrote:
> Hello,
> 
>  
> 
> I have both streaming applications & batch applications. Since the memory 
> needs are not the same, I was using a long-living container for my streaming 
> apps and new short-lived containers for hosting each batch execution.
> 
>  
> 
> For that, I submit streaming jobs with "flink run"  and batch jobs with 
> "flink run -m yarn-cluster"
> 
>  
> 
> This was working fine until I turned zookeeper HA mode on for my streaming 
> applications.
> 
> Even if I don't set it up in the yaml flink configuration file, but with -D 
> options on the yarn_session.sh command line, now my batch jobs try to run in 
> the streaming container, and fails because of the lack of ressources.
> 
>  
> 
> My HA options are :
> 
> -Dyarn.application-attempts=10 -Drecovery.mode=zookeeper 
> -Drecovery.zookeeper.quorum=h1r1en01:2181 
> -Drecovery.zookeeper.path.root=/flink  -Dstate.backend=filesystem 
> -Dstate.backend.fs.checkpointdir=hdfs:///tmp/flink/checkpoints 
> -Drecovery.zookeeper.storageDir=hdfs:///tmp/flink/recovery/
> 
>  
> 
> Am I missing something ?
> 
>  
> 
> Best regards,
> 
> Aranud
> 
>  
> 
> L'intégrité de ce message n'étant pas assurée sur internet, la société 
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
> n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
> l'expéditeur.
> 
> The integrity of this message cannot be guaranteed on the Internet. The 
> company that sent this message cannot therefore be held liable for its 
> content nor attachments. Any unauthorized use or dissemination is prohibited. 
> If you are not the intended recipient of this message, then please delete it 
> and notify the sender.



Re: Flink job on secure Yarn fails after many hours

2015-12-03 Thread Maximilian Michels
Hi Niels,

Just got back from our CI. The build above would fail with a
Checkstyle error. I corrected that. Also I have built the binaries for
your Hadoop version 2.6.0.

Binaries:

https://drive.google.com/file/d/0BziY9U_qva1sZ1FVR3RWeVNrNzA/view?usp=sharing

Source:

https://github.com/mxm/flink/tree/kerberos-yarn-heartbeat-fail-0.10.1

git fetch https://github.com/mxm/flink/ \
kerberos-yarn-heartbeat-fail-0.10.1 && git checkout FETCH_HEAD

https://github.com/mxm/flink/archive/kerberos-yarn-heartbeat-fail-0.10.1.zip

Thanks,
Max

On Wed, Dec 2, 2015 at 6:52 PM, Maximilian Michels  wrote:
> I forgot you're using Flink 0.10.1. The above was for the master.
>
> So here's the commit for Flink 0.10.1:
> https://github.com/mxm/flink/commit/a41f3866f4097586a7b2262093088861b62930cd
>
> git fetch https://github.com/mxm/flink/ \
> a41f3866f4097586a7b2262093088861b62930cd && git checkout FETCH_HEAD
>
> https://github.com/mxm/flink/archive/a41f3866f4097586a7b2262093088861b62930cd.zip
>
> Thanks,
> Max
>
> On Wed, Dec 2, 2015 at 6:39 PM, Maximilian Michels  wrote:
>> Great. Here is the commit to try out:
>> https://github.com/mxm/flink/commit/f49b9635bec703541f19cb8c615f302a07ea88b3
>>
>> If you already have the Flink repository, check it out using
>>
>> git fetch https://github.com/mxm/flink/
>> f49b9635bec703541f19cb8c615f302a07ea88b3 && git checkout FETCH_HEAD
>>
>> Alternatively, here's a direct download link to the sources with the
>> fix included:
>> https://github.com/mxm/flink/archive/f49b9635bec703541f19cb8c615f302a07ea88b3.zip
>>
>> Thanks a lot,
>> Max
>>
>> On Wed, Dec 2, 2015 at 5:44 PM, Niels Basjes  wrote:
>>> Sure, just give me the git repo url to build and I'll give it a try.
>>>
>>> Niels
>>>
>>> On Wed, Dec 2, 2015 at 4:28 PM, Maximilian Michels  wrote:

 I mentioned that the exception gets thrown when requesting container
 status information. We need this to send a heartbeat to YARN but it is
 not very crucial if this fails once for the running job. Possibly, we
 could work around this problem by retrying N times in case of an
 exception.

 Would it be possible for you to deploy a custom Flink 0.10.1 version
 we provide and test again?

 On Wed, Dec 2, 2015 at 4:03 PM, Niels Basjes  wrote:
 > No, I was just asking.
 > No upgrade is possible for the next month or two.
 >
 > This week is our busiest day of the year ...
 > Our shop is doing about 10 orders per second these days ...
 >
 > So they won't upgrade until next January/February
 >
 > Niels
 >
 > On Wed, Dec 2, 2015 at 3:59 PM, Maximilian Michels 
 > wrote:
 >>
 >> Hi Niels,
 >>
 >> You mentioned you have the option to update Hadoop and redeploy the
 >> job. Would be great if you could do that and let us know how it turns
 >> out.
 >>
 >> Cheers,
 >> Max
 >>
 >> On Wed, Dec 2, 2015 at 3:45 PM, Niels Basjes  wrote:
 >> > Hi,
 >> >
 >> > I posted the entire log from the first log line at the moment of
 >> > failure
 >> > to
 >> > the very end of the logfile.
 >> > This is all I have.
 >> >
 >> > As far as I understand the Kerberos and Keytab mechanism in Hadoop
 >> > Yarn
 >> > is
 >> > that it catches the "Invalid Token" and then (if keytab) gets a new
 >> > Kerberos
 >> > ticket (or tgt?).
 >> > When the new ticket has been obtained it retries the call that
 >> > previously
 >> > failed.
 >> > To me it seemed that this call can fail over the invalid Token yet it
 >> > cannot
 >> > be retried.
 >> >
 >> > At this moment I'm thinking a bug in Hadoop.
 >> >
 >> > Niels
 >> >
 >> > On Wed, Dec 2, 2015 at 2:51 PM, Maximilian Michels 
 >> > wrote:
 >> >>
 >> >> Hi Niels,
 >> >>
 >> >> Sorry for hear you experienced this exception. From a first glance,
 >> >> it
 >> >> looks like a bug in Hadoop to me.
 >> >>
 >> >> > "Not retrying because the invoked method is not idempotent, and
 >> >> > unable
 >> >> > to determine whether it was invoked"
 >> >>
 >> >> That is nothing to worry about. This is Hadoop's internal retry
 >> >> mechanism that re-attempts to do actions which previously failed if
 >> >> that's possible. Since the action is not idempotent (it cannot be
 >> >> executed again without risking to change the state of the execution)
 >> >> and it also doesn't track its execution states, it won't be retried
 >> >> again.
 >> >>
 >> >> The main issue is this exception:
 >> >>
 >> >> >org.apache.hadoop.security.token.SecretManager$InvalidToken:
 >> >> > Invalid
 >> >> > AMRMToken from >appattempt_1443166961758_163901_01
 >> >>
 >> >> From the stack trace it is clear that this exception occurs upon
 >> >> requesting container status information from the Resource Manager:
 >> >>
 >> >> >at
 >

RE: HA Mode and standalone containers compatibility ?

2015-12-03 Thread LINZ, Arnaud
Oopss... False joy. 

In fact, it does start another container, but this container ends immediately 
because the job is not submitted to that container but to the streaming one.

Log details: 

Command = 
#  JVM_ARGS =  -DCluster.Parallelisme=150  -Drecovery.mode=standalone
/usr/lib/flink/bin/flink run -m yarn-cluster -yn 48 -ytm 5120 -yqu batch1 -ys 4 
--class com.bouygtel.kubera.main.segstage.MainGeoSegStage 
/home/voyager/KBR/GOS/lib/KUBERA-GEO-SOURCE-0.0.1-SNAPSHOT-allinone.jar  -j 
/home/voyager/KBR/GOS/log -c /home/voyager/KBR/GOS/cfg/KBR_GOS_Config.cfg 

Log = 
Found YARN properties file /tmp/.yarn-properties-voyager
YARN properties set default parallelism to 24
Using JobManager address from YARN properties 
bt1shli3.bpa.bouyguestelecom.fr/172.21.125.28:36700
YARN cluster mode detected. Switching Log4j output to console
11:39:18,192 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl
 - Timeline service address: 
http://h1r1dn02.bpa.bouyguestelecom.fr:8188/ws/v1/timeline/
11:39:18,349 INFO  org.apache.hadoop.yarn.client.RMProxy
 - Connecting to ResourceManager at 
h1r1nn01.bpa.bouyguestelecom.fr/172.21.125.3:8050
11:39:18,504 INFO  org.apache.flink.client.FlinkYarnSessionCli  
 - No path for the flink jar passed. Using the location of class 
org.apache.flink.yarn.FlinkYarnClient to locate the jar
11:39:18,513 INFO  org.apache.flink.yarn.FlinkYarnClient
 - Using values:
11:39:18,515 INFO  org.apache.flink.yarn.FlinkYarnClient
 -   TaskManager count = 48
11:39:18,515 INFO  org.apache.flink.yarn.FlinkYarnClient
 -   JobManager memory = 1024
11:39:18,515 INFO  org.apache.flink.yarn.FlinkYarnClient
 -   TaskManager memory = 5120
11:39:18,641 WARN  org.apache.flink.yarn.FlinkYarnClient
 - The JobManager or TaskManager memory is below the smallest possible YARN 
Container size. The value of 'yarn.scheduler.minimum-allocation-mb' is '2048'. 
Please increase the memory size.YARN will allocate the smaller containers but 
the scheduler will account for the minimum-allocation-mb, maybe not all 
instances you requested will start.
11:39:19,102 INFO  org.apache.flink.yarn.Utils  
 - Copying from file:/usr/lib/flink/lib/flink-dist_2.11-0.10.0.jar to 
hdfs://h1r1nn01.bpa.bouyguestelecom.fr:8020/user/voyager/.flink/application_1449127732314_0046/flink-dist_2.11-0.10.0.jar
11:39:19,653 INFO  org.apache.flink.yarn.Utils  
 - Copying from /usr/lib/flink/conf/flink-conf.yaml to 
hdfs://h1r1nn01.bpa.bouyguestelecom.fr:8020/user/voyager/.flink/application_1449127732314_0046/flink-conf.yaml
11:39:19,667 INFO  org.apache.flink.yarn.Utils  
 - Copying from file:/usr/lib/flink/conf/logback.xml to 
hdfs://h1r1nn01.bpa.bouyguestelecom.fr:8020/user/voyager/.flink/application_1449127732314_0046/logback.xml
11:39:19,679 INFO  org.apache.flink.yarn.Utils  
 - Copying from file:/usr/lib/flink/conf/log4j.properties to 
hdfs://h1r1nn01.bpa.bouyguestelecom.fr:8020/user/voyager/.flink/application_1449127732314_0046/log4j.properties
11:39:19,698 INFO  org.apache.flink.yarn.FlinkYarnClient
 - Submitting application master application_1449127732314_0046
11:39:19,723 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl
 - Submitted application application_1449127732314_0046
11:39:19,723 INFO  org.apache.flink.yarn.FlinkYarnClient
 - Waiting for the cluster to be allocated
11:39:19,725 INFO  org.apache.flink.yarn.FlinkYarnClient
 - Deploying cluster, current state ACCEPTED
11:39:20,727 INFO  org.apache.flink.yarn.FlinkYarnClient
 - Deploying cluster, current state ACCEPTED
11:39:21,728 INFO  org.apache.flink.yarn.FlinkYarnClient
 - Deploying cluster, current state ACCEPTED
11:39:22,730 INFO  org.apache.flink.yarn.FlinkYarnClient
 - Deploying cluster, current state ACCEPTED
11:39:23,731 INFO  org.apache.flink.yarn.FlinkYarnClient
 - YARN application has been deployed successfully.
11:39:23,734 INFO  org.apache.flink.yarn.FlinkYarnCluster   
 - Start actor system.
11:39:24,192 INFO  org.apache.flink.yarn.FlinkYarnCluster   
 - Start application client.
YARN cluster started
JobManager web interface address 
http://h1r1nn01.bpa.bouyguestelecom.fr:8088/proxy/application_1449127732314_0046/
Waiting until all TaskManagers have connected
11:39:24,202 INFO  org.apache.flink.yarn.ApplicationClient  
 - Notification about new leader address 
akka.tcp://flink@172.21.125.16:59907/user/jobmanager with session ID null.
No status updates from the YARN cluster received so far. Waiting ...
11:39:24,206 INFO  org.apache.fli

Re: HA Mode and standalone containers compatibility ?

2015-12-03 Thread Ufuk Celebi

> On 03 Dec 2015, at 11:47, LINZ, Arnaud  wrote:
> 
> Oopss... False joy. 

OK, I think this is a bug in the YARN Client and the way it uses the 
.properties files to submit jobs.

As a work around: Can you mv the /tmp/.yarn-properties-voyager file and submit 
the batch job?

mv /tmp/.yarn-properties-voyager /tmp/.bak.yarn-properties-voyager

– Ufuk



RE: HA Mode and standalone containers compatibility ?

2015-12-03 Thread LINZ, Arnaud
Hi,
It works fine with that file renamed.  Is there a way to specify its path for a 
specific execution to have a proper workaround ?
Thanks,
Arnaud

-Message d'origine-
De : Ufuk Celebi [mailto:u...@apache.org]
Envoyé : jeudi 3 décembre 2015 11:53
À : user@flink.apache.org
Objet : Re: HA Mode and standalone containers compatibility ?


> On 03 Dec 2015, at 11:47, LINZ, Arnaud  wrote:
>
> Oopss... False joy.

OK, I think this is a bug in the YARN Client and the way it uses the 
.properties files to submit jobs.

As a work around: Can you mv the /tmp/.yarn-properties-voyager file and submit 
the batch job?

mv /tmp/.yarn-properties-voyager /tmp/.bak.yarn-properties-voyager

– Ufuk




L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.


Re: HA Mode and standalone containers compatibility ?

2015-12-03 Thread Ufuk Celebi
I opened an issue for it and it will fixed with the next 0.10.2 release.

@Robert: are you aware of another workaround for the time being?

On Thu, Dec 3, 2015 at 1:20 PM, LINZ, Arnaud 
wrote:

> Hi,
> It works fine with that file renamed.  Is there a way to specify its path
> for a specific execution to have a proper workaround ?
> Thanks,
> Arnaud
>
> -Message d'origine-
> De : Ufuk Celebi [mailto:u...@apache.org]
> Envoyé : jeudi 3 décembre 2015 11:53
> À : user@flink.apache.org
> Objet : Re: HA Mode and standalone containers compatibility ?
>
>
> > On 03 Dec 2015, at 11:47, LINZ, Arnaud  wrote:
> >
> > Oopss... False joy.
>
> OK, I think this is a bug in the YARN Client and the way it uses the
> .properties files to submit jobs.
>
> As a work around: Can you mv the /tmp/.yarn-properties-voyager file and
> submit the batch job?
>
> mv /tmp/.yarn-properties-voyager /tmp/.bak.yarn-properties-voyager
>
> – Ufuk
>
>
> 
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>


Re: HA Mode and standalone containers compatibility ?

2015-12-03 Thread Robert Metzger
There is a configuration parameter called "yarn.properties-file.location"
which allows setting a custom path for the properties file.
If the batch and streaming jobs are using different configuration files, it
should work.

On Thu, Dec 3, 2015 at 1:51 PM, Ufuk Celebi  wrote:

> I opened an issue for it and it will fixed with the next 0.10.2 release.
>
> @Robert: are you aware of another workaround for the time being?
>
> On Thu, Dec 3, 2015 at 1:20 PM, LINZ, Arnaud 
> wrote:
>
>> Hi,
>> It works fine with that file renamed.  Is there a way to specify its path
>> for a specific execution to have a proper workaround ?
>> Thanks,
>> Arnaud
>>
>> -Message d'origine-
>> De : Ufuk Celebi [mailto:u...@apache.org]
>> Envoyé : jeudi 3 décembre 2015 11:53
>> À : user@flink.apache.org
>> Objet : Re: HA Mode and standalone containers compatibility ?
>>
>>
>> > On 03 Dec 2015, at 11:47, LINZ, Arnaud 
>> wrote:
>> >
>> > Oopss... False joy.
>>
>> OK, I think this is a bug in the YARN Client and the way it uses the
>> .properties files to submit jobs.
>>
>> As a work around: Can you mv the /tmp/.yarn-properties-voyager file and
>> submit the batch job?
>>
>> mv /tmp/.yarn-properties-voyager /tmp/.bak.yarn-properties-voyager
>>
>> – Ufuk
>>
>>
>> 
>>
>> L'intégrité de ce message n'étant pas assurée sur internet, la société
>> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
>> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
>> vous n'êtes pas destinataire de ce message, merci de le détruire et
>> d'avertir l'expéditeur.
>>
>> The integrity of this message cannot be guaranteed on the Internet. The
>> company that sent this message cannot therefore be held liable for its
>> content nor attachments. Any unauthorized use or dissemination is
>> prohibited. If you are not the intended recipient of this message, then
>> please delete it and notify the sender.
>>
>
>


RE: HA Mode and standalone containers compatibility ?

2015-12-03 Thread LINZ, Arnaud
Hi,
I’ve tried to put that parameter in the JVM_ARGS, but not with much success.

# JVM_ARGS :  -DCluster.Parallelisme=150  -Drecovery.mode=standalone 
-Dyarn.properties-file.location=/tmp/flink/batch
(…)
2015:12:03 15:25:42 (ThrdExtrn) - INFO - (...)jobs.exec.ExecutionProcess$1.run 
- > Found YARN properties file /tmp/.yarn-properties-voyager

Arnaud


De : Robert Metzger [mailto:rmetz...@apache.org]
Envoyé : jeudi 3 décembre 2015 14:03
À : user@flink.apache.org
Objet : Re: HA Mode and standalone containers compatibility ?

There is a configuration parameter called "yarn.properties-file.location" which 
allows setting a custom path for the properties file.
If the batch and streaming jobs are using different configuration files, it 
should work.

On Thu, Dec 3, 2015 at 1:51 PM, Ufuk Celebi 
mailto:u...@apache.org>> wrote:
I opened an issue for it and it will fixed with the next 0.10.2 release.

@Robert: are you aware of another workaround for the time being?

On Thu, Dec 3, 2015 at 1:20 PM, LINZ, Arnaud 
mailto:al...@bouyguestelecom.fr>> wrote:
Hi,
It works fine with that file renamed.  Is there a way to specify its path for a 
specific execution to have a proper workaround ?
Thanks,
Arnaud

-Message d'origine-
De : Ufuk Celebi [mailto:u...@apache.org]
Envoyé : jeudi 3 décembre 2015 11:53
À : user@flink.apache.org
Objet : Re: HA Mode and standalone containers compatibility ?

> On 03 Dec 2015, at 11:47, LINZ, Arnaud 
> mailto:al...@bouyguestelecom.fr>> wrote:
>
> Oopss... False joy.

OK, I think this is a bug in the YARN Client and the way it uses the 
.properties files to submit jobs.

As a work around: Can you mv the /tmp/.yarn-properties-voyager file and submit 
the batch job?

mv /tmp/.yarn-properties-voyager /tmp/.bak.yarn-properties-voyager

– Ufuk



L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.




Read Kafka topic from the beginning

2015-12-03 Thread Vladimir Stoyak
I see that Flink 0.10.1 now supports Keyed Schemas which allows us to rely on 
Kafka topics set to "compact" retention for data persistence.

In our topology we wanted to set some topics with Log Compactions enabled and 
read topic from the beginning when the topology starts or component recovers. 
Does current Kafka Consumer implementation allow to read all messages in a 
topic from the beginning or from a specific offset.

Thanks,
Vladimir


Re: Including option for starting job and task managers in the foreground

2015-12-03 Thread Brian Chhun
Thanks Max, I took a look at making this change directly to the scripts. I
was initially thinking about making a separate script whose only
responsibility is to run the command in the foreground, so that the
flink-daemon.sh could delegate to this script. I didn't get very far into
though, mostly trying to find a nice way to share the logging configuration
and passing of parameters.

While doing this, it did occur to me that if the process is run in the
foreground, the logging should be written to standard out. This ended up
being slightly hairy, at least in the job manager case, because the web UI
client expects the standard out of the job manager to be written to a file,
but I seem to have gotten things to work. I just wanted to mention this
detail in case it's the convention for things running in the foreground, in
which case it may need to be implemented alongside foregrounding the
process.

Thanks,
Brian

On Thu, Dec 3, 2015 at 3:55 AM, Maximilian Michels  wrote:

> I think the way supervisor is used in the Docker scripts is a bit hacky.
> It is simply started in the foreground and does nothing. Supervisor is
> actually a really nice utility to start processes in Docker containers and
> monitor them.
>
> Nevertheless, supervisor also expects commands to stay in the foreground.
> A common way to work around this, is to create a script which monitors the
> daemon process' pid. Thinking about this, I think we could actually add the
> foreground functionality directly in the jobmanager / taskmanager shell
> script like you suggested.
>
> In the meantime, you could also use a simple script like this:
>
> #!/usr/bin/env bash
> # daemonize job manager
> ./bin/jobmanager start cluster
> # wait until process goes down
> wait $!
>
> Cheers,
> Max
>
> On Wed, Dec 2, 2015 at 7:16 PM, Brian Chhun 
> wrote:
>
>> Thanks, I'm basing the things I'm doing based on what I see there. One
>> thing that's not clear to me in that example is why supervisor is used to
>> keep the container alive, rather than using some simpler means. It doesn't
>> look like it's been configured to supervise anything.
>>
>> On Wed, Dec 2, 2015 at 11:44 AM, Maximilian Michels 
>> wrote:
>>
>>> Have you looked at
>>> https://github.com/apache/flink/tree/master/flink-contrib/docker-flink
>>> ? This demonstrates how to use Flink with Docker. In particular it
>>> states: "Images [..] run Supervisor to stay alive when running
>>> containers."
>>>
>>> Have a look at flink/config-flink.sh.
>>>
>>> Cheers,
>>> Max
>>>
>>> On Wed, Dec 2, 2015 at 6:29 PM, Brian Chhun
>>>  wrote:
>>> > Yep, I think this makes sense. I'm currently patching the
>>> flink-daemon.sh
>>> > script to remove the `&`, but I don't think it's a very robust
>>> solution,
>>> > particularly when this script changes across versions of Flink. I'm
>>> very new
>>> > to Docker, but the resources I've found indicates that the process
>>> must run
>>> > in the foreground, though people seem to get around it with some hacks.
>>> >
>>> > When I have some time, I can look into refactoring some parts of the
>>> scripts
>>> > so that it can be started in the foreground.
>>> >
>>> > Thanks,
>>> > Brian
>>> >
>>> > On Wed, Dec 2, 2015 at 3:22 AM, Maximilian Michels 
>>> wrote:
>>> >>
>>> >> Hi Brian,
>>> >>
>>> >> I don't recall Docker requires commands to run in the foreground.
>>> Still,
>>> >> if that is your requirement, simply remove the "&" at the end of this
>>> line
>>> >> in flink-daemon.sh:
>>> >>
>>> >> $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}"
>>> -classpath
>>> >> "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`"
>>> >> ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null &
>>> >>
>>> >> Cheers,
>>> >> Max
>>> >>
>>> >> On Wed, Dec 2, 2015 at 9:26 AM, Till Rohrmann 
>>> >> wrote:
>>> >>>
>>> >>> Hi Brian,
>>> >>>
>>> >>> as far as I know this is at the moment not possible with our scripts.
>>> >>> However it should be relatively easy to add by simply executing the
>>> Java
>>> >>> command in flink-daemon.sh in the foreground. Do you want to add
>>> this?
>>> >>>
>>> >>> Cheers,
>>> >>> Till
>>> >>>
>>> >>> On Dec 1, 2015 9:40 PM, "Brian Chhun" 
>>> >>> wrote:
>>> 
>>>  Hi All,
>>> 
>>>  Is it possible to include a command line flag for starting job and
>>> task
>>>  managers in the foreground? Currently, `bin/jobmanager.sh` and
>>>  `bin/taskmanager.sh` rely on `bin/flink-daemon.sh`, which starts
>>> these
>>>  things in the background. I'd like to execute these commands inside
>>> a docker
>>>  container, but it's expected that the process is running in the
>>> foreground.
>>>  I think it might be useful to have it run in the foreground so that
>>> it can
>>>  be hooked into some process supervisors. Any suggestions are
>>> appreciated.
>>> 
>>> 
>>>  Thanks,
>>>  Brian
>>> >>
>>> >>
>>> >
>>>
>>
>>
>


Re: Read Kafka topic from the beginning

2015-12-03 Thread Maximilian Michels
Hi Vladimir,

You may supply Kafka consumer properties when you create the FlinkKafkaConsumer.

Properties props = new Properties();

// start from largest offset - DEFAULT
props.setProperty("auto.offset.reset", "largest");
// start from smallest offset
props.setProperty("auto.offset.reset", "smallest");

I don't think it is possible to start from a specific offset. The
offset is only unique per partition. You could modify the offsets in
the Zookeeper state but you really have to know what you're doing
then.

Best regards,
Max


On Thu, Dec 3, 2015 at 4:01 PM, Vladimir Stoyak  wrote:
> I see that Flink 0.10.1 now supports Keyed Schemas which allows us to rely on 
> Kafka topics set to "compact" retention for data persistence.
>
> In our topology we wanted to set some topics with Log Compactions enabled and 
> read topic from the beginning when the topology starts or component recovers. 
> Does current Kafka Consumer implementation allow to read all messages in a 
> topic from the beginning or from a specific offset.
>
> Thanks,
> Vladimir


Re: Read Kafka topic from the beginning

2015-12-03 Thread Vladimir Stoyak
As far as I know "auto.offset.reset" what to do if offset it not available or 
out of bound?

Vladimir


On Thursday, December 3, 2015 5:58 PM, Maximilian Michels  
wrote:
Hi Vladimir,

You may supply Kafka consumer properties when you create the FlinkKafkaConsumer.

Properties props = new Properties();

// start from largest offset - DEFAULT
props.setProperty("auto.offset.reset", "largest");
// start from smallest offset
props.setProperty("auto.offset.reset", "smallest");

I don't think it is possible to start from a specific offset. The
offset is only unique per partition. You could modify the offsets in
the Zookeeper state but you really have to know what you're doing
then.

Best regards,
Max



On Thu, Dec 3, 2015 at 4:01 PM, Vladimir Stoyak  wrote:
> I see that Flink 0.10.1 now supports Keyed Schemas which allows us to rely on 
> Kafka topics set to "compact" retention for data persistence.
>
> In our topology we wanted to set some topics with Log Compactions enabled and 
> read topic from the beginning when the topology starts or component recovers. 
> Does current Kafka Consumer implementation allow to read all messages in a 
> topic from the beginning or from a specific offset.
>
> Thanks,
> Vladimir


Flink Storm

2015-12-03 Thread Madhire, Naveen
Hi,

I am trying to execute few storm topologies using Flink, I have a question 
related to the documentation,

Can anyone tell me which of the below code is correct,

https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_compatibility.html

https://ci.apache.org/projects/flink/flink-docs-master/apis/storm_compatibility.html


I want to use Flink-storm 1.0-SNAPSHOT version, I don’t see any createTopology 
method in FlinkTopology class.

Ex, cluster.submitTopology("WordCount", conf, 
FlinkTopology.createTopology(builder));

Is the documentation incorrect for the 1.0-SNAPSHOT or may be I missing 
something ;)

Thanks,
Naveen


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Read Kafka topic from the beginning

2015-12-03 Thread Vladimir Stoyak
Gave it a try, but does not seem to help. Is it working for you?

Thanks

Sent from my iPhone

> On Dec 3, 2015, at 6:11 PM, Vladimir Stoyak  wrote:
> 
> As far as I know "auto.offset.reset" what to do if offset it not available or 
> out of bound?
> 
> Vladimir
> 
> 
> On Thursday, December 3, 2015 5:58 PM, Maximilian Michels  
> wrote:
> Hi Vladimir,
> 
> You may supply Kafka consumer properties when you create the 
> FlinkKafkaConsumer.
> 
> Properties props = new Properties();
> 
> // start from largest offset - DEFAULT
> props.setProperty("auto.offset.reset", "largest");
> // start from smallest offset
> props.setProperty("auto.offset.reset", "smallest");
> 
> I don't think it is possible to start from a specific offset. The
> offset is only unique per partition. You could modify the offsets in
> the Zookeeper state but you really have to know what you're doing
> then.
> 
> Best regards,
> Max
> 
> 
> 
>> On Thu, Dec 3, 2015 at 4:01 PM, Vladimir Stoyak  wrote:
>> I see that Flink 0.10.1 now supports Keyed Schemas which allows us to rely 
>> on Kafka topics set to "compact" retention for data persistence.
>> 
>> In our topology we wanted to set some topics with Log Compactions enabled 
>> and read topic from the beginning when the topology starts or component 
>> recovers. Does current Kafka Consumer implementation allow to read all 
>> messages in a topic from the beginning or from a specific offset.
>> 
>> Thanks,
>> Vladimir


Re: Read Kafka topic from the beginning

2015-12-03 Thread Maximilian Michels
Hi Vladimir,

Did you pass the properties to the FlinkKafkaConsumer?

Cheers,
Max

On Thu, Dec 3, 2015 at 7:06 PM, Vladimir Stoyak  wrote:
> Gave it a try, but does not seem to help. Is it working for you?
>
> Thanks
>
> Sent from my iPhone
>
>> On Dec 3, 2015, at 6:11 PM, Vladimir Stoyak  wrote:
>>
>> As far as I know "auto.offset.reset" what to do if offset it not available 
>> or out of bound?
>>
>> Vladimir
>>
>>
>> On Thursday, December 3, 2015 5:58 PM, Maximilian Michels  
>> wrote:
>> Hi Vladimir,
>>
>> You may supply Kafka consumer properties when you create the 
>> FlinkKafkaConsumer.
>>
>> Properties props = new Properties();
>>
>> // start from largest offset - DEFAULT
>> props.setProperty("auto.offset.reset", "largest");
>> // start from smallest offset
>> props.setProperty("auto.offset.reset", "smallest");
>>
>> I don't think it is possible to start from a specific offset. The
>> offset is only unique per partition. You could modify the offsets in
>> the Zookeeper state but you really have to know what you're doing
>> then.
>>
>> Best regards,
>> Max
>>
>>
>>
>>> On Thu, Dec 3, 2015 at 4:01 PM, Vladimir Stoyak  wrote:
>>> I see that Flink 0.10.1 now supports Keyed Schemas which allows us to rely 
>>> on Kafka topics set to "compact" retention for data persistence.
>>>
>>> In our topology we wanted to set some topics with Log Compactions enabled 
>>> and read topic from the beginning when the topology starts or component 
>>> recovers. Does current Kafka Consumer implementation allow to read all 
>>> messages in a topic from the beginning or from a specific offset.
>>>
>>> Thanks,
>>> Vladimir


Re: Flink Storm

2015-12-03 Thread Maximilian Michels
Hi Naveen,

I think you're not using the latest 1.0-SNAPSHOT. Did you build from
source? If so, you need to build again because the snapshot API has
been updated recently.

Best regards,
Max

On Thu, Dec 3, 2015 at 6:40 PM, Madhire, Naveen
 wrote:
> Hi,
>
> I am trying to execute few storm topologies using Flink, I have a question
> related to the documentation,
>
> Can anyone tell me which of the below code is correct,
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_compatibility.html
>
> https://ci.apache.org/projects/flink/flink-docs-master/apis/storm_compatibility.html
>
>
> I want to use Flink-storm 1.0-SNAPSHOT version, I don’t see any
> createTopology method in FlinkTopology class.
>
> Ex, cluster.submitTopology("WordCount", conf,
> FlinkTopology.createTopology(builder));
>
> Is the documentation incorrect for the 1.0-SNAPSHOT or may be I missing
> something ;)
>
> Thanks,
> Naveen
>
> 
>
> The information contained in this e-mail is confidential and/or proprietary
> to Capital One and/or its affiliates and may only be used solely in
> performance of work or services for Capital One. The information transmitted
> herewith is intended only for use by the individual or entity to which it is
> addressed. If the reader of this message is not the intended recipient, you
> are hereby notified that any review, retransmission, dissemination,
> distribution, copying or other use of, or taking of any action in reliance
> upon this information is strictly prohibited. If you have received this
> communication in error, please contact the sender and delete the material
> from your computer.


Default parallelism for job submitted using RemoteEnvironment

2015-12-03 Thread Truong Duc Kien
Hi,

When I submit a job using RemoteEnvironment without setting parallelism, it
always uses only one task slot.

Is this a bug or intentional ? I thought it was supposed to be the default
configuration of the server (parallelism.default=24 in my cases)

I'm using Flink in Standalone cluster mode.

Best regards,
Kien Truong


Flink-Elasticsearch connector support for elasticsearch 2.0

2015-12-03 Thread Madhukar Thota
is current elasticsearch-flink connector support elasticsearch 2.x version?

-Madhu


Re: Read Kafka topic from the beginning

2015-12-03 Thread Stephan Ewen
Hi Vladimir!

The Kafka Consumer can start from any offset internally (it does that for
example when recovering a failure).

Should be fairly straightforward to set that offset field initially from a
parameter. The FlinkKafkaConsumer is part of the user jars anyways. If you
want, you can give it a try to create a modified version that accepts that
parameter, and then package that instead of the standard one.

Greetings,
Stephan


On Thu, Dec 3, 2015 at 7:07 PM, Maximilian Michels  wrote:

> Hi Vladimir,
>
> Did you pass the properties to the FlinkKafkaConsumer?
>
> Cheers,
> Max
>
> On Thu, Dec 3, 2015 at 7:06 PM, Vladimir Stoyak  wrote:
> > Gave it a try, but does not seem to help. Is it working for you?
> >
> > Thanks
> >
> > Sent from my iPhone
> >
> >> On Dec 3, 2015, at 6:11 PM, Vladimir Stoyak  wrote:
> >>
> >> As far as I know "auto.offset.reset" what to do if offset it not
> available or out of bound?
> >>
> >> Vladimir
> >>
> >>
> >> On Thursday, December 3, 2015 5:58 PM, Maximilian Michels <
> m...@apache.org> wrote:
> >> Hi Vladimir,
> >>
> >> You may supply Kafka consumer properties when you create the
> FlinkKafkaConsumer.
> >>
> >> Properties props = new Properties();
> >>
> >> // start from largest offset - DEFAULT
> >> props.setProperty("auto.offset.reset", "largest");
> >> // start from smallest offset
> >> props.setProperty("auto.offset.reset", "smallest");
> >>
> >> I don't think it is possible to start from a specific offset. The
> >> offset is only unique per partition. You could modify the offsets in
> >> the Zookeeper state but you really have to know what you're doing
> >> then.
> >>
> >> Best regards,
> >> Max
> >>
> >>
> >>
> >>> On Thu, Dec 3, 2015 at 4:01 PM, Vladimir Stoyak 
> wrote:
> >>> I see that Flink 0.10.1 now supports Keyed Schemas which allows us to
> rely on Kafka topics set to "compact" retention for data persistence.
> >>>
> >>> In our topology we wanted to set some topics with Log Compactions
> enabled and read topic from the beginning when the topology starts or
> component recovers. Does current Kafka Consumer implementation allow to
> read all messages in a topic from the beginning or from a specific offset.
> >>>
> >>> Thanks,
> >>> Vladimir
>


Re: Material on Apache flink internals

2015-12-03 Thread madhu phatak
Hi,
Thanks a lot for the resources.
On Dec 1, 2015 9:11 PM, "Fabian Hueske"  wrote:

> Hi Madhu,
>
> checkout the following resources:
>
> - Apache Flink Blog: http://flink.apache.org/blog/index.html
> - Data Artisans Blog: http://data-artisans.com/blog/
> - Flink Forward Conference website (Talk slides & recordings):
> http://flink-forward.org/?post_type=session
> - Flink Meetup talk recordings:
> https://www.youtube.com/channel/UCY8_lgiZLZErZPF47a2hXMA
> - Slim's Flink Knowledge base:
> http://sparkbigdata.com/component/tags/tag/27-flink
>
> Best, Fabian
>
> 2015-12-01 16:23 GMT+01:00 madhu phatak :
>
>> Hi everyone,
>>
>> I am fascinated with flink core engine way of streaming of operators
>> rather than typical map/reduce way that followed by hadoop or spark. Is any
>> good documentation/blog/video avalable which talks about this internal. I
>> am ok from a batch or streaming point of view.
>>
>> It will be great if some one can share this info. Thank you for your
>> excellent work.
>>
>> --
>> Regards,
>> Madhukara Phatak
>> http://datamantra.io/
>>
>
>


Documentation for Fold

2015-12-03 Thread Welly Tambunan
Hi All,

Currently i'm going through the documentation for DataStream here and minor
error in the docs. I thought i should inform you.

I think fold only works for keyed data stream.

[image: Inline image 1]


Cheers
-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Re: Flink Storm

2015-12-03 Thread Madhire, Naveen
Thanks Max. I was able to get through the compilation error after building
it from source.

I am trying to run simple word count topology in Storm and want to compare
it with Flink and see how the output is coming out.

I am running a simple word count storm topology of read -> split -> count
-> print on console

The code is present at
https://github.com/naveenmadhire/flink-storm-example. When I run the
WordCountTopologyFlink.java program, I don¹t see any messages on the
console. I modified this class in the same way as it is mentioned in the
flink documentation.



The detailed job log is at
https://gist.github.com/naveenmadhire/be23d54ed14c5e41ab7c


When you get some time, can you please check to see why it is not printing
anything on the console in local mode.


Thanks,
Naveen



On 12/3/15, 12:11 PM, "Maximilian Michels"  wrote:

>Hi Naveen,
>
>I think you're not using the latest 1.0-SNAPSHOT. Did you build from
>source? If so, you need to build again because the snapshot API has
>been updated recently.
>
>Best regards,
>Max
>
>On Thu, Dec 3, 2015 at 6:40 PM, Madhire, Naveen
> wrote:
>> Hi,
>>
>> I am trying to execute few storm topologies using Flink, I have a
>>question
>> related to the documentation,
>>
>> Can anyone tell me which of the below code is correct,
>>
>> 
>>https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_c
>>ompatibility.html
>>
>> 
>>https://ci.apache.org/projects/flink/flink-docs-master/apis/storm_compati
>>bility.html
>>
>>
>> I want to use Flink-storm 1.0-SNAPSHOT version, I don¹t see any
>> createTopology method in FlinkTopology class.
>>
>> Ex, cluster.submitTopology("WordCount", conf,
>> FlinkTopology.createTopology(builder));
>>
>> Is the documentation incorrect for the 1.0-SNAPSHOT or may be I missing
>> something ;)
>>
>> Thanks,
>> Naveen
>>
>> 
>>
>> The information contained in this e-mail is confidential and/or
>>proprietary
>> to Capital One and/or its affiliates and may only be used solely in
>> performance of work or services for Capital One. The information
>>transmitted
>> herewith is intended only for use by the individual or entity to which
>>it is
>> addressed. If the reader of this message is not the intended recipient,
>>you
>> are hereby notified that any review, retransmission, dissemination,
>> distribution, copying or other use of, or taking of any action in
>>reliance
>> upon this information is strictly prohibited. If you have received this
>> communication in error, please contact the sender and delete the
>>material
>> from your computer.



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.