Re: Correct way to reference Hadoop dependencies in Flink 1.4.0
Hi, You probably need to set core-site.xml and set the Hadoop conf path in flink-conf.yaml core-site.xml: fs.s3.impl org.apache.hadoop.fs.s3a.S3AFileSystem fs.s3.buffer.dir /tmp I’ve had similar issue when I tried to upgrade to Flink 1.4.2 . On Thu, Mar 15, 2018 at 9:39 AM Aljoscha Krettekwrote: > Hi, > > I believe for FileSystems to be correctly be picked up they have to be in > the lib/ folder of Flink. Stephan (cc'ed), please correct me if I'm wrong > here, you probably know that one best. > > Aljoscha > > > On 14. Mar 2018, at 18:26, l...@lyft.com wrote: > > > > Hi, > > > > I am running this on a Hadoop-free cluster (i.e. no YARN etc.). I have > the following dependencies packaged in my user application JAR: > > > > aws-java-sdk 1.7.4 > > flink-hadoop-fs 1.4.0 > > flink-shaded-hadoop2 1.4.0 > > flink-connector-filesystem_2.11 1.4.0 > > hadoop-common 2.7.4 > > hadoop-aws 2.7.4 > > > > I have also tried the following conf: > > classloader.resolve-order: parent-first > > fs.hdfs.hadoopconf: /srv/hadoop/hadoop-2.7.5/etc/hadoop > > > > But no luck. Anything else I could be missing? > > > > On 2018/03/14 18:57:47, Francesco Ciuci > wrote: > >> Hi, > >> > >> You do not just need the hadoop dependencies in the jar but you need to > >> have the hadoop file system running in your machine/cluster. > >> > >> Regards > >> > >> On 14 March 2018 at 18:38, l...@lyft.com wrote: > >> > >>> I'm trying to use a BucketingSink to write files to S3 in my Flink job. > >>> > >>> I have the Hadoop dependencies I need packaged in my user application > jar. > >>> However, on running the job I get the following error (from the > >>> taskmanager): > >>> > >>> java.lang.RuntimeException: Error while creating FileSystem when > >>> initializing the state of the BucketingSink. > >>>at org.apache.flink.streaming.connectors.fs.bucketing. > >>> BucketingSink.initializeState(BucketingSink.java:358) > >>>at org.apache.flink.streaming.util.functions. > >>> > StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > >>>at org.apache.flink.streaming.util.functions. > >>> > StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java: > >>> 160) > >>>at org.apache.flink.streaming.api.operators. > >>> AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator. > >>> java:96) > >>>at org.apache.flink.streaming.api.operators. > >>> AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259) > >>>at org.apache.flink.streaming.runtime.tasks.StreamTask. > >>> initializeOperators(StreamTask.java:694) > >>>at org.apache.flink.streaming.runtime.tasks.StreamTask. > >>> initializeState(StreamTask.java:682) > >>>at org.apache.flink.streaming.runtime.tasks.StreamTask. > >>> invoke(StreamTask.java:253) > >>>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > >>>at java.lang.Thread.run(Thread.java:748) > >>> Caused by: > org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > >>> Could not find a file system implementation for scheme 's3a'. The > scheme is > >>> not directly supported by Flink and no Hadoop file system to support > this > >>> scheme could be loaded. > >>>at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem( > >>> FileSystem.java:405) > >>>at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320) > >>>at org.apache.flink.streaming.connectors.fs.bucketing. > >>> BucketingSink.createHadoopFileSystem(BucketingSink.java:1125) > >>>at org.apache.flink.streaming.connectors.fs.bucketing. > >>> BucketingSink.initFileSystem(BucketingSink.java:411) > >>>at org.apache.flink.streaming.connectors.fs.bucketing. > >>> BucketingSink.initializeState(BucketingSink.java:355) > >>>... 9 common frames omitted > >>> Caused by: > org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > >>> Hadoop is not in the classpath/dependencies. > >>>at org.apache.flink.core.fs.UnsupportedSchemeFactory.create( > >>> UnsupportedSchemeFactory.java:64) > >>>at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem( > >>> FileSystem.java:401) > >>>... 13 common frames omitted > >>> > >>> What's the right way to do this? > >>> > >> > > -- Sent from Gmail Mobile
Re: FailureRate Restart Strategy is not picked from Config file
Hi Till, yes, I have enabled checkpointing from my job. Thanks for your help :) On Mon, Sep 26, 2016 at 3:18 AM, Till Rohrmann <trohrm...@apache.org> wrote: > Hi Deepak, > > are you running Flink streaming jobs with checkpointing enabled? In this > case Flink will check if you've set a restart strategy at your job and if > not it will set the fixed delay restart strategy. This will effectively > overwrite the default restart strategy which you define in the > flink-conf.yaml file. > > Cheers, > Till > > On Thu, Sep 22, 2016 at 10:01 PM, Deepak Jha <dkjhan...@gmail.com> wrote: > > > Hi All, > > I tried to use FailureRate restart strategy by setting values for it in > > flink-conf.yaml but flink (v 1.1.2) did not pick it up. > > > > # Flink Restart strategy > > restart-strategy: failure-rate > > restart-strategy.failure-rate.delay: 120 s > > restart-strategy.failure-rate.failure-rate-interval: 12 minute > > restart-strategy.failure-rate.max-failures-per-interval: 300 > > > > It works when I set it up explicitly in topology using > > *env.setRestartStrategy * > > > > PFA snapshot of the Jobmanager log. > > Thanks, > > Deepak Jha > > > > > -- Thanks, Deepak Jha
FailureRate Restart Strategy is not picked from Config file
Hi All, I tried to use FailureRate restart strategy by setting values for it in flink-conf.yaml but flink (v 1.1.2) did not pick it up. # Flink Restart strategy restart-strategy: failure-rate restart-strategy.failure-rate.delay: 120 s restart-strategy.failure-rate.failure-rate-interval: 12 minute restart-strategy.failure-rate.max-failures-per-interval: 300 It works when I set it up explicitly in topology using *env.setRestartStrategy * PFA snapshot of the Jobmanager log. Thanks, Deepak Jha
Re: Flink HA on AWS: Network related issue
Hi Till, There is a way to shutdown actor systems by setting taskmanager.maxRegistrationDuration to a reasonable duration (eg: 900 seconds). Default value sets it to Inf. In this case I noticed that Taskmanager goes down and runit restarts the service and it gets connected with Jobmanager. As I said earlier as well that retries to connect to Jobmanager does not work even though telnet works at the same time to the same Jobmanager on port 50050. So retry does cache something which does not allow it to reconnect. My flink cluster is on aws ( m4.large instances), not sure if anyone else has observed this behavior. On Thursday, September 15, 2016, Till Rohrmann <trohrm...@apache.org> wrote: > Hi Deepak, > > it seems that the JobManager's deathwatch declares the TaskManager to be > unreachable which will automatically quarantine it. You're right that in > such a case, the TaskManager should shut down and be restarted so that it > can again reconnect to the JobManager. This is, however, not yet supported > automatically. > > For the moment, I'd recommend you to make the deathwatch a little bit less > aggressive via the following settings: > > - akka.watch.heartbeat.interval: Heartbeat interval for Akka’s DeathWatch > mechanism to detect dead TaskManagers. If TaskManagers are wrongly marked > dead because of lost or delayed heartbeat messages, then you should > increase this value. A thorough description of Akka’s DeathWatch can be > found here (DEFAULT: akka.ask.timeout/10). > - akka.watch.heartbeat.pause: Acceptable heartbeat pause for Akka’s > DeathWatch mechanism. A low value does not allow a irregular heartbeat. A > thorough description of Akka’s DeathWatch can be found here (DEFAULT: > akka.ask.timeout). > - akka.watch.threshold: Threshold for the DeathWatch failure detector. A > low value is prone to false positives whereas a high value increases the > time to detect a dead TaskManager. A thorough description of Akka’s > DeathWatch can be found here (DEFAULT: 12). > > I hope this helps you to work around the problem for the moment until we've > added the automatic shut down and restart. > > Cheers, > Till > > On Mon, Sep 12, 2016 at 5:55 AM, Deepak Jha <dkjhan...@gmail.com > <javascript:;>> wrote: > > > Hi Till, > > One more thing i noticed after looking into following message in > > taskmanager log > > > > 2016-09-11 17:57:25,310 PDT [WARN] ip-10-6-0-15 > > [flink-akka.actor.default-dispatcher-31] Remoting - Tried to associate > > with > > unreachable remote address [akka.tcp://flink@10.6.22.22:50050]. Address > is > > now gated for 5000 ms, all messages to this address will be delivered to > > dead letters. Reason: *The remote system has quarantined this system. No > > further associations to the remote system are possible until this system > is > > restarted*. > > > > So in this case ideally the local ActorSystem should go down so that > > service supervisor/runit will restart the system and taskmanager will > again > > be able to connect to the remote system.. If it does not happen > > automatically then we have to monitor logs in some way and then try to > > ensure that it restarts. Ideally flink taskmanager Actor System should go > > down. Please let me know if my understanding is wrong. > > > > > > > > > > > > On Fri, Sep 9, 2016 at 8:01 AM, Deepak Jha <dkjhan...@gmail.com > <javascript:;>> wrote: > > > > > Hi Till, > > > I'm getting following message in Jobmanager log > > > > > > 2016-09-09 07:46:55,093 PDT [WARN] ip-10-8-11-249 > > > [flink-akka.actor.default-dispatcher-985] akka.remote.RemoteWatcher - > > *Detected > > > unreachable: [akka.tcp://flink@10.8.4.57:6121 > > > <http://flink@10.8.4.57:6121>]* > > > 2016-09-09 07:46:55,094 PDT [INFO] ip-10-8-11-249 > > > [flink-akka.actor.default-dispatcher-985] o.a.f.runtime.jobmanager. > > JobManager > > > - Task manager akka.tcp://flink@10.8.4.57:6121/user/taskmanager > > > terminated. > > > 2016-09-09 07:46:55,094 PDT [INFO] ip-10-8-11-249 > > > [flink-akka.actor.default-dispatcher-985] o.a.f.r.instance. > > InstanceManager > > > - Unregistered task manager akka.tcp://flink@10.8.4.57 <javascript:;>: > > > 6121/user/taskmanager. Number of registered task managers 2. Number of > > > available slots 4. > > > 2016-09-09 07:46:55,096 PDT [WARN] ip-10-8-11-249 > > > [flink-akka.actor.default-dispatcher-982] Remoting - Association to > > > [akka.tcp://flink@10.8.4.57:6121] having UID [-1223410403] is > > > irrecoverably failed. *UID is now
Re: Flink HA on AWS: Network related issue
Hi Till, One more thing i noticed after looking into following message in taskmanager log 2016-09-11 17:57:25,310 PDT [WARN] ip-10-6-0-15 [flink-akka.actor.default-dispatcher-31] Remoting - Tried to associate with unreachable remote address [akka.tcp://flink@10.6.22.22:50050]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: *The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted*. So in this case ideally the local ActorSystem should go down so that service supervisor/runit will restart the system and taskmanager will again be able to connect to the remote system.. If it does not happen automatically then we have to monitor logs in some way and then try to ensure that it restarts. Ideally flink taskmanager Actor System should go down. Please let me know if my understanding is wrong. On Fri, Sep 9, 2016 at 8:01 AM, Deepak Jha <dkjhan...@gmail.com> wrote: > Hi Till, > I'm getting following message in Jobmanager log > > 2016-09-09 07:46:55,093 PDT [WARN] ip-10-8-11-249 > [flink-akka.actor.default-dispatcher-985] akka.remote.RemoteWatcher - > *Detected > unreachable: [akka.tcp://flink@10.8.4.57:6121 > <http://flink@10.8.4.57:6121>]* > 2016-09-09 07:46:55,094 PDT [INFO] ip-10-8-11-249 > [flink-akka.actor.default-dispatcher-985] o.a.f.runtime.jobmanager.JobManager > - Task manager akka.tcp://flink@10.8.4.57:6121/user/taskmanager > terminated. > 2016-09-09 07:46:55,094 PDT [INFO] ip-10-8-11-249 > [flink-akka.actor.default-dispatcher-985] o.a.f.r.instance.InstanceManager > - Unregistered task manager akka.tcp://flink@10.8.4.57: > 6121/user/taskmanager. Number of registered task managers 2. Number of > available slots 4. > 2016-09-09 07:46:55,096 PDT [WARN] ip-10-8-11-249 > [flink-akka.actor.default-dispatcher-982] Remoting - Association to > [akka.tcp://flink@10.8.4.57:6121] having UID [-1223410403] is > irrecoverably failed. *UID is now quarantined and all messages to this > UID will be delivered to dead letters. Remote actorsystem must be restarted > to recover from this situation.* > 2016-09-09 07:46:55,097 PDT [INFO] ip-10-8-11-249 > [flink-akka.actor.default-dispatcher-982] akka.actor.LocalActorRef - > Message [akka.remote.transport.AssociationHandle$Disassociated] from > Actor[akka://flink/deadLetters] to Actor[akka://flink/system/ > endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fflink%4010.8.4.57% > 3A6121-0/endpointWriter/endpointReader-akka.tcp%3A%2F% > 2Fflink%4010.8.4.57%3A6121-0#393939009] was not delivered. [54] dead > letters encountered. This logging can be turned off or adjusted with > configuration settings 'akka.log-dead-letters' and > 'akka.log-dead-letters-during-shutdown'. > 2016-09-09 07:46:55,098 PDT [INFO] ip-10-8-11-249 > [flink-akka.actor.default-dispatcher-985] akka.actor.LocalActorRef - > Message [akka.remote.transport.AssociationHandle$Disassociated] from > Actor[akka://flink/deadLetters] to Actor[akka://flink/system/transports/ > akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F% > 2Fflink%4010.8.4.57%3A51291-2#1151730456] was not delivered. [55] dead > letters encountered. This logging can be turned off or adjusted with > configuration settings 'akka.log-dead-letters' and > 'akka.log-dead-letters-during-shutdown'. > 2016-09-09 07:46:58,479 PDT [INFO] ip-10-8-11-249 > [ForkJoinPool-3-worker-1] o.a.f.r.c.ZooKeeperCompletedCheckpointStore - > Recovering checkpoints from ZooKeeper. > > Hope it helps. I'm using Flink 1.0.2 > > On Fri, Sep 9, 2016 at 12:34 AM, Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hi Deepak, >> >> could you check the logs whether the JobManager has been quarantined and >> thus, cannot be connected to anymore? The logs should at least contain a >> hint why the TaskManager lost the connection initially. >> >> Cheers, >> Till >> >> On Thu, Sep 8, 2016 at 7:08 PM, Deepak Jha <dkjhan...@gmail.com> wrote: >> >> > Hi, >> > I've setup Flink HA on AWS ( 3 Taskmanagers and 2 Jobmanagers each are >> on >> > EC2 m4.large instance with checkpoint enabled on S3 ). My topology works >> > fine, but after few hours I do see that Taskmanagers gets detached with >> > Jobmanager. I tried to reach Jobmanager using telnet at the same time >> and >> > it worked but Taskmanager does not succeed in connecting again. It >> attaches >> > only after I restart it. I tried following settings but still the >> problem >> > persists. >> > >> > akka.ask.timeout: 20 s >> > akka.lookup.timeout: 20 s >> > akka.watch.heartbeat.interval: 20 s >> > >> > Please find attached snapshot on one of the Taskmanager. Is there any >> > setting that I need to do ? >> > >> > -- >> > Thanks, >> > Deepak Jha >> > >> > >> > > > > -- > Thanks, > Deepak Jha > > -- Thanks, Deepak Jha
Re: Flink HA on AWS: Network related issue
Hi Till, I'm getting following message in Jobmanager log 2016-09-09 07:46:55,093 PDT [WARN] ip-10-8-11-249 [flink-akka.actor.default-dispatcher-985] akka.remote.RemoteWatcher - *Detected unreachable: [akka.tcp://flink@10.8.4.57:6121 <http://flink@10.8.4.57:6121>]* 2016-09-09 07:46:55,094 PDT [INFO] ip-10-8-11-249 [flink-akka.actor.default-dispatcher-985] o.a.f.runtime.jobmanager.JobManager - Task manager akka.tcp:// flink@10.8.4.57:6121/user/taskmanager terminated. 2016-09-09 07:46:55,094 PDT [INFO] ip-10-8-11-249 [flink-akka.actor.default-dispatcher-985] o.a.f.r.instance.InstanceManager - Unregistered task manager akka.tcp://flink@10.8.4.57:6121/user/taskmanager. Number of registered task managers 2. Number of available slots 4. 2016-09-09 07:46:55,096 PDT [WARN] ip-10-8-11-249 [flink-akka.actor.default-dispatcher-982] Remoting - Association to [akka.tcp://flink@10.8.4.57:6121] having UID [-1223410403] is irrecoverably failed. *UID is now quarantined and all messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover from this situation.* 2016-09-09 07:46:55,097 PDT [INFO] ip-10-8-11-249 [flink-akka.actor.default-dispatcher-982] akka.actor.LocalActorRef - Message [akka.remote.transport.AssociationHandle$Disassociated] from Actor[akka://flink/deadLetters] to Actor[akka://flink/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fflink%4010.8.4.57%3A6121-0/endpointWriter/endpointReader-akka.tcp%3A%2F%2Fflink%4010.8.4.57%3A6121-0#393939009] was not delivered. [54] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 2016-09-09 07:46:55,098 PDT [INFO] ip-10-8-11-249 [flink-akka.actor.default-dispatcher-985] akka.actor.LocalActorRef - Message [akka.remote.transport.AssociationHandle$Disassociated] from Actor[akka://flink/deadLetters] to Actor[akka://flink/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fflink%4010.8.4.57%3A51291-2#1151730456] was not delivered. [55] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 2016-09-09 07:46:58,479 PDT [INFO] ip-10-8-11-249 [ForkJoinPool-3-worker-1] o.a.f.r.c.ZooKeeperCompletedCheckpointStore - Recovering checkpoints from ZooKeeper. Hope it helps. I'm using Flink 1.0.2 On Fri, Sep 9, 2016 at 12:34 AM, Till Rohrmann <trohrm...@apache.org> wrote: > Hi Deepak, > > could you check the logs whether the JobManager has been quarantined and > thus, cannot be connected to anymore? The logs should at least contain a > hint why the TaskManager lost the connection initially. > > Cheers, > Till > > On Thu, Sep 8, 2016 at 7:08 PM, Deepak Jha <dkjhan...@gmail.com> wrote: > > > Hi, > > I've setup Flink HA on AWS ( 3 Taskmanagers and 2 Jobmanagers each are on > > EC2 m4.large instance with checkpoint enabled on S3 ). My topology works > > fine, but after few hours I do see that Taskmanagers gets detached with > > Jobmanager. I tried to reach Jobmanager using telnet at the same time and > > it worked but Taskmanager does not succeed in connecting again. It > attaches > > only after I restart it. I tried following settings but still the problem > > persists. > > > > akka.ask.timeout: 20 s > > akka.lookup.timeout: 20 s > > akka.watch.heartbeat.interval: 20 s > > > > Please find attached snapshot on one of the Taskmanager. Is there any > > setting that I need to do ? > > > > -- > > Thanks, > > Deepak Jha > > > > > -- Thanks, Deepak Jha
Flink HA on AWS: Network related issue
Hi, I've setup Flink HA on AWS ( 3 Taskmanagers and 2 Jobmanagers each are on EC2 m4.large instance with checkpoint enabled on S3 ). My topology works fine, but after few hours I do see that Taskmanagers gets detached with Jobmanager. I tried to reach Jobmanager using telnet at the same time and it worked but Taskmanager does not succeed in connecting again. It attaches only after I restart it. I tried following settings but still the problem persists. akka.ask.timeout: 20 s akka.lookup.timeout: 20 s akka.watch.heartbeat.interval: 20 s Please find attached snapshot on one of the Taskmanager. Is there any setting that I need to do ? -- Thanks, Deepak Jha
Issues while interacting with DynamoDB
Hi All, We've flink (1.0.2) HA setup on AWS cloud and are using IAM roles to interact with S3 (S3a as suggested in flink best practices) and DynamoDB. While trying to interact with DynamoDB to perform key-value pair lookup from one of the operator we are running into the following issue. def putItem() = { val id = 126639L val item = new Item().withPrimaryKey("Id", "sfsaf12344").withLong("uniqueIdentifier", id) table.putItem(item) } 2016-07-04 17:15:18,379 PDT [INFO] ip-10-6-10-182 [flink-akka.actor.default-dispatcher-29] o.a.f.runtime.jobmanager.JobManager - Status of job 3ec7e145208453b5dbcf6224f373018f (Topology) changed to FAILING. org.apache.flink.runtime.util.SerializedThrowable: com.amazonaws.services.dynamodbv2.model.PutItemRequest.withExpressionAttributeNames(Ljava/util/Map;)Lcom/amazonaws/services/dynamodbv2/model/PutItemRequest; at com.amazonaws.services.dynamodbv2.document.internal.PutItemImpl.doPutItem(PutItemImpl.java:82) at com.amazonaws.services.dynamodbv2.document.internal.PutItemImpl.putItem(PutItemImpl.java:41) at com.amazonaws.services.dynamodbv2.document.Table.putItem(Table.java:144) at com.mix.ingestion.url.dupedetection.DynamoDBIO$.putItem(DynamoDBHandler.scala:38) at com.mix.ingestion.url.dupedetection.DynamoDBDupeDetectionBaseImpl.setKey(ABC.scala:143) at com.mix.ingestion.url.dupedetection.DynamoDBDupeDetectionBaseImpl.setKeyAndUpdateDupeFlag(ABC.scala:135) at com.mix.ingestion.url.dupedetection.DynamoDBDupeDetection$class.updateDupeFlagAndTable(ABC.scala:96) at com.mix.ingestion.url.dupedetection.DynamoDBDupeDetection$class.detectDupe(ABC.scala:111) at com.mix.ingestion.url.dupedetection.DynamoDBDupeDetection$.detectDupe(ABC.scala:158) at com.mix.ingestion.topology.Operators$$anonfun$15$$anonfun$apply$3.apply(Operators.scala:70) at com.mix.ingestion.topology.Operators$$anonfun$15$$anonfun$apply$3.apply(Operators.scala:70) at org.apache.flink.streaming.api.scala.DataStream$$anon$4.map(DataStream.scala:485) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Serialized representation of java.lang.NoSuchMethodError: com.amazonaws.services.dynamodbv2.model.PutItemRequest.withExpressionAttributeNames(Ljava/util/Map;)Lcom/amazonaws/services/dynamodbv2/model/PutItemRequest; ... 18 common frames omitted It works if I just run it in standalone fashion using "*java -cp fatjar.jar:/opt/flink/lib/* a.b.c.d.DynamoDBHandler"* on the same ec2 instance but I'm running into error when it tries to interact with DynamoDB from inside an operator. It fails even if I call the same *putItem* from inside the operator. We've aws-java-sdk-1.7.4.jar , hadoop-aws-2.7.2.jar in flink/lib folder. We're using fatjar to deploy the topology and it contains aws-java-sdk-s3 and aws-java-sdk-dynamodb both 1.11.3 version. I also experimented with using aws-java-sdk in fatjar as well but it did not work. I looked into aws-java-sdk-1.7.4.jar and see that com/amazonaws/services/dynamodbv2 exists. Please let me know what am I doing wrong. Any help will be appreciated. -- Thanks, Deepak Jha
Unique ID of an operator
Hi, I have a use case where I need to get UniqueId of an operator inside a stream. DataStream's getId() returns the ID of the stream but I have operator partitioned (say partitionByHash) inside the datastream. So I would like to get unique ID for each operator working in parallel. Is there a way in which I can get it ? Also, does DataStream's getId generates different ID in case i have parallelism > 1 ? -- Thanks, Deepak
Re: Writing multiple streams to multiple kafka
It works... Thanks On Thu, Mar 31, 2016 at 2:23 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > yes you can output the stages to several different Kafka Topics. If you > don't want to call addSink inside the run() method you somehow have to > return the handle to your stage3 DataStream, for example: > > private val env = StreamExecutionEnvironment.getExecutionEnvironment > private val src = env.addSource(Source.kafka(streams.abc.topic)) > > override def run(stream: DataStream[TypeX]) : = { > > val stage1 = stream >.map(doA) >.map(doB) >.map(doC) > > val stage2 = stage1.map(doD) *// Returns (isTrue: Boolean, somethingElse: > TypeT)* > > val stage3 = stage2.filter(_.isTrue) > val stage4 = stage2.filter(! _.isTrue) > > (stage3, stage4.map(_.toString)) // return both stages > } > > val (stage3, stage4) = run(src) > stage3.addSink(Write_To_Kafka_Topic_Y) > stage4.addSink(Write_To_Kafka_Topic_X) > > > On Wed, 30 Mar 2016 at 20:19 Deepak Jha <dkjhan...@gmail.com> wrote: > > > Hi, > > I'm building a pipeline using Flink using Kafka as source and sink. As > part > > of the this pipeline I have multiple stages in my run command and I would > > like to publish some substages output into separate kafka topic. > > My question is can I write multiple stages of run to multiple kafka > topics > > ? > > > > private val env = StreamExecutionEnvironment.getExecutionEnvironment > > private val src = env.addSource(Source.kafka(streams.abc.topic)) > > > > override def run(stream: DataStream[TypeX]) : = { > > > > val stage1 = stream > >.map(doA) > >.map(doB) > >.map(doC) > > > > val stage2 = stage1.map(doD) *// Returns (isTrue: Boolean, > somethingElse: > > TypeT)* > > > > val stage3 = stage2.filter(_.isTrue) > > *stage3.addSink(Write_To_Kafka_Topic_Y) // Can I do it outside run > method > > ?* > > val stage4 = stage2.filter(! _.isTrue) > > > > stage4.map(_.toString) > > } > > > > run(src).addSink(Write_To_Kafka_Topic_X) > > > > > > Ideally I will not prefer to call addSink method inside run (as mentioned > > in bold lines above). > > -- > > Thanks, > > Deepak Jha > > > -- Thanks, Deepak Jha
Writing multiple streams to multiple kafka
Hi, I'm building a pipeline using Flink using Kafka as source and sink. As part of the this pipeline I have multiple stages in my run command and I would like to publish some substages output into separate kafka topic. My question is can I write multiple stages of run to multiple kafka topics ? private val env = StreamExecutionEnvironment.getExecutionEnvironment private val src = env.addSource(Source.kafka(streams.abc.topic)) override def run(stream: DataStream[TypeX]) : = { val stage1 = stream .map(doA) .map(doB) .map(doC) val stage2 = stage1.map(doD) *// Returns (isTrue: Boolean, somethingElse: TypeT)* val stage3 = stage2.filter(_.isTrue) *stage3.addSink(Write_To_Kafka_Topic_Y) // Can I do it outside run method ?* val stage4 = stage2.filter(! _.isTrue) stage4.map(_.toString) } run(src).addSink(Write_To_Kafka_Topic_X) Ideally I will not prefer to call addSink method inside run (as mentioned in bold lines above). -- Thanks, Deepak Jha
Re: Flink-1.0.0 JobManager is not running in Docker Container on AWS
Hi Maximilian, Thanks for the email and looking into the issue. I'm using Scala 2.11 so it sounds perfect to me... I will be more than happy to test it out. On Tue, Mar 22, 2016 at 2:48 AM, Maximilian Michels <m...@apache.org> wrote: > Hi Deepak, > > We have looked further into this and have a pretty easy fix. However, > it will only work with Flink's Scala 2.11 version because newer > versions of the Akka library are incompatible with Scala 2.10 (Flink's > default Scala version). Would that be a viable option for you? > > We're currently discussing this here: > https://issues.apache.org/jira/browse/FLINK-2821 > > Best, > Max > > On Mon, Mar 14, 2016 at 4:49 PM, Deepak Jha <dkjhan...@gmail.com> wrote: > > Hi Maximilian, > > Thanks for your response. I will wait for the update. > > > > On Monday, March 14, 2016, Maximilian Michels <m...@apache.org> wrote: > > > >> Hi Deepak, > >> > >> We'll look more into this problem this week. Until now we considered it > a > >> configuration issue if the bind address was not externally reachable. > >> However, one might not always have the possibility to change this > network > >> configuration. > >> > >> Looking further, it is actually possible to let the bind address be > >> different from the advertised address. From the Akka FAQ at > >> http://doc.akka.io/docs/akka/2.4.1/additional/faq.html: > >> > >> If you are running an ActorSystem under a NAT or inside a docker > container, > >> > make sure to set akka.remote.netty.tcp.hostname and > >> > akka.remote.netty.tcp.port to the address it is reachable at from > other > >> > ActorSystems. If you need to bind your network interface to a > different > >> > address - use akka.remote.netty.tcp.bind-hostname and > >> > akka.remote.netty.tcp.bind-port settings. Also make sure your network > is > >> > configured to translate from the address your ActorSystem is > reachable at > >> > to the address your ActorSystem network interface is bound to. > >> > > >> > >> It looks like we have to expose this configuration to users who have a > >> special network setup. > >> > >> Best, > >> Max > >> > >> On Mon, Mar 14, 2016 at 5:42 AM, Deepak Jha <dkjhan...@gmail.com > >> <javascript:;>> wrote: > >> > >> > Hi Stephan & Ufuk, > >> > Thanks for your response. > >> > > >> > Yes there is a way in which you can run docker (net = host mode) in > which > >> > guest machine's network stack gets shared by docker container. > >> > Unfortunately its not supported by AWS ECS. > >> > > >> > I do have one more question for you. Can you guys please explain me > what > >> > happens when taskmanager's register themselves to jobmanager in HA > mode? > >> > Does each taskmanager gets connected to jobmanager on separate port ? > The > >> > reason I'm asking is because if I run 2 taskmanager's (on separate > docker > >> > container), they are able to attach themselves to the Jobmanager > (another > >> > docker container) ( Flink HA setup using remote zk cluster) but soon > >> after > >> > that they get disconnected. Logs are not very helpful either... I > suspect > >> > that each taskmanager gets connected on new port and since by default > >> > docker does not expose all ports, this may happen I do not see > this > >> > happen when I do not use docker container > >> > > >> > Here is the log file that I saw in jobmanager > >> > > >> > 2016-03-12 08:55:55,010 PST [INFO] ec2-54-173-231-120.compute-1.a > >> > [flink-akka.actor.default-dispatcher-20] > >> o.a.f.r.instance.InstanceManager - > >> > Registered TaskManager at 5673db03e679 (akka.tcp:// > >> > flink@172.17.0.3:6121/user/taskmanager) as > >> > 7eafcfddd6bd084f2ec5a32594603f4f. Current number of registered hosts > >> > is 1. *Current > >> > number of alive task slots is 1.* > >> > 2016-03-12 08:57:42,676 PST [INFO] ec2-54-173-231-120.compute-1.a > >> > [flink-akka.actor.default-dispatcher-20] > >> o.a.f.r.instance.InstanceManager - > >> > Registered TaskManager at 7200a7da4da7 (akka.tcp:// > >> > flink@172.17.0.3:6121/user/taskmanager) as > >> > 320338e15a7a44ee64dc03a40f04fcd7. Current number of registered hosts > >> > is 2. *Cur
Re: Flink-1.0.0 JobManager is not running in Docker Container on AWS
Hi Maximilian, Thanks for your response. I will wait for the update. On Monday, March 14, 2016, Maximilian Michels <m...@apache.org> wrote: > Hi Deepak, > > We'll look more into this problem this week. Until now we considered it a > configuration issue if the bind address was not externally reachable. > However, one might not always have the possibility to change this network > configuration. > > Looking further, it is actually possible to let the bind address be > different from the advertised address. From the Akka FAQ at > http://doc.akka.io/docs/akka/2.4.1/additional/faq.html: > > If you are running an ActorSystem under a NAT or inside a docker container, > > make sure to set akka.remote.netty.tcp.hostname and > > akka.remote.netty.tcp.port to the address it is reachable at from other > > ActorSystems. If you need to bind your network interface to a different > > address - use akka.remote.netty.tcp.bind-hostname and > > akka.remote.netty.tcp.bind-port settings. Also make sure your network is > > configured to translate from the address your ActorSystem is reachable at > > to the address your ActorSystem network interface is bound to. > > > > It looks like we have to expose this configuration to users who have a > special network setup. > > Best, > Max > > On Mon, Mar 14, 2016 at 5:42 AM, Deepak Jha <dkjhan...@gmail.com > <javascript:;>> wrote: > > > Hi Stephan & Ufuk, > > Thanks for your response. > > > > Yes there is a way in which you can run docker (net = host mode) in which > > guest machine's network stack gets shared by docker container. > > Unfortunately its not supported by AWS ECS. > > > > I do have one more question for you. Can you guys please explain me what > > happens when taskmanager's register themselves to jobmanager in HA mode? > > Does each taskmanager gets connected to jobmanager on separate port ? The > > reason I'm asking is because if I run 2 taskmanager's (on separate docker > > container), they are able to attach themselves to the Jobmanager (another > > docker container) ( Flink HA setup using remote zk cluster) but soon > after > > that they get disconnected. Logs are not very helpful either... I suspect > > that each taskmanager gets connected on new port and since by default > > docker does not expose all ports, this may happen I do not see this > > happen when I do not use docker container > > > > Here is the log file that I saw in jobmanager > > > > 2016-03-12 08:55:55,010 PST [INFO] ec2-54-173-231-120.compute-1.a > > [flink-akka.actor.default-dispatcher-20] > o.a.f.r.instance.InstanceManager - > > Registered TaskManager at 5673db03e679 (akka.tcp:// > > flink@172.17.0.3:6121/user/taskmanager) as > > 7eafcfddd6bd084f2ec5a32594603f4f. Current number of registered hosts > > is 1. *Current > > number of alive task slots is 1.* > > 2016-03-12 08:57:42,676 PST [INFO] ec2-54-173-231-120.compute-1.a > > [flink-akka.actor.default-dispatcher-20] > o.a.f.r.instance.InstanceManager - > > Registered TaskManager at 7200a7da4da7 (akka.tcp:// > > flink@172.17.0.3:6121/user/taskmanager) as > > 320338e15a7a44ee64dc03a40f04fcd7. Current number of registered hosts > > is 2. *Current > > number of alive task slots is 2.* > > 2016-03-12 08:57:48,422 PST [INFO] ec2-54-173-231-120.compute-1.a > > [flink-akka.actor.default-dispatcher-20] > > o.a.f.runtime.jobmanager.JobManager - Task manager akka.tcp:// > > flink@172.17.0.3:6121/user/taskmanager terminated. > > 2016-03-12 08:57:48,422 PST [INFO] ec2-54-173-231-120.compute-1.a > > [flink-akka.actor.default-dispatcher-20] o.a.f.r.instance.InstanceManager > > -* > > Unregistered task manager akka.tcp:// > > flink@172.17.0.3:6121/user/taskmanager > > <http://flink@172.17.0.3:6121/user/taskmanager>. Number of registered > task > > managers 1. Number of available slots 1.* > > 2016-03-12 08:58:01,417 PST [WARN] ec2-54-173-231-120.compute-1.a > > [flink-akka.actor.default-dispatcher-20] > > a.remote.ReliableDeliverySupervisor - Association with remote system > > [akka.tcp://flink@172.17.0.3:6121] has failed, address is now gated for > > [5000] ms. Reason is: [Disassociated]. > > 2016-03-12 08:58:01,451 PST [INFO] ec2-54-173-231-120.compute-1.a > > [flink-akka.actor.default-dispatcher-20] > > o.a.f.runtime.jobmanager.JobManager - Task manager akka.tcp:// > > flink@172.17.0.3:6121/user/taskmanager wants to disconnect, because > > TaskManager akka://flink/user/taskmanager is disassociating. > > 2016-03-12 08:58:01,451 PST [INFO] ec2-54-173-231-
Re: Flink-1.0.0 JobManager is not running in Docker Container on AWS
//flink@172.17.0.3:6121] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 2016-03-12 08:58:21,388 PST [INFO] ec2-54-173-231-120.compute-1.a [flink-akka.actor.default-dispatcher-20] o.a.f.runtime.jobmanager.JobManager - Task manager akka.tcp:// flink@172.17.0.3:6121/user/taskmanager wants to disconnect, because TaskManager akka://flink/user/taskmanager is disassociating. 2016-03-12 08:58:21,388 PST [INFO] ec2-54-173-231-120.compute-1.a [flink-akka.actor.default-dispatcher-20] o.a.f.r.instance.InstanceManager - Unregistered task manager akka.tcp://flink@172.17.0.3:6121/user/taskmanager. Number of registered task managers 0. Number of available slots 0. 2016-03-12 08:58:21,390 PST [INFO] ec2-54-173-231-120.compute-1.a [flink-akka.actor.default-dispatcher-20] o.a.f.r.instance.InstanceManager - Registered TaskManager at 7200a7da4da7 (akka.tcp:// flink@172.17.0.3:6121/user/taskmanager) as bda61dbd047d40889aa3868d5d4d86a9. Current number of registered hosts is 1. Current number of alive task slots is 1. 2016-03-12 08:58:25,433 PST [INFO] ec2-54-173-231-120.compute-1.a [flink-akka.actor.default-dispatcher-18] o.a.f.runtime.jobmanager.JobManager - Task manager akka.tcp:// flink@172.17.0.3:6121/user/taskmanager terminated. 2016-03-12 08:58:25,434 PST [INFO] ec2-54-173-231-120.compute-1.a [flink-akka.actor.default-dispatcher-18] o.a.f.r.instance.InstanceManager - Unregistered task manager akka.tcp://flink@172.17.0.3:6121/user/taskmanager. Number of registered task managers 0. Number of available slots 0. 2016-03-12 08:58:28,947 PST [INFO] ec2-54-173-231-120.compute-1.a [flink-akka.actor.default-dispatcher-20] o.a.f.r.instance.InstanceManager - Registering TaskManager at akka.tcp://flink@172.17.0.3:6121/user/taskmanager which was marked as dead earlier because of a heart-beat timeout. 2016-03-12 08:58:28,948 PST [INFO] ec2-54-173-231-120.compute-1.a [flink-akka.actor.default-dispatcher-20] o.a.f.r.instance.InstanceManager - Registered TaskManager at 7200a7da4da7 (akka.tcp:// flink@172.17.0.3:6121/user/taskmanager) as d42ea5c6e0053935a0973d8536f3d8a5. Current number of registered hosts is 1. Current number of alive task slots is 1. On Fri, Mar 11, 2016 at 5:23 AM, Stephan Ewen <se...@apache.org> wrote: > Hi Deepak! > > We can currently not split the bind address and advertised address, because > the Akka library only accepts packages sent explicitly to the bind address > (not sure why Akka has this artificial limitation, but it is there). > > Can you bridge the container IP address to be visible from the outside? > > Stephan > > > On Fri, Mar 11, 2016 at 1:03 PM, Ufuk Celebi <u...@apache.org> wrote: > > > Hey Deepak! > > > > Your description of Flink's behaviour is correct. To summarize: > > > > # Host Address > > > > If you specify a host address as an argument to the JVM (via > > jobmanager.sh or the start-cluster.sh scripts) then that one is used. > > If you don't, it falls back to the value configured in flink-conf.yaml > > (what you describe). > > > > # Ports > > > > Default used random port and publishes via ZooKeeper. You can > > configure a port range only via recovery.jobmanager.port (what you > > describe). > > > > --- > > > > Your proposal would likely solve the issue, but isn't it possible to > > handle this outside of Flink? I've found this stack overflow question, > > which should be related: > > > > > http://stackoverflow.com/questions/26539727/giving-a-docker-container-a-routable-ip-address > > > > What's your opinion? > > > -- Thanks, Deepak Jha
Re: Flink-1.0.0 JobManager is not running in Docker Container on AWS
Hi Stephan, I am able to figure out the issue... Here is my explanation.. As I've said, I'm trying to setup Flink HA cluster in docker containers managed by Amazon ECS. I've a remote zookeeper cluster running in AWS. There are few issues when we deploy it using docker --- Flink uses *jobmanager.rpc.address *to bind as well as for storing it in the zookeeper. Now this address could be the host_ipaddress or running_container_ipaddress. If I set it to host_ipaddress then jobmanager is not able to bind because this is not the container's ip address. If I use the container's ip address then it is able to bind, but when it pushes its details to zookeeper , its container's ip address. So remote taskmanager's are not able to discover it. Ideally *jobmanager.rpc.address *should be split into *jobmanager.bind.address (*to bind to jobmanager*) *and *jobmanager.discovery.address* (to publish in zookeeper so that remote taskmanager's can discover it).. eg: Let's assume EC2_Instance_Ip = 1.1.1.1 Container_Ip = 2.2.2.2 (This container is running in this EC2_Instance) recovery.jobmanager.port = 3000 jobmanager.web.port = 8080 I mapped port 3000 on container to 3000 on host and 8080 on container to 8080 on host... In flink-conf.yml assume *Case 1* jobmanager.rpc.address = 2.2.2.2 (Container's Ip address) Now 2.2.2.2 will be written in zookeeper. So external taskmanager would like to use this address to communicate with the jobmanager but it will not be able to connect since 2.2.2.2 is not discoverable from outside EC2 container. *Case 2* jobmanager.rpc.address = 1.1.1.1 (EC2_Instance Ip address) Container does not know this address, so it will not be able to bind at all. As you can see we need 2 ip address... one for binding and another for discovery. In docker world we have to expose all the ports we want to use ( in bridged network mode). By default the jobmanager uses random port number for communication, since we do not know the port number in advance so we set r*ecovery.jobmanager.port* and exposed it in Dockerfile. Same is the case with blob.server.port on taskmanager's. Hope I clarified it, please let me know if you have any other question. On Thu, Mar 10, 2016 at 10:47 AM, Stephan Ewen <se...@apache.org> wrote: > Hi! > > Is it possible that the docker container config forbids to open ports? > Flink will try to open some ports and needs the OS or container to permit > that. > > Greetings, > Stephan > > > On Thu, Mar 10, 2016 at 6:27 PM, Deepak Jha <dkjhan...@gmail.com> wrote: > > > Hi Stephan, > > I tried 0.10.2 as well still running into the same issue. > > > > On Thursday, March 10, 2016, Deepak Jha <dkjhan...@gmail.com> wrote: > > > > > Yes. Flink 1.0.0 > > > > > > On Thursday, March 10, 2016, Stephan Ewen <se...@apache.org > > > <javascript:_e(%7B%7D,'cvml','se...@apache.org');>> wrote: > > > > > >> Hi! > > >> > > >> Is this Flink 1.0.0 ? > > >> > > >> Stephan > > >> > > >> > > >> On Thu, Mar 10, 2016 at 6:02 AM, Deepak Jha <dkjhan...@gmail.com> > > wrote: > > >> > > >> > Hi All, > > >> > > > >> > I'm trying to setup Flink 1.0.0 cluster on Docker (separate > containers > > >> for > > >> > jobmanager and taskmanager) inside AWS (Using AWS ECS service). I > > >> tested it > > >> > locally and its working fine but on AWS Docker, I am running into > > >> following > > >> > issue > > >> > > > >> > *2016-03-09 18:04:12,114 PST [INFO] ec2-52-3-248-202.compute-1.ama > > >> [main] > > >> > o.a.f.runtime.jobmanager.JobManager - Starting JobManager with > > >> > high-availability* > > >> > *2016-03-09 18:04:12,118 PST [INFO] ec2-52-3-248-202.compute-1.ama > > >> [main] > > >> > o.a.f.runtime.jobmanager.JobManager - Starting JobManager on > > >> > 172.31.63.152:8079 <http://172.31.63.152:8079> with execution mode > > >> CLUSTER* > > >> > *2016-03-09 18:04:12,172 PST [INFO] ec2-52-3-248-202.compute-1.ama > > >> [main] > > >> > o.a.f.runtime.jobmanager.JobManager - Security is not enabled. > > Starting > > >> > non-authenticated JobManager.* > > >> > *2016-03-09 18:04:12,174 PST [DEBUG] ec2-52-3-248-202.compute-1.ama > > >> [main] > > >> > org.apache.flink.util.NetUtils - Trying to open socket on port 8079* > > >> > *2016-03-09 18:04:12,176 PST [DEBUG] ec2-52-3-248-202.compute-1.ama > > >> [main] > > >> &g
Re: Flink-1.0.0 JobManager is not running in Docker Container on AWS
Hi Stephan, I tried 0.10.2 as well still running into the same issue. On Thursday, March 10, 2016, Deepak Jha <dkjhan...@gmail.com> wrote: > Yes. Flink 1.0.0 > > On Thursday, March 10, 2016, Stephan Ewen <se...@apache.org > <javascript:_e(%7B%7D,'cvml','se...@apache.org');>> wrote: > >> Hi! >> >> Is this Flink 1.0.0 ? >> >> Stephan >> >> >> On Thu, Mar 10, 2016 at 6:02 AM, Deepak Jha <dkjhan...@gmail.com> wrote: >> >> > Hi All, >> > >> > I'm trying to setup Flink 1.0.0 cluster on Docker (separate containers >> for >> > jobmanager and taskmanager) inside AWS (Using AWS ECS service). I >> tested it >> > locally and its working fine but on AWS Docker, I am running into >> following >> > issue >> > >> > *2016-03-09 18:04:12,114 PST [INFO] ec2-52-3-248-202.compute-1.ama >> [main] >> > o.a.f.runtime.jobmanager.JobManager - Starting JobManager with >> > high-availability* >> > *2016-03-09 18:04:12,118 PST [INFO] ec2-52-3-248-202.compute-1.ama >> [main] >> > o.a.f.runtime.jobmanager.JobManager - Starting JobManager on >> > 172.31.63.152:8079 <http://172.31.63.152:8079> with execution mode >> CLUSTER* >> > *2016-03-09 18:04:12,172 PST [INFO] ec2-52-3-248-202.compute-1.ama >> [main] >> > o.a.f.runtime.jobmanager.JobManager - Security is not enabled. Starting >> > non-authenticated JobManager.* >> > *2016-03-09 18:04:12,174 PST [DEBUG] ec2-52-3-248-202.compute-1.ama >> [main] >> > org.apache.flink.util.NetUtils - Trying to open socket on port 8079* >> > *2016-03-09 18:04:12,176 PST [DEBUG] ec2-52-3-248-202.compute-1.ama >> [main] >> > org.apache.flink.util.NetUtils - Unable to allocate socket on port* >> > *java.net.BindException: Cannot assign requested address* >> > *at java.net.PlainSocketImpl.socketBind(Native Method)* >> > *at >> > java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387)* >> > *at java.net.ServerSocket.bind(ServerSocket.java:375)* >> > *at java.net.ServerSocket.(ServerSocket.java:237)* >> > *at >> > >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2$$anon$3.createSocket(JobManager.scala:1722)* >> > *at >> > org.apache.flink.util.NetUtils.createSocketFromPorts(NetUtils.java:237)* >> > *at >> > >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:1719)* >> > *at >> > >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:1717)* >> > *at >> > >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:1717)* >> > *at scala.util.Try$.apply(Try.scala:192)* >> > *at >> > >> org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:1772)* >> > *at >> > >> org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:1717)* >> > *at >> > >> org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1653)* >> > *at >> > org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)* >> > *2016-03-09 18:04:12,180 PST [ERROR] ec2-52-3-248-202.compute-1.ama >> [main] >> > o.a.f.runtime.jobmanager.JobManager - Failed to run JobManager.* >> > *java.lang.RuntimeException: Unable to do further retries starting the >> > actor system* >> > *at >> > >> org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:1777)* >> > *at >> > >> org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:1717)* >> > *at >> > >> org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1653)* >> > *at >> > org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)* >> > *2016-03-09 18:04:12,991 PST [DEBUG] ec2-52-3-248-202.compute-1.ama >> [main] >> > o.a.h.m.lib.MutableMetricsFactory - field >> > org.apache.hadoop.metrics2.lib.MutableRate >> > org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess >> > with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, >> > sampleName=Ops, always=false, type=DEFAULT, value=[Rate of successful >> > kerberos logins and latency (milliseconds)], valueName=Time)* >> > >> > >> > Initially Jobmanager tries to bind to port 0 which did not work. On >> > looking further into it, I tried using recovery jobmanager port using >> > different port combinations, but it does not seems to be working... I've >> > exposed the ports in the docker compose file as well >> > >> > >> > PFA the jobmanager log file for details also the jobmanager config >> file... >> > -- >> > Thanks, >> > Deepak Jha >> > >> > >> > > > -- > Sent from Gmail Mobile > -- Sent from Gmail Mobile
Re: Flink-1.0.0 JobManager is not running in Docker Container on AWS
Yes. Flink 1.0.0 On Thursday, March 10, 2016, Stephan Ewen <se...@apache.org> wrote: > Hi! > > Is this Flink 1.0.0 ? > > Stephan > > > On Thu, Mar 10, 2016 at 6:02 AM, Deepak Jha <dkjhan...@gmail.com > <javascript:;>> wrote: > > > Hi All, > > > > I'm trying to setup Flink 1.0.0 cluster on Docker (separate containers > for > > jobmanager and taskmanager) inside AWS (Using AWS ECS service). I tested > it > > locally and its working fine but on AWS Docker, I am running into > following > > issue > > > > *2016-03-09 18:04:12,114 PST [INFO] ec2-52-3-248-202.compute-1.ama > [main] > > o.a.f.runtime.jobmanager.JobManager - Starting JobManager with > > high-availability* > > *2016-03-09 18:04:12,118 PST [INFO] ec2-52-3-248-202.compute-1.ama > [main] > > o.a.f.runtime.jobmanager.JobManager - Starting JobManager on > > 172.31.63.152:8079 <http://172.31.63.152:8079> with execution mode > CLUSTER* > > *2016-03-09 18:04:12,172 PST [INFO] ec2-52-3-248-202.compute-1.ama > [main] > > o.a.f.runtime.jobmanager.JobManager - Security is not enabled. Starting > > non-authenticated JobManager.* > > *2016-03-09 18:04:12,174 PST [DEBUG] ec2-52-3-248-202.compute-1.ama > [main] > > org.apache.flink.util.NetUtils - Trying to open socket on port 8079* > > *2016-03-09 18:04:12,176 PST [DEBUG] ec2-52-3-248-202.compute-1.ama > [main] > > org.apache.flink.util.NetUtils - Unable to allocate socket on port* > > *java.net.BindException: Cannot assign requested address* > > *at java.net.PlainSocketImpl.socketBind(Native Method)* > > *at > > java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387)* > > *at java.net.ServerSocket.bind(ServerSocket.java:375)* > > *at java.net.ServerSocket.(ServerSocket.java:237)* > > *at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2$$anon$3.createSocket(JobManager.scala:1722)* > > *at > > org.apache.flink.util.NetUtils.createSocketFromPorts(NetUtils.java:237)* > > *at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:1719)* > > *at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:1717)* > > *at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:1717)* > > *at scala.util.Try$.apply(Try.scala:192)* > > *at > > > org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:1772)* > > *at > > > org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:1717)* > > *at > > > org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1653)* > > *at > > org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)* > > *2016-03-09 18:04:12,180 PST [ERROR] ec2-52-3-248-202.compute-1.ama > [main] > > o.a.f.runtime.jobmanager.JobManager - Failed to run JobManager.* > > *java.lang.RuntimeException: Unable to do further retries starting the > > actor system* > > *at > > > org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:1777)* > > *at > > > org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:1717)* > > *at > > > org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1653)* > > *at > > org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)* > > *2016-03-09 18:04:12,991 PST [DEBUG] ec2-52-3-248-202.compute-1.ama > [main] > > o.a.h.m.lib.MutableMetricsFactory - field > > org.apache.hadoop.metrics2.lib.MutableRate > > org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess > > with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, > > sampleName=Ops, always=false, type=DEFAULT, value=[Rate of successful > > kerberos logins and latency (milliseconds)], valueName=Time)* > > > > > > Initially Jobmanager tries to bind to port 0 which did not work. On > > looking further into it, I tried using recovery jobmanager port using > > different port combinations, but it does not seems to be working... I've > > exposed the ports in the docker compose file as well > > > > > > PFA the jobmanager log file for details also the jobmanager config > file... > > -- > > Thanks, > > Deepak Jha > > > > > -- Sent from Gmail Mobile
Flink-1.0.0 JobManager is not running in Docker Container on AWS
Hi All, I'm trying to setup Flink 1.0.0 cluster on Docker (separate containers for jobmanager and taskmanager) inside AWS (Using AWS ECS service). I tested it locally and its working fine but on AWS Docker, I am running into following issue *2016-03-09 18:04:12,114 PST [INFO] ec2-52-3-248-202.compute-1.ama [main] o.a.f.runtime.jobmanager.JobManager - Starting JobManager with high-availability* *2016-03-09 18:04:12,118 PST [INFO] ec2-52-3-248-202.compute-1.ama [main] o.a.f.runtime.jobmanager.JobManager - Starting JobManager on 172.31.63.152:8079 <http://172.31.63.152:8079> with execution mode CLUSTER* *2016-03-09 18:04:12,172 PST [INFO] ec2-52-3-248-202.compute-1.ama [main] o.a.f.runtime.jobmanager.JobManager - Security is not enabled. Starting non-authenticated JobManager.* *2016-03-09 18:04:12,174 PST [DEBUG] ec2-52-3-248-202.compute-1.ama [main] org.apache.flink.util.NetUtils - Trying to open socket on port 8079* *2016-03-09 18:04:12,176 PST [DEBUG] ec2-52-3-248-202.compute-1.ama [main] org.apache.flink.util.NetUtils - Unable to allocate socket on port* *java.net.BindException: Cannot assign requested address* *at java.net.PlainSocketImpl.socketBind(Native Method)* *at java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387)* *at java.net.ServerSocket.bind(ServerSocket.java:375)* *at java.net.ServerSocket.(ServerSocket.java:237)* *at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2$$anon$3.createSocket(JobManager.scala:1722)* *at org.apache.flink.util.NetUtils.createSocketFromPorts(NetUtils.java:237)* *at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:1719)* *at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:1717)* *at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:1717)* *at scala.util.Try$.apply(Try.scala:192)* *at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:1772)* *at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:1717)* *at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1653)* *at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)* *2016-03-09 18:04:12,180 PST [ERROR] ec2-52-3-248-202.compute-1.ama [main] o.a.f.runtime.jobmanager.JobManager - Failed to run JobManager.* *java.lang.RuntimeException: Unable to do further retries starting the actor system* *at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:1777)* *at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:1717)* *at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1653)* *at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)* *2016-03-09 18:04:12,991 PST [DEBUG] ec2-52-3-248-202.compute-1.ama [main] o.a.h.m.lib.MutableMetricsFactory - field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, sampleName=Ops, always=false, type=DEFAULT, value=[Rate of successful kerberos logins and latency (milliseconds)], valueName=Time)* Initially Jobmanager tries to bind to port 0 which did not work. On looking further into it, I tried using recovery jobmanager port using different port combinations, but it does not seems to be working... I've exposed the ports in the docker compose file as well PFA the jobmanager log file for details also the jobmanager config file... -- Thanks, Deepak Jha 2016-03-09 18:04:11,887 PST [INFO] ec2-52-3-248-202.compute-1.ama [main] o.a.f.runtime.jobmanager.JobManager - 2016-03-09 18:04:11,888 PST [INFO] ec2-52-3-248-202.compute-1.ama [main] o.a.f.runtime.jobmanager.JobManager - Registered UNIX signal handlers for [TERM, HUP, INT] 2016-03-09 18:04:12,070 PST [INFO] ec2-52-3-248-202.compute-1.ama [main] o.a.f.runtime.jobmanager.JobManager - Loading configuration from /opt/flink-1.0.0/conf 2016-03-09 18:04:12,082 PST [WARN] ec2-52-3-248-202.compute-1.ama [main] o.a.f.c.GlobalConfiguration - Error while reading configuration: Cannot read property 0 2016-03-09 18:04:12,083 PST [WARN] ec2-52-3-248-202.compute-1.ama [main] o.a.f.c.GlobalConfiguration - Error while reading configuration: Cannot read property 1 2016-03-09 18:04:12,083 PST [WARN] ec2-52-3-248-202.compute-1.ama [main] o.a.f.c.GlobalConfiguration - Error while reading configuration: Cannot read property 2 2016-03-09 18:04:12,091 PST [DEBUG] ec2-52-3-248-202.compute-1.ama [main] o.a.f.c.GlobalConfiguration - Loading configuration property: recovery.jobmanager.port, 8079 2016-03-09 18:04:12,095 PST [DEBUG] ec2-52-3-248-202.compute-1.ama [main] o.a.f.c.GlobalConfiguration - Loading configuration pr
Re: Remote TaskManager Connection Problem
Hi Stephan, Thanks for the response. I was able to resolve the issue, I was using localhost in jobmanager name instead of container name... There were few more issues which I would like to mention - I'm using S3 for storage/checkpoint in Flink HA mode, I realized that I have to set fs.hdfs.hadoopconf in conf/flink-conf.yaml and add core-site.xml in conf/ .. Since I'm deploying it on AWS I had to place hadoop-aws.jar as well On Fri, Mar 4, 2016 at 1:22 AM, Stephan Ewen <se...@apache.org> wrote: > The pull request https://github.com/apache/flink/pull/1758 should improve > the TaskManager's network interface selection. > > > On Fri, Mar 4, 2016 at 10:19 AM, Stephan Ewen <se...@apache.org> wrote: > > > Hi! > > > > This registration phase means that the TaskManager tries to tell the > > JobManager that it is available. > > If that fails, there can be two reasons > > > > 1) Network communication not possible to the port > > 1.1) JobManager IP really not reachable (not the case, as you > > described) > > 1.2) TaskManager selected a wrong network interface to work with > > 2) JobManager not listening > > > > > > To look into 1.2, can you check the TaskManager log at the beginning, > > where it says what interface/hostname the TaskManager selected to use? > > > > Thanks, > > Stephan > > > > > > > > > > > > > > On Fri, Mar 4, 2016 at 2:48 AM, Deepak Jha <dkjhan...@gmail.com> wrote: > > > >> Hi All, > >> I've created 2 docker containers on my local machine, one running > >> JM(192.168.99.104) and other running TM. I was expecting to see TM in > the > >> JM UI but it did not happen. On looking into the TM logs I see following > >> lines > >> > >> > >> 01:29:50,862 DEBUG org.apache.flink.runtime.taskmanager.TaskManager > >> - Starting TaskManager process reaper > >> 01:29:50,868 INFO org.apache.flink.runtime.filecache.FileCache > >> - User file cache uses directory > >> /tmp/flink-dist-cache-be63f351-2bce-48ef-bbc4-fb0f40fecd49 > >> 01:29:51,093 INFO org.apache.flink.runtime.taskmanager.TaskManager > >> - Starting TaskManager actor at > >> akka://flink/user/taskmanager#1222392284. > >> 01:29:51,095 INFO org.apache.flink.runtime.taskmanager.TaskManager > >> - TaskManager data connection information: 140efeb188cc > >> (dataPort=6122) > >> 01:29:51,096 INFO org.apache.flink.runtime.taskmanager.TaskManager > >> - TaskManager has 1 task slot(s). > >> 01:29:51,097 INFO org.apache.flink.runtime.taskmanager.TaskManager > >> - Memory usage stats: [HEAP: 386/494/494 MB, NON HEAP: 30/31/-1 MB > >> (used/committed/max)] > >> 01:29:51,104 INFO org.apache.flink.runtime.taskmanager.TaskManager > >> - Trying to register at JobManager akka.tcp:// > >> flink@192.168.99.104:6123/user/jobmanager (attempt 1, timeout: 500 > >> milliseconds) > >> 01:29:51,633 INFO org.apache.flink.runtime.taskmanager.TaskManager > >> - Trying to register at JobManager akka.tcp:// > >> flink@192.168.99.104:6123/user/jobmanager (attempt 2, timeout: 1000 > >> milliseconds) > >> 01:29:52,652 INFO org.apache.flink.runtime.taskmanager.TaskManager > >> - Trying to register at JobManager akka.tcp:// > >> flink@192.168.99.104:6123/user/jobmanager (attempt 3, timeout: 2000 > >> milliseconds) > >> 01:29:54,672 INFO org.apache.flink.runtime.taskmanager.TaskManager > >> - Trying to register at JobManager akka.tcp:// > >> flink@192.168.99.104:6123/user/jobmanager (attempt 4, timeout: 4000 > >> milliseconds) > >> 01:29:58,693 INFO org.apache.flink.runtime.taskmanager.TaskManager > >> - Trying to register at JobManager akka.tcp:// > >> flink@192.168.99.104:6123/user/jobmanager (attempt 5, timeout: 8000 > >> milliseconds) > >> 01:30:06,702 INFO org.apache.flink.runtime.taskmanager.TaskManager > >> - Trying to register at JobManager akka.tcp:// > >> flink@192.168.99.104:6123/user/jobmanager (attempt 6, timeout: 16000 > >> milliseconds) > >> > >> > >> However, from TM i am able to reach JM on port 6123 > >> root@140efeb188cc:/# nc -v 192.168.99.104 6123 > >> Connection to 192.168.99.104 6123 port [tcp/*] succeeded! > >> > >> > >> masters file on TM contains > >> 192.168.99.104:8080 > >> > >> Did anyone face this issue with remote JM/TM combination ? > >> > >> -- > >> Thanks, > >> Deepak Jha > >> > > > > > -- Thanks, Deepak Jha
Remote TaskManager Connection Problem
Hi All, I've created 2 docker containers on my local machine, one running JM(192.168.99.104) and other running TM. I was expecting to see TM in the JM UI but it did not happen. On looking into the TM logs I see following lines 01:29:50,862 DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Starting TaskManager process reaper 01:29:50,868 INFO org.apache.flink.runtime.filecache.FileCache - User file cache uses directory /tmp/flink-dist-cache-be63f351-2bce-48ef-bbc4-fb0f40fecd49 01:29:51,093 INFO org.apache.flink.runtime.taskmanager.TaskManager - Starting TaskManager actor at akka://flink/user/taskmanager#1222392284. 01:29:51,095 INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager data connection information: 140efeb188cc (dataPort=6122) 01:29:51,096 INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 1 task slot(s). 01:29:51,097 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 386/494/494 MB, NON HEAP: 30/31/-1 MB (used/committed/max)] 01:29:51,104 INFO org.apache.flink.runtime.taskmanager.TaskManager - Trying to register at JobManager akka.tcp:// flink@192.168.99.104:6123/user/jobmanager (attempt 1, timeout: 500 milliseconds) 01:29:51,633 INFO org.apache.flink.runtime.taskmanager.TaskManager - Trying to register at JobManager akka.tcp:// flink@192.168.99.104:6123/user/jobmanager (attempt 2, timeout: 1000 milliseconds) 01:29:52,652 INFO org.apache.flink.runtime.taskmanager.TaskManager - Trying to register at JobManager akka.tcp:// flink@192.168.99.104:6123/user/jobmanager (attempt 3, timeout: 2000 milliseconds) 01:29:54,672 INFO org.apache.flink.runtime.taskmanager.TaskManager - Trying to register at JobManager akka.tcp:// flink@192.168.99.104:6123/user/jobmanager (attempt 4, timeout: 4000 milliseconds) 01:29:58,693 INFO org.apache.flink.runtime.taskmanager.TaskManager - Trying to register at JobManager akka.tcp:// flink@192.168.99.104:6123/user/jobmanager (attempt 5, timeout: 8000 milliseconds) 01:30:06,702 INFO org.apache.flink.runtime.taskmanager.TaskManager - Trying to register at JobManager akka.tcp:// flink@192.168.99.104:6123/user/jobmanager (attempt 6, timeout: 16000 milliseconds) However, from TM i am able to reach JM on port 6123 root@140efeb188cc:/# nc -v 192.168.99.104 6123 Connection to 192.168.99.104 6123 port [tcp/*] succeeded! masters file on TM contains 192.168.99.104:8080 Did anyone face this issue with remote JM/TM combination ? -- Thanks, Deepak Jha
Re: Adding TaskManager's
Hi Ufuk, Sure... I will let you know. I'm planning to use centralized zookeeper. That way Flink and ZK will have the separation. On Fri, Feb 19, 2016 at 12:06 PM, Ufuk Celebi <u...@apache.org> wrote: > OK, nice! :-) Then you can just skip the "slaves" file and directly > work with the scripts. > > I'm curious to know if everything works as expected. If you encounter > something that seems wrong, let us know. > > – Ufuk > > > On Fri, Feb 19, 2016 at 9:02 PM, Deepak Jha <dkjhan...@gmail.com> wrote: > > Hi Ufuk, > > I'm planning to build Flink HA cluster and I may need to autoscale > > taskmanager based on the the requirement. I feel it makes more sense for > me > > to start each taskmanager & jobmanager individually using taskmanager.sh > > and jobmanager.sh and let these taskmanager's discover jobmanager's using > > Zookeeper. The reason being that if I go with updating the slaves, then I > > have to push these changes in all the machines in the cluster to come to > a > > consistent state.. I will get more flexibility if I can add taskmanager's > > on-the-fly without any update in rest of the machines in the cluster. I'm > > going to use Docker & Amazon ECS. So, if there is a change in my > artifact, > > I will build a new cluster but as such each node of the cluster will be > > immutable. > > > > > > > > On Wed, Feb 17, 2016 at 11:54 PM, Ufuk Celebi <u...@apache.org> wrote: > > > >> No, the "slaves" file is still used to ssh into the machines and start > >> the task manager processes in the start-cluster.sh script. So you > >> still need password-less ssh into the machines if you want to use > >> that. The task managers discover the job manager via ZooKeeper though > >> (therefore you don't need to configure the jobmanager address in the > >> config). > >> > >> In theory, you can also skip the "slaves" file if you ssh manually > >> into the machines and start the task managers via the taskmanger.sh > >> script, but I don't think that this is what you are looking for. Or > >> are you? > >> > >> – Ufuk > >> > >> > >> On Wed, Feb 17, 2016 at 10:27 PM, Deepak Jha <dkjhan...@gmail.com> > wrote: > >> > Hi Max and Stephan, > >> > Does this mean that I can start Flink HA cluster without keeping any > >> entry > >> > in "slaves" file ? I'm asking this because then I should not worry > about > >> > copying public key for password-less ssh in Flink HA cluster > >> > > >> > On Wed, Feb 17, 2016 at 12:38 PM, Deepak Jha <dkjhan...@gmail.com> > >> wrote: > >> > > >> >> Sorry for the typo Stephan > >> >> > >> >> > >> >> On Wednesday, February 17, 2016, Deepak Jha <dkjhan...@gmail.com> > >> wrote: > >> >> > >> >>> Thanks Max and Steven for the response. > >> >>> > >> >>> On Wednesday, February 17, 2016, Stephan Ewen <se...@apache.org> > >> wrote: > >> >>> > >> >>>> Hi Deepak! > >> >>>> > >> >>>> The "slaves" file is only used by the SSH script to start a > standalone > >> >>>> cluster. > >> >>>> > >> >>>> As Max said, TaskManagers register dynamically at the JobManager. > >> >>>> > >> >>>> Discovery works via: > >> >>>>- config in non-HA mode > >> >>>>- ZooKeeper in HA mode > >> >>>> > >> >>>> > >> >>>> > >> >>>> On Wed, Feb 17, 2016 at 10:11 AM, Maximilian Michels < > m...@apache.org> > >> >>>> wrote: > >> >>>> > >> >>>> > Hi Deepak, > >> >>>> > > >> >>>> > The job manager doesn't have to know about task managers. They > will > >> >>>> > simply register at the job manager using the provided > configuration. > >> >>>> > In HA mode, they will lookup the currently leading job manager > first > >> >>>> > and then connect to it. The job manager can then assign work. > >> >>>> > > >> >>>> > Cheers, > >> >>>> > Max > >> >>>> > > >> >>>> > On Tue, Feb 16, 2016 at 10:41 PM, Deepak Jha < > dkjhan...@gmail.com> > >> >>>> wrote: > >> >>>> > > Hi All, > >> >>>> > > I have a question on scaling-up/scaling-down flink cluster. > >> >>>> > > As per the documentation, in order to scale-up the cluster, I > can > >> >>>> add a > >> >>>> > new > >> >>>> > > taskmanager on the fly and jobmanager can assign work to it. > >> >>>> Assuming, I > >> >>>> > > have Flink HA , so in the event of master JobManager failure, > how > >> is > >> >>>> this > >> >>>> > > taskmanager detail is going to get transferred ? I believe new > >> master > >> >>>> > will > >> >>>> > > just read the contents from slaves config file. Can anyone give > >> more > >> >>>> > > clarity on how this is done ? Or, Is it union of slaves and the > >> >>>> > > taskmanager's that are added on the fly ? > >> >>>> > > > >> >>>> > > -- > >> >>>> > > Thanks, > >> >>>> > > Deepak > >> >>>> > > >> >>>> > >> >>> > >> >>> > >> >>> -- > >> >>> Sent from Gmail Mobile > >> >>> > >> >> > >> >> > >> >> -- > >> >> Sent from Gmail Mobile > >> >> > >> > > >> > > >> > > >> > -- > >> > Thanks, > >> > Deepak Jha > >> > > > > > > > > -- > > Thanks, > > Deepak Jha > -- Thanks, Deepak Jha
Re: Adding TaskManager's
Hi Ufuk, I'm planning to build Flink HA cluster and I may need to autoscale taskmanager based on the the requirement. I feel it makes more sense for me to start each taskmanager & jobmanager individually using taskmanager.sh and jobmanager.sh and let these taskmanager's discover jobmanager's using Zookeeper. The reason being that if I go with updating the slaves, then I have to push these changes in all the machines in the cluster to come to a consistent state.. I will get more flexibility if I can add taskmanager's on-the-fly without any update in rest of the machines in the cluster. I'm going to use Docker & Amazon ECS. So, if there is a change in my artifact, I will build a new cluster but as such each node of the cluster will be immutable. On Wed, Feb 17, 2016 at 11:54 PM, Ufuk Celebi <u...@apache.org> wrote: > No, the "slaves" file is still used to ssh into the machines and start > the task manager processes in the start-cluster.sh script. So you > still need password-less ssh into the machines if you want to use > that. The task managers discover the job manager via ZooKeeper though > (therefore you don't need to configure the jobmanager address in the > config). > > In theory, you can also skip the "slaves" file if you ssh manually > into the machines and start the task managers via the taskmanger.sh > script, but I don't think that this is what you are looking for. Or > are you? > > – Ufuk > > > On Wed, Feb 17, 2016 at 10:27 PM, Deepak Jha <dkjhan...@gmail.com> wrote: > > Hi Max and Stephan, > > Does this mean that I can start Flink HA cluster without keeping any > entry > > in "slaves" file ? I'm asking this because then I should not worry about > > copying public key for password-less ssh in Flink HA cluster > > > > On Wed, Feb 17, 2016 at 12:38 PM, Deepak Jha <dkjhan...@gmail.com> > wrote: > > > >> Sorry for the typo Stephan > >> > >> > >> On Wednesday, February 17, 2016, Deepak Jha <dkjhan...@gmail.com> > wrote: > >> > >>> Thanks Max and Steven for the response. > >>> > >>> On Wednesday, February 17, 2016, Stephan Ewen <se...@apache.org> > wrote: > >>> > >>>> Hi Deepak! > >>>> > >>>> The "slaves" file is only used by the SSH script to start a standalone > >>>> cluster. > >>>> > >>>> As Max said, TaskManagers register dynamically at the JobManager. > >>>> > >>>> Discovery works via: > >>>>- config in non-HA mode > >>>>- ZooKeeper in HA mode > >>>> > >>>> > >>>> > >>>> On Wed, Feb 17, 2016 at 10:11 AM, Maximilian Michels <m...@apache.org> > >>>> wrote: > >>>> > >>>> > Hi Deepak, > >>>> > > >>>> > The job manager doesn't have to know about task managers. They will > >>>> > simply register at the job manager using the provided configuration. > >>>> > In HA mode, they will lookup the currently leading job manager first > >>>> > and then connect to it. The job manager can then assign work. > >>>> > > >>>> > Cheers, > >>>> > Max > >>>> > > >>>> > On Tue, Feb 16, 2016 at 10:41 PM, Deepak Jha <dkjhan...@gmail.com> > >>>> wrote: > >>>> > > Hi All, > >>>> > > I have a question on scaling-up/scaling-down flink cluster. > >>>> > > As per the documentation, in order to scale-up the cluster, I can > >>>> add a > >>>> > new > >>>> > > taskmanager on the fly and jobmanager can assign work to it. > >>>> Assuming, I > >>>> > > have Flink HA , so in the event of master JobManager failure, how > is > >>>> this > >>>> > > taskmanager detail is going to get transferred ? I believe new > master > >>>> > will > >>>> > > just read the contents from slaves config file. Can anyone give > more > >>>> > > clarity on how this is done ? Or, Is it union of slaves and the > >>>> > > taskmanager's that are added on the fly ? > >>>> > > > >>>> > > -- > >>>> > > Thanks, > >>>> > > Deepak > >>>> > > >>>> > >>> > >>> > >>> -- > >>> Sent from Gmail Mobile > >>> > >> > >> > >> -- > >> Sent from Gmail Mobile > >> > > > > > > > > -- > > Thanks, > > Deepak Jha > -- Thanks, Deepak Jha
Re: Adding TaskManager's
Hi Max and Stephan, Does this mean that I can start Flink HA cluster without keeping any entry in "slaves" file ? I'm asking this because then I should not worry about copying public key for password-less ssh in Flink HA cluster On Wed, Feb 17, 2016 at 12:38 PM, Deepak Jha <dkjhan...@gmail.com> wrote: > Sorry for the typo Stephan > > > On Wednesday, February 17, 2016, Deepak Jha <dkjhan...@gmail.com> wrote: > >> Thanks Max and Steven for the response. >> >> On Wednesday, February 17, 2016, Stephan Ewen <se...@apache.org> wrote: >> >>> Hi Deepak! >>> >>> The "slaves" file is only used by the SSH script to start a standalone >>> cluster. >>> >>> As Max said, TaskManagers register dynamically at the JobManager. >>> >>> Discovery works via: >>>- config in non-HA mode >>>- ZooKeeper in HA mode >>> >>> >>> >>> On Wed, Feb 17, 2016 at 10:11 AM, Maximilian Michels <m...@apache.org> >>> wrote: >>> >>> > Hi Deepak, >>> > >>> > The job manager doesn't have to know about task managers. They will >>> > simply register at the job manager using the provided configuration. >>> > In HA mode, they will lookup the currently leading job manager first >>> > and then connect to it. The job manager can then assign work. >>> > >>> > Cheers, >>> > Max >>> > >>> > On Tue, Feb 16, 2016 at 10:41 PM, Deepak Jha <dkjhan...@gmail.com> >>> wrote: >>> > > Hi All, >>> > > I have a question on scaling-up/scaling-down flink cluster. >>> > > As per the documentation, in order to scale-up the cluster, I can >>> add a >>> > new >>> > > taskmanager on the fly and jobmanager can assign work to it. >>> Assuming, I >>> > > have Flink HA , so in the event of master JobManager failure, how is >>> this >>> > > taskmanager detail is going to get transferred ? I believe new master >>> > will >>> > > just read the contents from slaves config file. Can anyone give more >>> > > clarity on how this is done ? Or, Is it union of slaves and the >>> > > taskmanager's that are added on the fly ? >>> > > >>> > > -- >>> > > Thanks, >>> > > Deepak >>> > >>> >> >> >> -- >> Sent from Gmail Mobile >> > > > -- > Sent from Gmail Mobile > -- Thanks, Deepak Jha
Re: Adding TaskManager's
Sorry for the typo Stephan On Wednesday, February 17, 2016, Deepak Jha <dkjhan...@gmail.com> wrote: > Thanks Max and Steven for the response. > > On Wednesday, February 17, 2016, Stephan Ewen <se...@apache.org > <javascript:_e(%7B%7D,'cvml','se...@apache.org');>> wrote: > >> Hi Deepak! >> >> The "slaves" file is only used by the SSH script to start a standalone >> cluster. >> >> As Max said, TaskManagers register dynamically at the JobManager. >> >> Discovery works via: >>- config in non-HA mode >>- ZooKeeper in HA mode >> >> >> >> On Wed, Feb 17, 2016 at 10:11 AM, Maximilian Michels <m...@apache.org> >> wrote: >> >> > Hi Deepak, >> > >> > The job manager doesn't have to know about task managers. They will >> > simply register at the job manager using the provided configuration. >> > In HA mode, they will lookup the currently leading job manager first >> > and then connect to it. The job manager can then assign work. >> > >> > Cheers, >> > Max >> > >> > On Tue, Feb 16, 2016 at 10:41 PM, Deepak Jha <dkjhan...@gmail.com> >> wrote: >> > > Hi All, >> > > I have a question on scaling-up/scaling-down flink cluster. >> > > As per the documentation, in order to scale-up the cluster, I can add >> a >> > new >> > > taskmanager on the fly and jobmanager can assign work to it. >> Assuming, I >> > > have Flink HA , so in the event of master JobManager failure, how is >> this >> > > taskmanager detail is going to get transferred ? I believe new master >> > will >> > > just read the contents from slaves config file. Can anyone give more >> > > clarity on how this is done ? Or, Is it union of slaves and the >> > > taskmanager's that are added on the fly ? >> > > >> > > -- >> > > Thanks, >> > > Deepak >> > >> > > > -- > Sent from Gmail Mobile > -- Sent from Gmail Mobile
Re: Adding TaskManager's
Thanks Max and Steven for the response. On Wednesday, February 17, 2016, Stephan Ewen <se...@apache.org> wrote: > Hi Deepak! > > The "slaves" file is only used by the SSH script to start a standalone > cluster. > > As Max said, TaskManagers register dynamically at the JobManager. > > Discovery works via: >- config in non-HA mode >- ZooKeeper in HA mode > > > > On Wed, Feb 17, 2016 at 10:11 AM, Maximilian Michels <m...@apache.org > <javascript:;>> wrote: > > > Hi Deepak, > > > > The job manager doesn't have to know about task managers. They will > > simply register at the job manager using the provided configuration. > > In HA mode, they will lookup the currently leading job manager first > > and then connect to it. The job manager can then assign work. > > > > Cheers, > > Max > > > > On Tue, Feb 16, 2016 at 10:41 PM, Deepak Jha <dkjhan...@gmail.com > <javascript:;>> wrote: > > > Hi All, > > > I have a question on scaling-up/scaling-down flink cluster. > > > As per the documentation, in order to scale-up the cluster, I can add a > > new > > > taskmanager on the fly and jobmanager can assign work to it. Assuming, > I > > > have Flink HA , so in the event of master JobManager failure, how is > this > > > taskmanager detail is going to get transferred ? I believe new master > > will > > > just read the contents from slaves config file. Can anyone give more > > > clarity on how this is done ? Or, Is it union of slaves and the > > > taskmanager's that are added on the fly ? > > > > > > -- > > > Thanks, > > > Deepak > > > -- Sent from Gmail Mobile
Adding TaskManager's
Hi All, I have a question on scaling-up/scaling-down flink cluster. As per the documentation, in order to scale-up the cluster, I can add a new taskmanager on the fly and jobmanager can assign work to it. Assuming, I have Flink HA , so in the event of master JobManager failure, how is this taskmanager detail is going to get transferred ? I believe new master will just read the contents from slaves config file. Can anyone give more clarity on how this is done ? Or, Is it union of slaves and the taskmanager's that are added on the fly ? -- Thanks, Deepak
Kafka As Sink in Flink Streaming
Hi Devs, I just started using Flink and would like to ass kafka as Sink. I went through the documentation but so far I've not succeeded in writing to Kafka from Flink I' building application in Scala Here is my code snippet case class *Demo*(city: String, country: String, zipcode: Int) The map stage returns an instance of Demo type val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "127.0.0.1:9092") properties.setProperty("zookeeper.connect", "127.0.0.1:2181") properties.setProperty("group.id", "test_topic") val mapToDemo: String => Demo = {//Implementation} val stream = env.addSource(new FlinkKafkaConsumer082[String]("test", new SimpleStringSchema, properties)) stream.map(mapToDemo).addSink(new FlinkKafkaProducer[Demo]("127.0.0.1:9092", "test_topic", new SimpleStringSchema())) Can anyone explain me what am I doing wrong in adding Kafka as Sink ? -- Thanks, Deepak Jha
Re: Kafka As Sink in Flink Streaming
Hi Robert, No it was compile time issue. Actually i tried to write a string as well but it did not work for me. Just for clarity my flink-connector-kafka version is 0.10.1 I was able to fix the issue... SimpleStringSchema should be replaced with JavaDefaultStringSchema as the later is doing conversion from String to Byte Array. Thanks for the help though. On Fri, Jan 22, 2016 at 1:34 PM, Robert Metzger <rmetz...@apache.org> wrote: > Hi, > > did you get any error message while executing the job? I don't think you > can serialize the "Demo" type with the "SimpleStringSchema". > > > > > On Fri, Jan 22, 2016 at 8:13 PM, Deepak Jha <dkjhan...@gmail.com> wrote: > > > Hi Devs, > > I just started using Flink and would like to ass kafka as Sink. I went > > through the documentation but so far I've not succeeded in writing to > Kafka > > from Flink > > > > I' building application in Scala Here is my code snippet > > > > case class *Demo*(city: String, country: String, zipcode: Int) > > > > The map stage returns an instance of Demo type > > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > > > val properties = new Properties() > > > > properties.setProperty("bootstrap.servers", "127.0.0.1:9092") > > properties.setProperty("zookeeper.connect", "127.0.0.1:2181") > > properties.setProperty("group.id", "test_topic") > > val mapToDemo: String => Demo = {//Implementation} > > > > val stream = env.addSource(new FlinkKafkaConsumer082[String]("test", new > > SimpleStringSchema, properties)) > > > > stream.map(mapToDemo).addSink(new FlinkKafkaProducer[Demo](" > 127.0.0.1:9092 > > ", > > "test_topic", new SimpleStringSchema())) > > > > Can anyone explain me what am I doing wrong in adding Kafka as Sink ? > > -- > > Thanks, > > Deepak Jha > > > -- Thanks, Deepak Jha