It would also be good to know how many slots you have on each task executor.

On 10/1/2020 11:21 AM, Till Rohrmann wrote:
Hi Andreas,

do the logs of the JM contain any information?

Theoretically, each task submission to a `TaskExecutor` can trigger a new connection to the BlobServer. This depends a bit on how large your TaskInformation is and whether this information is being offloaded to the BlobServer. What you can definitely try to do is to increase the blob.fetch.backlog in order to see whether this solves the problem.

How many jobs and in with what timeline do you submit them to the Flink cluster? Maybe you can share a bit more details about the application you are running.

Cheers,
Till

On Wed, Sep 30, 2020 at 11:49 PM Hailu, Andreas <andreas.ha...@gs.com <mailto:andreas.ha...@gs.com>> wrote:

    Hello folks, I’m seeing application failures where our Blobserver
    is refusing connections mid application:

    2020-09-30 13:56:06,227 INFO
    [flink-akka.actor.default-dispatcher-18]
    org.apache.flink.runtime.taskexecutor.TaskExecutor -
    Un-registering task and sending final execution state FINISHED to
    JobManager for task DataSink (TextOutputFormat
    
(hdfs:/user/p2epda/lake/delp_prod/PROD/APPROVED/staging/datastore/MandateTradingLine/tmp_pipeline_collapse)
    - UTF-8) 3d1890b47f4398d805cf0c1b54286f71.

    2020-09-30 13:56:06,423 INFO
    [flink-akka.actor.default-dispatcher-18]
    org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable - Free
    slot TaskSlot(index:0, state:ACTIVE, resource profile:
    ResourceProfile{cpuCores=1.7976931348623157E308,
    heapMemoryInMB=2147483647, directMemoryInMB=2147483647,
    nativeMemoryInMB=2147483647, networkMemoryInMB=2147483647,
    managedMemoryInMB=3046}, allocationId:
    e8be16ec74f16c795d95b89cd08f5c37, jobId:
    e808de0373bd515224434b7ec1efe249).

    2020-09-30 13:56:06,424 INFO
    [flink-akka.actor.default-dispatcher-18]
    org.apache.flink.runtime.taskexecutor.JobLeaderService - Remove
    job e808de0373bd515224434b7ec1efe249 from job leader monitoring.

    2020-09-30 13:56:06,424 INFO
    [flink-akka.actor.default-dispatcher-18]
    org.apache.flink.runtime.taskexecutor.TaskExecutor - Close
    JobManager connection for job e808de0373bd515224434b7ec1efe249.

    2020-09-30 13:56:06,426 INFO
    [flink-akka.actor.default-dispatcher-18]
    org.apache.flink.runtime.taskexecutor.TaskExecutor - Close
    JobManager connection for job e808de0373bd515224434b7ec1efe249.

    2020-09-30 13:56:06,426 INFO
    [flink-akka.actor.default-dispatcher-18]
    org.apache.flink.runtime.taskexecutor.JobLeaderService - Cannot
    reconnect to job e808de0373bd515224434b7ec1efe249 because it is
    not registered.

    2020-09-30 13:56:09,918 INFO  [CHAIN DataSource (dataset | Read
    Staging From File System | AVRO) -> Map (Map at
    readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter
    at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
    handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
    collapsePipelineIfRequired(Task.java:160)) (1/1)]
    org.apache.flink.runtime.blob.BlobClient - Downloading
    
48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
    from d43723-430.dc.gs.com/10.48.128.14:46473
    <http://d43723-430.dc.gs.com/10.48.128.14:46473> (retry 3)

    2020-09-30 13:56:09,920 ERROR [CHAIN DataSource (dataset | Read
    Staging From File System | AVRO) -> Map (Map at
    readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter
    at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
    handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
    collapsePipelineIfRequired(Task.java:160)) (1/1)]
    org.apache.flink.runtime.blob.BlobClient - Failed to fetch BLOB
    
48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
    from d43723-430.dc.gs.com/10.48.128.14:46473
    <http://d43723-430.dc.gs.com/10.48.128.14:46473> and store it
    under
    
/fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004
    Retrying...

    2020-09-30 13:56:09,920 INFO  [CHAIN DataSource (dataset | Read
    Staging From File System | AVRO) -> Map (Map at
    readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter
    at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
    handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
    collapsePipelineIfRequired(Task.java:160)) (1/1)]
    org.apache.flink.runtime.blob.BlobClient - Downloading
    
48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
    from d43723-430.dc.gs.com/10.48.128.14:46473
    <http://d43723-430.dc.gs.com/10.48.128.14:46473> (retry 4)

    2020-09-30 13:56:09,922 ERROR [CHAIN DataSource (dataset | Read
    Staging From File System | AVRO) -> Map (Map at
    readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter
    at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
    handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
    collapsePipelineIfRequired(Task.java:160)) (1/1)]
    org.apache.flink.runtime.blob.BlobClient - Failed to fetch BLOB
    
48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
    from d43723-430.dc.gs.com/10.48.128.14:46473
    <http://d43723-430.dc.gs.com/10.48.128.14:46473> and store it
    under
    
/fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004
    Retrying...

    2020-09-30 13:56:09,922 INFO  [CHAIN DataSource (dataset | Read
    Staging From File System | AVRO) -> Map (Map at
    readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter
    at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
    handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
    collapsePipelineIfRequired(Task.java:160)) (1/1)]
    org.apache.flink.runtime.blob.BlobClient - Downloading
    
48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
    from d43723-430.dc.gs.com/10.48.128.14:46473
    <http://d43723-430.dc.gs.com/10.48.128.14:46473> (retry 5)

    2020-09-30 13:56:09,925 ERROR [CHAIN DataSource (dataset | Read
    Staging From File System | AVRO) -> Map (Map at
    readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter
    at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
    handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
    collapsePipelineIfRequired(Task.java:160)) (1/1)]
    org.apache.flink.runtime.blob.BlobClient - Failed to fetch BLOB
    
48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
    from d43723-430.dc.gs.com/10.48.128.14:46473
    <http://d43723-430.dc.gs.com/10.48.128.14:46473> and store it
    under
    
/fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004
    No retries left.

    java.io.IOException: Could not connect to BlobServer at address
    d43723-430.dc.gs.com/10.48.128.14:46473
    <http://d43723-430.dc.gs.com/10.48.128.14:46473>

                    at
    org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:100)

                    at
    
org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)

                    at
    
org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)

                    at
    
org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)

                    at
    
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)

                    at
    
org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)

                    at
    org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)

                    at
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

                    at java.lang.Thread.run(Thread.java:745)

    Caused by: java.net.ConnectException: Connection refused
    (Connection refused)

                    at java.net.PlainSocketImpl.socketConnect(Native
    Method)

                    at
    java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

                    at
    
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

                    at
    java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

                    at
    java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

                    at java.net.Socket.connect(Socket.java:589)

                    at
    org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:95)

                    ... 8 more

    Prior to the above connection refused error, I don’t see any
    exceptions or failures. We’re running this application with v1.9.2
    on YARN, 26 Task Managers with 2 cores each, and with the default
    BLOB server configurations. The application itself then has many
    jobs it submits to the cluster. Does this sound like a
    blob.fetch.backlog/concurrent-connections config problem
    (defaulted to 1000 and 50 respectively)? I wasn’t sure how chatty
    each TM is with the server. How can we tell if it’s either a max
    concurrent-conn or backlog problem?

    Best,

    Andreas


    ------------------------------------------------------------------------

    Your Personal Data: We may collect and process information about
    you that may be subject to data protection laws. For more
    information about how we use and disclose your personal data, how
    we protect your information, our legal basis to use your
    information, your rights and who you can contact, please refer to:
    www.gs.com/privacy-notices <http://www.gs.com/privacy-notices>


Reply via email to