Re: Correct way to reference Hadoop dependencies in Flink 1.4.0

2018-03-18 Thread Deepak Jha
Hi,
You probably need to set core-site.xml and set the Hadoop conf path in
flink-conf.yaml

core-site.xml:



  fs.s3.impl
  org.apache.hadoop.fs.s3a.S3AFileSystem

  fs.s3.buffer.dir
  /tmp



I’ve had similar issue when I tried to upgrade to Flink 1.4.2 .

On Thu, Mar 15, 2018 at 9:39 AM Aljoscha Krettek 
wrote:

> Hi,
>
> I believe for FileSystems to be correctly be picked up they have to be in
> the lib/ folder of Flink. Stephan (cc'ed), please correct me if I'm wrong
> here, you probably know that one best.
>
> Aljoscha
>
> > On 14. Mar 2018, at 18:26, l...@lyft.com wrote:
> >
> > Hi,
> >
> > I am running this on a Hadoop-free cluster (i.e. no YARN etc.). I have
> the following dependencies packaged in my user application JAR:
> >
> > aws-java-sdk 1.7.4
> > flink-hadoop-fs 1.4.0
> > flink-shaded-hadoop2 1.4.0
> > flink-connector-filesystem_2.11 1.4.0
> > hadoop-common 2.7.4
> > hadoop-aws 2.7.4
> >
> > I have also tried the following conf:
> > classloader.resolve-order: parent-first
> > fs.hdfs.hadoopconf: /srv/hadoop/hadoop-2.7.5/etc/hadoop
> >
> > But no luck. Anything else I could be missing?
> >
> > On 2018/03/14 18:57:47, Francesco Ciuci 
> wrote:
> >> Hi,
> >>
> >> You do not just need the hadoop dependencies in the jar but you need to
> >> have the hadoop file system running in your machine/cluster.
> >>
> >> Regards
> >>
> >> On 14 March 2018 at 18:38, l...@lyft.com  wrote:
> >>
> >>> I'm trying to use a BucketingSink to write files to S3 in my Flink job.
> >>>
> >>> I have the Hadoop dependencies I need packaged in my user application
> jar.
> >>> However, on running the job I get the following error (from the
> >>> taskmanager):
> >>>
> >>> java.lang.RuntimeException: Error while creating FileSystem when
> >>> initializing the state of the BucketingSink.
> >>>at org.apache.flink.streaming.connectors.fs.bucketing.
> >>> BucketingSink.initializeState(BucketingSink.java:358)
> >>>at org.apache.flink.streaming.util.functions.
> >>>
> StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> >>>at org.apache.flink.streaming.util.functions.
> >>>
> StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:
> >>> 160)
> >>>at org.apache.flink.streaming.api.operators.
> >>> AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.
> >>> java:96)
> >>>at org.apache.flink.streaming.api.operators.
> >>> AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
> >>>at org.apache.flink.streaming.runtime.tasks.StreamTask.
> >>> initializeOperators(StreamTask.java:694)
> >>>at org.apache.flink.streaming.runtime.tasks.StreamTask.
> >>> initializeState(StreamTask.java:682)
> >>>at org.apache.flink.streaming.runtime.tasks.StreamTask.
> >>> invoke(StreamTask.java:253)
> >>>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> >>>at java.lang.Thread.run(Thread.java:748)
> >>> Caused by:
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> >>> Could not find a file system implementation for scheme 's3a'. The
> scheme is
> >>> not directly supported by Flink and no Hadoop file system to support
> this
> >>> scheme could be loaded.
> >>>at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> >>> FileSystem.java:405)
> >>>at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
> >>>at org.apache.flink.streaming.connectors.fs.bucketing.
> >>> BucketingSink.createHadoopFileSystem(BucketingSink.java:1125)
> >>>at org.apache.flink.streaming.connectors.fs.bucketing.
> >>> BucketingSink.initFileSystem(BucketingSink.java:411)
> >>>at org.apache.flink.streaming.connectors.fs.bucketing.
> >>> BucketingSink.initializeState(BucketingSink.java:355)
> >>>... 9 common frames omitted
> >>> Caused by:
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> >>> Hadoop is not in the classpath/dependencies.
> >>>at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(
> >>> UnsupportedSchemeFactory.java:64)
> >>>at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> >>> FileSystem.java:401)
> >>>... 13 common frames omitted
> >>>
> >>> What's the right way to do this?
> >>>
> >>
>
> --
Sent from Gmail Mobile


Re: FailureRate Restart Strategy is not picked from Config file

2016-09-28 Thread Deepak Jha
Hi Till,
yes, I have enabled checkpointing from my job. Thanks for your help :)

On Mon, Sep 26, 2016 at 3:18 AM, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Deepak,
>
> are you running Flink streaming jobs with checkpointing enabled? In this
> case Flink will check if you've set a restart strategy at your job and if
> not it will set the fixed delay restart strategy. This will effectively
> overwrite the default restart strategy which you define in the
> flink-conf.yaml file.
>
> Cheers,
> Till
>
> On Thu, Sep 22, 2016 at 10:01 PM, Deepak Jha <dkjhan...@gmail.com> wrote:
>
> > Hi All,
> > I tried to use FailureRate restart strategy by setting values for it in
> > flink-conf.yaml but flink (v 1.1.2) did not pick it up.
> >
> > # Flink Restart strategy
> > restart-strategy: failure-rate
> > restart-strategy.failure-rate.delay: 120 s
> > restart-strategy.failure-rate.failure-rate-interval: 12 minute
> > restart-strategy.failure-rate.max-failures-per-interval: 300
> >
> > It works when I set it up explicitly in topology using
> > *env.setRestartStrategy *
> >
> > PFA snapshot of the Jobmanager log.
> > Thanks,
> > Deepak Jha
> >
> >
>



-- 
Thanks,
Deepak Jha


FailureRate Restart Strategy is not picked from Config file

2016-09-22 Thread Deepak Jha
Hi All,
I tried to use FailureRate restart strategy by setting values for it in
flink-conf.yaml but flink (v 1.1.2) did not pick it up.

# Flink Restart strategy
restart-strategy: failure-rate
restart-strategy.failure-rate.delay: 120 s
restart-strategy.failure-rate.failure-rate-interval: 12 minute
restart-strategy.failure-rate.max-failures-per-interval: 300

It works when I set it up explicitly in topology using
*env.setRestartStrategy *

PFA snapshot of the Jobmanager log.
Thanks,
Deepak Jha


Re: Flink HA on AWS: Network related issue

2016-09-15 Thread Deepak Jha
Hi Till,
There is a way to shutdown actor systems by
setting  taskmanager.maxRegistrationDuration to a reasonable duration
(eg: 900 seconds). Default value sets it to Inf. In this case I noticed
that Taskmanager goes down and runit restarts the service and it gets
connected with Jobmanager.

 As I said earlier as well that retries to connect to Jobmanager does not
work even though telnet works at the same time to the same Jobmanager on
port 50050.  So retry does cache something which does not allow it to
reconnect. My flink cluster is on aws ( m4.large instances), not sure if
anyone else has observed this behavior.



On Thursday, September 15, 2016, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Deepak,
>
> it seems that the JobManager's deathwatch declares the TaskManager to be
> unreachable which will automatically quarantine it. You're right that in
> such a case, the TaskManager should shut down and be restarted so that it
> can again reconnect to the JobManager. This is, however, not yet supported
> automatically.
>
> For the moment, I'd recommend you to make the deathwatch a little bit less
> aggressive via the following settings:
>
> - akka.watch.heartbeat.interval: Heartbeat interval for Akka’s DeathWatch
> mechanism to detect dead TaskManagers. If TaskManagers are wrongly marked
> dead because of lost or delayed heartbeat messages, then you should
> increase this value. A thorough description of Akka’s DeathWatch can be
> found here (DEFAULT: akka.ask.timeout/10).
> - akka.watch.heartbeat.pause: Acceptable heartbeat pause for Akka’s
> DeathWatch mechanism. A low value does not allow a irregular heartbeat. A
> thorough description of Akka’s DeathWatch can be found here (DEFAULT:
> akka.ask.timeout).
> - akka.watch.threshold: Threshold for the DeathWatch failure detector. A
> low value is prone to false positives whereas a high value increases the
> time to detect a dead TaskManager. A thorough description of Akka’s
> DeathWatch can be found here (DEFAULT: 12).
>
> I hope this helps you to work around the problem for the moment until we've
> added the automatic shut down and restart.
>
> Cheers,
> Till
>
> On Mon, Sep 12, 2016 at 5:55 AM, Deepak Jha <dkjhan...@gmail.com
> <javascript:;>> wrote:
>
> > Hi Till,
> > One more thing i noticed after looking into following message in
> > taskmanager log
> >
> > 2016-09-11 17:57:25,310 PDT [WARN]  ip-10-6-0-15
> > [flink-akka.actor.default-dispatcher-31] Remoting - Tried to associate
> > with
> > unreachable remote address [akka.tcp://flink@10.6.22.22:50050]. Address
> is
> > now gated for 5000 ms, all messages to this address will be delivered to
> > dead letters. Reason: *The remote system has quarantined this system. No
> > further associations to the remote system are possible until this system
> is
> > restarted*.
> >
> > So in this case ideally the local ActorSystem should go down so that
> > service supervisor/runit will restart the system and taskmanager will
> again
> > be able to connect to the remote system.. If it does not happen
> > automatically then we have to monitor logs in some way and then try to
> > ensure that it restarts. Ideally flink taskmanager Actor System should go
> > down. Please let me know if my understanding is wrong.
> >
> >
> >
> >
> >
> > On Fri, Sep 9, 2016 at 8:01 AM, Deepak Jha <dkjhan...@gmail.com
> <javascript:;>> wrote:
> >
> > > Hi Till,
> > > I'm getting following message in Jobmanager log
> > >
> > > 2016-09-09 07:46:55,093 PDT [WARN]  ip-10-8-11-249
> > > [flink-akka.actor.default-dispatcher-985] akka.remote.RemoteWatcher -
> > *Detected
> > > unreachable: [akka.tcp://flink@10.8.4.57:6121
> > > <http://flink@10.8.4.57:6121>]*
> > > 2016-09-09 07:46:55,094 PDT [INFO]  ip-10-8-11-249
> > > [flink-akka.actor.default-dispatcher-985] o.a.f.runtime.jobmanager.
> > JobManager
> > > - Task manager akka.tcp://flink@10.8.4.57:6121/user/taskmanager
> > > terminated.
> > > 2016-09-09 07:46:55,094 PDT [INFO]  ip-10-8-11-249
> > > [flink-akka.actor.default-dispatcher-985] o.a.f.r.instance.
> > InstanceManager
> > > - Unregistered task manager akka.tcp://flink@10.8.4.57 <javascript:;>:
> > > 6121/user/taskmanager. Number of registered task managers 2. Number of
> > > available slots 4.
> > > 2016-09-09 07:46:55,096 PDT [WARN]  ip-10-8-11-249
> > > [flink-akka.actor.default-dispatcher-982] Remoting - Association to
> > > [akka.tcp://flink@10.8.4.57:6121] having UID [-1223410403] is
> > > irrecoverably failed. *UID is now

Re: Flink HA on AWS: Network related issue

2016-09-11 Thread Deepak Jha
Hi Till,
One more thing i noticed after looking into following message in
taskmanager log

2016-09-11 17:57:25,310 PDT [WARN]  ip-10-6-0-15
[flink-akka.actor.default-dispatcher-31] Remoting - Tried to associate with
unreachable remote address [akka.tcp://flink@10.6.22.22:50050]. Address is
now gated for 5000 ms, all messages to this address will be delivered to
dead letters. Reason: *The remote system has quarantined this system. No
further associations to the remote system are possible until this system is
restarted*.

So in this case ideally the local ActorSystem should go down so that
service supervisor/runit will restart the system and taskmanager will again
be able to connect to the remote system.. If it does not happen
automatically then we have to monitor logs in some way and then try to
ensure that it restarts. Ideally flink taskmanager Actor System should go
down. Please let me know if my understanding is wrong.





On Fri, Sep 9, 2016 at 8:01 AM, Deepak Jha <dkjhan...@gmail.com> wrote:

> Hi Till,
> I'm getting following message in Jobmanager log
>
> 2016-09-09 07:46:55,093 PDT [WARN]  ip-10-8-11-249
> [flink-akka.actor.default-dispatcher-985] akka.remote.RemoteWatcher - 
> *Detected
> unreachable: [akka.tcp://flink@10.8.4.57:6121
> <http://flink@10.8.4.57:6121>]*
> 2016-09-09 07:46:55,094 PDT [INFO]  ip-10-8-11-249
> [flink-akka.actor.default-dispatcher-985] o.a.f.runtime.jobmanager.JobManager
> - Task manager akka.tcp://flink@10.8.4.57:6121/user/taskmanager
> terminated.
> 2016-09-09 07:46:55,094 PDT [INFO]  ip-10-8-11-249
> [flink-akka.actor.default-dispatcher-985] o.a.f.r.instance.InstanceManager
> - Unregistered task manager akka.tcp://flink@10.8.4.57:
> 6121/user/taskmanager. Number of registered task managers 2. Number of
> available slots 4.
> 2016-09-09 07:46:55,096 PDT [WARN]  ip-10-8-11-249
> [flink-akka.actor.default-dispatcher-982] Remoting - Association to
> [akka.tcp://flink@10.8.4.57:6121] having UID [-1223410403] is
> irrecoverably failed. *UID is now quarantined and all messages to this
> UID will be delivered to dead letters. Remote actorsystem must be restarted
> to recover from this situation.*
> 2016-09-09 07:46:55,097 PDT [INFO]  ip-10-8-11-249
> [flink-akka.actor.default-dispatcher-982] akka.actor.LocalActorRef -
> Message [akka.remote.transport.AssociationHandle$Disassociated] from
> Actor[akka://flink/deadLetters] to Actor[akka://flink/system/
> endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fflink%4010.8.4.57%
> 3A6121-0/endpointWriter/endpointReader-akka.tcp%3A%2F%
> 2Fflink%4010.8.4.57%3A6121-0#393939009] was not delivered. [54] dead
> letters encountered. This logging can be turned off or adjusted with
> configuration settings 'akka.log-dead-letters' and
> 'akka.log-dead-letters-during-shutdown'.
> 2016-09-09 07:46:55,098 PDT [INFO]  ip-10-8-11-249
> [flink-akka.actor.default-dispatcher-985] akka.actor.LocalActorRef -
> Message [akka.remote.transport.AssociationHandle$Disassociated] from
> Actor[akka://flink/deadLetters] to Actor[akka://flink/system/transports/
> akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%
> 2Fflink%4010.8.4.57%3A51291-2#1151730456] was not delivered. [55] dead
> letters encountered. This logging can be turned off or adjusted with
> configuration settings 'akka.log-dead-letters' and
> 'akka.log-dead-letters-during-shutdown'.
> 2016-09-09 07:46:58,479 PDT [INFO]  ip-10-8-11-249
> [ForkJoinPool-3-worker-1] o.a.f.r.c.ZooKeeperCompletedCheckpointStore -
> Recovering checkpoints from ZooKeeper.
>
> Hope it helps. I'm using Flink 1.0.2
>
> On Fri, Sep 9, 2016 at 12:34 AM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> Hi Deepak,
>>
>> could you check the logs whether the JobManager has been quarantined and
>> thus, cannot be connected to anymore? The logs should at least contain a
>> hint why the TaskManager lost the connection initially.
>>
>> Cheers,
>> Till
>>
>> On Thu, Sep 8, 2016 at 7:08 PM, Deepak Jha <dkjhan...@gmail.com> wrote:
>>
>> > Hi,
>> > I've setup Flink HA on AWS ( 3 Taskmanagers and 2 Jobmanagers each are
>> on
>> > EC2 m4.large instance with checkpoint enabled on S3 ). My topology works
>> > fine, but after few hours I do see that Taskmanagers gets detached with
>> > Jobmanager. I tried to reach Jobmanager using telnet at the same time
>> and
>> > it worked but Taskmanager does not succeed in connecting again. It
>> attaches
>> > only after I restart it. I tried following settings but still the
>> problem
>> > persists.
>> >
>> > akka.ask.timeout: 20 s
>> > akka.lookup.timeout: 20 s
>> > akka.watch.heartbeat.interval: 20 s
>> >
>> > Please find attached snapshot on one of the Taskmanager. Is there any
>> > setting that I need to do ?
>> >
>> > --
>> > Thanks,
>> > Deepak Jha
>> >
>> >
>>
>
>
>
> --
> Thanks,
> Deepak Jha
>
>


-- 
Thanks,
Deepak Jha


Re: Flink HA on AWS: Network related issue

2016-09-09 Thread Deepak Jha
Hi Till,
I'm getting following message in Jobmanager log

2016-09-09 07:46:55,093 PDT [WARN]  ip-10-8-11-249
[flink-akka.actor.default-dispatcher-985] akka.remote.RemoteWatcher - *Detected
unreachable: [akka.tcp://flink@10.8.4.57:6121
<http://flink@10.8.4.57:6121>]*
2016-09-09 07:46:55,094 PDT [INFO]  ip-10-8-11-249
[flink-akka.actor.default-dispatcher-985]
o.a.f.runtime.jobmanager.JobManager - Task manager akka.tcp://
flink@10.8.4.57:6121/user/taskmanager terminated.
2016-09-09 07:46:55,094 PDT [INFO]  ip-10-8-11-249
[flink-akka.actor.default-dispatcher-985] o.a.f.r.instance.InstanceManager
- Unregistered task manager akka.tcp://flink@10.8.4.57:6121/user/taskmanager.
Number of registered task managers 2. Number of available slots 4.
2016-09-09 07:46:55,096 PDT [WARN]  ip-10-8-11-249
[flink-akka.actor.default-dispatcher-982] Remoting - Association to
[akka.tcp://flink@10.8.4.57:6121] having UID [-1223410403] is irrecoverably
failed. *UID is now quarantined and all messages to this UID will be
delivered to dead letters. Remote actorsystem must be restarted to recover
from this situation.*
2016-09-09 07:46:55,097 PDT [INFO]  ip-10-8-11-249
[flink-akka.actor.default-dispatcher-982] akka.actor.LocalActorRef -
Message [akka.remote.transport.AssociationHandle$Disassociated] from
Actor[akka://flink/deadLetters] to
Actor[akka://flink/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fflink%4010.8.4.57%3A6121-0/endpointWriter/endpointReader-akka.tcp%3A%2F%2Fflink%4010.8.4.57%3A6121-0#393939009]
was not delivered. [54] dead letters encountered. This logging can be
turned off or adjusted with configuration settings 'akka.log-dead-letters'
and 'akka.log-dead-letters-during-shutdown'.
2016-09-09 07:46:55,098 PDT [INFO]  ip-10-8-11-249
[flink-akka.actor.default-dispatcher-985] akka.actor.LocalActorRef -
Message [akka.remote.transport.AssociationHandle$Disassociated] from
Actor[akka://flink/deadLetters] to
Actor[akka://flink/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fflink%4010.8.4.57%3A51291-2#1151730456]
was not delivered. [55] dead letters encountered. This logging can be
turned off or adjusted with configuration settings 'akka.log-dead-letters'
and 'akka.log-dead-letters-during-shutdown'.
2016-09-09 07:46:58,479 PDT [INFO]  ip-10-8-11-249
[ForkJoinPool-3-worker-1] o.a.f.r.c.ZooKeeperCompletedCheckpointStore -
Recovering checkpoints from ZooKeeper.

Hope it helps. I'm using Flink 1.0.2

On Fri, Sep 9, 2016 at 12:34 AM, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Deepak,
>
> could you check the logs whether the JobManager has been quarantined and
> thus, cannot be connected to anymore? The logs should at least contain a
> hint why the TaskManager lost the connection initially.
>
> Cheers,
> Till
>
> On Thu, Sep 8, 2016 at 7:08 PM, Deepak Jha <dkjhan...@gmail.com> wrote:
>
> > Hi,
> > I've setup Flink HA on AWS ( 3 Taskmanagers and 2 Jobmanagers each are on
> > EC2 m4.large instance with checkpoint enabled on S3 ). My topology works
> > fine, but after few hours I do see that Taskmanagers gets detached with
> > Jobmanager. I tried to reach Jobmanager using telnet at the same time and
> > it worked but Taskmanager does not succeed in connecting again. It
> attaches
> > only after I restart it. I tried following settings but still the problem
> > persists.
> >
> > akka.ask.timeout: 20 s
> > akka.lookup.timeout: 20 s
> > akka.watch.heartbeat.interval: 20 s
> >
> > Please find attached snapshot on one of the Taskmanager. Is there any
> > setting that I need to do ?
> >
> > --
> > Thanks,
> > Deepak Jha
> >
> >
>



-- 
Thanks,
Deepak Jha


Flink HA on AWS: Network related issue

2016-09-08 Thread Deepak Jha
Hi,
I've setup Flink HA on AWS ( 3 Taskmanagers and 2 Jobmanagers each are on
EC2 m4.large instance with checkpoint enabled on S3 ). My topology works
fine, but after few hours I do see that Taskmanagers gets detached with
Jobmanager. I tried to reach Jobmanager using telnet at the same time and
it worked but Taskmanager does not succeed in connecting again. It attaches
only after I restart it. I tried following settings but still the problem
persists.

akka.ask.timeout: 20 s
akka.lookup.timeout: 20 s
akka.watch.heartbeat.interval: 20 s

Please find attached snapshot on one of the Taskmanager. Is there any
setting that I need to do ?

-- 
Thanks,
Deepak Jha


Issues while interacting with DynamoDB

2016-07-04 Thread Deepak Jha
Hi All,

We've flink (1.0.2) HA setup on AWS cloud and are using IAM roles to
interact with S3 (S3a as suggested in flink best practices) and DynamoDB.
While trying to interact with DynamoDB to perform key-value pair lookup
from one of the operator we are running into the following issue.

def putItem() = {
  val id = 126639L
  val item = new Item().withPrimaryKey("Id",
"sfsaf12344").withLong("uniqueIdentifier", id)
  table.putItem(item)

}

2016-07-04 17:15:18,379 PDT [INFO]  ip-10-6-10-182
[flink-akka.actor.default-dispatcher-29]
o.a.f.runtime.jobmanager.JobManager - Status of job
3ec7e145208453b5dbcf6224f373018f (Topology) changed to FAILING.
org.apache.flink.runtime.util.SerializedThrowable:
com.amazonaws.services.dynamodbv2.model.PutItemRequest.withExpressionAttributeNames(Ljava/util/Map;)Lcom/amazonaws/services/dynamodbv2/model/PutItemRequest;
at
com.amazonaws.services.dynamodbv2.document.internal.PutItemImpl.doPutItem(PutItemImpl.java:82)
at
com.amazonaws.services.dynamodbv2.document.internal.PutItemImpl.putItem(PutItemImpl.java:41)
at com.amazonaws.services.dynamodbv2.document.Table.putItem(Table.java:144)
at
com.mix.ingestion.url.dupedetection.DynamoDBIO$.putItem(DynamoDBHandler.scala:38)
at
com.mix.ingestion.url.dupedetection.DynamoDBDupeDetectionBaseImpl.setKey(ABC.scala:143)
at
com.mix.ingestion.url.dupedetection.DynamoDBDupeDetectionBaseImpl.setKeyAndUpdateDupeFlag(ABC.scala:135)
at
com.mix.ingestion.url.dupedetection.DynamoDBDupeDetection$class.updateDupeFlagAndTable(ABC.scala:96)
at
com.mix.ingestion.url.dupedetection.DynamoDBDupeDetection$class.detectDupe(ABC.scala:111)
at
com.mix.ingestion.url.dupedetection.DynamoDBDupeDetection$.detectDupe(ABC.scala:158)
at
com.mix.ingestion.topology.Operators$$anonfun$15$$anonfun$apply$3.apply(Operators.scala:70)
at
com.mix.ingestion.topology.Operators$$anonfun$15$$anonfun$apply$3.apply(Operators.scala:70)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$4.map(DataStream.scala:485)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Serialized representation of
java.lang.NoSuchMethodError:
com.amazonaws.services.dynamodbv2.model.PutItemRequest.withExpressionAttributeNames(Ljava/util/Map;)Lcom/amazonaws/services/dynamodbv2/model/PutItemRequest;
... 18 common frames omitted

It works if I just run it in standalone fashion using "*java -cp
fatjar.jar:/opt/flink/lib/*  a.b.c.d.DynamoDBHandler"* on the same ec2
instance but I'm running into error when it tries to interact with DynamoDB
from inside an operator.
It fails even if I call the same *putItem* from inside the operator.

We've aws-java-sdk-1.7.4.jar , hadoop-aws-2.7.2.jar in flink/lib folder.
We're using fatjar to deploy the topology and it contains aws-java-sdk-s3
and aws-java-sdk-dynamodb both 1.11.3 version. I also experimented with
using aws-java-sdk in fatjar as well but it did not work. I looked into
aws-java-sdk-1.7.4.jar and see that com/amazonaws/services/dynamodbv2
exists.



Please let me know what am I doing wrong. Any help will be appreciated.

-- 
Thanks,
Deepak Jha


Unique ID of an operator

2016-04-08 Thread Deepak Jha
Hi,
I have a use case where I need to get UniqueId of an operator inside a
stream. DataStream's getId() returns the ID of the stream but I have
operator partitioned (say partitionByHash) inside the datastream. So I
would like to get unique ID for each operator working in parallel. Is there
a way in which I can get it ?
Also, does DataStream's getId generates different ID in case i have
parallelism > 1 ?


-- 
Thanks,
Deepak


Re: Writing multiple streams to multiple kafka

2016-03-31 Thread Deepak Jha
It works... Thanks

On Thu, Mar 31, 2016 at 2:23 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> yes you can output the stages to several different Kafka Topics. If you
> don't want to call addSink inside the run() method you somehow have to
> return the handle to your stage3 DataStream, for example:
>
> private val env = StreamExecutionEnvironment.getExecutionEnvironment
> private val src = env.addSource(Source.kafka(streams.abc.topic))
>
> override def run(stream: DataStream[TypeX]) : = {
>
>   val stage1 = stream
>.map(doA)
>.map(doB)
>.map(doC)
>
>  val stage2 = stage1.map(doD)  *// Returns (isTrue: Boolean, somethingElse:
> TypeT)*
>
> val stage3 = stage2.filter(_.isTrue)
> val stage4 = stage2.filter(! _.isTrue)
>
> (stage3, stage4.map(_.toString)) // return both stages
> }
>
> val (stage3, stage4) = run(src)
> stage3.addSink(Write_To_Kafka_Topic_Y)
> stage4.addSink(Write_To_Kafka_Topic_X)
>
>
> On Wed, 30 Mar 2016 at 20:19 Deepak Jha <dkjhan...@gmail.com> wrote:
>
> > Hi,
> > I'm building a pipeline using Flink using Kafka as source and sink. As
> part
> > of the this pipeline I have multiple stages in my run command and I would
> > like to publish some substages output into separate kafka topic.
> > My question is can I write multiple stages of run to multiple kafka
> topics
> > ?
> >
> > private val env = StreamExecutionEnvironment.getExecutionEnvironment
> > private val src = env.addSource(Source.kafka(streams.abc.topic))
> >
> > override def run(stream: DataStream[TypeX]) : = {
> >
> >   val stage1 = stream
> >.map(doA)
> >.map(doB)
> >.map(doC)
> >
> >  val stage2 = stage1.map(doD)  *// Returns (isTrue: Boolean,
> somethingElse:
> > TypeT)*
> >
> > val stage3 = stage2.filter(_.isTrue)
> > *stage3.addSink(Write_To_Kafka_Topic_Y)  // Can I do it outside run
> method
> > ?*
> > val stage4 = stage2.filter(! _.isTrue)
> >
> > stage4.map(_.toString)
> > }
> >
> > run(src).addSink(Write_To_Kafka_Topic_X)
> >
> >
> > Ideally I will not prefer to call addSink method inside run (as mentioned
> > in bold lines above).
> > --
> > Thanks,
> > Deepak Jha
> >
>



-- 
Thanks,
Deepak Jha


Writing multiple streams to multiple kafka

2016-03-30 Thread Deepak Jha
Hi,
I'm building a pipeline using Flink using Kafka as source and sink. As part
of the this pipeline I have multiple stages in my run command and I would
like to publish some substages output into separate kafka topic.
My question is can I write multiple stages of run to multiple kafka topics ?

private val env = StreamExecutionEnvironment.getExecutionEnvironment
private val src = env.addSource(Source.kafka(streams.abc.topic))

override def run(stream: DataStream[TypeX]) : = {

  val stage1 = stream
   .map(doA)
   .map(doB)
   .map(doC)

 val stage2 = stage1.map(doD)  *// Returns (isTrue: Boolean, somethingElse:
TypeT)*

val stage3 = stage2.filter(_.isTrue)
*stage3.addSink(Write_To_Kafka_Topic_Y)  // Can I do it outside run method
?*
val stage4 = stage2.filter(! _.isTrue)

stage4.map(_.toString)
}

run(src).addSink(Write_To_Kafka_Topic_X)


Ideally I will not prefer to call addSink method inside run (as mentioned
in bold lines above).
-- 
Thanks,
Deepak Jha


Re: Flink-1.0.0 JobManager is not running in Docker Container on AWS

2016-03-22 Thread Deepak Jha
Hi Maximilian,
Thanks for the email and looking into the issue. I'm using Scala 2.11 so it
sounds perfect to me...
I will be more than happy to test it out.

On Tue, Mar 22, 2016 at 2:48 AM, Maximilian Michels <m...@apache.org> wrote:

> Hi Deepak,
>
> We have looked further into this and have a pretty easy fix. However,
> it will only work with Flink's Scala 2.11 version because newer
> versions of the Akka library are incompatible with Scala 2.10 (Flink's
> default Scala version). Would that be a viable option for you?
>
> We're currently discussing this here:
> https://issues.apache.org/jira/browse/FLINK-2821
>
> Best,
> Max
>
> On Mon, Mar 14, 2016 at 4:49 PM, Deepak Jha <dkjhan...@gmail.com> wrote:
> > Hi Maximilian,
> > Thanks for your response. I will wait for the update.
> >
> > On Monday, March 14, 2016, Maximilian Michels <m...@apache.org> wrote:
> >
> >> Hi Deepak,
> >>
> >> We'll look more into this problem this week. Until now we considered it
> a
> >> configuration issue if the bind address was not externally reachable.
> >> However, one might not always have the possibility to change this
> network
> >> configuration.
> >>
> >> Looking further, it is actually possible to let the bind address be
> >> different from the advertised address. From the Akka FAQ at
> >> http://doc.akka.io/docs/akka/2.4.1/additional/faq.html:
> >>
> >> If you are running an ActorSystem under a NAT or inside a docker
> container,
> >> > make sure to set akka.remote.netty.tcp.hostname and
> >> > akka.remote.netty.tcp.port to the address it is reachable at from
> other
> >> > ActorSystems. If you need to bind your network interface to a
> different
> >> > address - use akka.remote.netty.tcp.bind-hostname and
> >> > akka.remote.netty.tcp.bind-port settings. Also make sure your network
> is
> >> > configured to translate from the address your ActorSystem is
> reachable at
> >> > to the address your ActorSystem network interface is bound to.
> >> >
> >>
> >> It looks like we have to expose this configuration to users who have a
> >> special network setup.
> >>
> >> Best,
> >> Max
> >>
> >> On Mon, Mar 14, 2016 at 5:42 AM, Deepak Jha <dkjhan...@gmail.com
> >> <javascript:;>> wrote:
> >>
> >> > Hi Stephan & Ufuk,
> >> > Thanks for your response.
> >> >
> >> > Yes there is a way in which you can run docker (net = host mode) in
> which
> >> > guest machine's network stack gets shared by docker container.
> >> > Unfortunately its not supported by AWS ECS.
> >> >
> >> > I do have one more question for you. Can you guys please explain me
> what
> >> > happens when taskmanager's register themselves to jobmanager in HA
> mode?
> >> > Does each taskmanager gets connected to jobmanager on separate port ?
> The
> >> > reason I'm asking is because if I run 2 taskmanager's (on separate
> docker
> >> > container), they are able to attach themselves to the Jobmanager
> (another
> >> > docker container) ( Flink HA setup using remote zk cluster) but soon
> >> after
> >> > that they get disconnected. Logs are not very helpful either... I
> suspect
> >> > that each taskmanager gets connected on new port and since by default
> >> > docker does not expose all ports, this may happen I do not see
> this
> >> > happen when I do not use docker container
> >> >
> >> > Here is the log file that I saw in jobmanager
> >> >
> >> > 2016-03-12 08:55:55,010 PST [INFO]  ec2-54-173-231-120.compute-1.a
> >> > [flink-akka.actor.default-dispatcher-20]
> >> o.a.f.r.instance.InstanceManager -
> >> > Registered TaskManager at 5673db03e679 (akka.tcp://
> >> > flink@172.17.0.3:6121/user/taskmanager) as
> >> > 7eafcfddd6bd084f2ec5a32594603f4f. Current number of registered hosts
> >> > is 1. *Current
> >> > number of alive task slots is 1.*
> >> > 2016-03-12 08:57:42,676 PST [INFO]  ec2-54-173-231-120.compute-1.a
> >> > [flink-akka.actor.default-dispatcher-20]
> >> o.a.f.r.instance.InstanceManager -
> >> > Registered TaskManager at 7200a7da4da7 (akka.tcp://
> >> > flink@172.17.0.3:6121/user/taskmanager) as
> >> > 320338e15a7a44ee64dc03a40f04fcd7. Current number of registered hosts
> >> > is 2. *Cur

Re: Flink-1.0.0 JobManager is not running in Docker Container on AWS

2016-03-14 Thread Deepak Jha
Hi Maximilian,
Thanks for your response. I will wait for the update.

On Monday, March 14, 2016, Maximilian Michels <m...@apache.org> wrote:

> Hi Deepak,
>
> We'll look more into this problem this week. Until now we considered it a
> configuration issue if the bind address was not externally reachable.
> However, one might not always have the possibility to change this network
> configuration.
>
> Looking further, it is actually possible to let the bind address be
> different from the advertised address. From the Akka FAQ at
> http://doc.akka.io/docs/akka/2.4.1/additional/faq.html:
>
> If you are running an ActorSystem under a NAT or inside a docker container,
> > make sure to set akka.remote.netty.tcp.hostname and
> > akka.remote.netty.tcp.port to the address it is reachable at from other
> > ActorSystems. If you need to bind your network interface to a different
> > address - use akka.remote.netty.tcp.bind-hostname and
> > akka.remote.netty.tcp.bind-port settings. Also make sure your network is
> > configured to translate from the address your ActorSystem is reachable at
> > to the address your ActorSystem network interface is bound to.
> >
>
> It looks like we have to expose this configuration to users who have a
> special network setup.
>
> Best,
> Max
>
> On Mon, Mar 14, 2016 at 5:42 AM, Deepak Jha <dkjhan...@gmail.com
> <javascript:;>> wrote:
>
> > Hi Stephan & Ufuk,
> > Thanks for your response.
> >
> > Yes there is a way in which you can run docker (net = host mode) in which
> > guest machine's network stack gets shared by docker container.
> > Unfortunately its not supported by AWS ECS.
> >
> > I do have one more question for you. Can you guys please explain me what
> > happens when taskmanager's register themselves to jobmanager in HA mode?
> > Does each taskmanager gets connected to jobmanager on separate port ? The
> > reason I'm asking is because if I run 2 taskmanager's (on separate docker
> > container), they are able to attach themselves to the Jobmanager (another
> > docker container) ( Flink HA setup using remote zk cluster) but soon
> after
> > that they get disconnected. Logs are not very helpful either... I suspect
> > that each taskmanager gets connected on new port and since by default
> > docker does not expose all ports, this may happen I do not see this
> > happen when I do not use docker container
> >
> > Here is the log file that I saw in jobmanager
> >
> > 2016-03-12 08:55:55,010 PST [INFO]  ec2-54-173-231-120.compute-1.a
> > [flink-akka.actor.default-dispatcher-20]
> o.a.f.r.instance.InstanceManager -
> > Registered TaskManager at 5673db03e679 (akka.tcp://
> > flink@172.17.0.3:6121/user/taskmanager) as
> > 7eafcfddd6bd084f2ec5a32594603f4f. Current number of registered hosts
> > is 1. *Current
> > number of alive task slots is 1.*
> > 2016-03-12 08:57:42,676 PST [INFO]  ec2-54-173-231-120.compute-1.a
> > [flink-akka.actor.default-dispatcher-20]
> o.a.f.r.instance.InstanceManager -
> > Registered TaskManager at 7200a7da4da7 (akka.tcp://
> > flink@172.17.0.3:6121/user/taskmanager) as
> > 320338e15a7a44ee64dc03a40f04fcd7. Current number of registered hosts
> > is 2. *Current
> > number of alive task slots is 2.*
> > 2016-03-12 08:57:48,422 PST [INFO]  ec2-54-173-231-120.compute-1.a
> > [flink-akka.actor.default-dispatcher-20]
> > o.a.f.runtime.jobmanager.JobManager - Task manager akka.tcp://
> > flink@172.17.0.3:6121/user/taskmanager terminated.
> > 2016-03-12 08:57:48,422 PST [INFO]  ec2-54-173-231-120.compute-1.a
> > [flink-akka.actor.default-dispatcher-20] o.a.f.r.instance.InstanceManager
> > -*
> > Unregistered task manager akka.tcp://
> > flink@172.17.0.3:6121/user/taskmanager
> > <http://flink@172.17.0.3:6121/user/taskmanager>. Number of registered
> task
> > managers 1. Number of available slots 1.*
> > 2016-03-12 08:58:01,417 PST [WARN]  ec2-54-173-231-120.compute-1.a
> > [flink-akka.actor.default-dispatcher-20]
> > a.remote.ReliableDeliverySupervisor - Association with remote system
> > [akka.tcp://flink@172.17.0.3:6121] has failed, address is now gated for
> > [5000] ms. Reason is: [Disassociated].
> > 2016-03-12 08:58:01,451 PST [INFO]  ec2-54-173-231-120.compute-1.a
> > [flink-akka.actor.default-dispatcher-20]
> > o.a.f.runtime.jobmanager.JobManager - Task manager akka.tcp://
> > flink@172.17.0.3:6121/user/taskmanager wants to disconnect, because
> > TaskManager akka://flink/user/taskmanager is disassociating.
> > 2016-03-12 08:58:01,451 PST [INFO]  ec2-54-173-231-

Re: Flink-1.0.0 JobManager is not running in Docker Container on AWS

2016-03-13 Thread Deepak Jha
//flink@172.17.0.3:6121] has failed, address is now gated for
[5000] ms. Reason is: [Disassociated].
2016-03-12 08:58:21,388 PST [INFO]  ec2-54-173-231-120.compute-1.a
[flink-akka.actor.default-dispatcher-20]
o.a.f.runtime.jobmanager.JobManager - Task manager akka.tcp://
flink@172.17.0.3:6121/user/taskmanager wants to disconnect, because
TaskManager akka://flink/user/taskmanager is disassociating.
2016-03-12 08:58:21,388 PST [INFO]  ec2-54-173-231-120.compute-1.a
[flink-akka.actor.default-dispatcher-20] o.a.f.r.instance.InstanceManager -
Unregistered task manager akka.tcp://flink@172.17.0.3:6121/user/taskmanager.
Number of registered task managers 0. Number of available slots 0.
2016-03-12 08:58:21,390 PST [INFO]  ec2-54-173-231-120.compute-1.a
[flink-akka.actor.default-dispatcher-20] o.a.f.r.instance.InstanceManager -
Registered TaskManager at 7200a7da4da7 (akka.tcp://
flink@172.17.0.3:6121/user/taskmanager) as
bda61dbd047d40889aa3868d5d4d86a9. Current number of registered hosts is 1.
Current number of alive task slots is 1.
2016-03-12 08:58:25,433 PST [INFO]  ec2-54-173-231-120.compute-1.a
[flink-akka.actor.default-dispatcher-18]
o.a.f.runtime.jobmanager.JobManager - Task manager akka.tcp://
flink@172.17.0.3:6121/user/taskmanager terminated.
2016-03-12 08:58:25,434 PST [INFO]  ec2-54-173-231-120.compute-1.a
[flink-akka.actor.default-dispatcher-18] o.a.f.r.instance.InstanceManager -
Unregistered task manager akka.tcp://flink@172.17.0.3:6121/user/taskmanager.
Number of registered task managers 0. Number of available slots 0.
2016-03-12 08:58:28,947 PST [INFO]  ec2-54-173-231-120.compute-1.a
[flink-akka.actor.default-dispatcher-20] o.a.f.r.instance.InstanceManager -
Registering TaskManager at akka.tcp://flink@172.17.0.3:6121/user/taskmanager
which was marked as dead earlier because of a heart-beat timeout.
2016-03-12 08:58:28,948 PST [INFO]  ec2-54-173-231-120.compute-1.a
[flink-akka.actor.default-dispatcher-20] o.a.f.r.instance.InstanceManager -
Registered TaskManager at 7200a7da4da7 (akka.tcp://
flink@172.17.0.3:6121/user/taskmanager) as
d42ea5c6e0053935a0973d8536f3d8a5. Current number of registered hosts is 1.
Current number of alive task slots is 1.


On Fri, Mar 11, 2016 at 5:23 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi Deepak!
>
> We can currently not split the bind address and advertised address, because
> the Akka library only accepts packages sent explicitly to the bind address
> (not sure why Akka has this artificial limitation, but it is there).
>
> Can you bridge the container IP address to be visible from the outside?
>
> Stephan
>
>
> On Fri, Mar 11, 2016 at 1:03 PM, Ufuk Celebi <u...@apache.org> wrote:
>
> > Hey Deepak!
> >
> > Your description of Flink's behaviour is correct. To summarize:
> >
> > # Host Address
> >
> > If you specify a host address as an argument to the JVM (via
> > jobmanager.sh or the start-cluster.sh scripts) then that one is used.
> > If you don't, it falls back to the value configured in flink-conf.yaml
> > (what you describe).
> >
> > # Ports
> >
> > Default used random port and publishes via ZooKeeper. You can
> > configure a port range only via recovery.jobmanager.port (what you
> > describe).
> >
> > ---
> >
> > Your proposal would likely solve the issue, but isn't it possible to
> > handle this outside of Flink? I've found this stack overflow question,
> > which should be related:
> >
> >
> http://stackoverflow.com/questions/26539727/giving-a-docker-container-a-routable-ip-address
> >
> > What's your opinion?
> >
>



-- 
Thanks,
Deepak Jha


Re: Flink-1.0.0 JobManager is not running in Docker Container on AWS

2016-03-10 Thread Deepak Jha
Hi Stephan,
I am able to figure out the issue... Here is my explanation..

As I've said, I'm trying to setup Flink HA cluster in docker containers
managed by Amazon ECS. I've a remote zookeeper cluster running in AWS.
There are few issues when we deploy it using docker

--- Flink uses *jobmanager.rpc.address *to bind as well as for storing it
in the zookeeper. Now this address could be the host_ipaddress or
running_container_ipaddress. If I set it to host_ipaddress then jobmanager
is not able to bind because this is not the container's ip address.  If I
use the container's ip address then it is able to bind, but when it pushes
its details to zookeeper , its container's ip address. So remote
taskmanager's are not able to discover it. Ideally  *jobmanager.rpc.address
*should be split into *jobmanager.bind.address (*to bind to jobmanager*) *and
*jobmanager.discovery.address* (to publish in zookeeper so that remote
taskmanager's can discover it)..

eg: Let's assume

EC2_Instance_Ip = 1.1.1.1
Container_Ip = 2.2.2.2 (This container is running in this EC2_Instance)
recovery.jobmanager.port = 3000
jobmanager.web.port = 8080
I mapped port 3000 on container to 3000 on host and 8080 on container to
8080 on host...

In flink-conf.yml assume
*Case 1*
 jobmanager.rpc.address = 2.2.2.2  (Container's Ip address)
 Now 2.2.2.2 will be written in zookeeper. So external taskmanager
would like to use this address to communicate with the jobmanager but it
will not be able to connect since 2.2.2.2 is not discoverable from outside
EC2 container.

*Case 2*
   jobmanager.rpc.address = 1.1.1.1  (EC2_Instance Ip address)
   Container does not know this address, so it will not be able to bind at
all.

As you can see we need 2 ip address... one for binding and another for
discovery.

 In docker world we have to expose all the ports we want to use ( in
bridged network mode). By default the jobmanager uses random port number
for communication, since we do not know the port number in advance so we
set r*ecovery.jobmanager.port*  and exposed it in Dockerfile. Same is the
case with blob.server.port on taskmanager's.

Hope I clarified it, please let me know if you have any other question.

On Thu, Mar 10, 2016 at 10:47 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> Is it possible that the docker container config forbids to open ports?
> Flink will try to open some ports and needs the OS or container to permit
> that.
>
> Greetings,
> Stephan
>
>
> On Thu, Mar 10, 2016 at 6:27 PM, Deepak Jha <dkjhan...@gmail.com> wrote:
>
> > Hi Stephan,
> > I tried 0.10.2 as well still running into the same issue.
> >
> > On Thursday, March 10, 2016, Deepak Jha <dkjhan...@gmail.com> wrote:
> >
> > > Yes. Flink 1.0.0
> > >
> > > On Thursday, March 10, 2016, Stephan Ewen <se...@apache.org
> > > <javascript:_e(%7B%7D,'cvml','se...@apache.org');>> wrote:
> > >
> > >> Hi!
> > >>
> > >> Is this Flink 1.0.0 ?
> > >>
> > >> Stephan
> > >>
> > >>
> > >> On Thu, Mar 10, 2016 at 6:02 AM, Deepak Jha <dkjhan...@gmail.com>
> > wrote:
> > >>
> > >> > Hi All,
> > >> >
> > >> > I'm trying to setup Flink 1.0.0 cluster on Docker (separate
> containers
> > >> for
> > >> > jobmanager and taskmanager) inside AWS (Using AWS ECS service). I
> > >> tested it
> > >> > locally and its working fine but on AWS Docker, I am running into
> > >> following
> > >> > issue
> > >> >
> > >> > *2016-03-09 18:04:12,114 PST [INFO]  ec2-52-3-248-202.compute-1.ama
> > >> [main]
> > >> > o.a.f.runtime.jobmanager.JobManager - Starting JobManager with
> > >> > high-availability*
> > >> > *2016-03-09 18:04:12,118 PST [INFO]  ec2-52-3-248-202.compute-1.ama
> > >> [main]
> > >> > o.a.f.runtime.jobmanager.JobManager - Starting JobManager on
> > >> > 172.31.63.152:8079 <http://172.31.63.152:8079> with execution mode
> > >> CLUSTER*
> > >> > *2016-03-09 18:04:12,172 PST [INFO]  ec2-52-3-248-202.compute-1.ama
> > >> [main]
> > >> > o.a.f.runtime.jobmanager.JobManager - Security is not enabled.
> > Starting
> > >> > non-authenticated JobManager.*
> > >> > *2016-03-09 18:04:12,174 PST [DEBUG] ec2-52-3-248-202.compute-1.ama
> > >> [main]
> > >> > org.apache.flink.util.NetUtils - Trying to open socket on port 8079*
> > >> > *2016-03-09 18:04:12,176 PST [DEBUG] ec2-52-3-248-202.compute-1.ama
> > >> [main]
> > >> &g

Re: Flink-1.0.0 JobManager is not running in Docker Container on AWS

2016-03-10 Thread Deepak Jha
Hi Stephan,
I tried 0.10.2 as well still running into the same issue.

On Thursday, March 10, 2016, Deepak Jha <dkjhan...@gmail.com> wrote:

> Yes. Flink 1.0.0
>
> On Thursday, March 10, 2016, Stephan Ewen <se...@apache.org
> <javascript:_e(%7B%7D,'cvml','se...@apache.org');>> wrote:
>
>> Hi!
>>
>> Is this Flink 1.0.0 ?
>>
>> Stephan
>>
>>
>> On Thu, Mar 10, 2016 at 6:02 AM, Deepak Jha <dkjhan...@gmail.com> wrote:
>>
>> > Hi All,
>> >
>> > I'm trying to setup Flink 1.0.0 cluster on Docker (separate containers
>> for
>> > jobmanager and taskmanager) inside AWS (Using AWS ECS service). I
>> tested it
>> > locally and its working fine but on AWS Docker, I am running into
>> following
>> > issue
>> >
>> > *2016-03-09 18:04:12,114 PST [INFO]  ec2-52-3-248-202.compute-1.ama
>> [main]
>> > o.a.f.runtime.jobmanager.JobManager - Starting JobManager with
>> > high-availability*
>> > *2016-03-09 18:04:12,118 PST [INFO]  ec2-52-3-248-202.compute-1.ama
>> [main]
>> > o.a.f.runtime.jobmanager.JobManager - Starting JobManager on
>> > 172.31.63.152:8079 <http://172.31.63.152:8079> with execution mode
>> CLUSTER*
>> > *2016-03-09 18:04:12,172 PST [INFO]  ec2-52-3-248-202.compute-1.ama
>> [main]
>> > o.a.f.runtime.jobmanager.JobManager - Security is not enabled. Starting
>> > non-authenticated JobManager.*
>> > *2016-03-09 18:04:12,174 PST [DEBUG] ec2-52-3-248-202.compute-1.ama
>> [main]
>> > org.apache.flink.util.NetUtils - Trying to open socket on port 8079*
>> > *2016-03-09 18:04:12,176 PST [DEBUG] ec2-52-3-248-202.compute-1.ama
>> [main]
>> > org.apache.flink.util.NetUtils - Unable to allocate socket on port*
>> > *java.net.BindException: Cannot assign requested address*
>> > *at java.net.PlainSocketImpl.socketBind(Native Method)*
>> > *at
>> > java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387)*
>> > *at java.net.ServerSocket.bind(ServerSocket.java:375)*
>> > *at java.net.ServerSocket.(ServerSocket.java:237)*
>> > *at
>> >
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2$$anon$3.createSocket(JobManager.scala:1722)*
>> > *at
>> > org.apache.flink.util.NetUtils.createSocketFromPorts(NetUtils.java:237)*
>> > *at
>> >
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:1719)*
>> > *at
>> >
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:1717)*
>> > *at
>> >
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:1717)*
>> > *at scala.util.Try$.apply(Try.scala:192)*
>> > *at
>> >
>> org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:1772)*
>> > *at
>> >
>> org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:1717)*
>> > *at
>> >
>> org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1653)*
>> > *at
>> > org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)*
>> > *2016-03-09 18:04:12,180 PST [ERROR] ec2-52-3-248-202.compute-1.ama
>> [main]
>> > o.a.f.runtime.jobmanager.JobManager - Failed to run JobManager.*
>> > *java.lang.RuntimeException: Unable to do further retries starting the
>> > actor system*
>> > *at
>> >
>> org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:1777)*
>> > *at
>> >
>> org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:1717)*
>> > *at
>> >
>> org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1653)*
>> > *at
>> > org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)*
>> > *2016-03-09 18:04:12,991 PST [DEBUG] ec2-52-3-248-202.compute-1.ama
>> [main]
>> > o.a.h.m.lib.MutableMetricsFactory - field
>> > org.apache.hadoop.metrics2.lib.MutableRate
>> > org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess
>> > with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=,
>> > sampleName=Ops, always=false, type=DEFAULT, value=[Rate of successful
>> > kerberos logins and latency (milliseconds)], valueName=Time)*
>> >
>> >
>> > Initially Jobmanager tries to bind to port 0 which did not work. On
>> > looking further into it, I tried using recovery jobmanager port using
>> > different port combinations, but it does not seems to be working... I've
>> > exposed the ports in the docker compose file as well
>> >
>> >
>> > PFA the jobmanager log file for details also the jobmanager config
>> file...
>> > --
>> > Thanks,
>> > Deepak Jha
>> >
>> >
>>
>
>
> --
> Sent from Gmail Mobile
>


-- 
Sent from Gmail Mobile


Re: Flink-1.0.0 JobManager is not running in Docker Container on AWS

2016-03-10 Thread Deepak Jha
Yes. Flink 1.0.0

On Thursday, March 10, 2016, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> Is this Flink 1.0.0 ?
>
> Stephan
>
>
> On Thu, Mar 10, 2016 at 6:02 AM, Deepak Jha <dkjhan...@gmail.com
> <javascript:;>> wrote:
>
> > Hi All,
> >
> > I'm trying to setup Flink 1.0.0 cluster on Docker (separate containers
> for
> > jobmanager and taskmanager) inside AWS (Using AWS ECS service). I tested
> it
> > locally and its working fine but on AWS Docker, I am running into
> following
> > issue
> >
> > *2016-03-09 18:04:12,114 PST [INFO]  ec2-52-3-248-202.compute-1.ama
> [main]
> > o.a.f.runtime.jobmanager.JobManager - Starting JobManager with
> > high-availability*
> > *2016-03-09 18:04:12,118 PST [INFO]  ec2-52-3-248-202.compute-1.ama
> [main]
> > o.a.f.runtime.jobmanager.JobManager - Starting JobManager on
> > 172.31.63.152:8079 <http://172.31.63.152:8079> with execution mode
> CLUSTER*
> > *2016-03-09 18:04:12,172 PST [INFO]  ec2-52-3-248-202.compute-1.ama
> [main]
> > o.a.f.runtime.jobmanager.JobManager - Security is not enabled. Starting
> > non-authenticated JobManager.*
> > *2016-03-09 18:04:12,174 PST [DEBUG] ec2-52-3-248-202.compute-1.ama
> [main]
> > org.apache.flink.util.NetUtils - Trying to open socket on port 8079*
> > *2016-03-09 18:04:12,176 PST [DEBUG] ec2-52-3-248-202.compute-1.ama
> [main]
> > org.apache.flink.util.NetUtils - Unable to allocate socket on port*
> > *java.net.BindException: Cannot assign requested address*
> > *at java.net.PlainSocketImpl.socketBind(Native Method)*
> > *at
> > java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387)*
> > *at java.net.ServerSocket.bind(ServerSocket.java:375)*
> > *at java.net.ServerSocket.(ServerSocket.java:237)*
> > *at
> >
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2$$anon$3.createSocket(JobManager.scala:1722)*
> > *at
> > org.apache.flink.util.NetUtils.createSocketFromPorts(NetUtils.java:237)*
> > *at
> >
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:1719)*
> > *at
> >
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:1717)*
> > *at
> >
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:1717)*
> > *at scala.util.Try$.apply(Try.scala:192)*
> > *at
> >
> org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:1772)*
> > *at
> >
> org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:1717)*
> > *at
> >
> org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1653)*
> > *at
> > org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)*
> > *2016-03-09 18:04:12,180 PST [ERROR] ec2-52-3-248-202.compute-1.ama
> [main]
> > o.a.f.runtime.jobmanager.JobManager - Failed to run JobManager.*
> > *java.lang.RuntimeException: Unable to do further retries starting the
> > actor system*
> > *at
> >
> org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:1777)*
> > *at
> >
> org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:1717)*
> > *at
> >
> org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1653)*
> > *at
> > org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)*
> > *2016-03-09 18:04:12,991 PST [DEBUG] ec2-52-3-248-202.compute-1.ama
> [main]
> > o.a.h.m.lib.MutableMetricsFactory - field
> > org.apache.hadoop.metrics2.lib.MutableRate
> > org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess
> > with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=,
> > sampleName=Ops, always=false, type=DEFAULT, value=[Rate of successful
> > kerberos logins and latency (milliseconds)], valueName=Time)*
> >
> >
> > Initially Jobmanager tries to bind to port 0 which did not work. On
> > looking further into it, I tried using recovery jobmanager port using
> > different port combinations, but it does not seems to be working... I've
> > exposed the ports in the docker compose file as well
> >
> >
> > PFA the jobmanager log file for details also the jobmanager config
> file...
> > --
> > Thanks,
> > Deepak Jha
> >
> >
>


-- 
Sent from Gmail Mobile


Flink-1.0.0 JobManager is not running in Docker Container on AWS

2016-03-09 Thread Deepak Jha
Hi All,

I'm trying to setup Flink 1.0.0 cluster on Docker (separate containers for
jobmanager and taskmanager) inside AWS (Using AWS ECS service). I tested it
locally and its working fine but on AWS Docker, I am running into following
issue

*2016-03-09 18:04:12,114 PST [INFO]  ec2-52-3-248-202.compute-1.ama [main]
o.a.f.runtime.jobmanager.JobManager - Starting JobManager with
high-availability*
*2016-03-09 18:04:12,118 PST [INFO]  ec2-52-3-248-202.compute-1.ama [main]
o.a.f.runtime.jobmanager.JobManager - Starting JobManager on
172.31.63.152:8079 <http://172.31.63.152:8079> with execution mode CLUSTER*
*2016-03-09 18:04:12,172 PST [INFO]  ec2-52-3-248-202.compute-1.ama [main]
o.a.f.runtime.jobmanager.JobManager - Security is not enabled. Starting
non-authenticated JobManager.*
*2016-03-09 18:04:12,174 PST [DEBUG] ec2-52-3-248-202.compute-1.ama [main]
org.apache.flink.util.NetUtils - Trying to open socket on port 8079*
*2016-03-09 18:04:12,176 PST [DEBUG] ec2-52-3-248-202.compute-1.ama [main]
org.apache.flink.util.NetUtils - Unable to allocate socket on port*
*java.net.BindException: Cannot assign requested address*
*at java.net.PlainSocketImpl.socketBind(Native Method)*
*at
java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387)*
*at java.net.ServerSocket.bind(ServerSocket.java:375)*
*at java.net.ServerSocket.(ServerSocket.java:237)*
*at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2$$anon$3.createSocket(JobManager.scala:1722)*
*at
org.apache.flink.util.NetUtils.createSocketFromPorts(NetUtils.java:237)*
*at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:1719)*
*at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:1717)*
*at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:1717)*
*at scala.util.Try$.apply(Try.scala:192)*
*at
org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:1772)*
*at
org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:1717)*
*at
org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1653)*
*at
org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)*
*2016-03-09 18:04:12,180 PST [ERROR] ec2-52-3-248-202.compute-1.ama [main]
o.a.f.runtime.jobmanager.JobManager - Failed to run JobManager.*
*java.lang.RuntimeException: Unable to do further retries starting the
actor system*
*at
org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:1777)*
*at
org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:1717)*
*at
org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1653)*
*at
org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)*
*2016-03-09 18:04:12,991 PST [DEBUG] ec2-52-3-248-202.compute-1.ama [main]
o.a.h.m.lib.MutableMetricsFactory - field
org.apache.hadoop.metrics2.lib.MutableRate
org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess
with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=,
sampleName=Ops, always=false, type=DEFAULT, value=[Rate of successful
kerberos logins and latency (milliseconds)], valueName=Time)*


Initially Jobmanager tries to bind to port 0 which did not work. On looking
further into it, I tried using recovery jobmanager port using different
port combinations, but it does not seems to be working... I've exposed the
ports in the docker compose file as well


PFA the jobmanager log file for details also the jobmanager config file...
-- 
Thanks,
Deepak Jha
2016-03-09 18:04:11,887 PST [INFO]  ec2-52-3-248-202.compute-1.ama [main] 
o.a.f.runtime.jobmanager.JobManager - 

2016-03-09 18:04:11,888 PST [INFO]  ec2-52-3-248-202.compute-1.ama [main] 
o.a.f.runtime.jobmanager.JobManager - Registered UNIX signal handlers for 
[TERM, HUP, INT]
2016-03-09 18:04:12,070 PST [INFO]  ec2-52-3-248-202.compute-1.ama [main] 
o.a.f.runtime.jobmanager.JobManager - Loading configuration from 
/opt/flink-1.0.0/conf
2016-03-09 18:04:12,082 PST [WARN]  ec2-52-3-248-202.compute-1.ama [main] 
o.a.f.c.GlobalConfiguration - Error while reading configuration: Cannot read 
property 0
2016-03-09 18:04:12,083 PST [WARN]  ec2-52-3-248-202.compute-1.ama [main] 
o.a.f.c.GlobalConfiguration - Error while reading configuration: Cannot read 
property 1
2016-03-09 18:04:12,083 PST [WARN]  ec2-52-3-248-202.compute-1.ama [main] 
o.a.f.c.GlobalConfiguration - Error while reading configuration: Cannot read 
property 2
2016-03-09 18:04:12,091 PST [DEBUG] ec2-52-3-248-202.compute-1.ama [main] 
o.a.f.c.GlobalConfiguration - Loading configuration property: 
recovery.jobmanager.port, 8079
2016-03-09 18:04:12,095 PST [DEBUG] ec2-52-3-248-202.compute-1.ama [main] 
o.a.f.c.GlobalConfiguration - Loading configuration pr

Re: Remote TaskManager Connection Problem

2016-03-07 Thread Deepak Jha
Hi Stephan,
Thanks for the response. I was able to resolve the issue, I was using
localhost in jobmanager name instead of container name... There were few
more issues which I would like to mention
- I'm using S3 for storage/checkpoint in Flink HA mode, I realized that I
have to set fs.hdfs.hadoopconf in conf/flink-conf.yaml and add
core-site.xml in conf/ .. Since I'm deploying it on AWS I had to place
hadoop-aws.jar as well


On Fri, Mar 4, 2016 at 1:22 AM, Stephan Ewen <se...@apache.org> wrote:

> The  pull request https://github.com/apache/flink/pull/1758 should improve
> the TaskManager's network interface selection.
>
>
> On Fri, Mar 4, 2016 at 10:19 AM, Stephan Ewen <se...@apache.org> wrote:
>
> > Hi!
> >
> > This registration phase means that the TaskManager tries to tell the
> > JobManager that it is available.
> > If that fails, there can be two reasons
> >
> >   1) Network communication not possible to the port
> >   1.1) JobManager IP really not reachable (not the case, as you
> > described)
> >   1.2) TaskManager selected a wrong network interface to work with
> >   2) JobManager not listening
> >
> >
> > To look into 1.2, can you check the TaskManager log at the beginning,
> > where it says what interface/hostname the TaskManager selected to use?
> >
> > Thanks,
> > Stephan
> >
> >
> >
> >
> >
> >
> > On Fri, Mar 4, 2016 at 2:48 AM, Deepak Jha <dkjhan...@gmail.com> wrote:
> >
> >> Hi All,
> >> I've created 2 docker containers on my local machine, one running
> >> JM(192.168.99.104) and other running TM. I was expecting to see TM in
> the
> >> JM UI but it did not happen. On looking into the TM logs I see following
> >> lines
> >>
> >>
> >> 01:29:50,862 DEBUG org.apache.flink.runtime.taskmanager.TaskManager
> >>  - Starting TaskManager process reaper
> >> 01:29:50,868 INFO  org.apache.flink.runtime.filecache.FileCache
> >>  - User file cache uses directory
> >> /tmp/flink-dist-cache-be63f351-2bce-48ef-bbc4-fb0f40fecd49
> >> 01:29:51,093 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> >>  - Starting TaskManager actor at
> >> akka://flink/user/taskmanager#1222392284.
> >> 01:29:51,095 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> >>  - TaskManager data connection information: 140efeb188cc
> >> (dataPort=6122)
> >> 01:29:51,096 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> >>  - TaskManager has 1 task slot(s).
> >> 01:29:51,097 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> >>  - Memory usage stats: [HEAP: 386/494/494 MB, NON HEAP: 30/31/-1 MB
> >> (used/committed/max)]
> >> 01:29:51,104 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> >>  - Trying to register at JobManager akka.tcp://
> >> flink@192.168.99.104:6123/user/jobmanager (attempt 1, timeout: 500
> >> milliseconds)
> >> 01:29:51,633 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> >>  - Trying to register at JobManager akka.tcp://
> >> flink@192.168.99.104:6123/user/jobmanager (attempt 2, timeout: 1000
> >> milliseconds)
> >> 01:29:52,652 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> >>  - Trying to register at JobManager akka.tcp://
> >> flink@192.168.99.104:6123/user/jobmanager (attempt 3, timeout: 2000
> >> milliseconds)
> >> 01:29:54,672 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> >>  - Trying to register at JobManager akka.tcp://
> >> flink@192.168.99.104:6123/user/jobmanager (attempt 4, timeout: 4000
> >> milliseconds)
> >> 01:29:58,693 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> >>  - Trying to register at JobManager akka.tcp://
> >> flink@192.168.99.104:6123/user/jobmanager (attempt 5, timeout: 8000
> >> milliseconds)
> >> 01:30:06,702 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> >>  - Trying to register at JobManager akka.tcp://
> >> flink@192.168.99.104:6123/user/jobmanager (attempt 6, timeout: 16000
> >> milliseconds)
> >>
> >>
> >> However, from TM i am able to reach JM on port 6123
> >> root@140efeb188cc:/# nc -v 192.168.99.104 6123
> >> Connection to 192.168.99.104 6123 port [tcp/*] succeeded!
> >>
> >>
> >> masters file on TM contains
> >> 192.168.99.104:8080
> >>
> >> Did anyone face this issue with remote JM/TM combination ?
> >>
> >> --
> >> Thanks,
> >> Deepak Jha
> >>
> >
> >
>



-- 
Thanks,
Deepak Jha


Remote TaskManager Connection Problem

2016-03-03 Thread Deepak Jha
Hi All,
I've created 2 docker containers on my local machine, one running
JM(192.168.99.104) and other running TM. I was expecting to see TM in the
JM UI but it did not happen. On looking into the TM logs I see following
lines


01:29:50,862 DEBUG org.apache.flink.runtime.taskmanager.TaskManager
 - Starting TaskManager process reaper
01:29:50,868 INFO  org.apache.flink.runtime.filecache.FileCache
 - User file cache uses directory
/tmp/flink-dist-cache-be63f351-2bce-48ef-bbc4-fb0f40fecd49
01:29:51,093 INFO  org.apache.flink.runtime.taskmanager.TaskManager
 - Starting TaskManager actor at
akka://flink/user/taskmanager#1222392284.
01:29:51,095 INFO  org.apache.flink.runtime.taskmanager.TaskManager
 - TaskManager data connection information: 140efeb188cc (dataPort=6122)
01:29:51,096 INFO  org.apache.flink.runtime.taskmanager.TaskManager
 - TaskManager has 1 task slot(s).
01:29:51,097 INFO  org.apache.flink.runtime.taskmanager.TaskManager
 - Memory usage stats: [HEAP: 386/494/494 MB, NON HEAP: 30/31/-1 MB
(used/committed/max)]
01:29:51,104 INFO  org.apache.flink.runtime.taskmanager.TaskManager
 - Trying to register at JobManager akka.tcp://
flink@192.168.99.104:6123/user/jobmanager (attempt 1, timeout: 500
milliseconds)
01:29:51,633 INFO  org.apache.flink.runtime.taskmanager.TaskManager
 - Trying to register at JobManager akka.tcp://
flink@192.168.99.104:6123/user/jobmanager (attempt 2, timeout: 1000
milliseconds)
01:29:52,652 INFO  org.apache.flink.runtime.taskmanager.TaskManager
 - Trying to register at JobManager akka.tcp://
flink@192.168.99.104:6123/user/jobmanager (attempt 3, timeout: 2000
milliseconds)
01:29:54,672 INFO  org.apache.flink.runtime.taskmanager.TaskManager
 - Trying to register at JobManager akka.tcp://
flink@192.168.99.104:6123/user/jobmanager (attempt 4, timeout: 4000
milliseconds)
01:29:58,693 INFO  org.apache.flink.runtime.taskmanager.TaskManager
 - Trying to register at JobManager akka.tcp://
flink@192.168.99.104:6123/user/jobmanager (attempt 5, timeout: 8000
milliseconds)
01:30:06,702 INFO  org.apache.flink.runtime.taskmanager.TaskManager
 - Trying to register at JobManager akka.tcp://
flink@192.168.99.104:6123/user/jobmanager (attempt 6, timeout: 16000
milliseconds)


However, from TM i am able to reach JM on port 6123
root@140efeb188cc:/# nc -v 192.168.99.104 6123
Connection to 192.168.99.104 6123 port [tcp/*] succeeded!


masters file on TM contains
192.168.99.104:8080

Did anyone face this issue with remote JM/TM combination ?

-- 
Thanks,
Deepak Jha


Re: Adding TaskManager's

2016-02-19 Thread Deepak Jha
Hi Ufuk,
Sure... I will let you know. I'm planning to use centralized zookeeper.
That way Flink and ZK will have the separation.

On Fri, Feb 19, 2016 at 12:06 PM, Ufuk Celebi <u...@apache.org> wrote:

> OK, nice! :-) Then you can just skip the "slaves" file and directly
> work with the scripts.
>
> I'm curious to know if everything works as expected. If you encounter
> something that seems wrong, let us know.
>
> – Ufuk
>
>
> On Fri, Feb 19, 2016 at 9:02 PM, Deepak Jha <dkjhan...@gmail.com> wrote:
> > Hi Ufuk,
> > I'm planning to build Flink HA cluster and I may need to autoscale
> > taskmanager based on the the requirement. I feel it makes more sense for
> me
> > to start each taskmanager & jobmanager individually using taskmanager.sh
> > and jobmanager.sh and let these taskmanager's discover jobmanager's using
> > Zookeeper. The reason being that if I go with updating the slaves, then I
> > have to push these changes in all the machines in the cluster to come to
> a
> > consistent state.. I will get more flexibility if I can add taskmanager's
> > on-the-fly without any update in rest of the machines in the cluster. I'm
> > going to use Docker & Amazon ECS. So, if there is a change in my
> artifact,
> > I will build a new cluster but as such each node of the cluster will be
> > immutable.
> >
> >
> >
> > On Wed, Feb 17, 2016 at 11:54 PM, Ufuk Celebi <u...@apache.org> wrote:
> >
> >> No, the "slaves" file is still used to ssh into the machines and start
> >> the task manager processes in the start-cluster.sh script. So you
> >> still need password-less ssh into the machines if you want to use
> >> that. The task managers discover the job manager via ZooKeeper though
> >> (therefore you don't need to configure the jobmanager address in the
> >> config).
> >>
> >> In theory, you can also skip the "slaves" file if you ssh manually
> >> into the machines and start the task managers via the taskmanger.sh
> >> script, but I don't think that this is what you are looking for. Or
> >> are you?
> >>
> >> – Ufuk
> >>
> >>
> >> On Wed, Feb 17, 2016 at 10:27 PM, Deepak Jha <dkjhan...@gmail.com>
> wrote:
> >> > Hi Max and Stephan,
> >> > Does this mean that I can start Flink HA cluster without keeping any
> >> entry
> >> > in "slaves" file ? I'm asking this because then I should not worry
> about
> >> > copying public key for password-less ssh in Flink HA cluster
> >> >
> >> > On Wed, Feb 17, 2016 at 12:38 PM, Deepak Jha <dkjhan...@gmail.com>
> >> wrote:
> >> >
> >> >> Sorry for the typo Stephan
> >> >>
> >> >>
> >> >> On Wednesday, February 17, 2016, Deepak Jha <dkjhan...@gmail.com>
> >> wrote:
> >> >>
> >> >>> Thanks Max and Steven for the response.
> >> >>>
> >> >>> On Wednesday, February 17, 2016, Stephan Ewen <se...@apache.org>
> >> wrote:
> >> >>>
> >> >>>> Hi Deepak!
> >> >>>>
> >> >>>> The "slaves" file is only used by the SSH script to start a
> standalone
> >> >>>> cluster.
> >> >>>>
> >> >>>> As Max said, TaskManagers register dynamically at the JobManager.
> >> >>>>
> >> >>>> Discovery works via:
> >> >>>>- config in non-HA mode
> >> >>>>- ZooKeeper in HA mode
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>> On Wed, Feb 17, 2016 at 10:11 AM, Maximilian Michels <
> m...@apache.org>
> >> >>>> wrote:
> >> >>>>
> >> >>>> > Hi Deepak,
> >> >>>> >
> >> >>>> > The job manager doesn't have to know about task managers. They
> will
> >> >>>> > simply register at the job manager using the provided
> configuration.
> >> >>>> > In HA mode, they will lookup the currently leading job manager
> first
> >> >>>> > and then connect to it. The job manager can then assign work.
> >> >>>> >
> >> >>>> > Cheers,
> >> >>>> > Max
> >> >>>> >
> >> >>>> > On Tue, Feb 16, 2016 at 10:41 PM, Deepak Jha <
> dkjhan...@gmail.com>
> >> >>>> wrote:
> >> >>>> > > Hi All,
> >> >>>> > > I have a question on scaling-up/scaling-down flink cluster.
> >> >>>> > > As per the documentation, in order to scale-up the cluster, I
> can
> >> >>>> add a
> >> >>>> > new
> >> >>>> > > taskmanager on the fly and jobmanager can assign work to it.
> >> >>>> Assuming, I
> >> >>>> > > have Flink HA , so in the event of master JobManager failure,
> how
> >> is
> >> >>>> this
> >> >>>> > > taskmanager detail is going to get transferred ? I believe new
> >> master
> >> >>>> > will
> >> >>>> > > just read the contents from slaves config file. Can anyone give
> >> more
> >> >>>> > > clarity on how this is done ? Or, Is it union of slaves and the
> >> >>>> > > taskmanager's that are added on the fly ?
> >> >>>> > >
> >> >>>> > > --
> >> >>>> > > Thanks,
> >> >>>> > > Deepak
> >> >>>> >
> >> >>>>
> >> >>>
> >> >>>
> >> >>> --
> >> >>> Sent from Gmail Mobile
> >> >>>
> >> >>
> >> >>
> >> >> --
> >> >> Sent from Gmail Mobile
> >> >>
> >> >
> >> >
> >> >
> >> > --
> >> > Thanks,
> >> > Deepak Jha
> >>
> >
> >
> >
> > --
> > Thanks,
> > Deepak Jha
>



-- 
Thanks,
Deepak Jha


Re: Adding TaskManager's

2016-02-19 Thread Deepak Jha
Hi Ufuk,
I'm planning to build Flink HA cluster and I may need to autoscale
taskmanager based on the the requirement. I feel it makes more sense for me
to start each taskmanager & jobmanager individually using taskmanager.sh
and jobmanager.sh and let these taskmanager's discover jobmanager's using
Zookeeper. The reason being that if I go with updating the slaves, then I
have to push these changes in all the machines in the cluster to come to a
consistent state.. I will get more flexibility if I can add taskmanager's
on-the-fly without any update in rest of the machines in the cluster. I'm
going to use Docker & Amazon ECS. So, if there is a change in my artifact,
I will build a new cluster but as such each node of the cluster will be
immutable.



On Wed, Feb 17, 2016 at 11:54 PM, Ufuk Celebi <u...@apache.org> wrote:

> No, the "slaves" file is still used to ssh into the machines and start
> the task manager processes in the start-cluster.sh script. So you
> still need password-less ssh into the machines if you want to use
> that. The task managers discover the job manager via ZooKeeper though
> (therefore you don't need to configure the jobmanager address in the
> config).
>
> In theory, you can also skip the "slaves" file if you ssh manually
> into the machines and start the task managers via the taskmanger.sh
> script, but I don't think that this is what you are looking for. Or
> are you?
>
> – Ufuk
>
>
> On Wed, Feb 17, 2016 at 10:27 PM, Deepak Jha <dkjhan...@gmail.com> wrote:
> > Hi Max and Stephan,
> > Does this mean that I can start Flink HA cluster without keeping any
> entry
> > in "slaves" file ? I'm asking this because then I should not worry about
> > copying public key for password-less ssh in Flink HA cluster
> >
> > On Wed, Feb 17, 2016 at 12:38 PM, Deepak Jha <dkjhan...@gmail.com>
> wrote:
> >
> >> Sorry for the typo Stephan
> >>
> >>
> >> On Wednesday, February 17, 2016, Deepak Jha <dkjhan...@gmail.com>
> wrote:
> >>
> >>> Thanks Max and Steven for the response.
> >>>
> >>> On Wednesday, February 17, 2016, Stephan Ewen <se...@apache.org>
> wrote:
> >>>
> >>>> Hi Deepak!
> >>>>
> >>>> The "slaves" file is only used by the SSH script to start a standalone
> >>>> cluster.
> >>>>
> >>>> As Max said, TaskManagers register dynamically at the JobManager.
> >>>>
> >>>> Discovery works via:
> >>>>- config in non-HA mode
> >>>>- ZooKeeper in HA mode
> >>>>
> >>>>
> >>>>
> >>>> On Wed, Feb 17, 2016 at 10:11 AM, Maximilian Michels <m...@apache.org>
> >>>> wrote:
> >>>>
> >>>> > Hi Deepak,
> >>>> >
> >>>> > The job manager doesn't have to know about task managers. They will
> >>>> > simply register at the job manager using the provided configuration.
> >>>> > In HA mode, they will lookup the currently leading job manager first
> >>>> > and then connect to it. The job manager can then assign work.
> >>>> >
> >>>> > Cheers,
> >>>> > Max
> >>>> >
> >>>> > On Tue, Feb 16, 2016 at 10:41 PM, Deepak Jha <dkjhan...@gmail.com>
> >>>> wrote:
> >>>> > > Hi All,
> >>>> > > I have a question on scaling-up/scaling-down flink cluster.
> >>>> > > As per the documentation, in order to scale-up the cluster, I can
> >>>> add a
> >>>> > new
> >>>> > > taskmanager on the fly and jobmanager can assign work to it.
> >>>> Assuming, I
> >>>> > > have Flink HA , so in the event of master JobManager failure, how
> is
> >>>> this
> >>>> > > taskmanager detail is going to get transferred ? I believe new
> master
> >>>> > will
> >>>> > > just read the contents from slaves config file. Can anyone give
> more
> >>>> > > clarity on how this is done ? Or, Is it union of slaves and the
> >>>> > > taskmanager's that are added on the fly ?
> >>>> > >
> >>>> > > --
> >>>> > > Thanks,
> >>>> > > Deepak
> >>>> >
> >>>>
> >>>
> >>>
> >>> --
> >>> Sent from Gmail Mobile
> >>>
> >>
> >>
> >> --
> >> Sent from Gmail Mobile
> >>
> >
> >
> >
> > --
> > Thanks,
> > Deepak Jha
>



-- 
Thanks,
Deepak Jha


Re: Adding TaskManager's

2016-02-17 Thread Deepak Jha
Hi Max and Stephan,
Does this mean that I can start Flink HA cluster without keeping any entry
in "slaves" file ? I'm asking this because then I should not worry about
copying public key for password-less ssh in Flink HA cluster

On Wed, Feb 17, 2016 at 12:38 PM, Deepak Jha <dkjhan...@gmail.com> wrote:

> Sorry for the typo Stephan
>
>
> On Wednesday, February 17, 2016, Deepak Jha <dkjhan...@gmail.com> wrote:
>
>> Thanks Max and Steven for the response.
>>
>> On Wednesday, February 17, 2016, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Hi Deepak!
>>>
>>> The "slaves" file is only used by the SSH script to start a standalone
>>> cluster.
>>>
>>> As Max said, TaskManagers register dynamically at the JobManager.
>>>
>>> Discovery works via:
>>>- config in non-HA mode
>>>- ZooKeeper in HA mode
>>>
>>>
>>>
>>> On Wed, Feb 17, 2016 at 10:11 AM, Maximilian Michels <m...@apache.org>
>>> wrote:
>>>
>>> > Hi Deepak,
>>> >
>>> > The job manager doesn't have to know about task managers. They will
>>> > simply register at the job manager using the provided configuration.
>>> > In HA mode, they will lookup the currently leading job manager first
>>> > and then connect to it. The job manager can then assign work.
>>> >
>>> > Cheers,
>>> > Max
>>> >
>>> > On Tue, Feb 16, 2016 at 10:41 PM, Deepak Jha <dkjhan...@gmail.com>
>>> wrote:
>>> > > Hi All,
>>> > > I have a question on scaling-up/scaling-down flink cluster.
>>> > > As per the documentation, in order to scale-up the cluster, I can
>>> add a
>>> > new
>>> > > taskmanager on the fly and jobmanager can assign work to it.
>>> Assuming, I
>>> > > have Flink HA , so in the event of master JobManager failure, how is
>>> this
>>> > > taskmanager detail is going to get transferred ? I believe new master
>>> > will
>>> > > just read the contents from slaves config file. Can anyone give more
>>> > > clarity on how this is done ? Or, Is it union of slaves and the
>>> > > taskmanager's that are added on the fly ?
>>> > >
>>> > > --
>>> > > Thanks,
>>> > > Deepak
>>> >
>>>
>>
>>
>> --
>> Sent from Gmail Mobile
>>
>
>
> --
> Sent from Gmail Mobile
>



-- 
Thanks,
Deepak Jha


Re: Adding TaskManager's

2016-02-17 Thread Deepak Jha
Sorry for the typo Stephan

On Wednesday, February 17, 2016, Deepak Jha <dkjhan...@gmail.com> wrote:

> Thanks Max and Steven for the response.
>
> On Wednesday, February 17, 2016, Stephan Ewen <se...@apache.org
> <javascript:_e(%7B%7D,'cvml','se...@apache.org');>> wrote:
>
>> Hi Deepak!
>>
>> The "slaves" file is only used by the SSH script to start a standalone
>> cluster.
>>
>> As Max said, TaskManagers register dynamically at the JobManager.
>>
>> Discovery works via:
>>- config in non-HA mode
>>- ZooKeeper in HA mode
>>
>>
>>
>> On Wed, Feb 17, 2016 at 10:11 AM, Maximilian Michels <m...@apache.org>
>> wrote:
>>
>> > Hi Deepak,
>> >
>> > The job manager doesn't have to know about task managers. They will
>> > simply register at the job manager using the provided configuration.
>> > In HA mode, they will lookup the currently leading job manager first
>> > and then connect to it. The job manager can then assign work.
>> >
>> > Cheers,
>> > Max
>> >
>> > On Tue, Feb 16, 2016 at 10:41 PM, Deepak Jha <dkjhan...@gmail.com>
>> wrote:
>> > > Hi All,
>> > > I have a question on scaling-up/scaling-down flink cluster.
>> > > As per the documentation, in order to scale-up the cluster, I can add
>> a
>> > new
>> > > taskmanager on the fly and jobmanager can assign work to it.
>> Assuming, I
>> > > have Flink HA , so in the event of master JobManager failure, how is
>> this
>> > > taskmanager detail is going to get transferred ? I believe new master
>> > will
>> > > just read the contents from slaves config file. Can anyone give more
>> > > clarity on how this is done ? Or, Is it union of slaves and the
>> > > taskmanager's that are added on the fly ?
>> > >
>> > > --
>> > > Thanks,
>> > > Deepak
>> >
>>
>
>
> --
> Sent from Gmail Mobile
>


-- 
Sent from Gmail Mobile


Re: Adding TaskManager's

2016-02-17 Thread Deepak Jha
Thanks Max and Steven for the response.

On Wednesday, February 17, 2016, Stephan Ewen <se...@apache.org> wrote:

> Hi Deepak!
>
> The "slaves" file is only used by the SSH script to start a standalone
> cluster.
>
> As Max said, TaskManagers register dynamically at the JobManager.
>
> Discovery works via:
>- config in non-HA mode
>- ZooKeeper in HA mode
>
>
>
> On Wed, Feb 17, 2016 at 10:11 AM, Maximilian Michels <m...@apache.org
> <javascript:;>> wrote:
>
> > Hi Deepak,
> >
> > The job manager doesn't have to know about task managers. They will
> > simply register at the job manager using the provided configuration.
> > In HA mode, they will lookup the currently leading job manager first
> > and then connect to it. The job manager can then assign work.
> >
> > Cheers,
> > Max
> >
> > On Tue, Feb 16, 2016 at 10:41 PM, Deepak Jha <dkjhan...@gmail.com
> <javascript:;>> wrote:
> > > Hi All,
> > > I have a question on scaling-up/scaling-down flink cluster.
> > > As per the documentation, in order to scale-up the cluster, I can add a
> > new
> > > taskmanager on the fly and jobmanager can assign work to it. Assuming,
> I
> > > have Flink HA , so in the event of master JobManager failure, how is
> this
> > > taskmanager detail is going to get transferred ? I believe new master
> > will
> > > just read the contents from slaves config file. Can anyone give more
> > > clarity on how this is done ? Or, Is it union of slaves and the
> > > taskmanager's that are added on the fly ?
> > >
> > > --
> > > Thanks,
> > > Deepak
> >
>


-- 
Sent from Gmail Mobile


Adding TaskManager's

2016-02-16 Thread Deepak Jha
Hi All,
I have a question on scaling-up/scaling-down flink cluster.
As per the documentation, in order to scale-up the cluster, I can add a new
taskmanager on the fly and jobmanager can assign work to it. Assuming, I
have Flink HA , so in the event of master JobManager failure, how is this
taskmanager detail is going to get transferred ? I believe new master will
just read the contents from slaves config file. Can anyone give more
clarity on how this is done ? Or, Is it union of slaves and the
taskmanager's that are added on the fly ?

-- 
Thanks,
Deepak


Kafka As Sink in Flink Streaming

2016-01-22 Thread Deepak Jha
Hi Devs,
I just started using Flink and would like to ass kafka as Sink. I went
through the documentation but so far I've not succeeded in writing to Kafka
from Flink

I' building application in Scala Here is my code snippet

case class *Demo*(city: String, country: String, zipcode: Int)

The map stage returns an instance of Demo type

 val env = StreamExecutionEnvironment.getExecutionEnvironment

  val properties = new Properties()

  properties.setProperty("bootstrap.servers", "127.0.0.1:9092")
  properties.setProperty("zookeeper.connect", "127.0.0.1:2181")
  properties.setProperty("group.id", "test_topic")
val mapToDemo: String => Demo = {//Implementation}

val stream = env.addSource(new FlinkKafkaConsumer082[String]("test", new
SimpleStringSchema, properties))

stream.map(mapToDemo).addSink(new FlinkKafkaProducer[Demo]("127.0.0.1:9092",
"test_topic", new SimpleStringSchema()))

Can anyone explain me what am I doing wrong in adding Kafka as Sink ?
-- 
Thanks,
Deepak Jha


Re: Kafka As Sink in Flink Streaming

2016-01-22 Thread Deepak Jha
Hi Robert,
No it was compile time issue. Actually i tried to write a string as well
but it did not work for me. Just for clarity my flink-connector-kafka
version is 0.10.1

I was able to fix the issue... SimpleStringSchema should be replaced with
JavaDefaultStringSchema as the later is doing conversion from String to
Byte Array.

Thanks for the help though.

On Fri, Jan 22, 2016 at 1:34 PM, Robert Metzger <rmetz...@apache.org> wrote:

> Hi,
>
> did you get any error message while executing the job? I don't think you
> can serialize the "Demo" type with the "SimpleStringSchema".
>
>
>
>
> On Fri, Jan 22, 2016 at 8:13 PM, Deepak Jha <dkjhan...@gmail.com> wrote:
>
> > Hi Devs,
> > I just started using Flink and would like to ass kafka as Sink. I went
> > through the documentation but so far I've not succeeded in writing to
> Kafka
> > from Flink
> >
> > I' building application in Scala Here is my code snippet
> >
> > case class *Demo*(city: String, country: String, zipcode: Int)
> >
> > The map stage returns an instance of Demo type
> >
> >  val env = StreamExecutionEnvironment.getExecutionEnvironment
> >
> >   val properties = new Properties()
> >
> >   properties.setProperty("bootstrap.servers", "127.0.0.1:9092")
> >   properties.setProperty("zookeeper.connect", "127.0.0.1:2181")
> >   properties.setProperty("group.id", "test_topic")
> > val mapToDemo: String => Demo = {//Implementation}
> >
> > val stream = env.addSource(new FlinkKafkaConsumer082[String]("test", new
> > SimpleStringSchema, properties))
> >
> > stream.map(mapToDemo).addSink(new FlinkKafkaProducer[Demo]("
> 127.0.0.1:9092
> > ",
> > "test_topic", new SimpleStringSchema()))
> >
> > Can anyone explain me what am I doing wrong in adding Kafka as Sink ?
> > --
> > Thanks,
> > Deepak Jha
> >
>



-- 
Thanks,
Deepak Jha