[jira] [Created] (FLINK-11187) StreamingFileSink with S3 backend transient socket timeout issues
Addison Higham created FLINK-11187: -- Summary: StreamingFileSink with S3 backend transient socket timeout issues Key: FLINK-11187 URL: https://issues.apache.org/jira/browse/FLINK-11187 Project: Flink Issue Type: Bug Components: FileSystem, Streaming Connectors Affects Versions: 1.7.0, 1.7.1 Reporter: Addison Higham Assignee: Addison Higham Fix For: 1.7.2 When using the StreamingFileSink with S3A backend, occasionally, errors like this will occur: {noformat} Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Your socket connection to the server was not read from or written to within the timeout period. Idle connections will be closed. (Service: Amazon S3; Status Code: 400; Error Code: RequestTimeout; Request ID: xxx; S3 Extended Request ID: xxx, S3 Extended Request ID: xxx at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056){noformat} This causes a restart of flink job, which is often able to recover from, but under heavy load, this can become very frequent. Turning on debug logs you can find the following relevant stack trace: {noformat} 2018-12-17 05:55:46,546 DEBUG org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient - FYI: failed to reset content inputstream before throwing up java.io.IOException: Resetting to invalid mark at java.io.BufferedInputStream.reset(BufferedInputStream.java:448) at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106) at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112) at org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168) at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325) at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272) at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306) at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471) at
[jira] [Created] (FLINK-9616) DatadogHttpReporter fails to be created due to missing shaded dependency
Addison Higham created FLINK-9616: - Summary: DatadogHttpReporter fails to be created due to missing shaded dependency Key: FLINK-9616 URL: https://issues.apache.org/jira/browse/FLINK-9616 Project: Flink Issue Type: Bug Components: Metrics Affects Versions: 1.5.0 Reporter: Addison Higham When using the DatadogHttpReporter, it fails to instantiate with the following exception: {code:java} 2018-06-19 06:01:19,640 INFO org.apache.flink.runtime.metrics.MetricRegistryImpl - Configuring dghttp with {apikey=, tags=, class=org.apache.flink.metrics.datadog.DatadogHttpReporter}. 2018-06-19 06:01:19,642 ERROR org.apache.flink.runtime.metrics.MetricRegistryImpl - Could not instantiate metrics reporter dghttp. Metrics might not be exposed/reported. java.lang.NoClassDefFoundError: org/apache/flink/shaded/okhttp3/MediaType at org.apache.flink.metrics.datadog.DatadogHttpClient.(DatadogHttpClient.java:45) at org.apache.flink.metrics.datadog.DatadogHttpReporter.open(DatadogHttpReporter.java:105) at org.apache.flink.runtime.metrics.MetricRegistryImpl.(MetricRegistryImpl.java:150) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:413) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:274) at org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.initializeServices(MesosSessionClusterEntrypoint.java:92) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:225) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:189) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1889) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:188) at org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.main(MesosSessionClusterEntrypoint.java:181) Caused by: java.lang.ClassNotFoundException: org.apache.flink.shaded.okhttp3.MediaType at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 14 more {code} Looking at the pom.xml for `flink-metrics-datadog` it looks like that dependency is intended to be shaded and included in the jar, however, when we build the jar we see the following lines: {noformat} $ mvn package [INFO] Scanning for projects... [INFO] [INFO] [INFO] Building flink-metrics-datadog 1.5.0 [INFO] [INFO] --- maven-shade-plugin:3.0.0:shade (shade-flink) @ flink-metrics-datadog --- [INFO] Excluding com.squareup.okhttp3:okhttp:jar:3.7.0 from the shaded jar. [INFO] Excluding com.squareup.okio:okio:jar:1.12.0 from the shaded jar. [INFO] Including org.apache.flink:force-shading:jar:1.5.0 in the shaded jar. [INFO] Replacing original artifact with shaded artifact. {noformat} And inspecting the built jar: {noformat} $ jar tf flink-metrics-datadog-1.5.0.jar META-INF/ META-INF/MANIFEST.MF org/ org/apache/ org/apache/flink/ org/apache/flink/metrics/ org/apache/flink/metrics/datadog/ org/apache/flink/metrics/datadog/DatadogHttpClient$EmptyCallback.class org/apache/flink/metrics/datadog/DMetric.class org/apache/flink/metrics/datadog/DSeries.class org/apache/flink/metrics/datadog/DGauge.class org/apache/flink/metrics/datadog/DatadogHttpReporter.class org/apache/flink/metrics/datadog/DatadogHttpClient.class org/apache/flink/metrics/datadog/MetricType.class org/apache/flink/metrics/datadog/DatadogHttpReporter$DatadogHttpRequest.class org/apache/flink/metrics/datadog/DMeter.class org/apache/flink/metrics/datadog/DCounter.class META-INF/DEPENDENCIES META-INF/maven/ META-INF/maven/org.apache.flink/ META-INF/maven/org.apache.flink/flink-metrics-datadog/ META-INF/maven/org.apache.flink/flink-metrics-datadog/pom.xml META-INF/maven/org.apache.flink/flink-metrics-datadog/pom.properties META-INF/NOTICE {noformat} We don't see the included dependencies -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9612) Add option for minimal artifacts being pulled in meseos
Addison Higham created FLINK-9612: - Summary: Add option for minimal artifacts being pulled in meseos Key: FLINK-9612 URL: https://issues.apache.org/jira/browse/FLINK-9612 Project: Flink Issue Type: Improvement Components: Docker, Mesos Reporter: Addison Higham NOTE: this assumes mesos, but this improvement could also be useful for future container deployments. Currently, in mesos, the FlinkDistributionOverlay copies the entire `conf`, `bin`, and `lib` folders from the running JobManager/ResourceManager. When using docker with a pre-installed flink distribution, this is relatively inefficient as it pulls jars that are already baked into the container image. A new option that disables pulling most (if not all?) of the FlinkDistributionOverlay could allow for much faster and more scalable provisions of TaskManagers. As it currently stands, trying to run a few hundred TaskManagers is likely to result in poor performance in pulling all the artifacts from the MesosArtifactServer -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9611) Allow for user-defined artifacts to be specified as part of a mesos overlay
Addison Higham created FLINK-9611: - Summary: Allow for user-defined artifacts to be specified as part of a mesos overlay Key: FLINK-9611 URL: https://issues.apache.org/jira/browse/FLINK-9611 Project: Flink Issue Type: Improvement Components: Mesos Affects Versions: 1.5.0 Reporter: Addison Higham NOTE: this assumes mesos, but this improvement could also be useful for future container deployments. Currently, when deploying to mesos, the "Overlay" functionality is used to determine which artifacts are to be downloaded into the container. However, there isn't a way to plug in your own artifacts to be downloaded into the container. This can cause problems with certain deployment models. For example, if you are running flink in docker on mesos, you cannot easily use a private docker image. Typically with mesos and private docker images, you specify credentials as a URI to be downloaded into the container that give permissions to download the private image. Typically, this credentials expire after a few days, so baking them into a docker host isn't a solution. It would make sense to add a `MesosUserOverlay` that would simplify take some new configuration parameters and add any custom artifacts (or possibly also environment variables?) Another solution (or longer term solution) might be to allow for dynamically loading an overlay class for even further customization of the container specification. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-7615) Under mesos when using a role, TaskManagers fail to schedule
Addison Higham created FLINK-7615: - Summary: Under mesos when using a role, TaskManagers fail to schedule Key: FLINK-7615 URL: https://issues.apache.org/jira/browse/FLINK-7615 Project: Flink Issue Type: Bug Components: Mesos Affects Versions: 1.3.2 Reporter: Addison Higham When `mesos.resourcemanager.framework.role` is specified, TaskManagers are unable to start. An error message is given that indicates that the request resources can be satisfied. I sadly lost the logs, but essentially it appears that an offer extend by mesos is accepted, but the request being made for resources under the default role (of `*`) but if the resources offered all exist under the role. I believe this is likely to do with the fact that while the framework properly starts under the specified role (meaning it only gets offers of the specified role), it isn't making `Protos.Resource` objects with a role defined. This can be seen here: https://github.com/apache/flink/blob/release-1.3.2/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java#L72 The mesos docs for the `Resource.Builder.setRole` (http://mesos.apache.org/api/latest/java/org/apache/mesos/Protos.Resource.Builder.html#setRole-java.lang.String-) allow for a role to be provided. (Note, this method is shown as deprecated for mesos 1.4.0, but for the current version flink uses of 1.0.1, this method is the only mechanism) I believe this should mostly be fixed by something like this: {code:java} /** * Construct a scalar resource value. */ public static Protos.Resource scalar(String name, double value, Option role) { Protos.Resource.Builder builder = Protos.Resource.newBuilder() .setName(name) .setType(Protos.Value.Type.SCALAR) .setScalar(Protos.Value.Scalar.newBuilder().setValue(value)); if (role.isDefined()) { builder.setRole(role.get()); } return builder.build(); } {code} However, perhaps we want to consider upgrading to mesos 1.4.x that has the newer API for this (http://mesos.apache.org/api/latest/java/org/apache/mesos/Protos.Resource.ReservationInfo.Builder.html#setRole-java.lang.String-) In looking at the other options for ReservationInfo, I don't see any current need to expose any of those parameters for configuration, but perhaps some FLIP-6 work could benefit. [~till.rohrmann] any thoughts? I can implement a fix as above against mesos 1.0.1, but figured I would get your input before submitting a patch for this -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-5975) Mesos should support adding volumes to launched taskManagers
Addison Higham created FLINK-5975: - Summary: Mesos should support adding volumes to launched taskManagers Key: FLINK-5975 URL: https://issues.apache.org/jira/browse/FLINK-5975 Project: Flink Issue Type: Improvement Components: Mesos Affects Versions: 1.2.0, 1.3.0 Reporter: Addison Higham Priority: Minor Flink needs access to shared storage. In many cases, this is HDFS, but it would be nice to also support file URIs on an mounted NFS for example. Mesos exposes APIs for adding volumes, so it should be relatively simply to add this. As an example, here is the spark code for supporting volumes: https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala#L35 -- This message was sent by Atlassian JIRA (v6.3.15#6346)