event time timezone is not correct

2019-03-03 Thread 孙森
Hi all:
I am using flink sql with event time, but the field which acts as the 
routine  is not correct in the output. There’s an eight-hour time difference. 
Any suggestion?


My input is (ums_ts_ acts as the rowtime):

{"schema":{"namespace":"en2.*.*.*","fields":[{"name":"ums_id_","type":"long","nullable":false},{"name":"ums_ts_","type":"datetime","nullable":false},{"name":"ums_op_","type":"string","nullable":false},{"name":"key","type":"int","nullable":false},{"name":"value1","type":"string","nullable":true},{"name":"value2","type":"long","nullable":false}]},"payload":[{"tuple":["1","2018-04-11
 12:40:01.345123","i","10","aa1","10"]},{"tuple":["22","2018-04-11 
12:40:20.345123","u","10","aa2","11"]},{"tuple":["311","2018-04-11 
12:40:39.345123","d","10","aa3","12"]}]}


My sql is:

select key,   COUNT(*) AS count_sen,   SUM(value2) AS 
ages,TUMBLE_START(ums_ts_, INTERVAL '1' SECOND) as window_start FROM sen2 GROUP 
BY TUMBLE(ums_ts_, INTERVAL '1' SECOND), key;


The output is :

{"schema":{"namespace":"en3.*.*.*","fields":[{"name":"key","type":"int","nullable":true},{"name":"count_sen","type":"long","nullable":true},{"name":"ages","type":"long","nullable":true},{"name":"window_start","type":"datetime","nullable":true}]},"payload":[{"tuple":["10","1","10","2018-04-11
 04:40:01.0"]}]}
{"schema":{"namespace":"en3.*.*.*","fields":[{"name":"key","type":"int","nullable":true},{"name":"count_sen","type":"long","nullable":true},{"name":"ages","type":"long","nullable":true},{"name":"window_start","type":"datetime","nullable":true}]},"payload":[{"tuple":["10","1","11","2018-04-11
 04:40:20.0"]}]}





flink sql about nested json

2019-03-03 Thread 杨光
Hi,
i am trying the flink sql api to read json formate data from kafka topic.
My json schema is a nested json like this
{
  "type": "object",
  "properties": {
"table": {
  "type": "string"
},
"str2": {
  "type": "string"
},
"obj1": {
  "type": "object",
  "properties": {
"rkey": {
  "type": "string"
},
"val": {
  "type": "string"
},
"lastTime": {
  "type": "number"
}
  },
  "required": ["lastTime", "rkey", "val"]
},
"obj2": {
  "type": "object",
  "properties": {
"val": {
  "type": "string"
},
"lastTime": {
  "type": "number"
}
  },
  "required": ["lastTime", "val"]
}
  },
  "required": ["table", "str2", "obj1", "obj2"]
}

i define a table sechema like this.

Schema schemaDesc1 = new Schema()
...
.field("tablestr", Types.STRING).from("table")
...
.field("rkey", Types.STRING).from("rkey");


when i run a debug case ,i got error about the "rkey" field (the file in
the nest obj1)
" SQL validation failed. Table field 'rkey' was resolved to TableSource
return type field 'rkey', but field 'rkey' was not found in the return type
Row".

My question is :does the org.apache.flink.table.descriptors.Json format
support nested json schema? If does ,how can i set the right format or
schema ? If not ,then how can i apply flink sql api on nested json data
source.


Re: Checkpoints and catch-up burst (heavy back pressure)

2019-03-03 Thread LINZ, Arnaud
Hi,

My source checkpoint is actually the file list. But it's not trivially small as 
I may have hundreds of thousand of files, with long filenames.
My sink checkpoint is a smaller hdfs file list with current size.

 Message d'origine 
De : Ken Krugler 
Date : ven., mars 01, 2019 7:05 PM +0100
A : "LINZ, Arnaud" 
CC : zhijiang , user 
Objet : Re: Checkpoints and catch-up burst (heavy back pressure)

Hi Arnaud,

1. What’s your checkpoint configuration? Wondering if you’re writing to HDFS, 
and thus the load you’re putting on it while catching up & checkpointing is too 
high.

If so, then you could monitor the TotalLoad metric (FSNamesystem) in your 
source, and throttle back the emitting of file paths when this (empirically) 
gets too high.

2. I’m wondering what all you are checkpointing, and why.

E.g. if this is just an ETL-ish workflow to pull files, parse them, and write 
out (transformed) results, then you could in theory just checkpoint which files 
have been processed.

This means catching up after a failure could take more time, but your 
checkpoint size will be trivially small.

— Ken


On Mar 1, 2019, at 5:04 AM, LINZ, Arnaud 
mailto:al...@bouyguestelecom.fr>> wrote:

Hi,

I think I should go into more details to explain my use case.
I have one non parallel source (parallelism = 1) that list binary files in a 
HDFS directory. DataSet emitted by the source is a data set of file names, not 
file content. These filenames are rebalanced, and sent to workers (parallelism 
= 15) that will use a flatmapper that open the file, read it, decode it, and 
send records (forward mode) to the sinks (with a few 1-to-1 mapping 
in-between). So the flatmap operation is a time-consuming one as the files are 
more than 200Mb large each; the flatmapper will emit millions of record to the 
sink given one source record (filename).

The rebalancing, occurring at the file name level, does not use much I/O and I 
cannot use one-to-one mode at that point if I want some parallelims since I 
have only one source.

I did not put file decoding directly in the sources because I have no good way 
to distribute files to sources without a controller (input directory is unique, 
filenames are random and cannot be “attributed” to one particular source 
instance easily).
Alternatively, I could have used a dispatcher daemon separated from the 
streaming app that distribute files to various directories, each directory 
being associated with a flink source instance, and put the file reading & 
decoding directly in the source, but that seemed more complex to code and 
exploit than the filename source. Would it have been better from the 
checkpointing perspective?

About the ungraceful source sleep(), is there a way, programmatically, to know 
the “load” of the app, or to determine if checkpointing takes too much time, so 
that I can do it only on purpose?

Thanks,
Arnaud

De : zhijiang mailto:wangzhijiang...@aliyun.com>>
Envoyé : vendredi 1 mars 2019 04:59
À : user mailto:user@flink.apache.org>>; LINZ, Arnaud 
mailto:al...@bouyguestelecom.fr>>
Objet : Re: Checkpoints and catch-up burst (heavy back pressure)

Hi Arnaud,

Thanks for the further feedbacks!

For option1: 40min still does not makes sense, which indicates it might take 
more time to finish checkpoint in your case. I also experienced some scenarios 
of catching up data to take several hours to finish one checkpoint. If the 
current checkpoint expires because of timeout, the next new triggered 
checkpoint might still be failed for timeout. So it seems better to wait the 
current checkpoint until finishes, not expires it, unless we can not bear this 
long time for some reasons such as wondering failover to restore more data 
during this time.

For option2: The default network setting should be make sense. The lower values 
might cause performance regression and the higher values would increase the 
inflighing buffers and checkpoint delay more seriously.

For option3: If the resource is limited, it is still not working on your side.

It is an option and might work in your case for sleeping some time in source as 
you mentioned, although it seems not a graceful way.

I think there are no data skew in your case to cause backpressure, because you 
used the rebalance mode as mentioned. Another option might use the forward mode 
which would be better than rebalance mode if possible in your case. Because the 
source and downstream task is one-to-one in forward mode, so the total 
flighting buffers are 2+2+8 for one single downstream task before barrier. If 
in rebalance mode, the total flighting buffer would be (a*2+a*2+8) for one 
single downstream task (`a` is the parallelism of source vertex), because it is 
all-to-all connection. The barrier alignment takes more time in rebalance mode 
than forward mode.

Best,
Zhijiang
--
From:LINZ, Arnaud mailto:al...@bouyguestelecom.fr>>
Send Time:2019年3月1日(星期五) 00:46

Re: [Flink-Question] In Flink parallel computing, how do different windows receive the data of their own partition, that is, how does Window determine which partition number the current Window belongs

2019-03-03 Thread 刘 文

Sorry, I still don't understand. Can I ask for help again?


For example, the degree of parallelism is 2, which will produce two Window 
threads.
).setParallelism(2)
).These two windows are how to read their own partition data.
).input data
  1 2 3 4 5 6 7 8 9 10
).source   ->  operator   ->   RecordWriter.emitcal  partition by key,
--
change [partition 0]
   
   
key:1partition:0
key:2partition:0
key:3partition:0
key:4partition:0
key:6partition:0
key:10   partition:0
 --
 change 1  [partition 1]

key:5partition:1
key:7partition:1
key:8partition:1
key:9partition:1
).window 0 (1/2)
   How to Calculation current parition  ?
How to get the data in the current partition  ?
   
).window 1 (2/2)
   How to Calculation current parition  ? 
How to get the data in the current partition ?

---

> 在 2019年3月4日,上午4:19,Rong Rong  写道:
> 
> Hi
> 
> I am not sure if I understand your question correctly, so will try to explain 
> the flow how elements gets into window operators.
> 
> Flink makes the partition assignment before invoking the operator to process 
> element. For the word count example, WindowOperator is invoked by 
> StreamInputProcessor[1] to "setKeyContextElement".
> The actual key is then set by WindowOperator (inherently by 
> AbstractStreamOperator[2]), which ultimately passed to KeyedStateBackend[3].
> 
> So, by the time WindowOperator processes elements, the KeyedStateBackend was 
> already set to the correct key.
> 
> Hope this answers your question.
> 
> --
> Rong
> 
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.html
>  
> 
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.html
>  
> 
> On Sun, Mar 3, 2019 at 5:15 AM 刘 文  > wrote:
> ). Environment Flink1.7.2 WordCount local, stream processing
> ).source RecordWriter.emit(), for each element by key, divided into different 
> partitions, the partition location of each element has been determined, the 
> number of partitions is determined by DataStream.setParallelism(2)
>  ). By copyFromSerializerToTargetChannel(int targetChannel) to write data to 
> different channels, it is to send data to the window corresponding to 
> different partitions (data is sent one by one)



Re: Checkpoints and catch-up burst (heavy back pressure)

2019-03-03 Thread zhijiang
Hi Arnaud,

I think I understand your special user case based on your further explanation. 
As you said, it is easy for source to emit the whole file names caching in 
network buffers because the emitted file name is so small and flatmap/sink 
processing is slow. Then when checkpoint triggered, the barrier is behind the 
whole set of file names, that means the sink can not receive the barrier until 
reading and writing all the corresponding files. 

So the proper solution in your case has to control the emit rate on source side 
based on sink catchup progress in order to avoid many files queued in front of 
barriers.  This is the right way to try and wish your solution with 2 
parameters work.

Best,
Zhijiang


--
From:LINZ, Arnaud 
Send Time:2019年3月2日(星期六) 16:45
To:zhijiang ; user 
Subject:RE: Checkpoints and catch-up burst (heavy back pressure)

 Hello,
 When I think about it, I figure out that a barrier for the source is the whole 
set of files and therefore the checkpoint will never complete until the sink 
have caught up.
 The simplest way to deal with it without refactoring is to add 2 parameters to 
the source, a file number  threshold detecting the catchup mode and a max file 
per sec limitation when this occupe, slightly lower than the natural catchup 
rate.

  Message d'origine 
 De : "LINZ, Arnaud" 
 Date : ven., mars 01, 2019 2:04 PM +0100
 A : zhijiang , user 
 Objet : RE: Checkpoints and catch-up burst (heavy back pressure)

Hi,
I think I should go into more details to explain my use case.
I have one non parallel source (parallelism = 1) that list binary files in a 
HDFS directory. DataSet emitted by the source is a data set of file names, not 
file content. These filenames are rebalanced, and sent to workers (parallelism 
= 15) that will use a flatmapper that open the file, read it, decode it, and 
send records (forward mode) to the sinks (with a few 1-to-1 mapping 
in-between). So the flatmap operation is a time-consuming one as the files are 
more than 200Mb large each; the flatmapper will emit millions of record to the 
sink given one source record (filename).
The rebalancing, occurring at the file name level, does not use much I/O and I 
cannot use one-to-one mode at that point if I want some parallelims since I 
have only one source.
I did not put file decoding directly in the sources because I have no good way 
to distribute files to sources without a controller (input directory is unique, 
filenames are random and cannot be “attributed” to one particular source 
instance easily). 
Alternatively, I could have used a dispatcher daemon separated from the 
streaming app that distribute files to various directories, each directory 
being associated with a flink source instance, and put the file reading & 
decoding directly in the source, but that seemed more complex to code and 
exploit than the filename source. Would it have been better from the 
checkpointing perspective?
About the ungraceful source sleep(), is there a way, programmatically, to know 
the “load” of the app, or to determine if checkpointing takes too much time, so 
that I can do it only on purpose?
Thanks,
Arnaud
De : zhijiang  
Envoyé : vendredi 1 mars 2019 04:59
À : user ; LINZ, Arnaud 
Objet : Re: Checkpoints and catch-up burst (heavy back pressure)
Hi Arnaud,
Thanks for the further feedbacks!
For option1: 40min still does not makes sense, which indicates it might take 
more time to finish checkpoint in your case. I also experienced some scenarios 
of catching up data to take several hours to finish one checkpoint. If the 
current checkpoint expires because of timeout, the next new triggered 
checkpoint might still be failed for timeout. So it seems better to wait the 
current checkpoint until finishes, not expires it, unless we can not bear this 
long time for some reasons such as wondering failover to restore more data 
during this time.
For option2: The default network setting should be make sense. The lower values 
might cause performance regression and the higher values would increase the 
inflighing buffers and checkpoint delay more seriously.
For option3: If the resource is limited, it is still not working on your side.
It is an option and might work in your case for sleeping some time in source as 
you mentioned, although it seems not a graceful way.
I think there are no data skew in your case to cause backpressure, because you 
used the rebalance mode as mentioned. Another option might use the forward mode 
which would be better than rebalance mode if possible in your case. Because the 
source and downstream task is one-to-one in forward mode, so the total 
flighting buffers are 2+2+8 for one single downstream task before barrier. If 
in rebalance mode, the total flighting buffer would be (a*2+a*2+8) for one 
single downstream task (`a` is the parallelism of source vertex), because it is 
all-to-all connection. The barrier alignment

Re: [Flink-Question] In Flink parallel computing, how do different windows receive the data of their own partition, that is, how does Window determine which partition number the current Window belongs

2019-03-03 Thread Rong Rong
Hi

I am not sure if I understand your question correctly, so will try to
explain the flow how elements gets into window operators.

Flink makes the partition assignment before invoking the operator to
process element. For the word count example, WindowOperator is invoked by
StreamInputProcessor[1] to "setKeyContextElement".
The actual key is then set by WindowOperator (inherently by
AbstractStreamOperator[2]), which ultimately passed to KeyedStateBackend[3].

So, by the time WindowOperator processes elements, the KeyedStateBackend
was already set to the correct key.

Hope this answers your question.

--
Rong


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.html

On Sun, Mar 3, 2019 at 5:15 AM 刘 文  wrote:

> ). Environment Flink1.7.2 WordCount local, stream processing
> ).source RecordWriter.emit(), for each element by key, divided into
> different partitions, the partition location of each element has been
> determined, the number of partitions is determined by
> DataStream.setParallelism(2)
>  ). By copyFromSerializerToTargetChannel(int targetChannel) to write data
> to different channels, it is to send data to the window corresponding to
> different partitions (data is sent one by one)


[Flink-Question] In Flink parallel computing, how do different windows receive the data of their own partition, that is, how does Window determine which partition number the current Window belongs to?

2019-03-03 Thread 刘 文
). Environment Flink1.7.2 WordCount local, stream processing
).source RecordWriter.emit(), for each element by key, divided into different 
partitions, the partition location of each element has been determined, the 
number of partitions is determined by DataStream.setParallelism(2)
 ). By copyFromSerializerToTargetChannel(int targetChannel) to write data to 
different channels, it is to send data to the window corresponding to different 
partitions (data is sent one by one)

Command exited with status 1 in running Flink on marathon

2019-03-03 Thread Mar_zieh
I want to run my flink program on Mesos cluster via marathon. I created an
application with this Json file in Marathon: 
  
 { 
"id": "flink", 
"cmd": "/home/flink-1.7.0/bin/mesos-appmaster.sh
-Djobmanager.heap.mb=1024 -Djobmanager.rpc.port=6123 -Drest.port=8081
-Dmesos.resourcemanager.tasks.mem=1024 -Dtaskmanager.heap.mb=1024
-Dtaskmanager.numberOfTaskSlots=2 -Dparallelism.default=2
-Dmesos.resourcemanager.tasks.cpus=1", 
"cpus": 1.0, 
"mem": 1024 
} 

  The task became failed with this error: 

 I0303 09:41:52.841243  2594 exec.cpp:162] Version: 1.7.0 
I0303 09:41:52.851898  2593 exec.cpp:236] Executor registered on agent
d9a98175-b93c-4600-a41b-fe91fae5486a-S0 
I0303 09:41:52.854436  2594 executor.cpp:182] Received SUBSCRIBED event 
I0303 09:41:52.855284  2594 executor.cpp:186] Subscribed executor on
172.28.10.136 
I0303 09:41:52.855479  2594 executor.cpp:182] Received LAUNCH event 
I0303 09:41:52.855932  2594 executor.cpp:679] Starting task
.933fdd2f-3d98-11e9-bbc4-0242a78449af 
I0303 09:41:52.868172  2594 executor.cpp:499] Running
'/home/mesos-1.7.0/build/src/mesos-containerizer launch
' 
I0303 09:41:52.872699  2594 executor.cpp:693] Forked command at 2599 
I0303 09:41:54.050284  2596 executor.cpp:994] Command exited with status 1
(pid: 2599) 
I0303 09:41:55.052323  2598 process.cpp:926] Stopped the socket accept loop 

I configured Zookeeper, Mesos, Marathon and Flink. Moreover, they are all on
docker. I ran a simple program like "echo "hello" >> /home/output.txt"
without any problems. 

I really do not know what is going on, I am confused. Would you please any
one tell me what is wrong here? 

Any help would be appreciated. 

Many thanks.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Automate Job submission with container

2019-03-03 Thread Mar_zieh
I want to run my flink program on Mesos cluster via marathon. I created an
application with this Json file in Marathon:
 
 {
"id": "flink",
"cmd": "/home/flink-1.7.0/bin/mesos-appmaster.sh
-Djobmanager.heap.mb=1024 -Djobmanager.rpc.port=6123 -Drest.port=8081
-Dmesos.resourcemanager.tasks.mem=1024 -Dtaskmanager.heap.mb=1024
-Dtaskmanager.numberOfTaskSlots=2 -Dparallelism.default=2
-Dmesos.resourcemanager.tasks.cpus=1",
"cpus": 1.0,
"mem": 1024
}

  The task became failed with this error:

 I0303 09:41:52.841243  2594 exec.cpp:162] Version: 1.7.0
I0303 09:41:52.851898  2593 exec.cpp:236] Executor registered on agent
d9a98175-b93c-4600-a41b-fe91fae5486a-S0
I0303 09:41:52.854436  2594 executor.cpp:182] Received SUBSCRIBED event
I0303 09:41:52.855284  2594 executor.cpp:186] Subscribed executor on
172.28.10.136
I0303 09:41:52.855479  2594 executor.cpp:182] Received LAUNCH event
I0303 09:41:52.855932  2594 executor.cpp:679] Starting task
.933fdd2f-3d98-11e9-bbc4-0242a78449af
I0303 09:41:52.868172  2594 executor.cpp:499] Running
'/home/mesos-1.7.0/build/src/mesos-containerizer launch
'
I0303 09:41:52.872699  2594 executor.cpp:693] Forked command at 2599
I0303 09:41:54.050284  2596 executor.cpp:994] Command exited with status 1
(pid: 2599)
I0303 09:41:55.052323  2598 process.cpp:926] Stopped the socket accept loop

I configured Zookeeper, Mesos, Marathon and Flink. Moreover, they are all on
docker. I ran a simple program like "echo "hello" >> /home/output.txt"
without any problems. 

I really do not know what is going on, I am confused. Would you please any
one tell me what is wrong here?

Any help would be appreciated. 

Many thanks. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Automate Job submission with container

2019-03-03 Thread Mar_zieh
I want to run my flink program on Mesos cluster via marathon. I created an
application with this Json file in Marathon:
 
 {
"id": "flink",
"cmd": "/home/flink-1.7.0/bin/mesos-appmaster.sh
-Djobmanager.heap.mb=1024 -Djobmanager.rpc.port=6123 -Drest.port=8081
-Dmesos.resourcemanager.tasks.mem=1024 -Dtaskmanager.heap.mb=1024
-Dtaskmanager.numberOfTaskSlots=2 -Dparallelism.default=2
-Dmesos.resourcemanager.tasks.cpus=1",
"cpus": 1.0,
"mem": 1024
}

  The task became failed with this error:

 I0303 09:41:52.841243  2594 exec.cpp:162] Version: 1.7.0
I0303 09:41:52.851898  2593 exec.cpp:236] Executor registered on agent
d9a98175-b93c-4600-a41b-fe91fae5486a-S0
I0303 09:41:52.854436  2594 executor.cpp:182] Received SUBSCRIBED event
I0303 09:41:52.855284  2594 executor.cpp:186] Subscribed executor on
172.28.10.136
I0303 09:41:52.855479  2594 executor.cpp:182] Received LAUNCH event
I0303 09:41:52.855932  2594 executor.cpp:679] Starting task
.933fdd2f-3d98-11e9-bbc4-0242a78449af
I0303 09:41:52.868172  2594 executor.cpp:499] Running
'/home/mesos-1.7.0/build/src/mesos-containerizer launch
'
I0303 09:41:52.872699  2594 executor.cpp:693] Forked command at 2599
I0303 09:41:54.050284  2596 executor.cpp:994] Command exited with status 1
(pid: 2599)
I0303 09:41:55.052323  2598 process.cpp:926] Stopped the socket accept loop

I configured Zookeeper, Mesos, Marathon and Flink. Moreover, they are all on
docker. I ran a simple program like "echo "hello" >> /home/output.txt"
without any problems. 

I really do not know what is going on, I am confused. Would you please any
one tell me what is wrong here?

Any help would be appreciated. 

Many thanks. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Automate Job submission with container

2019-03-03 Thread Mar_zieh
I want to run my flink program on Mesos cluster via marathon. I created an
application with this Json file in Marathon:
 
 {
"id": "flink",
"cmd": "/home/flink-1.7.0/bin/mesos-appmaster.sh
-Djobmanager.heap.mb=1024 -Djobmanager.rpc.port=6123 -Drest.port=8081
-Dmesos.resourcemanager.tasks.mem=1024 -Dtaskmanager.heap.mb=1024
-Dtaskmanager.numberOfTaskSlots=2 -Dparallelism.default=2
-Dmesos.resourcemanager.tasks.cpus=1",
"cpus": 1.0,
"mem": 1024
}

  The task became failed with this error:

 I0303 09:41:52.841243  2594 exec.cpp:162] Version: 1.7.0
I0303 09:41:52.851898  2593 exec.cpp:236] Executor registered on agent
d9a98175-b93c-4600-a41b-fe91fae5486a-S0
I0303 09:41:52.854436  2594 executor.cpp:182] Received SUBSCRIBED event
I0303 09:41:52.855284  2594 executor.cpp:186] Subscribed executor on
172.28.10.136
I0303 09:41:52.855479  2594 executor.cpp:182] Received LAUNCH event
I0303 09:41:52.855932  2594 executor.cpp:679] Starting task
.933fdd2f-3d98-11e9-bbc4-0242a78449af
I0303 09:41:52.868172  2594 executor.cpp:499] Running
'/home/mesos-1.7.0/build/src/mesos-containerizer launch
'
I0303 09:41:52.872699  2594 executor.cpp:693] Forked command at 2599
I0303 09:41:54.050284  2596 executor.cpp:994] Command exited with status 1
(pid: 2599)
I0303 09:41:55.052323  2598 process.cpp:926] Stopped the socket accept loop

I configured Zookeeper, Mesos, Marathon and Flink. Moreover, they are all on
docker. I ran a simple program like "echo "hello" >> /home/output.txt"
without any problems. 

I really do not know what is going on, I am confused. Would you please any
one tell me what is wrong here?

Any help would be appreciated. 

Many thanks. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/