Hi Andreas,

do the logs of the JM contain any information?

Theoretically, each task submission to a `TaskExecutor` can trigger a new
connection to the BlobServer. This depends a bit on how large your
TaskInformation is and whether this information is being offloaded to the
BlobServer. What you can definitely try to do is to increase the
blob.fetch.backlog in order to see whether this solves the problem.

How many jobs and in with what timeline do you submit them to the Flink
cluster? Maybe you can share a bit more details about the application you
are running.

Cheers,
Till

On Wed, Sep 30, 2020 at 11:49 PM Hailu, Andreas <andreas.ha...@gs.com>
wrote:

> Hello folks, I’m seeing application failures where our Blobserver is
> refusing connections mid application:
>
>
>
> 2020-09-30 13:56:06,227 INFO  [flink-akka.actor.default-dispatcher-18]
> org.apache.flink.runtime.taskexecutor.TaskExecutor            -
> Un-registering task and sending final execution state FINISHED to
> JobManager for task DataSink (TextOutputFormat
> (hdfs:/user/p2epda/lake/delp_prod/PROD/APPROVED/staging/datastore/MandateTradingLine/tmp_pipeline_collapse)
> - UTF-8) 3d1890b47f4398d805cf0c1b54286f71.
>
> 2020-09-30 13:56:06,423 INFO  [flink-akka.actor.default-dispatcher-18]
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot
> TaskSlot(index:0, state:ACTIVE, resource profile:
> ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147483647,
> directMemoryInMB=2147483647, nativeMemoryInMB=2147483647,
> networkMemoryInMB=2147483647, managedMemoryInMB=3046}, allocationId:
> e8be16ec74f16c795d95b89cd08f5c37, jobId: e808de0373bd515224434b7ec1efe249).
>
> 2020-09-30 13:56:06,424 INFO  [flink-akka.actor.default-dispatcher-18]
> org.apache.flink.runtime.taskexecutor.JobLeaderService        - Remove job
> e808de0373bd515224434b7ec1efe249 from job leader monitoring.
>
> 2020-09-30 13:56:06,424 INFO  [flink-akka.actor.default-dispatcher-18]
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close
> JobManager connection for job e808de0373bd515224434b7ec1efe249.
>
> 2020-09-30 13:56:06,426 INFO  [flink-akka.actor.default-dispatcher-18]
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close
> JobManager connection for job e808de0373bd515224434b7ec1efe249.
>
> 2020-09-30 13:56:06,426 INFO  [flink-akka.actor.default-dispatcher-18]
> org.apache.flink.runtime.taskexecutor.JobLeaderService        - Cannot
> reconnect to job e808de0373bd515224434b7ec1efe249 because it is not
> registered.
>
> 2020-09-30 13:56:09,918 INFO  [CHAIN DataSource (dataset | Read Staging
> From File System | AVRO) -> Map (Map at
> readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at
> validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
> handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
> collapsePipelineIfRequired(Task.java:160)) (1/1)]
> org.apache.flink.runtime.blob.BlobClient                      - Downloading
> 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
> from d43723-430.dc.gs.com/10.48.128.14:46473 (retry 3)
>
> 2020-09-30 13:56:09,920 ERROR [CHAIN DataSource (dataset | Read Staging
> From File System | AVRO) -> Map (Map at
> readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at
> validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
> handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
> collapsePipelineIfRequired(Task.java:160)) (1/1)]
> org.apache.flink.runtime.blob.BlobClient                      - Failed to
> fetch BLOB
> 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
> from d43723-430.dc.gs.com/10.48.128.14:46473 and store it under
> /fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004
> Retrying...
>
> 2020-09-30 13:56:09,920 INFO  [CHAIN DataSource (dataset | Read Staging
> From File System | AVRO) -> Map (Map at
> readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at
> validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
> handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
> collapsePipelineIfRequired(Task.java:160)) (1/1)]
> org.apache.flink.runtime.blob.BlobClient                      - Downloading
> 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
> from d43723-430.dc.gs.com/10.48.128.14:46473 (retry 4)
>
> 2020-09-30 13:56:09,922 ERROR [CHAIN DataSource (dataset | Read Staging
> From File System | AVRO) -> Map (Map at
> readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at
> validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
> handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
> collapsePipelineIfRequired(Task.java:160)) (1/1)]
> org.apache.flink.runtime.blob.BlobClient                      - Failed to
> fetch BLOB
> 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
> from d43723-430.dc.gs.com/10.48.128.14:46473 and store it under
> /fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004
> Retrying...
>
> 2020-09-30 13:56:09,922 INFO  [CHAIN DataSource (dataset | Read Staging
> From File System | AVRO) -> Map (Map at
> readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at
> validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
> handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
> collapsePipelineIfRequired(Task.java:160)) (1/1)]
> org.apache.flink.runtime.blob.BlobClient                      - Downloading
> 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
> from d43723-430.dc.gs.com/10.48.128.14:46473 (retry 5)
>
> 2020-09-30 13:56:09,925 ERROR [CHAIN DataSource (dataset | Read Staging
> From File System | AVRO) -> Map (Map at
> readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at
> validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
> handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
> collapsePipelineIfRequired(Task.java:160)) (1/1)]
> org.apache.flink.runtime.blob.BlobClient                      - Failed to
> fetch BLOB
> 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
> from d43723-430.dc.gs.com/10.48.128.14:46473 and store it under
> /fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004
> No retries left.
>
> java.io.IOException: Could not connect to BlobServer at address
> d43723-430.dc.gs.com/10.48.128.14:46473
>
>                 at
> org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:100)
>
>                 at
> org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)
>
>                 at
> org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)
>
>                 at
> org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)
>
>                 at
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>
>                 at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.net.ConnectException: Connection refused (Connection
> refused)
>
>                 at java.net.PlainSocketImpl.socketConnect(Native Method)
>
>                 at
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
>
>                 at
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
>
>                 at
> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
>
>                 at
> java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
>
>                 at java.net.Socket.connect(Socket.java:589)
>
>                 at
> org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:95)
>
>                 ... 8 more
>
>
>
> Prior to the above connection refused error, I don’t see any exceptions or
> failures. We’re running this application with v1.9.2 on YARN, 26 Task
> Managers with 2 cores each, and with the default BLOB server
> configurations. The application itself then has many jobs it submits to the
> cluster. Does this sound like a blob.fetch.backlog/concurrent-connections
> config problem (defaulted to 1000 and 50 respectively)? I wasn’t sure how
> chatty each TM is with the server. How can we tell if it’s either a max
> concurrent-conn or backlog problem?
>
>
>
> Best,
>
> Andreas
>
> ------------------------------
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>

Reply via email to