[jira] [Commented] (FLINK-4947) Make all configuration possible via flink-conf.yaml and CLI.
[ https://issues.apache.org/jira/browse/FLINK-4947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137249#comment-16137249 ] Jamie Grier commented on FLINK-4947: [~gaborhermann] It's more than just that but yes I do suggest that you should be able to override what's in the config file on the command line. More importantly though is that all *config* should be configurable via flink-conf.yaml. We shouldn't add features that are only configurable from the *user code*. An example of this used to be the RocksDB state backend. If you wanted to use that backend and configure it in "async" mode you had to put this in application code, but that's not great for separation of concerns between application developers and ops/platform teams. I know this isn't black-and-white but we should try to clearly separate configuration from user code by putting everything in flink-conf.yaml. We should *also* make it possible to override any of those values on the command line when submitting a job. > Make all configuration possible via flink-conf.yaml and CLI. > > > Key: FLINK-4947 > URL: https://issues.apache.org/jira/browse/FLINK-4947 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Jamie Grier > > I think it's important to make all configuration possible via the > flink-conf.yaml and the command line. > As an example: To enable "externalizedCheckpoints" you must actually call > the StreamExecutionEnvironment#enableExternalizedCheckpoints() method from > your Flink program. > Another example of this would be configuring the RocksDB state backend. > I think it important to make deployment flexible and easy to build tools > around. For example, the infrastructure teams that make these configuration > decisions and provide tools for deploying Flink apps, will be different from > the teams deploying apps. The team writing apps should not have to set all > of this lower level configuration up in their programs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4947) Make all configuration possible via flink-conf.yaml and CLI.
[ https://issues.apache.org/jira/browse/FLINK-4947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16138418#comment-16138418 ] Jamie Grier commented on FLINK-4947: Sounds good to me :) > Make all configuration possible via flink-conf.yaml and CLI. > > > Key: FLINK-4947 > URL: https://issues.apache.org/jira/browse/FLINK-4947 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Jamie Grier > > I think it's important to make all configuration possible via the > flink-conf.yaml and the command line. > As an example: To enable "externalizedCheckpoints" you must actually call > the StreamExecutionEnvironment#enableExternalizedCheckpoints() method from > your Flink program. > Another example of this would be configuring the RocksDB state backend. > I think it important to make deployment flexible and easy to build tools > around. For example, the infrastructure teams that make these configuration > decisions and provide tools for deploying Flink apps, will be different from > the teams deploying apps. The team writing apps should not have to set all > of this lower level configuration up in their programs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-10886) Event time synchronization across sources
Jamie Grier created FLINK-10886: --- Summary: Event time synchronization across sources Key: FLINK-10886 URL: https://issues.apache.org/jira/browse/FLINK-10886 Project: Flink Issue Type: Improvement Components: Streaming Connectors Reporter: Jamie Grier Assignee: Jamie Grier When reading from a source with many parallel partitions, especially when reading lots of historical data (or recovering from downtime and there is a backlog to read), it's quite common for there to develop an event-time skew across those partitions. When doing event-time windowing -- or in fact any event-time driven processing -- the event time skew across partitions results directly in increased buffering in Flink and of course the corresponding state/checkpoint size growth. As the event-time skew and state size grows larger this can have a major effect on application performance and in some cases result in a "death spiral" where the application performance get's worse and worse as the state size grows and grows. So, one solution to this problem, outside of core changes in Flink itself, seems to be to try to coordinate sources across partitions so that they make progress through event time at roughly the same rate. In fact if there is large skew the idea would be to slow or even stop reading from some partitions with newer data while first reading the partitions with older data. Anyway, to do this we need to share state somehow amongst sub-tasks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10887) Add source watermark tracking to the JobMaster
[ https://issues.apache.org/jira/browse/FLINK-10887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jamie Grier updated FLINK-10887: Summary: Add source watermark tracking to the JobMaster (was: Add source watermarking tracking to the JobMaster) > Add source watermark tracking to the JobMaster > -- > > Key: FLINK-10887 > URL: https://issues.apache.org/jira/browse/FLINK-10887 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Jamie Grier >Assignee: Jamie Grier >Priority: Major > Original Estimate: 24h > Remaining Estimate: 24h > > We need to add a new RPC to the JobMaster such that the current watermark for > every source sub-task can be reported and the current global minimum/maximum > watermark can be retrieved so that each source can adjust their partition > read rates in an attempt to keep sources roughly aligned in event time. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10887) Add source watermarking tracking to the JobMaster
Jamie Grier created FLINK-10887: --- Summary: Add source watermarking tracking to the JobMaster Key: FLINK-10887 URL: https://issues.apache.org/jira/browse/FLINK-10887 Project: Flink Issue Type: Sub-task Components: JobManager Reporter: Jamie Grier Assignee: Jamie Grier We need to add a new RPC to the JobMaster such that the current watermark for every source sub-task can be reported and the current global minimum/maximum watermark can be retrieved so that each source can adjust their partition read rates in an attempt to keep sources roughly aligned in event time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10888) Expose new global watermark RPC to sources
Jamie Grier created FLINK-10888: --- Summary: Expose new global watermark RPC to sources Key: FLINK-10888 URL: https://issues.apache.org/jira/browse/FLINK-10888 Project: Flink Issue Type: Sub-task Components: Streaming Connectors Reporter: Jamie Grier Assignee: Jamie Grier Expose new JobMaster RPC for watermark tracking to Source implementations so it can be used to align reads across sources. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10886) Event time synchronization across sources
[ https://issues.apache.org/jira/browse/FLINK-10886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16687333#comment-16687333 ] Jamie Grier commented on FLINK-10886: - ML discussion: [http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html] > Event time synchronization across sources > - > > Key: FLINK-10886 > URL: https://issues.apache.org/jira/browse/FLINK-10886 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Jamie Grier >Assignee: Jamie Grier >Priority: Major > Original Estimate: 336h > Remaining Estimate: 336h > > When reading from a source with many parallel partitions, especially when > reading lots of historical data (or recovering from downtime and there is a > backlog to read), it's quite common for there to develop an event-time skew > across those partitions. > > When doing event-time windowing -- or in fact any event-time driven > processing -- the event time skew across partitions results directly in > increased buffering in Flink and of course the corresponding state/checkpoint > size growth. > > As the event-time skew and state size grows larger this can have a major > effect on application performance and in some cases result in a "death > spiral" where the application performance get's worse and worse as the state > size grows and grows. > > So, one solution to this problem, outside of core changes in Flink itself, > seems to be to try to coordinate sources across partitions so that they make > progress through event time at roughly the same rate. In fact if there is > large skew the idea would be to slow or even stop reading from some > partitions with newer data while first reading the partitions with older > data. Anyway, to do this we need to share state somehow amongst sub-tasks. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-3617) NPE from CaseClassSerializer when dealing with null Option field
Jamie Grier created FLINK-3617: -- Summary: NPE from CaseClassSerializer when dealing with null Option field Key: FLINK-3617 URL: https://issues.apache.org/jira/browse/FLINK-3617 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.0.0 Reporter: Jamie Grier This error occurs when serializing a Scala case class with an field of Option[] type where the value is not Some or None, but null. If this is not supported we should have a good error message. java.lang.RuntimeException: ConsumerThread threw an exception: null at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:81) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39) at org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:158) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473) Caused by: java.lang.NullPointerException at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:100) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:100) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30) at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:107) at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42) at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:84) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78) ... 3 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3627) Task stuck on lock in StreamSource when cancelling
Jamie Grier created FLINK-3627: -- Summary: Task stuck on lock in StreamSource when cancelling Key: FLINK-3627 URL: https://issues.apache.org/jira/browse/FLINK-3627 Project: Flink Issue Type: Bug Components: Core Reporter: Jamie Grier I've seen this occur a couple of times when the # of network buffers is set too low. The job fails with the an appropriate message indicating that the user should increase the # of network buffers. However, some of the task threads then hang with a stack trace similar to the following. 2016-03-16 13:38:54,017 WARN org.apache.flink.runtime.taskmanager.Task - Task 'Source: EventGenerator -> (Flat Map, blah -> Filter -> Projection -> Flat Map -> Timestamps/Watermarks -> Map) (46/144)' did not react to cancelling signal, but is stuck in method: org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:317) flink.benchmark.generator.LoadGeneratorSource.run(LoadGeneratorSource.java:38) org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input
Jamie Grier created FLINK-3679: -- Summary: DeserializationSchema should handle zero or more outputs for every input Key: FLINK-3679 URL: https://issues.apache.org/jira/browse/FLINK-3679 Project: Flink Issue Type: Bug Components: DataStream API Reporter: Jamie Grier There are a couple of issues with the DeserializationSchema API that I think should be improved. This request has come to me via an existing Flink user. The main issue is simply that the API assumes that there is a one-to-one mapping between input and outputs. In reality there are scenarios where one input message (say from Kafka) might actually map to zero or more logical elements in the pipeline. Particularly important here is the case where you receive a message from a source (such as Kafka) and say the raw bytes don't deserialize properly. Right now the only recourse is to throw IOException and therefore fail the job. This is definitely not good since bad data is a reality and failing the job is not the right option. If the job fails we'll just end up replaying the bad data and the whole thing will start again. Instead in this case it would be best if the user could just return the empty set. The other case is where one input message should logically be multiple output messages. This case is probably less important since there are other ways to do this but in general it might be good to make the DeserializationSchema.deserialize() method return a collection rather than a single element. Maybe we need to support a DeserializationSchema variant that has semantics more like that of FlatMap. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3680) Remove or improve (not set) text in the Job Plan UI
Jamie Grier created FLINK-3680: -- Summary: Remove or improve (not set) text in the Job Plan UI Key: FLINK-3680 URL: https://issues.apache.org/jira/browse/FLINK-3680 Project: Flink Issue Type: Bug Components: Webfrontend Reporter: Jamie Grier When running streaming jobs the UI display (not set) in the UI in a few different places. This is not the case for batch jobs. To illustrate I've included screen shots of the UI for the batch and streaming WordCount examples. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3680) Remove or improve (not set) text in the Job Plan UI
[ https://issues.apache.org/jira/browse/FLINK-3680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jamie Grier updated FLINK-3680: --- Attachment: Screen Shot 2016-03-29 at 8.12.17 PM.png Screen Shot 2016-03-29 at 8.13.12 PM.png > Remove or improve (not set) text in the Job Plan UI > --- > > Key: FLINK-3680 > URL: https://issues.apache.org/jira/browse/FLINK-3680 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Reporter: Jamie Grier > Attachments: Screen Shot 2016-03-29 at 8.12.17 PM.png, Screen Shot > 2016-03-29 at 8.13.12 PM.png > > > When running streaming jobs the UI display (not set) in the UI in a few > different places. This is not the case for batch jobs. > To illustrate I've included screen shots of the UI for the batch and > streaming WordCount examples. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input
[ https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15224692#comment-15224692 ] Jamie Grier commented on FLINK-3679: I'm not sure about the locking and operator chaining issues so I would say if that's unduly complicated because of this change maybe it's not worth it. However, a DeserializationSchema with more flatMap() like semantics would certainly the better API given that bad data issues are a reality. It also seems we could provide this without breaking existing code, but certainly it would add a bit more complexity to the API (having multiple variants for this). Anyway, I agree you can work around this issue my making a special "sentinel" value and dealing with all of this is in a chained flatMap() operator. I imagine that's exactly the approach that people are already using. > DeserializationSchema should handle zero or more outputs for every input > > > Key: FLINK-3679 > URL: https://issues.apache.org/jira/browse/FLINK-3679 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Jamie Grier > > There are a couple of issues with the DeserializationSchema API that I think > should be improved. This request has come to me via an existing Flink user. > The main issue is simply that the API assumes that there is a one-to-one > mapping between input and outputs. In reality there are scenarios where one > input message (say from Kafka) might actually map to zero or more logical > elements in the pipeline. > Particularly important here is the case where you receive a message from a > source (such as Kafka) and say the raw bytes don't deserialize properly. > Right now the only recourse is to throw IOException and therefore fail the > job. > This is definitely not good since bad data is a reality and failing the job > is not the right option. If the job fails we'll just end up replaying the > bad data and the whole thing will start again. > Instead in this case it would be best if the user could just return the empty > set. > The other case is where one input message should logically be multiple output > messages. This case is probably less important since there are other ways to > do this but in general it might be good to make the > DeserializationSchema.deserialize() method return a collection rather than a > single element. > Maybe we need to support a DeserializationSchema variant that has semantics > more like that of FlatMap. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input
[ https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15224692#comment-15224692 ] Jamie Grier edited comment on FLINK-3679 at 4/4/16 6:12 PM: I'm not sure about the locking and operator chaining issues so I would say if that's unduly complicated because of this change maybe it's not worth it. However, a DeserializationSchema with more flatMap() like semantics would certainly be the better API given that bad data issues are a reality. It also seems we could provide this without breaking existing code, but certainly it would add a bit more complexity to the API (having multiple variants for this). Anyway, I agree you can work around this issue my making a special "sentinel" value and dealing with all of this is in a chained flatMap() operator. I imagine that's exactly the approach that people are already using. was (Author: jgrier): I'm not sure about the locking and operator chaining issues so I would say if that's unduly complicated because of this change maybe it's not worth it. However, a DeserializationSchema with more flatMap() like semantics would certainly the better API given that bad data issues are a reality. It also seems we could provide this without breaking existing code, but certainly it would add a bit more complexity to the API (having multiple variants for this). Anyway, I agree you can work around this issue my making a special "sentinel" value and dealing with all of this is in a chained flatMap() operator. I imagine that's exactly the approach that people are already using. > DeserializationSchema should handle zero or more outputs for every input > > > Key: FLINK-3679 > URL: https://issues.apache.org/jira/browse/FLINK-3679 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Jamie Grier > > There are a couple of issues with the DeserializationSchema API that I think > should be improved. This request has come to me via an existing Flink user. > The main issue is simply that the API assumes that there is a one-to-one > mapping between input and outputs. In reality there are scenarios where one > input message (say from Kafka) might actually map to zero or more logical > elements in the pipeline. > Particularly important here is the case where you receive a message from a > source (such as Kafka) and say the raw bytes don't deserialize properly. > Right now the only recourse is to throw IOException and therefore fail the > job. > This is definitely not good since bad data is a reality and failing the job > is not the right option. If the job fails we'll just end up replaying the > bad data and the whole thing will start again. > Instead in this case it would be best if the user could just return the empty > set. > The other case is where one input message should logically be multiple output > messages. This case is probably less important since there are other ways to > do this but in general it might be good to make the > DeserializationSchema.deserialize() method return a collection rather than a > single element. > Maybe we need to support a DeserializationSchema variant that has semantics > more like that of FlatMap. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4947) Make all configuration possible via flink-conf.yaml and CLI.
Jamie Grier created FLINK-4947: -- Summary: Make all configuration possible via flink-conf.yaml and CLI. Key: FLINK-4947 URL: https://issues.apache.org/jira/browse/FLINK-4947 Project: Flink Issue Type: Improvement Components: DataStream API Reporter: Jamie Grier Fix For: 1.2.0 I think it's important to make all configuration possible via the flink-conf.yaml and the command line. As an example: To enable "externalizedCheckpoints" you must actually call the StreamExecutionEnvironment#enableExternalizedCheckpoints() method from your Flink program. Another example of this would be configuring the RocksDB state backend. I think it important to make deployment flexible and easy to build tools around. For example, the infrastructure teams that make these configuration decisions and provide tools for deploying Flink apps, will be different from the teams deploying apps. The team writing apps should not have to set all of this lower level configuration up in their programs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4948) Consider using checksums or similar to detect bad checkpoints
Jamie Grier created FLINK-4948: -- Summary: Consider using checksums or similar to detect bad checkpoints Key: FLINK-4948 URL: https://issues.apache.org/jira/browse/FLINK-4948 Project: Flink Issue Type: Improvement Components: DataStream API Reporter: Jamie Grier Fix For: 1.2.0 We should consider proactively checking to verify that checkpoints are valid when reading (and maybe writing). This should help prevent any possible state corruption issues that might otherwise go undetected. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4948) Consider using checksums or similar to detect bad checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15613064#comment-15613064 ] Jamie Grier commented on FLINK-4948: Makes sense. Maybe a scheme where we can verify that the checkpoint is at least self-consistent -- using only data stored in the checkpoint itself. > Consider using checksums or similar to detect bad checkpoints > - > > Key: FLINK-4948 > URL: https://issues.apache.org/jira/browse/FLINK-4948 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Jamie Grier > Fix For: 1.2.0 > > > We should consider proactively checking to verify that checkpoints are valid > when reading (and maybe writing). This should help prevent any possible > state corruption issues that might otherwise go undetected. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4980) Include example source code in Flink binary distribution
Jamie Grier created FLINK-4980: -- Summary: Include example source code in Flink binary distribution Key: FLINK-4980 URL: https://issues.apache.org/jira/browse/FLINK-4980 Project: Flink Issue Type: Improvement Reporter: Jamie Grier I think we should include the Flink examples source code in the binary distribution of Flink. This would allow people to download Flink and run examples (as now), but also play around with and modify the examples. Right now they would have to actually get the Flink source distribution if they wanted the examples source -- which I think is onerous. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3659) Add ConnectWithBroadcast Operation
[ https://issues.apache.org/jira/browse/FLINK-3659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15626962#comment-15626962 ] Jamie Grier commented on FLINK-3659: I would like to suggest that rather than adding a new API method, connectWithBroadcast(), we just enable this functionality via the current API and check access to state variants at runtime. In other words all of the following will work: {code:java} DataStream stream = new DataStream(); DataStream keyedStream = new DataStream().keyBy("..."); DataStream broadcastStream = new DataStream().broadcast(); stream.connect(stream); stream.connect(keyedStream); stream.connect(broadcastStream); keyedStream.connect(stream); keyedStream.connect(keyedStream); keyedStream.connect(broadcastStream); broadcastStream.connect(stream); broadcastStream.connect(keyedStream); broadcastStream.connect(broadcastStream); {code} ... and based on the actual input types to the LHS and RHS of these connected streams we check at runtime what they can do. For example: {code:java} keyedStream.connect(broadcastStream).flatMap(...) {code} In the above the user can access the keyed and broadcast state from his flatMap1() method (LHS), and can access only broadcast state from his flatMap2() method (RHS). The reason I suggest this is that it keeps the API simpler and more intuitive and there aren't any new APIs to learn -- other than for the new broadcast state access itself. People are already building things exactly this way -- they are just being forced to use Checkpointed to make their state fault-tolerant. This allows the same API as before just with some additional capabilities and this will work with re-scalable state properly. In a future version of Flink (2.0+) maybe we can start to think about @annotation based APIs more like the current Beam approach which I think is very nice. It allows both flexible and dynamic API evolution as well as "static" verification. Anyway maybe in the future we could do something more like this: {code:java} class MyCoFlatMap { // doesn't even need to extend anything @FlatMap(input=KeyedStream) void dataFunc(@BroadcastState("name") String s, @KeyedState("name") Integer i) { … } @FlatMap(input=BroadcastStream) void controlFunc(@BroadcastState("name") String s) { … } } {code} > Add ConnectWithBroadcast Operation > -- > > Key: FLINK-3659 > URL: https://issues.apache.org/jira/browse/FLINK-3659 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > We should add a new operation that has a main input that can be keyed (but > doesn't have to be) and a second input that is always broadcast. This is > similar to a {{CoFlatMap}} or {{CoMap}} but there either both inputs have to > be keyed or non-keyed. > This builds on FLINK-4940 which aims at adding broadcast/global state. When > processing an element from the broadcast input only access to broadcast state > is allowed. When processing an element from the main input access both the > regular keyed state and the broadcast state can be accessed. > I'm proposing this as an intermediate/low-level operation because it will > probably take a while until we add support for side-inputs in the API. This > new operation would allow expressing new patterns that cannot be expressed > with the currently expressed operations. > This is the new proposed API (names are non-final): > 1) Add {{DataStream.connectWithBroadcast(DataStream)}} and > {{KeyedStream.connectWithBroadcast(DataStream)}} > 2) Add {{ConnectedWithBroadcastStream}}, akin to {{ConnectedStreams}}/ > 3) Add {{BroadcastFlatMap}} and {{TimelyBroadcastFlatMap}} as the user > functions. > Sketch of the user function: > {code} > interface BroadcastFlatMapFunction { > public void flatMap(IN in, Collector out); > public void processBroadcastInput(BIN in); > } > {code} > The API names, function names are a bit verbose and we have to add two new > different ones but I don't see a way around this with the current way the > Flink API works. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15629091#comment-15629091 ] Jamie Grier commented on FLINK-4022: I also think this would be a great feature and a few Flink users have asked about this -- both dynamic partition discover within one topic and also dynamic topic discovery. Any progress on this? > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a process pool for the consumer group of the subscribed topics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15632378#comment-15632378 ] Jamie Grier commented on FLINK-4022: Rather than emitting Long.MAX_VALUE for subtasks without partitions or using a global watermark service, can we not just have subtasks without partitions (or in general are not making progress) emit a special value for watermark which means "do not consider this subtask when calculating the current watermark downstream"? The benefit, of course, is that this doesn't require a central service / coordination. Can't this achieve the same thing? When a partition comes on line after this, of course, all of it's data will be considered late -- but basically, what else could you do? > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of to
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15632537#comment-15632537 ] Jamie Grier commented on FLINK-4022: Yes, definitely get input from Stephan and/or Aljoscha. There may be a good reason why this simple solution wont' work. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a process pool for the consumer group of the subscribed topics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15632537#comment-15632537 ] Jamie Grier edited comment on FLINK-4022 at 11/3/16 12:03 PM: -- Yes, definitely get input from Stephan and/or Aljoscha. There may be a good reason why this simple solution won't work. was (Author: jgrier): Yes, definitely get input from Stephan and/or Aljoscha. There may be a good reason why this simple solution wont' work. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a proces
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15632608#comment-15632608 ] Jamie Grier commented on FLINK-4545: Big +1! In general I would love to see this improved. In my experience this is the "one thing" that people run into with Flink, whereas everything else "just works" this one parameter they have to set/tune and it's very confusing to newcomers. The equation to get this right is complex and the "correct" setting changes based on how they deploy the job, what parallelism they use, how many TMs, etc, etc. It also often happens that things are working and then a user changes their job a bit (adding a keyBy for instance) and then it stops working at they have a hard time understanding why. Is there a way we can set this parameter automatically in a majority of use cases? If folks are running single jobs directly on YARN for instance it seems we should have all the information necessary to set this parameter auto-magically or at least fail-fast and tell the the user what the parameter should be set to. > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction
[ https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15636266#comment-15636266 ] Jamie Grier commented on FLINK-5012: Definitely +1 to the Context parameter I've always thought there should be a a way to get the timestamp of the current element in any Function. Should we just add this to the RuntimeContext? Is there a good reason to not do this? > Provide Timestamp in TimelyFlatMapFunction > -- > > Key: FLINK-5012 > URL: https://issues.apache.org/jira/browse/FLINK-5012 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek > > Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the > element in {{flatMap()}}. > The signature is currently this: > {code} > void flatMap(I value, TimerService timerService, Collector out) throws > Exception; > {code} > if we add the timestamp it would become this: > {code} > void flatMap(I value, Long timestamp, TimerService timerService, Collector > out) throws Exception; > {code} > The reason why it's a {{Long}} and not a {{long}} is that an element might > not have a timestamp, in that case we should hand in {{null}} here. > This is becoming quite look so we could add a {{Context}} parameter that > provides access to the timestamp and timer service. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction
[ https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15636428#comment-15636428 ] Jamie Grier commented on FLINK-5012: Okay, makes sense about RuntimeContext.. I also like your "ideal" solution best -- or maybe: {code:java} void flatMap(I value, Context ctx) throws Exception; interface Context { Long timestamp(); TimerService timerService(); Collector collector(); } {code} > Provide Timestamp in TimelyFlatMapFunction > -- > > Key: FLINK-5012 > URL: https://issues.apache.org/jira/browse/FLINK-5012 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek > > Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the > element in {{flatMap()}}. > The signature is currently this: > {code} > void flatMap(I value, TimerService timerService, Collector out) throws > Exception; > {code} > if we add the timestamp it would become this: > {code} > void flatMap(I value, Long timestamp, TimerService timerService, Collector > out) throws Exception; > {code} > The reason why it's a {{Long}} and not a {{long}} is that an element might > not have a timestamp, in that case we should hand in {{null}} here. > This is becoming quite look so we could add a {{Context}} parameter that > provides access to the timestamp and timer service. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction
[ https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15636428#comment-15636428 ] Jamie Grier edited comment on FLINK-5012 at 11/4/16 2:05 PM: - Okay, makes sense about RuntimeContext.. I also like your "ideal" solution best -- or maybe: {code:java} void flatMap(I value, Context ctx) throws Exception; interface Context { Long timestamp(); TimerService timerService(); Collector collector(); } {code} Maybe the above is "close enough" to what people are used to since it still uses the Collector interface. was (Author: jgrier): Okay, makes sense about RuntimeContext.. I also like your "ideal" solution best -- or maybe: {code:java} void flatMap(I value, Context ctx) throws Exception; interface Context { Long timestamp(); TimerService timerService(); Collector collector(); } {code} > Provide Timestamp in TimelyFlatMapFunction > -- > > Key: FLINK-5012 > URL: https://issues.apache.org/jira/browse/FLINK-5012 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek > > Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the > element in {{flatMap()}}. > The signature is currently this: > {code} > void flatMap(I value, TimerService timerService, Collector out) throws > Exception; > {code} > if we add the timestamp it would become this: > {code} > void flatMap(I value, Long timestamp, TimerService timerService, Collector > out) throws Exception; > {code} > The reason why it's a {{Long}} and not a {{long}} is that an element might > not have a timestamp, in that case we should hand in {{null}} here. > This is becoming quite look so we could add a {{Context}} parameter that > provides access to the timestamp and timer service. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5026) Rename TimelyFlatMap to Process
[ https://issues.apache.org/jira/browse/FLINK-5026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15644807#comment-15644807 ] Jamie Grier commented on FLINK-5026: +1 - I agree that TimelyFlatMap is a cumbersome name. > Rename TimelyFlatMap to Process > --- > > Key: FLINK-5026 > URL: https://issues.apache.org/jira/browse/FLINK-5026 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.2.0 > > > The method on {{KeyedDataStream}} would be called {{process()}} and the > function itself would be called {{ProcessFunction}}. > The reason for this is that {{TimelyFlatMapFunction}} is a bit of a mouthful > and with the additions to the timer API and state the {{ProcessFunction}} > could become the basic, low-level, user-facing API for cases where users > nowadays implement their own operator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5026) Rename TimelyFlatMap to Process
[ https://issues.apache.org/jira/browse/FLINK-5026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15644931#comment-15644931 ] Jamie Grier commented on FLINK-5026: Another option here would be apply(), rather than process(). Whatever the name it should imply that this is sort of the basic building block in Flink. > Rename TimelyFlatMap to Process > --- > > Key: FLINK-5026 > URL: https://issues.apache.org/jira/browse/FLINK-5026 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.2.0 > > > The method on {{KeyedDataStream}} would be called {{process()}} and the > function itself would be called {{ProcessFunction}}. > The reason for this is that {{TimelyFlatMapFunction}} is a bit of a mouthful > and with the additions to the timer API and state the {{ProcessFunction}} > could become the basic, low-level, user-facing API for cases where users > nowadays implement their own operator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4992) Expose String parameter for timers in Timely functions and TimerService
[ https://issues.apache.org/jira/browse/FLINK-4992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15730286#comment-15730286 ] Jamie Grier commented on FLINK-4992: Another use case here is just simply attaching whatever data you need to properly handle the callback -- it might even be the element that you were processing when you registered the timer. Without this you are forced to implement some sort of buffering of data yourself. > Expose String parameter for timers in Timely functions and TimerService > --- > > Key: FLINK-4992 > URL: https://issues.apache.org/jira/browse/FLINK-4992 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.2.0 >Reporter: Gyula Fora >Priority: Minor > > Currently it is very hard to register and execute multiple different types > timers from the same user function because timers don't carry any metadata. > We propose to extend the timer registration and onTimer logic by attaching a > String argument so users of these features can implement functionality that > depends on this addtitional metadata. > The proposed new methods: > In the TimerService: > void registerProcessingTimeTimer(long time, String label); > void registerEventTimeTimer(long time, String label); > In the TimelyFunctions: > void onTimer(long timestamp, String label, TimeDomain timeDomain, > TimerService timerService...); > This extended functionality can be mapped to a String namespace for the > internal timer service. I suggest we don't use the term "namespace" here > because it just complicates things for the users, I think "label" or "id" or > "name" is much simpler to understand. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3710) ScalaDocs for org.apache.flink.streaming.scala are missing from the web site
[ https://issues.apache.org/jira/browse/FLINK-3710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15798921#comment-15798921 ] Jamie Grier commented on FLINK-3710: Hi all, I've been asked about these incomplete ScalaDocs by several users and I advocate that we just remove the ScalaDocs links from the Flink website until this is resolved. People look at the ScalaDocs and get confused and think that's all the available Scala API documentation. > ScalaDocs for org.apache.flink.streaming.scala are missing from the web site > > > Key: FLINK-3710 > URL: https://issues.apache.org/jira/browse/FLINK-3710 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.0.1 >Reporter: Elias Levy > Fix For: 1.0.4 > > > The ScalaDocs only include docs for org.apache.flink.scala and sub-packages. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.
[ https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15149099#comment-15149099 ] Jamie Grier commented on FLINK-1502: [~eastcirclek] You shouldn't need to do this with counters. Typically you just want to report the value of the counter as is to the metrics system. The metrics system (e.g. Graphite or Ganglia) should have built-in tools for turning counters into other types of graphs. For example, what you really want here is a "rate", how many GC invocations per second for example (1st derivative of counter). Ganglia and any decent metrics tools should already have this function builtin. I think we should just report the raw counters. > Expose metrics to graphite, ganglia and JMX. > > > Key: FLINK-1502 > URL: https://issues.apache.org/jira/browse/FLINK-1502 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Dongwon Kim >Priority: Minor > Fix For: pre-apache > > > The metrics library allows to expose collected metrics easily to other > systems such as graphite, ganglia or Java's JVM (VisualVM). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.
[ https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15149109#comment-15149109 ] Jamie Grier commented on FLINK-1502: Is there no way to refer to a TaskManager by index in order to solve this problem. It would be nice if we didn't have to send all the metrics through the JobManager but rather just report them via JMX locally on each host. I think I understand the problem you are describing but would just having a logical index for each TaskManager solve this problem. I would like to avoid having to send the metrics through a central node if possible as I would like to see the # of total metrics go up dramatically as we instrument the code more and more. > Expose metrics to graphite, ganglia and JMX. > > > Key: FLINK-1502 > URL: https://issues.apache.org/jira/browse/FLINK-1502 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Dongwon Kim >Priority: Minor > Fix For: pre-apache > > > The metrics library allows to expose collected metrics easily to other > systems such as graphite, ganglia or Java's JVM (VisualVM). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.
[ https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15149109#comment-15149109 ] Jamie Grier edited comment on FLINK-1502 at 2/16/16 7:09 PM: - Is there no way to refer to a TaskManager by index in order to solve this problem? It would be nice if we didn't have to send all the metrics through the JobManager but rather just report them via JMX locally on each host. I think I understand the problem you are describing but wouldn't just having a logical index for each TaskManager solve this problem. I would like to avoid having to send the metrics through a central node if possible as I would like to see the # of total metrics go up dramatically as we instrument the code more and more and give users more insight into how Flink is running. Maybe we can collaborate on this. I want a general way to instrument both Flink code and user code and make those metrics available easily via JMX at a minimum and maybe directly in Graphite and Ganglia. Once available in JMX there are many tools to integrate with other metrics and alerting systems. was (Author: jgrier): Is there no way to refer to a TaskManager by index in order to solve this problem. It would be nice if we didn't have to send all the metrics through the JobManager but rather just report them via JMX locally on each host. I think I understand the problem you are describing but would just having a logical index for each TaskManager solve this problem. I would like to avoid having to send the metrics through a central node if possible as I would like to see the # of total metrics go up dramatically as we instrument the code more and more. > Expose metrics to graphite, ganglia and JMX. > > > Key: FLINK-1502 > URL: https://issues.apache.org/jira/browse/FLINK-1502 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Dongwon Kim >Priority: Minor > Fix For: pre-apache > > > The metrics library allows to expose collected metrics easily to other > systems such as graphite, ganglia or Java's JVM (VisualVM). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.
[ https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15151004#comment-15151004 ] Jamie Grier commented on FLINK-1502: I understand [~eastcirclek]'s points about using the InstanceID. This is a unique ID that is automatically generated (I believe). As such if you use it to namespace the metrics you will see new metrics names whenever new TaskMangers are created. Overtime this means the total # of metrics will grow and grow. From my experience it would be better to have a "logical" ID for each TaskManager in the cluster. Literally like (1, 2, 3, 4, etc) and use this value to namespace the metrics. This will provide better continuity over time as TaskManagers come up and down. However, I don't know if this concept actually exists inside Flink at the moment. Does it? I would suggest we use logical ids/indexes for TaskManager level metrics, as well as task level metrics, etc, as opposed to UUIDs. So rather than: taskmanager..gc_time taskmanager..gc_time and task..flatMap.messagesReceived task..flatMap.messagesReceived I would suggest something like cluster..taskmanager.1.gc_time cluster..taskmanager.2.gc_time and cluster..task.1.flatMap.messagesReceived cluster..task.2.flatMap.messagesReceived I hope that makes sense. The main point is to use Logical ID's wherever possible, especially for things that change otherwise there will be a lack of continuity in the metrics. Also I don't know that we actually have the CLUSTER_NAME concept right now either but we might need this. This would be unique for any given YarnSession if running on YARN for example. Basically we just need some way to group a set of TaskManagers uniquely. I guess this could also be done by using the UUID of the JobManager. Comments? > Expose metrics to graphite, ganglia and JMX. > > > Key: FLINK-1502 > URL: https://issues.apache.org/jira/browse/FLINK-1502 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Dongwon Kim >Priority: Minor > Fix For: pre-apache > > > The metrics library allows to expose collected metrics easily to other > systems such as graphite, ganglia or Java's JVM (VisualVM). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3439) Remove final Long.MAX_VALUE Watermark in StreamSource
[ https://issues.apache.org/jira/browse/FLINK-3439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15152769#comment-15152769 ] Jamie Grier commented on FLINK-3439: [~aljoscha] Just a quick question on this. In the description you said: "At one point we decided to not emit in-flight windows when closing the topology or sources." This is still the case, correct? > Remove final Long.MAX_VALUE Watermark in StreamSource > - > > Key: FLINK-3439 > URL: https://issues.apache.org/jira/browse/FLINK-3439 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > I think this was an oversight. At one point we decided to not emit in-flight > windows when closing the topology or sources. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.
[ https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15152873#comment-15152873 ] Jamie Grier commented on FLINK-1502: [~eastcirclek] Yes, I believe it does. It's implicit in the metric type that get's reported to Ganglia. I believe what we want is Slope.POSITIVE for counters. I imagine the Dropwizard metrics library would already do this correctly for metrics with type Counter (as opposed to gauge) -- but maybe not. See here: http://codeblog.majakorpi.net/post/16281432462/ganglia-xml-slope-attribute Also, is there no query language in Ganglia when building a graph that allows you graph the rate of change rather than the actual metric? I'm not too familiar with Ganglia. > Expose metrics to graphite, ganglia and JMX. > > > Key: FLINK-1502 > URL: https://issues.apache.org/jira/browse/FLINK-1502 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Dongwon Kim >Priority: Minor > Fix For: pre-apache > > > The metrics library allows to expose collected metrics easily to other > systems such as graphite, ganglia or Java's JVM (VisualVM). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.
[ https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15152895#comment-15152895 ] Jamie Grier commented on FLINK-1502: I'm suggesting that we use Dropwizard Metrics library and expose those metrics via JMX at a minimum/default, but also via optional configuration we could let user's report metrics via any of the Metrics library's available metrics Reporter classes. Ganglia and Graphite are both supported via the built-in GangliaReporter and GraphitesReporter, but there are integrations with other systems as well. Of particular interest to people running in production would be StatsD, Librato, InfluxDB, etc. https://dropwizard.github.io/metrics/3.1.0/manual/third-party/ What I'm suggesting is that we should expose the ability for people to choose/configure which Reporters to use, but we should default to JMX. Many 3rd party tools will be able to consume/route these metrics if they're available via JMX so that should be the default. > Expose metrics to graphite, ganglia and JMX. > > > Key: FLINK-1502 > URL: https://issues.apache.org/jira/browse/FLINK-1502 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Dongwon Kim >Priority: Minor > Fix For: pre-apache > > > The metrics library allows to expose collected metrics easily to other > systems such as graphite, ganglia or Java's JVM (VisualVM). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.
[ https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15152917#comment-15152917 ] Jamie Grier commented on FLINK-1502: To be clear what I meant here is to have the indexes assigned to the TaskManagers scoped to the *entire* cluster. Not a particular host like what you're describing here. So, for example, if you spun up a Flink cluster with 10 TaskManagers running on 10 different hosts the TaskManager's would be given a unique INDEX on the cluster. Literally, TaskManager[1-10]. Use this to scope the metrics, e.g.: cluster.MyCluster.taskmanager.1.gc_time cluster.MyCluster.taskmanager.2.gc_time ... ... cluster.MyCluster.taskmanager.10.gc_time It doesn't matter which hosts they are on. These are 10 unique JVMS on some set of hosts. > Expose metrics to graphite, ganglia and JMX. > > > Key: FLINK-1502 > URL: https://issues.apache.org/jira/browse/FLINK-1502 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Dongwon Kim >Priority: Minor > Fix For: pre-apache > > > The metrics library allows to expose collected metrics easily to other > systems such as graphite, ganglia or Java's JVM (VisualVM). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.
[ https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15152917#comment-15152917 ] Jamie Grier edited comment on FLINK-1502 at 2/18/16 7:21 PM: - To be clear what I meant here is to have the indexes assigned to the TaskManagers scoped to the *entire* cluster. Not a particular host like what you're describing here. So, for example, if you spun up a Flink cluster with 10 TaskManagers running on 10 different hosts the TaskManager's would be given a unique *index* on the *cluster*. Literally, TaskManager[1-10]. Use this to scope the metrics, e.g.: cluster.MyCluster.taskmanager.1.gc_time cluster.MyCluster.taskmanager.2.gc_time ... ... cluster.MyCluster.taskmanager.10.gc_time It doesn't matter which hosts they are on. These are 10 unique JVMS on some set of hosts. was (Author: jgrier): To be clear what I meant here is to have the indexes assigned to the TaskManagers scoped to the *entire* cluster. Not a particular host like what you're describing here. So, for example, if you spun up a Flink cluster with 10 TaskManagers running on 10 different hosts the TaskManager's would be given a unique INDEX on the cluster. Literally, TaskManager[1-10]. Use this to scope the metrics, e.g.: cluster.MyCluster.taskmanager.1.gc_time cluster.MyCluster.taskmanager.2.gc_time ... ... cluster.MyCluster.taskmanager.10.gc_time It doesn't matter which hosts they are on. These are 10 unique JVMS on some set of hosts. > Expose metrics to graphite, ganglia and JMX. > > > Key: FLINK-1502 > URL: https://issues.apache.org/jira/browse/FLINK-1502 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Dongwon Kim >Priority: Minor > Fix For: pre-apache > > > The metrics library allows to expose collected metrics easily to other > systems such as graphite, ganglia or Java's JVM (VisualVM). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.
[ https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15158090#comment-15158090 ] Jamie Grier commented on FLINK-1502: [~eastcirclek] Let's define our terms to make sure we're talking about the same thing. *Session*: A single instance of a Job Manager and some # of TaskManagers working together. A session can be created "on-the-fly" for a single job or it can be a long-running thing. Multiple jobs can start, run, and finish in the same session. Think of the "yarn-session.sh" command. This creates a session outside of any particular job. This is also what I've meant when I've said "cluster". A Yarn session is a "cluster" that we've spun up for some length of time on Yarn. Another example of a cluster would be a standalone install of Flink on some # of machines. *Job*: A single batch or streaming job that runs on a Flink cluster. In the above scenario, and if your definition of sessions is in agreement with mine. You would instead have the following. Note that I've named the cluster according to the "session" name you've given, because in this case each session is really a different (ad-hoc) cluster. When you run a job directly using just "flink run -ytm ..." on YARN you are spinning up an ad-hoc cluster for your job. After Session 1 is finished, Node 1 would have the following metrics: - cluster.session1.taskmanager.1.gc_time After session 2 is finshed, Node 1 would have the following metrics: - cluster.session1.taskmanager.1.gc_time - cluster.session2.taskmanager.2.gc_time - cluster.session3.taskmanager.3.gc_time There are many metrics in this case because that's exactly what you want. These are JVM scope metrics we are talking about and those are 3 different JVMS, not the same one so it makes total sense for them to have these different names/scopes. These metrics have nothing to do with each other and it doesn't matter which host they are from. They are scoped to the cluster (or session) and logical TaskManager index, not the host. The above should not be confused with any host level metrics we want to report. Host level metrics would be scoped simply by the hostname so they wouldn't grow either. One more example, hopefully to clarify. Let's say I spun up a long-running cluster (or session) using yarn-session.sh -tm 3. Now we have a Flink cluster running on YARN with no jobs running and three TaskManagers. We then run three different jobs one after another on this cluster. The metrics would still simply be: - cluster.yarn-session.taskmanager.1.gc_time - cluster.yarn-session.taskmanager.2.gc_time - cluster.yarn-session.taskmanager.3.gc_time No matter how many jobs you ran this list would not grow, which is natural because there have only been 3 TaskManagers. Now if one of these TaskManagers were to fail and be restarted it would assume the same name -- that's the point of using "logical" indexes so the set of metrics name in that case still would not be larger than the above. In the initial case you describe above if you didn't want lot's of different metrics over time you could also just give all of your sessions the same name. You're metrics are growing because you're spinning up many different clusters (sessions) over time with different names each time. If you used the same name for the cluster (session) every time this metrics namespace growth would not occur. I hope any of that made sense ;) This is getting a bit hard to describe this way. We could also sync via Hangouts or something if that is easier. > Expose metrics to graphite, ganglia and JMX. > > > Key: FLINK-1502 > URL: https://issues.apache.org/jira/browse/FLINK-1502 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Dongwon Kim >Priority: Minor > Fix For: pre-apache > > > The metrics library allows to expose collected metrics easily to other > systems such as graphite, ganglia or Java's JVM (VisualVM). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.
[ https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15158090#comment-15158090 ] Jamie Grier edited comment on FLINK-1502 at 2/23/16 1:30 AM: - [~eastcirclek] Let's define our terms to make sure we're talking about the same thing. *Session*: A single instance of a Job Manager and some # of TaskManagers working together. A session can be created "on-the-fly" for a single job or it can be a long-running thing. Multiple jobs can start, run, and finish in the same session. Think of the "yarn-session.sh" command. This creates a session outside of any particular job. This is also what I've meant when I've said "cluster". A Yarn session is a "cluster" that we've spun up for some length of time on Yarn. Another example of a cluster would be a standalone install of Flink on some # of machines. *Job*: A single batch or streaming job that runs on a Flink cluster. In the above scenario, and if your definition of sessions is in agreement with mine. You would instead have the following. Note that I've named the cluster according to the "session" name you've given, because in this case each session is really a different (ad-hoc) cluster. When you run a job directly using just "flink run -ytm ..." on YARN you are spinning up an ad-hoc cluster for your job. After Session 1 is finished, Node 1 would have the following metrics: - cluster.session1.taskmanager.1.gc_time After session 2 is finshed, Node 1 would have the following metrics: - cluster.session1.taskmanager.1.gc_time - cluster.session2.taskmanager.2.gc_time - cluster.session3.taskmanager.3.gc_time There are many metrics in this case because that's exactly what you want. These are JVM scope metrics we are talking about and those are 3 different JVMS, not the same one so it makes total sense for them to have these different names/scopes. These metrics have nothing to do with each other and it doesn't matter which host they are from. They are scoped to the cluster (or session) and logical TaskManager index, not the host. The above should not be confused with any host level metrics we want to report. Host level metrics would be scoped simply by the hostname so they wouldn't grow either. One more example, hopefully to clarify. Let's say I spun up a long-running cluster (or session) using yarn-session.sh -tm 3. Now we have a Flink cluster running on YARN with no jobs running and three TaskManagers. We then run three different jobs one after another on this cluster. The metrics would still simply be: - cluster.yarn-session.taskmanager.1.gc_time - cluster.yarn-session.taskmanager.2.gc_time - cluster.yarn-session.taskmanager.3.gc_time No matter how many jobs you ran this list would not grow, which is natural because there have only been 3 TaskManagers. Now if one of these TaskManagers were to fail and be restarted it would assume the same name -- that's the point of using "logical" indexes so the set of metrics name in that case still would not be larger than the above. In the initial case you describe above if you didn't want lot's of different metrics over time you could also just give all of your sessions the same name. Your metrics are growing because you're spinning up many different clusters (sessions) over time with different names each time. If you used the same name for the cluster (session) every time this metrics namespace growth would not occur. I hope any of that made sense ;) This is getting a bit hard to describe this way. We could also sync via Hangouts or something if that is easier. was (Author: jgrier): [~eastcirclek] Let's define our terms to make sure we're talking about the same thing. *Session*: A single instance of a Job Manager and some # of TaskManagers working together. A session can be created "on-the-fly" for a single job or it can be a long-running thing. Multiple jobs can start, run, and finish in the same session. Think of the "yarn-session.sh" command. This creates a session outside of any particular job. This is also what I've meant when I've said "cluster". A Yarn session is a "cluster" that we've spun up for some length of time on Yarn. Another example of a cluster would be a standalone install of Flink on some # of machines. *Job*: A single batch or streaming job that runs on a Flink cluster. In the above scenario, and if your definition of sessions is in agreement with mine. You would instead have the following. Note that I've named the cluster according to the "session" name you've given, because in this case each session is really a different (ad-hoc) cluster. When you run a job directly using just "flink run -ytm ..." on YARN you are spinning up an ad-hoc cluster for your job. After Session 1 is finished, Node 1 would have the following metrics: - cluster.session1.taskmanager.1.
[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16505461#comment-16505461 ] Jamie Grier commented on FLINK-9061: [~neoeahit] This will affect all versions. > add entropy to s3 path for better scalability > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jamie Grier >Assignee: Indrajit Roychoudhury >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16530389#comment-16530389 ] Jamie Grier commented on FLINK-9061: [~ind_rc] Initial changes look good. Are you going to try to get this into 1.6? > add entropy to s3 path for better scalability > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jamie Grier >Assignee: Indrajit Roychoudhury >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9691) Modify run loop in Kinesis ShardConsumer to not sleep for a fixed fetchIntervalMillis
[ https://issues.apache.org/jira/browse/FLINK-9691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jamie Grier reassigned FLINK-9691: -- Assignee: Jamie Grier > Modify run loop in Kinesis ShardConsumer to not sleep for a fixed > fetchIntervalMillis > - > > Key: FLINK-9691 > URL: https://issues.apache.org/jira/browse/FLINK-9691 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Lakshmi Rao >Assignee: Jamie Grier >Priority: Major > > Currently the ShardConsumer in the Kinesis connector sleeps for a fixed > [fetchIntervalMillis|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L210] > resulting in the shard consumer sleeping for more time than necessary and > not optimally reading from Kinesis. It should only be sleeping for > (fetchIntervalMillis - time taken to process records) before making the > subsequent getRecords call. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9691) Modify run loop in Kinesis ShardConsumer to not sleep for a fixed fetchIntervalMillis
[ https://issues.apache.org/jira/browse/FLINK-9691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jamie Grier updated FLINK-9691: --- Affects Version/s: 1.5.0 1.4.2 > Modify run loop in Kinesis ShardConsumer to not sleep for a fixed > fetchIntervalMillis > - > > Key: FLINK-9691 > URL: https://issues.apache.org/jira/browse/FLINK-9691 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.5.0, 1.4.2 >Reporter: Lakshmi Rao >Assignee: Jamie Grier >Priority: Major > > Currently the ShardConsumer in the Kinesis connector sleeps for a fixed > [fetchIntervalMillis|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L210] > resulting in the shard consumer sleeping for more time than necessary and > not optimally reading from Kinesis. It should only be sleeping for > (fetchIntervalMillis - time taken to process records) before making the > subsequent getRecords call. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9691) Modify run loop in Kinesis ShardConsumer to not sleep for a fixed fetchIntervalMillis
[ https://issues.apache.org/jira/browse/FLINK-9691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16537834#comment-16537834 ] Jamie Grier commented on FLINK-9691: https://github.com/apache/flink/pull/6290 > Modify run loop in Kinesis ShardConsumer to not sleep for a fixed > fetchIntervalMillis > - > > Key: FLINK-9691 > URL: https://issues.apache.org/jira/browse/FLINK-9691 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.5.0, 1.4.2 >Reporter: Lakshmi Rao >Assignee: Jamie Grier >Priority: Major > > Currently the ShardConsumer in the Kinesis connector sleeps for a fixed > [fetchIntervalMillis|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L210] > resulting in the shard consumer sleeping for more time than necessary and > not optimally reading from Kinesis. It should only be sleeping for > (fetchIntervalMillis - time taken to process records) before making the > subsequent getRecords call. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10484) New latency tracking metrics format causes metrics cardinality explosion
Jamie Grier created FLINK-10484: --- Summary: New latency tracking metrics format causes metrics cardinality explosion Key: FLINK-10484 URL: https://issues.apache.org/jira/browse/FLINK-10484 Project: Flink Issue Type: Bug Components: Metrics Affects Versions: 1.5.4, 1.6.1, 1.6.0 Reporter: Jamie Grier Assignee: Jamie Grier The new metrics format for latency tracking causes huge metrics cardinality explosion due to the format and the fact that there is a metric reported for a every combination of source subtask index and operator subtask index. Yikes! This format is actually responsible for basically taking down our metrics system due to DDOSing our metrics servers (at Lyft). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10484) New latency tracking metrics format causes metrics cardinality explosion
[ https://issues.apache.org/jira/browse/FLINK-10484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16637332#comment-16637332 ] Jamie Grier commented on FLINK-10484: - [~Zentol] Great. I didn't see that this had already been addressed in 1.7. What do you think about the difficulty of backporting to 1.5 and 1.6? Currently, it's a pretty big problem for people trying to run Flink at any reasonable scale – and since latency tracking is on by default basically everything breaks as soon as you upgrade a job from 1.4 to 1.5. Also, latency tracking is something that has to be disabled from application code rather than in the flink-conf.yaml file so it's very hard for infra teams supporting Flink to enforce. It's also not just a problem for Flink JM – but in our case we actually caused an observability incident company wide just because of the sheer volume of metrics being thrown at our metrics servers. > New latency tracking metrics format causes metrics cardinality explosion > > > Key: FLINK-10484 > URL: https://issues.apache.org/jira/browse/FLINK-10484 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.6.0, 1.6.1, 1.5.4 >Reporter: Jamie Grier >Assignee: Jamie Grier >Priority: Critical > > The new metrics format for latency tracking causes huge metrics cardinality > explosion due to the format and the fact that there is a metric reported for > a every combination of source subtask index and operator subtask index. > Yikes! > This format is actually responsible for basically taking down our metrics > system due to DDOSing our metrics servers (at Lyft). > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10484) New latency tracking metrics format causes metrics cardinality explosion
[ https://issues.apache.org/jira/browse/FLINK-10484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16638756#comment-16638756 ] Jamie Grier commented on FLINK-10484: - Cool. We should definitely backport [FLINK-10242] as well then. Would you like to do this [~Zentol] or should I do it? > New latency tracking metrics format causes metrics cardinality explosion > > > Key: FLINK-10484 > URL: https://issues.apache.org/jira/browse/FLINK-10484 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.6.0, 1.6.1, 1.5.4 >Reporter: Jamie Grier >Assignee: Jamie Grier >Priority: Critical > > The new metrics format for latency tracking causes huge metrics cardinality > explosion due to the format and the fact that there is a metric reported for > a every combination of source subtask index and operator subtask index. > Yikes! > This format is actually responsible for basically taking down our metrics > system due to DDOSing our metrics servers (at Lyft). > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10484) New latency tracking metrics format causes metrics cardinality explosion
[ https://issues.apache.org/jira/browse/FLINK-10484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16638756#comment-16638756 ] Jamie Grier edited comment on FLINK-10484 at 10/4/18 7:46 PM: -- Cool. We should definitely backport FLINK-10242 as well then. Are you going to do this [~Zentol] or should I take a crack at it ? was (Author: jgrier): Cool. We should definitely backport [FLINK-10242] as well then. Would you like to do this [~Zentol] or should I do it? > New latency tracking metrics format causes metrics cardinality explosion > > > Key: FLINK-10484 > URL: https://issues.apache.org/jira/browse/FLINK-10484 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.6.0, 1.6.1, 1.5.4 >Reporter: Jamie Grier >Assignee: Jamie Grier >Priority: Critical > > The new metrics format for latency tracking causes huge metrics cardinality > explosion due to the format and the fact that there is a metric reported for > a every combination of source subtask index and operator subtask index. > Yikes! > This format is actually responsible for basically taking down our metrics > system due to DDOSing our metrics servers (at Lyft). > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10154) Make sure we always read at least one record in KinesisConnector
Jamie Grier created FLINK-10154: --- Summary: Make sure we always read at least one record in KinesisConnector Key: FLINK-10154 URL: https://issues.apache.org/jira/browse/FLINK-10154 Project: Flink Issue Type: Bug Components: Kinesis Connector Affects Versions: 1.6.0 Reporter: Jamie Grier Assignee: Jamie Grier It's possible in some cases to request zero records from Kinesis in the Kinesis connector. This can happen when the "adpative reads" feature is enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10154) Make sure we always read at least one record in KinesisConnector
[ https://issues.apache.org/jira/browse/FLINK-10154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16581414#comment-16581414 ] Jamie Grier commented on FLINK-10154: - PR: [https://github.com/apache/flink/pull/6564] > Make sure we always read at least one record in KinesisConnector > > > Key: FLINK-10154 > URL: https://issues.apache.org/jira/browse/FLINK-10154 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.6.0 >Reporter: Jamie Grier >Assignee: Jamie Grier >Priority: Minor > Labels: pull-request-available > > It's possible in some cases to request zero records from Kinesis in the > Kinesis connector. This can happen when the "adpative reads" feature is > enabled. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (FLINK-10154) Make sure we always read at least one record in KinesisConnector
[ https://issues.apache.org/jira/browse/FLINK-10154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jamie Grier updated FLINK-10154: Comment: was deleted (was: PR: [https://github.com/apache/flink/pull/6564] ) > Make sure we always read at least one record in KinesisConnector > > > Key: FLINK-10154 > URL: https://issues.apache.org/jira/browse/FLINK-10154 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.6.0 >Reporter: Jamie Grier >Assignee: Jamie Grier >Priority: Minor > Labels: pull-request-available > > It's possible in some cases to request zero records from Kinesis in the > Kinesis connector. This can happen when the "adpative reads" feature is > enabled. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11984) StreamingFileSink docs do not mention S3 savepoint caveats.
[ https://issues.apache.org/jira/browse/FLINK-11984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16797404#comment-16797404 ] Jamie Grier commented on FLINK-11984: - [~kkl0u] What are the S3 savepoint caveats? > StreamingFileSink docs do not mention S3 savepoint caveats. > --- > > Key: FLINK-11984 > URL: https://issues.apache.org/jira/browse/FLINK-11984 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem, Documentation >Affects Versions: 1.7.2 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (FLINK-11984) StreamingFileSink docs do not mention S3 savepoint caveats.
[ https://issues.apache.org/jira/browse/FLINK-11984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jamie Grier updated FLINK-11984: Comment: was deleted (was: [~kkl0u] What are the S3 savepoint caveats?) > StreamingFileSink docs do not mention S3 savepoint caveats. > --- > > Key: FLINK-11984 > URL: https://issues.apache.org/jira/browse/FLINK-11984 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem, Documentation >Affects Versions: 1.7.2 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10484) New latency tracking metrics format causes metrics cardinality explosion
[ https://issues.apache.org/jira/browse/FLINK-10484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jamie Grier closed FLINK-10484. --- Resolution: Duplicate > New latency tracking metrics format causes metrics cardinality explosion > > > Key: FLINK-10484 > URL: https://issues.apache.org/jira/browse/FLINK-10484 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics >Affects Versions: 1.5.4, 1.6.0, 1.6.1 >Reporter: Jamie Grier >Assignee: Jamie Grier >Priority: Critical > > The new metrics format for latency tracking causes huge metrics cardinality > explosion due to the format and the fact that there is a metric reported for > a every combination of source subtask index and operator subtask index. > Yikes! > This format is actually responsible for basically taking down our metrics > system due to DDOSing our metrics servers (at Lyft). > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11617) CLONE - Handle AmazonKinesisException gracefully in Kinesis Streaming Connector
Jamie Grier created FLINK-11617: --- Summary: CLONE - Handle AmazonKinesisException gracefully in Kinesis Streaming Connector Key: FLINK-11617 URL: https://issues.apache.org/jira/browse/FLINK-11617 Project: Flink Issue Type: Improvement Components: Kinesis Connector Reporter: Jamie Grier Assignee: Scott Kidder My Flink job that consumes from a Kinesis stream must be restarted at least once daily due to an uncaught AmazonKinesisException when reading from Kinesis. The complete stacktrace looks like: {noformat} com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: dc1b7a1a-1b97-1a32-8cd5-79a896a55223) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601) at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447) at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747) at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723) at com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {noformat} It's interesting that the Kinesis endpoint returned a 500 status code, but that's outside the scope of this issue. I think we can handle this exception in the same manner as a ProvisionedThroughputException: performing an exponential backoff and retrying a finite number of times before throwing an exception. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11617) Kinesis Connector getRecords() failure logging is misleading
[ https://issues.apache.org/jira/browse/FLINK-11617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jamie Grier reassigned FLINK-11617: --- Assignee: Jamie Grier (was: Scott Kidder) Affects Version/s: 1.5.6 1.6.3 1.7.1 Summary: Kinesis Connector getRecords() failure logging is misleading (was: CLONE - Handle AmazonKinesisException gracefully in Kinesis Streaming Connector) > Kinesis Connector getRecords() failure logging is misleading > > > Key: FLINK-11617 > URL: https://issues.apache.org/jira/browse/FLINK-11617 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.5.6, 1.6.3, 1.7.1 >Reporter: Jamie Grier >Assignee: Jamie Grier >Priority: Major > > My Flink job that consumes from a Kinesis stream must be restarted at least > once daily due to an uncaught AmazonKinesisException when reading from > Kinesis. The complete stacktrace looks like: > {noformat} > com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: > AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: > dc1b7a1a-1b97-1a32-8cd5-79a896a55223) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583) > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858) > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193) > at > org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268) > at > org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > It's interesting that the Kinesis endpoint returned a 500 status code, but > that's outside the scope of this issue. > I think we can handle this exception in the same manner as a > ProvisionedThroughputException: performing an exponential backoff and > retrying a finite number of times before throwing an exception. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11617) Kinesis Connector getRecords() failure logging is misleading
[ https://issues.apache.org/jira/browse/FLINK-11617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jamie Grier updated FLINK-11617: Description: There isn't enough information in the current logging to diagnose a getRecords() failure. Also there is a hardcoded string that states the failure cause was always ProvisionedThroughputExceededException which isn't true. There are many possible causes of failures. This is misleading. (was: My Flink job that consumes from a Kinesis stream must be restarted at least once daily due to an uncaught AmazonKinesisException when reading from Kinesis. The complete stacktrace looks like: {noformat} com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: dc1b7a1a-1b97-1a32-8cd5-79a896a55223) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601) at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447) at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747) at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723) at com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {noformat} It's interesting that the Kinesis endpoint returned a 500 status code, but that's outside the scope of this issue. I think we can handle this exception in the same manner as a ProvisionedThroughputException: performing an exponential backoff and retrying a finite number of times before throwing an exception.) > Kinesis Connector getRecords() failure logging is misleading > > > Key: FLINK-11617 > URL: https://issues.apache.org/jira/browse/FLINK-11617 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.5.6, 1.6.3, 1.7.1 >Reporter: Jamie Grier >Assignee: Jamie Grier >Priority: Major > > There isn't enough information in the current logging to diagnose a > getRecords() failure. Also there is a hardcoded string that states the > failure cause was always ProvisionedThroughputExceededException which isn't > true. There are many possible causes of failures. This is misleading. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11617) Kinesis Connector getRecords() failure logging is misleading
[ https://issues.apache.org/jira/browse/FLINK-11617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16768330#comment-16768330 ] Jamie Grier commented on FLINK-11617: - Here's an example: Stacktrace is: {{java.lang.RuntimeException: Rate Exceeded for getRecords operation - all 3 retry attempts returned ProvisionedThroughputExceededException.}} {{ at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:234)}} {{ at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:373)}} {{ at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:216)}} {{ at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266)}} {{ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)}} {{ at java.lang.Thread.run(Thread.java:748)}} But the root cause is actually given by this log line: {{Got recoverable SdkClientException. Backing off for 140 millis (null (Service: AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: c49c8e5b-a068-9733-9043-b215d51b0aa1))}} > Kinesis Connector getRecords() failure logging is misleading > > > Key: FLINK-11617 > URL: https://issues.apache.org/jira/browse/FLINK-11617 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.5.6, 1.6.3, 1.7.1 >Reporter: Jamie Grier >Assignee: Jamie Grier >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > There isn't enough information in the current logging to diagnose a > getRecords() failure. Also there is a hardcoded string that states the > failure cause was always ProvisionedThroughputExceededException which isn't > true. There are many possible causes of failures. This is misleading. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10887) Add source watermark tracking to the JobMaster
[ https://issues.apache.org/jira/browse/FLINK-10887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16774612#comment-16774612 ] Jamie Grier edited comment on FLINK-10887 at 2/22/19 12:11 AM: --- [~thw] I'll create a new PR with a solution for the aggregand and result that works similarly to the aggregateFunction. This was as designed but I think it's an oversight. Thanks. was (Author: jgrier): [~thw] I'll update the PR with a solution for the aggregand and result that works similarly to the aggregateFunction. This was a designed but I think it's an oversight. Thanks. > Add source watermark tracking to the JobMaster > -- > > Key: FLINK-10887 > URL: https://issues.apache.org/jira/browse/FLINK-10887 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Jamie Grier >Assignee: Jamie Grier >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Original Estimate: 24h > Time Spent: 50m > Remaining Estimate: 23h 10m > > We need to add a new RPC to the JobMaster such that the current watermark for > every source sub-task can be reported and the current global minimum/maximum > watermark can be retrieved so that each source can adjust their partition > read rates in an attempt to keep sources roughly aligned in event time. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10887) Add source watermark tracking to the JobMaster
[ https://issues.apache.org/jira/browse/FLINK-10887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16774612#comment-16774612 ] Jamie Grier commented on FLINK-10887: - [~thw] I'll update the PR with a solution for the aggregand and result that works similarly to the aggregateFunction. This was a designed but I think it's an oversight. Thanks. > Add source watermark tracking to the JobMaster > -- > > Key: FLINK-10887 > URL: https://issues.apache.org/jira/browse/FLINK-10887 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Jamie Grier >Assignee: Jamie Grier >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Original Estimate: 24h > Time Spent: 50m > Remaining Estimate: 23h 10m > > We need to add a new RPC to the JobMaster such that the current watermark for > every source sub-task can be reported and the current global minimum/maximum > watermark can be retrieved so that each source can adjust their partition > read rates in an attempt to keep sources roughly aligned in event time. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-19468) Metrics return empty when data stream / operator name contains "+"
[ https://issues.apache.org/jira/browse/FLINK-19468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jamie Grier reassigned FLINK-19468: --- Assignee: Boyang Jerry Peng > Metrics return empty when data stream / operator name contains "+" > -- > > Key: FLINK-19468 > URL: https://issues.apache.org/jira/browse/FLINK-19468 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.9.0, 1.9.2, 1.9.3, 2.0.0 >Reporter: Boyang Jerry Peng >Assignee: Boyang Jerry Peng >Priority: Major > Labels: pull-request-available > > There is an issue in which the special character "+" is not removed from the > data stream / operator name which causes metrics for the operator to not be > properly returned. Code Reference: > [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java#L208] > > For example if the operator name is: > pulsar(url: pulsar+ssl://192.168.1.198:56014) > Metrics for an operator with the above name will always return empty. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19481) Add support for a flink native GCS FileSystem
[ https://issues.apache.org/jira/browse/FLINK-19481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17211477#comment-17211477 ] Jamie Grier commented on FLINK-19481: - I think a native GCS filesytem would be a major benefit to Flink users. The only way to support GCS currently is, as stated, through the Hadoop Filesystem implementation which brings several problems along with it. The two largest problems I've experienced are: 1) Hadoop has a huge dependency footprint which is a significant headache for Flink application developers dealing with dependency-hell. 2) The total stack of FileSystem abstractions on this path becomes very difficult to tune, understand, and support. By stack I'm referring to Flink's own FileSystem abstraction, then the Hadoop layer, then the GCS libraries. This is very difficult to work with in production as each layer has its own intricacies, connection pools, thread pools, tunable configuration, versions, dependency versions, etc. Having gone down this path with the old-style Hadoop+S3 filesystem approach I know how difficult it can be and a native implementation should prove to be much simpler to support and easier to tune and modify for performance. This is why the presto-s3-fs filesystem was adopted, for example. > Add support for a flink native GCS FileSystem > - > > Key: FLINK-19481 > URL: https://issues.apache.org/jira/browse/FLINK-19481 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem, FileSystems >Affects Versions: 1.12.0 >Reporter: Ben Augarten >Priority: Major > Fix For: 1.12.0 > > > Currently, GCS is supported but only by using the hadoop connector[1] > > The objective of this improvement is to add support for checkpointing to > Google Cloud Storage with the Flink File System, > > This would allow the `gs://` scheme to be used for savepointing and > checkpointing. Long term, it would be nice if we could use the GCS FileSystem > as a source and sink in flink jobs as well. > > Long term, I hope that implementing a flink native GCS FileSystem will > simplify usage of GCS because the hadoop FileSystem ends up bringing in many > unshaded dependencies. > > [1] > [https://github.com/GoogleCloudDataproc/hadoop-connectors|https://github.com/GoogleCloudDataproc/hadoop-connectors)] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-29962) Exclude Jamon 2.3.1
[ https://issues.apache.org/jira/browse/FLINK-29962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17633495#comment-17633495 ] Jamie Grier commented on FLINK-29962: - Merged in Flink master: 9572cf6b287d71ee9c307546d8cd8f8898137bdd > Exclude Jamon 2.3.1 > --- > > Key: FLINK-29962 > URL: https://issues.apache.org/jira/browse/FLINK-29962 > Project: Flink > Issue Type: Improvement > Components: Connectors / Hive, Table SQL / Gateway >Reporter: John Roesler >Assignee: John Roesler >Priority: Minor > Labels: pull-request-available > > Hi all, > My Maven mirror is complaining that the pom for jamon-runtime:2.3.1 has a > malformed pom. It looks like it's fixed in jamon-runtime:2.4.1. According to > dependency:tree, Flink already has transitive dependencies on both versions, > so I'm proposing to just exclude the transitive dependency from the > problematic direct dependencies and pin the dependency to 2.4.1. > I'll send a PR shortly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29962) Exclude Jamon 2.3.1
[ https://issues.apache.org/jira/browse/FLINK-29962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jamie Grier updated FLINK-29962: Fix Version/s: 1.17.0 > Exclude Jamon 2.3.1 > --- > > Key: FLINK-29962 > URL: https://issues.apache.org/jira/browse/FLINK-29962 > Project: Flink > Issue Type: Improvement > Components: Connectors / Hive, Table SQL / Gateway >Reporter: John Roesler >Assignee: John Roesler >Priority: Minor > Labels: pull-request-available > Fix For: 1.17.0 > > > Hi all, > My Maven mirror is complaining that the pom for jamon-runtime:2.3.1 has a > malformed pom. It looks like it's fixed in jamon-runtime:2.4.1. According to > dependency:tree, Flink already has transitive dependencies on both versions, > so I'm proposing to just exclude the transitive dependency from the > problematic direct dependencies and pin the dependency to 2.4.1. > I'll send a PR shortly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-29962) Exclude Jamon 2.3.1
[ https://issues.apache.org/jira/browse/FLINK-29962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jamie Grier resolved FLINK-29962. - Resolution: Fixed > Exclude Jamon 2.3.1 > --- > > Key: FLINK-29962 > URL: https://issues.apache.org/jira/browse/FLINK-29962 > Project: Flink > Issue Type: Improvement > Components: Connectors / Hive, Table SQL / Gateway >Reporter: John Roesler >Assignee: John Roesler >Priority: Minor > Labels: pull-request-available > Fix For: 1.17.0 > > > Hi all, > My Maven mirror is complaining that the pom for jamon-runtime:2.3.1 has a > malformed pom. It looks like it's fixed in jamon-runtime:2.4.1. According to > dependency:tree, Flink already has transitive dependencies on both versions, > so I'm proposing to just exclude the transitive dependency from the > problematic direct dependencies and pin the dependency to 2.4.1. > I'll send a PR shortly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-19481) Add support for a flink native GCS FileSystem
[ https://issues.apache.org/jira/browse/FLINK-19481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17345032#comment-17345032 ] Jamie Grier commented on FLINK-19481: - The primary benefits of a native implementation are described earlier in this ticket. This is based on my own experience in production for several years with the other Hadoop based File Systems – primarily the S3 one though. {noformat} I think a native GCS filesytem would be a major benefit to Flink users. The only way to support GCS currently is, as stated, through the Hadoop Filesystem implementation which brings several problems along with it. The two largest problems I've experienced are:1) Hadoop has a huge dependency footprint which is a significant headache for Flink application developers dealing with dependency-hell.2) The total stack of FileSystem abstractions on this path becomes very difficult to tune, understand, and support. By stack I'm referring to Flink's own FileSystem abstraction, then the Hadoop layer, then the GCS libraries. This is very difficult to work with in production as each layer has its own intricacies, connection pools, thread pools, tunable configuration, versions, dependency versions, etc.Having gone down this path with the old-style Hadoop+S3 filesystem approach I know how difficult it can be and a native implementation should prove to be much simpler to support and easier to tune and modify for performance. This is why the presto-s3-fs filesystem was adopted, for example.{noformat} > Add support for a flink native GCS FileSystem > - > > Key: FLINK-19481 > URL: https://issues.apache.org/jira/browse/FLINK-19481 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem, FileSystems >Affects Versions: 1.12.0 >Reporter: Ben Augarten >Priority: Minor > Labels: auto-deprioritized-major > > Currently, GCS is supported but only by using the hadoop connector[1] > > The objective of this improvement is to add support for checkpointing to > Google Cloud Storage with the Flink File System, > > This would allow the `gs://` scheme to be used for savepointing and > checkpointing. Long term, it would be nice if we could use the GCS FileSystem > as a source and sink in flink jobs as well. > > Long term, I hope that implementing a flink native GCS FileSystem will > simplify usage of GCS because the hadoop FileSystem ends up bringing in many > unshaded dependencies. > > [1] > [https://github.com/GoogleCloudDataproc/hadoop-connectors|https://github.com/GoogleCloudDataproc/hadoop-connectors)] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-19481) Add support for a flink native GCS FileSystem
[ https://issues.apache.org/jira/browse/FLINK-19481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17345032#comment-17345032 ] Jamie Grier edited comment on FLINK-19481 at 5/15/21, 1:25 PM: --- The primary benefits of a native implementation are described earlier in this ticket. This is based on my own experience in production for several years with the other Hadoop based File Systems – primarily the S3 one though. {noformat} {noformat} was (Author: jgrier): The primary benefits of a native implementation are described earlier in this ticket. This is based on my own experience in production for several years with the other Hadoop based File Systems – primarily the S3 one though. {noformat} I think a native GCS filesytem would be a major benefit to Flink users. The only way to support GCS currently is, as stated, through the Hadoop Filesystem implementation which brings several problems along with it. The two largest problems I've experienced are:1) Hadoop has a huge dependency footprint which is a significant headache for Flink application developers dealing with dependency-hell.2) The total stack of FileSystem abstractions on this path becomes very difficult to tune, understand, and support. By stack I'm referring to Flink's own FileSystem abstraction, then the Hadoop layer, then the GCS libraries. This is very difficult to work with in production as each layer has its own intricacies, connection pools, thread pools, tunable configuration, versions, dependency versions, etc.Having gone down this path with the old-style Hadoop+S3 filesystem approach I know how difficult it can be and a native implementation should prove to be much simpler to support and easier to tune and modify for performance. This is why the presto-s3-fs filesystem was adopted, for example.{noformat} > Add support for a flink native GCS FileSystem > - > > Key: FLINK-19481 > URL: https://issues.apache.org/jira/browse/FLINK-19481 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem, FileSystems >Affects Versions: 1.12.0 >Reporter: Ben Augarten >Priority: Minor > Labels: auto-deprioritized-major > > Currently, GCS is supported but only by using the hadoop connector[1] > > The objective of this improvement is to add support for checkpointing to > Google Cloud Storage with the Flink File System, > > This would allow the `gs://` scheme to be used for savepointing and > checkpointing. Long term, it would be nice if we could use the GCS FileSystem > as a source and sink in flink jobs as well. > > Long term, I hope that implementing a flink native GCS FileSystem will > simplify usage of GCS because the hadoop FileSystem ends up bringing in many > unshaded dependencies. > > [1] > [https://github.com/GoogleCloudDataproc/hadoop-connectors|https://github.com/GoogleCloudDataproc/hadoop-connectors)] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-19481) Add support for a flink native GCS FileSystem
[ https://issues.apache.org/jira/browse/FLINK-19481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17345032#comment-17345032 ] Jamie Grier edited comment on FLINK-19481 at 5/15/21, 1:25 PM: --- The primary benefits of a native implementation are described earlier in this ticket. This is based on my own experience in production for several years with the other Hadoop based File Systems – primarily the S3 one though. https://issues.apache.org/jira/browse/FLINK-19481?focusedCommentId=17211477&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17211477 {noformat} {noformat} was (Author: jgrier): The primary benefits of a native implementation are described earlier in this ticket. This is based on my own experience in production for several years with the other Hadoop based File Systems – primarily the S3 one though. {noformat} {noformat} > Add support for a flink native GCS FileSystem > - > > Key: FLINK-19481 > URL: https://issues.apache.org/jira/browse/FLINK-19481 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem, FileSystems >Affects Versions: 1.12.0 >Reporter: Ben Augarten >Priority: Minor > Labels: auto-deprioritized-major > > Currently, GCS is supported but only by using the hadoop connector[1] > > The objective of this improvement is to add support for checkpointing to > Google Cloud Storage with the Flink File System, > > This would allow the `gs://` scheme to be used for savepointing and > checkpointing. Long term, it would be nice if we could use the GCS FileSystem > as a source and sink in flink jobs as well. > > Long term, I hope that implementing a flink native GCS FileSystem will > simplify usage of GCS because the hadoop FileSystem ends up bringing in many > unshaded dependencies. > > [1] > [https://github.com/GoogleCloudDataproc/hadoop-connectors|https://github.com/GoogleCloudDataproc/hadoop-connectors)] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19481) Add support for a flink native GCS FileSystem
[ https://issues.apache.org/jira/browse/FLINK-19481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17346179#comment-17346179 ] Jamie Grier commented on FLINK-19481: - Yes, [~xintongsong], that's my opinion based on experience. The runtime complexity of having the additional Hadoop layer will likely be strictly worse. This is because each layer has it's own configuration and things like thread pooling, pool sizes, buffering, and other non-trivial tuning parameters. It can be very difficult to tune this stuff for production workloads with non-trivial throughput and having all of those layers makes it (much) worse. Due to the config It's a leaky abstraction so you end up having to understand, configure, and tune the Flink, Hadoop, and GCS layers anyway. Again, this is based mostly on my experience with the various flavors of the S3 connector but it will still apply here. In my experience the more native (fewer layers of abstraction) you can achieve the better the result. That said I have not looked at Galen's PR. It seems from reading the comments here though that a good solution would be a hybrid of Ben's work on the native GCS Filesystem combined with Galen's work on the RecoverableWriter. > Add support for a flink native GCS FileSystem > - > > Key: FLINK-19481 > URL: https://issues.apache.org/jira/browse/FLINK-19481 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem, FileSystems >Affects Versions: 1.12.0 >Reporter: Ben Augarten >Priority: Minor > Labels: auto-deprioritized-major > > Currently, GCS is supported but only by using the hadoop connector[1] > > The objective of this improvement is to add support for checkpointing to > Google Cloud Storage with the Flink File System, > > This would allow the `gs://` scheme to be used for savepointing and > checkpointing. Long term, it would be nice if we could use the GCS FileSystem > as a source and sink in flink jobs as well. > > Long term, I hope that implementing a flink native GCS FileSystem will > simplify usage of GCS because the hadoop FileSystem ends up bringing in many > unshaded dependencies. > > [1] > [https://github.com/GoogleCloudDataproc/hadoop-connectors|https://github.com/GoogleCloudDataproc/hadoop-connectors)] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-4391) Provide support for asynchronous operations over streams
Jamie Grier created FLINK-4391: -- Summary: Provide support for asynchronous operations over streams Key: FLINK-4391 URL: https://issues.apache.org/jira/browse/FLINK-4391 Project: Flink Issue Type: New Feature Components: DataStream API Reporter: Jamie Grier Many Flink users need to do asynchronous processing driven by data from a DataStream. The classic example would be joining against an external database in order to enrich a stream with extra information. It would be nice to add general support for this type of operation in the Flink API. Ideally this could simply take the form of a new operator that manages async operations, keeps so many of them in flight, and then emits results to downstream operators as the async operations complete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15425292#comment-15425292 ] Jamie Grier commented on FLINK-4391: [~maguowei] Do you have any sense of when you might be able to contribute this? Is it in a publicly accessible fork anywhere so I could take a look? There are a few others in the community that I know would benefit from this immediately. > Provide support for asynchronous operations over streams > > > Key: FLINK-4391 > URL: https://issues.apache.org/jira/browse/FLINK-4391 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Jamie Grier >Assignee: david.wang > > Many Flink users need to do asynchronous processing driven by data from a > DataStream. The classic example would be joining against an external > database in order to enrich a stream with extra information. > It would be nice to add general support for this type of operation in the > Flink API. Ideally this could simply take the form of a new operator that > manages async operations, keeps so many of them in flight, and then emits > results to downstream operators as the async operations complete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3026) Publish the flink docker container to the docker registry
[ https://issues.apache.org/jira/browse/FLINK-3026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15898438#comment-15898438 ] Jamie Grier commented on FLINK-3026: [~iemejia] [~plucas] I'm definitely also for the #2 option above. We should definitely create an Official Flink Docker Image. I also agree with getting rid of the bluemix files but I'm not so sure about having multiple base images. Is that truly a common practice? I also think your labelling scheme looks reasonable - a full spec plus some defaults to make it simple to just grab the latest Flink image. > Publish the flink docker container to the docker registry > - > > Key: FLINK-3026 > URL: https://issues.apache.org/jira/browse/FLINK-3026 > Project: Flink > Issue Type: Task > Components: Build System, Docker >Reporter: Omer Katz >Assignee: Ismaël Mejía > Labels: Deployment, Docker > > There's a dockerfile that can be used to build a docker container already in > the repository. It'd be awesome to just be able to pull it instead of > building it ourselves. > The dockerfile can be found at > https://github.com/apache/flink/tree/master/flink-contrib/docker-flink > It also doesn't point to the latest version of Flink which I fixed in > https://github.com/apache/flink/pull/1366 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6199) Single outstanding Async I/O operation per key
Jamie Grier created FLINK-6199: -- Summary: Single outstanding Async I/O operation per key Key: FLINK-6199 URL: https://issues.apache.org/jira/browse/FLINK-6199 Project: Flink Issue Type: Improvement Components: DataStream API Reporter: Jamie Grier I would like to propose we extend the Async I/O semantics a bit such that a user can guarantee a single outstanding async request per key. This would allow a user to order async requests per key while still achieving the throughput benefits of using Async I/O in the first place. This is essential for operations where stream order is important but we still need to use Async operations to interact with an external system in a performant way. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
Jamie Grier created FLINK-9061: -- Summary: S3 checkpoint data not partitioned well -- causes errors and poor performance Key: FLINK-9061 URL: https://issues.apache.org/jira/browse/FLINK-9061 Project: Flink Issue Type: Bug Components: FileSystem, State Backends, Checkpointing Affects Versions: 1.4.2 Reporter: Jamie Grier I think we need to modify the way we write checkpoints to S3 for high-scale jobs (those with many total tasks). The issue is that we are writing all the checkpoint data under a common key prefix. This is the worst case scenario for S3 performance since the key is used as a partition key. In the worst case checkpoints fail with a 500 status code coming back from S3 and an internal error type of TooBusyException. One possible solution would be to add a hook in the Flink filesystem code that allows me to "rewrite" paths. For example say I have the checkpoint directory set to: s3://bucket/flink/checkpoints I would hook that and rewrite that path to: s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original path This would distribute the checkpoint write load around the S3 cluster evenly. For reference: https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ Any other people hit this issue? Any other ideas for solutions? This is a pretty serious problem for people trying to checkpoint to S3. -Jamie -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16412737#comment-16412737 ] Jamie Grier commented on FLINK-9061: [~stevenz3wu] Did you contribute those changes back to Flink? I think this will be affecting others as well. Would you consider a contribution back to the project? Otherwise I will do this but if you already have something working we might as well use it or base changes on it. > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414828#comment-16414828 ] Jamie Grier edited comment on FLINK-9061 at 3/27/18 12:45 AM: -- [~ste...@apache.org] Here's the full stack trace: {quote}"java.lang.Exception: Could not perform checkpoint 465 for operator catch_all.reduce -> (catch_all.late, catch_all.metric) (113\/256). org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:569) org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:380) org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:283) org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:185) org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214) org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not complete snapshot 465 for operator catch_all.reduce -> (catch_all.late, catch_all.metric) (113\/256). org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1089) org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1038) org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:671) org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:607) org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:560) ... 8 common frames omitted Caused by: java.io.IOException: Could not flush and close the file system output stream to s3://BUCKET/SERVICE/flink/checkpoints/fda9a54de951d8cf7a55b5cd833cb0f7/chk-465/02896cca-0d9c-4295-9d30-b0a3b7cc928b in order to obtain the stream state handle org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105) org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30) org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:126) org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113) org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:359) ... 13 common frames omitted Caused by: java.io.IOException: org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error. Please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: BLAH), S3 Extended Request ID: BLAH_ org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.uploadObject(PrestoS3FileSystem.java:1036) org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:987) org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) org.apache.flink.fs.s3presto.shaded.org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:319) ... 18 common frames omitted Caused by: org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error. Please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: BLAH) org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1587) org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1257) org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpCli
[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414828#comment-16414828 ] Jamie Grier commented on FLINK-9061: [~ste...@apache.org] Here's the full stack trace: "java.lang.Exception: Could not perform checkpoint 465 for operator catch_all.reduce -> (catch_all.late, catch_all.metric) (113\/256). org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:569) org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:380) org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:283) org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:185) org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214) org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not complete snapshot 465 for operator catch_all.reduce -> (catch_all.late, catch_all.metric) (113\/256). org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1089) org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1038) org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:671) org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:607) org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:560) ... 8 common frames omitted Caused by: java.io.IOException: Could not flush and close the file system output stream to s3://BUCKET/SERVICE/flink/checkpoints/fda9a54de951d8cf7a55b5cd833cb0f7/chk-465/02896cca-0d9c-4295-9d30-b0a3b7cc928b in order to obtain the stream state handle org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105) org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30) org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:126) org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113) org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:359) ... 13 common frames omitted Caused by: java.io.IOException: org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error. Please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: BLAH), S3 Extended Request ID: BLAH_ org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.uploadObject(PrestoS3FileSystem.java:1036) org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:987) org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) org.apache.flink.fs.s3presto.shaded.org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:319) ... 18 common frames omitted Caused by: org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error. Please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: BLAH) org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1587) org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1257) org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1029)
[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416254#comment-16416254 ] Jamie Grier commented on FLINK-9061: Yeah, so I completely agree that should be a 503 but it's not. I already ran this through the AWS channels. The response was essentially that this was "internally" a TooBusyException. Here's their full response: {quote}Based on the information provided, I understand that you are experiencing some internal errors (status code 500) from S3, which is impacting one of your Flink jobs. From the log dive on your provided request IDs, I observe that your PutObject request triggered Internal Error with TooBusyException. This happens when a bucket receives more requests than it can handle or is allowed [1]. By default, S3 limits 100 PUT/LIST/DELETE requests per second or more than 300 GET requests per second. So, if your workload is to exceed this limit, you'd need to scale your bucket through partitioning. Currently, your key space isn't randomized and all your keys include "BUCKET/SERVICE/flink/checkpoints/faa473252e9bf42d07f618923fa22af1/chk-13/". Therefore, your bucket isn't being automatically partitioned by S3 and you received increased error rates after your requests increased. {quote} > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416254#comment-16416254 ] Jamie Grier edited comment on FLINK-9061 at 3/27/18 9:13 PM: - Yeah, so I completely agree that the response should be a 503 or better yet a 429 but it's not. I already ran this through the AWS support channels. The response was essentially that this was "internally" a TooBusyException. Here's their full response: {quote}Based on the information provided, I understand that you are experiencing some internal errors (status code 500) from S3, which is impacting one of your Flink jobs. From the log dive on your provided request IDs, I observe that your PutObject request triggered Internal Error with TooBusyException. This happens when a bucket receives more requests than it can handle or is allowed [1]. By default, S3 limits 100 PUT/LIST/DELETE requests per second or more than 300 GET requests per second. So, if your workload is to exceed this limit, you'd need to scale your bucket through partitioning. Currently, your key space isn't randomized and all your keys include "BUCKET/SERVICE/flink/checkpoints/faa473252e9bf42d07f618923fa22af1/chk-13/". Therefore, your bucket isn't being automatically partitioned by S3 and you received increased error rates after your requests increased. {quote} was (Author: jgrier): Yeah, so I completely agree that should be a 503 but it's not. I already ran this through the AWS channels. The response was essentially that this was "internally" a TooBusyException. Here's their full response: {quote}Based on the information provided, I understand that you are experiencing some internal errors (status code 500) from S3, which is impacting one of your Flink jobs. From the log dive on your provided request IDs, I observe that your PutObject request triggered Internal Error with TooBusyException. This happens when a bucket receives more requests than it can handle or is allowed [1]. By default, S3 limits 100 PUT/LIST/DELETE requests per second or more than 300 GET requests per second. So, if your workload is to exceed this limit, you'd need to scale your bucket through partitioning. Currently, your key space isn't randomized and all your keys include "BUCKET/SERVICE/flink/checkpoints/faa473252e9bf42d07f618923fa22af1/chk-13/". Therefore, your bucket isn't being automatically partitioned by S3 and you received increased error rates after your requests increased. {quote} > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416271#comment-16416271 ] Jamie Grier commented on FLINK-9061: [~StephanEwen] I don't know if the s3a-based connector exhibits the same behavior but I suspect it would since this is not a great approach to writing checkpoint data to S3 ;) . We could improve on this by changing the retry policy in use as [~ste...@apache.org] said but in the end it seems like we'll have to introduce the entropy to really solve the problem. Any further ideas about which approach might be best? We could make changes at the FileSystem or StateBackend level. And of course it would make listing files hard. Maybe we could use just two digits for the entropy, like 00-99, and when listing we could just list all of those and merge the results.. > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16420885#comment-16420885 ] Jamie Grier commented on FLINK-9061: Maybe we should keep this super simple and make a change at the state backend level where we optionally just reverse the path. That should actually work very well. > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16420885#comment-16420885 ] Jamie Grier edited comment on FLINK-9061 at 3/31/18 1:32 AM: - Maybe we should keep this super simple and make a change at the state backend level where we optionally just reverse the key name. That should actually work very well. was (Author: jgrier): Maybe we should keep this super simple and make a change at the state backend level where we optionally just reverse the path. That should actually work very well. > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423072#comment-16423072 ] Jamie Grier commented on FLINK-9061: So, what I'm suggesting is that we, optionally, split on '/' and reverse the components like so: s3://my_bucket/flink/checkpoints/JOB_ID/chk_000/123456789 becomes s3://my_bucket/123456789/CHK_000/JOB_ID/checkpoints It's not very ops friendly but that's because S3 isn't a filesystem. The hierarchy isn't real. The equivalent of a directory listing that would group all the checkpoints for a single job would be: aws s3 list s3://my_bucket | grep "JOB_ID/checkpoints" I think that would work just fine for our use case. What about others? > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423072#comment-16423072 ] Jamie Grier edited comment on FLINK-9061 at 4/2/18 8:17 PM: So, what I'm suggesting is that we, optionally, split on '/' and reverse the components like so: s3://my_bucket/flink/checkpoints/JOB_ID/chk_000/123456789 becomes s3://my_bucket/123456789/CHK_000/JOB_ID/checkpoints It's not very ops friendly but that's because S3 isn't a filesystem. The hierarchy isn't real. It's a flat keyspace (I know we all know that of course). The equivalent of a directory listing that would group all the checkpoints for a single job would be: aws s3 list s3://my_bucket | grep "JOB_ID/checkpoints" I think that would work just fine for our use case. What about others? was (Author: jgrier): So, what I'm suggesting is that we, optionally, split on '/' and reverse the components like so: s3://my_bucket/flink/checkpoints/JOB_ID/chk_000/123456789 becomes s3://my_bucket/123456789/CHK_000/JOB_ID/checkpoints It's not very ops friendly but that's because S3 isn't a filesystem. The hierarchy isn't real. The equivalent of a directory listing that would group all the checkpoints for a single job would be: aws s3 list s3://my_bucket | grep "JOB_ID/checkpoints" I think that would work just fine for our use case. What about others? > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423089#comment-16423089 ] Jamie Grier commented on FLINK-9061: As I understand it the above doesn't work – maybe if you ask Amazon to set up buckets for you manually this could be made to work, but I think in the normal case the very first characters of the key name must introduce the randomness. > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16426103#comment-16426103 ] Jamie Grier commented on FLINK-9061: Okay, this is the best documentation I've found on this: [https://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html] and even it is very vague. It does appear that it doesn't have to be the very first characters but it brings up an interesting question. What are the exact constraints here? Which part of the key name is and isn't used for partitioning exactly? I mean technically all of our checkpoint objects do in fact have several characters of uniqueness since the last part of the full object key name is the GUID. Anyway, not having full info sucks. [~stevenz3wu] I think your proposal sounds good. Thanks for offering to do the PR :) That should work well and logical listing of sub-directories should still be possible in this scheme by issuing parallel s3 list requests for each possible prefix and merging the results. Shall we proceed with this approach then? > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16426132#comment-16426132 ] Jamie Grier commented on FLINK-9061: Yup, sounds good to me :) > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-5634) Flink should not always redirect stdout to a file.
Jamie Grier created FLINK-5634: -- Summary: Flink should not always redirect stdout to a file. Key: FLINK-5634 URL: https://issues.apache.org/jira/browse/FLINK-5634 Project: Flink Issue Type: Bug Components: Docker Affects Versions: 1.2.0 Reporter: Jamie Grier Fix For: 1.2.0 Flink always redirects stdout to a file. While often convenient this isn't always what people want. The most obvious case of this is a Docker deployment. It should be possible to have Flink log to stdout. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5634) Flink should not always redirect stdout to a file.
[ https://issues.apache.org/jira/browse/FLINK-5634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jamie Grier updated FLINK-5634: --- Description: Flink always redirects stdout to a file. While often convenient this isn't always what people want. The most obvious case of this is a Docker deployment. It should be possible to have Flink log to stdout. Here is a PR for this: https://github.com/apache/flink/pull/3204 was: Flink always redirects stdout to a file. While often convenient this isn't always what people want. The most obvious case of this is a Docker deployment. It should be possible to have Flink log to stdout. > Flink should not always redirect stdout to a file. > -- > > Key: FLINK-5634 > URL: https://issues.apache.org/jira/browse/FLINK-5634 > Project: Flink > Issue Type: Bug > Components: Docker >Affects Versions: 1.2.0 >Reporter: Jamie Grier > Fix For: 1.2.0 > > > Flink always redirects stdout to a file. While often convenient this isn't > always what people want. The most obvious case of this is a Docker > deployment. > It should be possible to have Flink log to stdout. > Here is a PR for this: https://github.com/apache/flink/pull/3204 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5635) Improve Docker tooling to make it easier to build images and launch Flink via Docker tools
Jamie Grier created FLINK-5635: -- Summary: Improve Docker tooling to make it easier to build images and launch Flink via Docker tools Key: FLINK-5635 URL: https://issues.apache.org/jira/browse/FLINK-5635 Project: Flink Issue Type: Improvement Components: Docker Affects Versions: 1.2.0 Reporter: Jamie Grier Fix For: 1.2.0 This is a bit of a catch-all ticket for general improvements to the Flink on Docker experience. Things to improve: - Make it possible to build a Docker image from your own flink-dist directory as well as official releases. - Make it possible to override the image name so a user can more easily publish these images to their Docker repository - Provide scripts that show how to properly run on Docker Swarm or similar environments with overlay networking (Kubernetes) without using host networking. - Log to stdout rather than to files. - Work properly with docker-compose for local deployment as well as production deployments (Swarm/k8s) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5635) Improve Docker tooling to make it easier to build images and launch Flink via Docker tools
[ https://issues.apache.org/jira/browse/FLINK-5635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jamie Grier reassigned FLINK-5635: -- Assignee: Jamie Grier > Improve Docker tooling to make it easier to build images and launch Flink via > Docker tools > -- > > Key: FLINK-5635 > URL: https://issues.apache.org/jira/browse/FLINK-5635 > Project: Flink > Issue Type: Improvement > Components: Docker >Affects Versions: 1.2.0 >Reporter: Jamie Grier >Assignee: Jamie Grier > Fix For: 1.2.0 > > > This is a bit of a catch-all ticket for general improvements to the Flink on > Docker experience. > Things to improve: > - Make it possible to build a Docker image from your own flink-dist > directory as well as official releases. > - Make it possible to override the image name so a user can more easily > publish these images to their Docker repository > - Provide scripts that show how to properly run on Docker Swarm or similar > environments with overlay networking (Kubernetes) without using host > networking. > - Log to stdout rather than to files. > - Work properly with docker-compose for local deployment as well as > production deployments (Swarm/k8s) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5634) Flink should not always redirect stdout to a file.
[ https://issues.apache.org/jira/browse/FLINK-5634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jamie Grier reassigned FLINK-5634: -- Assignee: Jamie Grier > Flink should not always redirect stdout to a file. > -- > > Key: FLINK-5634 > URL: https://issues.apache.org/jira/browse/FLINK-5634 > Project: Flink > Issue Type: Bug > Components: Docker >Affects Versions: 1.2.0 >Reporter: Jamie Grier >Assignee: Jamie Grier > Fix For: 1.2.0 > > > Flink always redirects stdout to a file. While often convenient this isn't > always what people want. The most obvious case of this is a Docker > deployment. > It should be possible to have Flink log to stdout. > Here is a PR for this: https://github.com/apache/flink/pull/3204 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5635) Improve Docker tooling to make it easier to build images and launch Flink via Docker tools
[ https://issues.apache.org/jira/browse/FLINK-5635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15839036#comment-15839036 ] Jamie Grier commented on FLINK-5635: [~greghogan] I actually wasn't aware of [FLINK-4326]. However there does seem to be some overlap. There is also [FLINK-5634] and an associated PR which specifically addresses enabling logging to stdout rather than to a file. However, neither of these two issues addresses running Flink processes in the foreground and avoiding forking as in [FLINK-4326]. I think this remains a separate issue that may need to be addressed. My main motivation in both of these JIRA issues and associated PRs was in providing a better Flink on Docker experience and providing some example scripts of how to run Flink properly in container-based environments. I would also like to get some "official" Flink Docker images published once we're satisfied with them. > Improve Docker tooling to make it easier to build images and launch Flink via > Docker tools > -- > > Key: FLINK-5635 > URL: https://issues.apache.org/jira/browse/FLINK-5635 > Project: Flink > Issue Type: Improvement > Components: Docker >Affects Versions: 1.2.0 >Reporter: Jamie Grier >Assignee: Jamie Grier > Fix For: 1.2.0 > > > This is a bit of a catch-all ticket for general improvements to the Flink on > Docker experience. > Things to improve: > - Make it possible to build a Docker image from your own flink-dist > directory as well as official releases. > - Make it possible to override the image name so a user can more easily > publish these images to their Docker repository > - Provide scripts that show how to properly run on Docker Swarm or similar > environments with overlay networking (Kubernetes) without using host > networking. > - Log to stdout rather than to files. > - Work properly with docker-compose for local deployment as well as > production deployments (Swarm/k8s) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5634) Flink should not always redirect stdout to a file.
[ https://issues.apache.org/jira/browse/FLINK-5634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15851609#comment-15851609 ] Jamie Grier commented on FLINK-5634: Agreed.. If [4326] were merged this would also work fine. Whether or not Flink runs in the background or foreground is orthogonal to where the logs go -- as long as Flink can be configured to log to stdout we're good. > Flink should not always redirect stdout to a file. > -- > > Key: FLINK-5634 > URL: https://issues.apache.org/jira/browse/FLINK-5634 > Project: Flink > Issue Type: Bug > Components: Docker >Affects Versions: 1.2.0 >Reporter: Jamie Grier >Assignee: Jamie Grier > > Flink always redirects stdout to a file. While often convenient this isn't > always what people want. The most obvious case of this is a Docker > deployment. > It should be possible to have Flink log to stdout. > Here is a PR for this: https://github.com/apache/flink/pull/3204 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Issue Comment Deleted] (FLINK-5634) Flink should not always redirect stdout to a file.
[ https://issues.apache.org/jira/browse/FLINK-5634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jamie Grier updated FLINK-5634: --- Comment: was deleted (was: Agreed.. If [FLINK-4326] were merged this would also work fine. Whether or not Flink runs in the background or foreground is orthogonal to where the logs go -- as long as Flink can be configured to log to stdout we're good.) > Flink should not always redirect stdout to a file. > -- > > Key: FLINK-5634 > URL: https://issues.apache.org/jira/browse/FLINK-5634 > Project: Flink > Issue Type: Bug > Components: Docker >Affects Versions: 1.2.0 >Reporter: Jamie Grier >Assignee: Jamie Grier > > Flink always redirects stdout to a file. While often convenient this isn't > always what people want. The most obvious case of this is a Docker > deployment. > It should be possible to have Flink log to stdout. > Here is a PR for this: https://github.com/apache/flink/pull/3204 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4326) Flink start-up scripts should optionally start services on the foreground
[ https://issues.apache.org/jira/browse/FLINK-4326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15851616#comment-15851616 ] Jamie Grier commented on FLINK-4326: [~StephanEwen] Yup this will also solve the issue I raised in [FLINK-5634] since my primary concern was running Docker containers and having the option to configure Flink to log to stdout. > Flink start-up scripts should optionally start services on the foreground > - > > Key: FLINK-4326 > URL: https://issues.apache.org/jira/browse/FLINK-4326 > Project: Flink > Issue Type: Improvement > Components: Startup Shell Scripts >Affects Versions: 1.0.3 >Reporter: Elias Levy > > This has previously been mentioned in the mailing list, but has not been > addressed. Flink start-up scripts start the job and task managers in the > background. This makes it difficult to integrate Flink with most processes > supervisory tools and init systems, including Docker. One can get around > this via hacking the scripts or manually starting the right classes via Java, > but it is a brittle solution. > In addition to starting the daemons in the foreground, the start up scripts > should use exec instead of running the commends, so as to avoid forks. Many > supervisory tools assume the PID of the process to be monitored is that of > the process it first executes, and fork chains make it difficult for the > supervisor to figure out what process to monitor. Specifically, > jobmanager.sh and taskmanager.sh should exec flink-daemon.sh, and > flink-daemon.sh should exec java. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5634) Flink should not always redirect stdout to a file.
[ https://issues.apache.org/jira/browse/FLINK-5634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15851609#comment-15851609 ] Jamie Grier edited comment on FLINK-5634 at 2/3/17 3:20 PM: Agreed.. If [FLINK-4326] were merged this would also work fine. Whether or not Flink runs in the background or foreground is orthogonal to where the logs go -- as long as Flink can be configured to log to stdout we're good. was (Author: jgrier): Agreed.. If [4326] were merged this would also work fine. Whether or not Flink runs in the background or foreground is orthogonal to where the logs go -- as long as Flink can be configured to log to stdout we're good. > Flink should not always redirect stdout to a file. > -- > > Key: FLINK-5634 > URL: https://issues.apache.org/jira/browse/FLINK-5634 > Project: Flink > Issue Type: Bug > Components: Docker >Affects Versions: 1.2.0 >Reporter: Jamie Grier >Assignee: Jamie Grier > > Flink always redirects stdout to a file. While often convenient this isn't > always what people want. The most obvious case of this is a Docker > deployment. > It should be possible to have Flink log to stdout. > Here is a PR for this: https://github.com/apache/flink/pull/3204 -- This message was sent by Atlassian JIRA (v6.3.15#6346)