Yes, I will check that, but any pointers on why Flink is taking more time
than gsutil upload?

On Thu, Sep 3, 2020 at 10:14 PM Till Rohrmann <> wrote:

> Hmm then it probably rules GCS out. What about ZooKeeper? Have you
> experienced slow response times from your ZooKeeper cluster?
> Cheers,
> Till
> On Thu, Sep 3, 2020 at 6:23 PM Prakhar Mathur <> wrote:
>> We tried uploading the same blob from Job Manager k8s pod directly to GCS
>> using gsutils and it took 2 seconds. The upload speed was 166.8 MiB/s.
>> Thanks.
>> On Wed, Sep 2, 2020 at 6:14 PM Till Rohrmann <>
>> wrote:
>>> The logs don't look suspicious. Could you maybe check what the write
>>> bandwidth to your GCS bucket is from the machine you are running Flink on?
>>> It should be enough to generate a 200 MB file and write it to GCS. Thanks a
>>> lot for your help in debugging this matter.
>>> Cheers,
>>> Till
>>> On Wed, Sep 2, 2020 at 1:04 PM Prakhar Mathur <>
>>> wrote:
>>>> Hi,
>>>> Thanks for the response. Yes, we are running Flink in HA mode. We
>>>> checked there are no such quota limits for GCS for us. Please find the logs
>>>> below, here you can see the copying of blob started at 11:50:39,455 and it
>>>> got JobGraph submission at 11:50:46,400.
>>>> 2020-09-01 11:50:37,061 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
>>>> Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded
>>>> the idle timeout.
>>>> 2020-09-01 11:50:37,061 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>> Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped.
>>>> 2020-09-01 11:50:37,062 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
>>>> Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded
>>>> the idle timeout.
>>>> 2020-09-01 11:50:37,062 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>> Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped.
>>>> 2020-09-01 11:50:37,305 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>> Trigger heartbeat request.
>>>> 2020-09-01 11:50:37,305 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>> Trigger heartbeat request.
>>>> 2020-09-01 11:50:37,354 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>> Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
>>>> 2020-09-01 11:50:37,354 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
>>>> 2020-09-01 11:50:39,455 DEBUG
>>>> org.apache.flink.runtime.blob.FileSystemBlobStore             - Copying
>>>> from
>>>> /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426
>>>> to
>>>> gs://<test-bucket>/blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426.
>>>> 2020-09-01 11:50:43,904 DEBUG
>>>>  - Got
>>>> ping response for sessionid: 0x30be3d929102460 after 2ms
>>>> 2020-09-01 11:50:46,400 INFO
>>>>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Received
>>>> JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
>>>> 2020-09-01 11:50:46,403 INFO
>>>>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting
>>>> job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
>>>> 2020-09-01 11:50:46,405 DEBUG
>>>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>>>> Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to
>>>> flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0.
>>>> 2020-09-01 11:50:47,325 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>> Trigger heartbeat request.
>>>> 2020-09-01 11:50:47,325 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>> Trigger heartbeat request.
>>>> 2020-09-01 11:50:47,325 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>> Trigger heartbeat request.
>>>> 2020-09-01 11:50:47,325 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>> Trigger heartbeat request.
>>>> 2020-09-01 11:50:47,330 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>> Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
>>>> 2020-09-01 11:50:47,331 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
>>>> 2020-09-01 11:50:52,880 DEBUG
>>>>  - Got
>>>> notification sessionid:0x30be3d929102460
>>>> 2020-09-01 11:50:52,880 DEBUG
>>>>  - Got
>>>> WatchedEvent state:SyncConnected type:NodeChildrenChanged
>>>> path:/flink/cluster/jobgraphs for sessionid 0x30be3d929102460
>>>> 2020-09-01 11:50:52,882 INFO
>>>>  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>>>> Added SubmittedJobGraph(980d3ff229b7fbfe889e2bc93e526da0) to ZooKeeper.
>>>> Thank You.
>>>> On Wed, Sep 2, 2020 at 2:06 PM Till Rohrmann <>
>>>> wrote:
>>>>> Hi Prakhar,
>>>>> have you enabled HA for your cluster? If yes, then Flink will try to
>>>>> store the job graph to the configured high-availability.storageDir in 
>>>>> order
>>>>> to be able to recover it. If this operation takes long, then it is either
>>>>> the filesystem which is slow or storing the pointer in ZooKeeper. If it is
>>>>> the filesystem, then I would suggest to check whether you have some
>>>>> read/write quotas which might slow the operation down.
>>>>> If you haven't enabled HA or persisting the jobGraph is not what takes
>>>>> long, then the next most likely candidate is the recovery from a previous
>>>>> checkpoint. Here again, Flink needs to read from the remote storage (in
>>>>> your case GCS). Depending on the size of the checkpoint and the read
>>>>> bandwidth, this can be faster or slower. The best way to figure out what
>>>>> takes long is to share the logs with us so that we can confirm what takes
>>>>> long.
>>>>> To sum it up, the job submission is most likely slow because of the
>>>>> interplay of Flink with the external system (most likely your configured
>>>>> filesystem). If the filesystem is somewhat throttled, then Flink cannot do
>>>>> much about it.
>>>>> What you could try to do is to check whether your jar contains
>>>>> dependencies which are not needed (e.g. Flink dependencies which are
>>>>> usually provided by the system). That way you could decrease the size of
>>>>> the jar a bit.
>>>>> Cheers,
>>>>> Till
>>>>> On Wed, Sep 2, 2020 at 9:48 AM Prakhar Mathur <>
>>>>> wrote:
>>>>>> Hi,
>>>>>> We are currently running Flink 1.9.0. We see a delay of around 20
>>>>>> seconds in order to start a job on a session Flink cluster. We start the
>>>>>> job using Flink's monitoring REST API where our jar is already uploaded 
>>>>>> on
>>>>>> Job Manager. Our jar file size is around 200 MB. We are using memory 
>>>>>> state
>>>>>> backend having GCS as remote storage.
>>>>>> On running the cluster in debug mode, we observed that generating the
>>>>>> plan itself takes around 6 seconds and copying job graph from local to 
>>>>>> the
>>>>>> remote folder takes around 10 seconds.
>>>>>> We were wondering whether this delay is expected or if it can be
>>>>>> reduced via tweaking any configuration?
>>>>>> Thank you. Regards
>>>>>> Prakhar Mathur

Reply via email to