[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 @StephanEwen I guess this PR is already for an another look now... ---
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476945#comment-16476945 ] ASF GitHub Bot commented on FLINK-9325: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 @StephanEwen I guess this PR is already for an another look now... > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9091) Failure while enforcing releasability in building flink-json module
[ https://issues.apache.org/jira/browse/FLINK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476972#comment-16476972 ] Timo Walther commented on FLINK-9091: - [~yew1eb] can we close this issue or do you still get this error? > Failure while enforcing releasability in building flink-json module > --- > > Key: FLINK-9091 > URL: https://issues.apache.org/jira/browse/FLINK-9091 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Ted Yu >Assignee: Hai Zhou >Priority: Major > Attachments: f-json.out > > > Got the following when building flink-json module: > {code} > [WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence > failed with message: > Failed while enforcing releasability. See above detailed error message. > ... > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M1:enforce > (dependency-convergence) on project flink-json: Some Enforcer rules have > failed. Look above for specific messages explaining why the rule failed. -> > [Help 1] > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6001: [FLINK-9299] ProcessWindowFunction documentation J...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6001#discussion_r188529928 --- Diff: docs/dev/stream/operators/windows.md --- @@ -797,7 +797,7 @@ DataStream input = ...; input .keyBy() - .timeWindow() + .timeWindow() --- End diff -- How about using "duration" instead of "time size". I think "time size" is not a commonly used term... ---
[jira] [Commented] (FLINK-9299) ProcessWindowFunction documentation Java examples have errors
[ https://issues.apache.org/jira/browse/FLINK-9299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476984#comment-16476984 ] ASF GitHub Bot commented on FLINK-9299: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6001#discussion_r188529928 --- Diff: docs/dev/stream/operators/windows.md --- @@ -797,7 +797,7 @@ DataStream input = ...; input .keyBy() - .timeWindow() + .timeWindow() --- End diff -- How about using "duration" instead of "time size". I think "time size" is not a commonly used term... > ProcessWindowFunction documentation Java examples have errors > - > > Key: FLINK-9299 > URL: https://issues.apache.org/jira/browse/FLINK-9299 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.4.2 >Reporter: Ken Krugler >Assignee: vinoyang >Priority: Minor > > In looking at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation], > I noticed a few errors... > * "This allows to incrementally compute windows" should be "This allows it > to incrementally compute windows" > * DataStream input = ...; should be > DataStream> input = ...; > * The getResult() method needs to cast one of the accumulator values to a > double, if that's what it is going to return. > * MyProcessWindowFunction needs to extend, not implement > ProcessWindowFunction > * MyProcessWindowFunction needs to implement a process() method, not an > apply() method. > * The call to .timeWindow takes a Time parameter, not a window assigner. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6001: [FLINK-9299] ProcessWindowFunction documentation J...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6001#discussion_r188530473 --- Diff: docs/dev/stream/operators/windows.md --- @@ -797,7 +797,7 @@ DataStream input = ...; input .keyBy() - .timeWindow() + .timeWindow() --- End diff -- accept, hold on... ---
[jira] [Commented] (FLINK-9299) ProcessWindowFunction documentation Java examples have errors
[ https://issues.apache.org/jira/browse/FLINK-9299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476987#comment-16476987 ] ASF GitHub Bot commented on FLINK-9299: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6001#discussion_r188530473 --- Diff: docs/dev/stream/operators/windows.md --- @@ -797,7 +797,7 @@ DataStream input = ...; input .keyBy() - .timeWindow() + .timeWindow() --- End diff -- accept, hold on... > ProcessWindowFunction documentation Java examples have errors > - > > Key: FLINK-9299 > URL: https://issues.apache.org/jira/browse/FLINK-9299 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.4.2 >Reporter: Ken Krugler >Assignee: vinoyang >Priority: Minor > > In looking at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation], > I noticed a few errors... > * "This allows to incrementally compute windows" should be "This allows it > to incrementally compute windows" > * DataStream input = ...; should be > DataStream> input = ...; > * The getResult() method needs to cast one of the accumulator values to a > double, if that's what it is going to return. > * MyProcessWindowFunction needs to extend, not implement > ProcessWindowFunction > * MyProcessWindowFunction needs to implement a process() method, not an > apply() method. > * The call to .timeWindow takes a Time parameter, not a window assigner. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6020: [FLINK-9373][state] Always call RocksIterator.stat...
GitHub user sihuazhou opened a pull request: https://github.com/apache/flink/pull/6020 [FLINK-9373][state] Always call RocksIterator.status() to check the internal error of RocksDB ## What is the purpose of the change Currently, when using RocksIterator we only use the `iterator.isValid()` to check whether we have reached the end of the iterator. But that is not enough, if we refer to RocksDB's wiki https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should find that even if `iterator.isValid()=true`, there may also exist some internal errors. A safer way to use the `RocksIterator` is to always call the `iterator.status()` to check the internal error of RocksDB. There is one case from user email seems to lost data because of this http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html (I'm not so sure yet) ## Brief change log - *Always call RocksIterator.status() to check the internal error of RocksDB* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/sihuazhou/flink FLINK-9373 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6020.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6020 commit b1d531949d521bb7d72cd389dfe72a4a5cf1bfc9 Author: sihuazhou Date: 2018-05-16T07:47:05Z Always call RocksIterator.status() to check the internal error of RocksDB ---
[jira] [Commented] (FLINK-9373) Always call RocksIterator.status() to check the internal error of RocksDB
[ https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476989#comment-16476989 ] ASF GitHub Bot commented on FLINK-9373: --- GitHub user sihuazhou opened a pull request: https://github.com/apache/flink/pull/6020 [FLINK-9373][state] Always call RocksIterator.status() to check the internal error of RocksDB ## What is the purpose of the change Currently, when using RocksIterator we only use the `iterator.isValid()` to check whether we have reached the end of the iterator. But that is not enough, if we refer to RocksDB's wiki https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should find that even if `iterator.isValid()=true`, there may also exist some internal errors. A safer way to use the `RocksIterator` is to always call the `iterator.status()` to check the internal error of RocksDB. There is one case from user email seems to lost data because of this http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html (I'm not so sure yet) ## Brief change log - *Always call RocksIterator.status() to check the internal error of RocksDB* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/sihuazhou/flink FLINK-9373 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6020.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6020 commit b1d531949d521bb7d72cd389dfe72a4a5cf1bfc9 Author: sihuazhou Date: 2018-05-16T07:47:05Z Always call RocksIterator.status() to check the internal error of RocksDB > Always call RocksIterator.status() to check the internal error of RocksDB > - > > Key: FLINK-9373 > URL: https://issues.apache.org/jira/browse/FLINK-9373 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Blocker > > Currently, when using RocksIterator we only use the _iterator.isValid()_ to > check whether we have reached the end of the iterator. But that is not > enough, if we refer to RocksDB's wiki > https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should > find that even if _iterator.isValid()=true_, there may also exist some > internal error. A safer way to use the _RocksIterator_ is to always call the > _iterator.status()_ to check the internal error of _RocksDB_. There is a case > from user email seems to lost data because of this > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r188531694 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -60,6 +60,9 @@ class DataStreamJoin( override def needsUpdatesAsRetraction: Boolean = true + // outer join will generate retractions + override def producesRetractions: Boolean = joinType != JoinRelType.INNER --- End diff -- Thanks, now I understand the terminology between producing and just forwarding retractions. ---
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476994#comment-16476994 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r188531785 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -176,14 +179,34 @@ class DataStreamJoin( body, returnType) -val coMapFun = - new NonWindowInnerJoin( -leftSchema.typeInfo, -rightSchema.typeInfo, -CRowTypeInfo(returnType), -genFunction.name, -genFunction.code, -queryConfig) +val coMapFun = joinType match { + case JoinRelType.INNER => +new NonWindowInnerJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + CRowTypeInfo(returnType), + genFunction.name, + genFunction.code, + queryConfig) + case JoinRelType.LEFT if joinInfo.isEqui => +new NonWindowLeftRightJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + CRowTypeInfo(returnType), + genFunction.name, + genFunction.code, + joinType == JoinRelType.LEFT, + queryConfig) + case JoinRelType.LEFT => --- End diff -- We can also do it as part of FLINK-8429. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r188531785 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -176,14 +179,34 @@ class DataStreamJoin( body, returnType) -val coMapFun = - new NonWindowInnerJoin( -leftSchema.typeInfo, -rightSchema.typeInfo, -CRowTypeInfo(returnType), -genFunction.name, -genFunction.code, -queryConfig) +val coMapFun = joinType match { + case JoinRelType.INNER => +new NonWindowInnerJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + CRowTypeInfo(returnType), + genFunction.name, + genFunction.code, + queryConfig) + case JoinRelType.LEFT if joinInfo.isEqui => +new NonWindowLeftRightJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + CRowTypeInfo(returnType), + genFunction.name, + genFunction.code, + joinType == JoinRelType.LEFT, + queryConfig) + case JoinRelType.LEFT => --- End diff -- We can also do it as part of FLINK-8429. ---
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476992#comment-16476992 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r188531694 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -60,6 +60,9 @@ class DataStreamJoin( override def needsUpdatesAsRetraction: Boolean = true + // outer join will generate retractions + override def producesRetractions: Boolean = joinType != JoinRelType.INNER --- End diff -- Thanks, now I understand the terminology between producing and just forwarding retractions. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r188532278 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala --- @@ -53,4 +53,8 @@ trait DataStreamRel extends FlinkRelNode { */ def consumesRetractions: Boolean = false + /** +* Whether the [[DataStreamRel]] produces retraction messages. +*/ + def producesRetractions: Boolean = false --- End diff -- Thanks for the explanation. ---
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476998#comment-16476998 ] ASF GitHub Bot commented on FLINK-8428: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r188532278 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala --- @@ -53,4 +53,8 @@ trait DataStreamRel extends FlinkRelNode { */ def consumesRetractions: Boolean = false + /** +* Whether the [[DataStreamRel]] produces retraction messages. +*/ + def producesRetractions: Boolean = false --- End diff -- Thanks for the explanation. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6008: [FLINK-9354][travis] Print execution times for nightly E2...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6008 nice catch @tzulitai, will rework this into a run_test method that takes a description and command to execute. ---
[jira] [Commented] (FLINK-9354) print execution times for end-to-end tests
[ https://issues.apache.org/jira/browse/FLINK-9354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477003#comment-16477003 ] ASF GitHub Bot commented on FLINK-9354: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/6008 nice catch @tzulitai, will rework this into a run_test method that takes a description and command to execute. > print execution times for end-to-end tests > -- > > Key: FLINK-9354 > URL: https://issues.apache.org/jira/browse/FLINK-9354 > Project: Flink > Issue Type: Improvement > Components: Tests, Travis >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.5.0 > > > We need to modify the end-to-end scripts to include the time it takes for a > test to run. > We currently don't have any clue how long a test actually runs for. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9374) Flink Kinesis Producer does not backpressure
Franz Thoma created FLINK-9374: -- Summary: Flink Kinesis Producer does not backpressure Key: FLINK-9374 URL: https://issues.apache.org/jira/browse/FLINK-9374 Project: Flink Issue Type: Bug Reporter: Franz Thoma Attachments: after.png, before.png The {{FlinkKinesisProducer}} just accepts records and forwards it to a {{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL internally holds an unbounded queue of records that have not yet been sent. Since Kinesis is rate-limited to 1MB per second per shard, this queue may grow indefinitely if Flink sends records faster than the KPL can forward them to Kinesis. One way to circumvent this problem is to set a record TTL, so that queued records are dropped after a certain amount of time, but this will lead to data loss under high loads. Currently the only time the queue is flushed is during checkpointing: {{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a checkpoint is reached (and will wait until the queue is flushed), or until out-of-memory, whichever is reached first. (This gets worse due to the fact that the Java KPL is only a thin wrapper around a C++ process, so it is not even the Java process that runs out of memory, but the C++ process.) The implicit rate-limit due to checkpointing leads to a ragged throughput graph like this (the periods with zero throughput are the wait times before a checkpoint): !file:///home/fthoma/projects/flink/before.png!!before.png! Throughput limited by checkpointing only My proposed solution is to add a config option {{queueLimit}} to set a maximum number of records that may be waiting in the KPL queue. If this limit is reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and wait (blocking) until the queue length is below the limit again. This automatically leads to backpressuring, since the {{FlinkKinesisProducer}} cannot accept records while waiting. For compatibility, {{queueLimit}} is set to {{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a client explicitly sets the value. Setting a »sane« default value is not possible unfortunately, since sensible values for the limit depend on the record size (the limit should be chosen so that about 10–100MB of records per shard are accumulated before flushing, otherwise the maximum Kinesis throughput may not be reached). !after.png! Throughput with a queue limit of 10 records (the spikes are checkpoints, where the queue is still flushed completely) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9374) Flink Kinesis Producer does not backpressure
[ https://issues.apache.org/jira/browse/FLINK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Franz Thoma updated FLINK-9374: --- Description: The {{FlinkKinesisProducer}} just accepts records and forwards it to a {{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL internally holds an unbounded queue of records that have not yet been sent. Since Kinesis is rate-limited to 1MB per second per shard, this queue may grow indefinitely if Flink sends records faster than the KPL can forward them to Kinesis. One way to circumvent this problem is to set a record TTL, so that queued records are dropped after a certain amount of time, but this will lead to data loss under high loads. Currently the only time the queue is flushed is during checkpointing: {{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a checkpoint is reached (and will wait until the queue is flushed), or until out-of-memory, whichever is reached first. (This gets worse due to the fact that the Java KPL is only a thin wrapper around a C++ process, so it is not even the Java process that runs out of memory, but the C++ process.) The implicit rate-limit due to checkpointing leads to a ragged throughput graph like this (the periods with zero throughput are the wait times before a checkpoint): !file:///home/fthoma/projects/flink/before.png!!before.png! Throughput limited by checkpointing only My proposed solution is to add a config option {{queueLimit}} to set a maximum number of records that may be waiting in the KPL queue. If this limit is reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and wait (blocking) until the queue length is below the limit again. This automatically leads to backpressuring, since the {{FlinkKinesisProducer}} cannot accept records while waiting. For compatibility, {{queueLimit}} is set to {{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a client explicitly sets the value. Setting a »sane« default value is not possible unfortunately, since sensible values for the limit depend on the record size (the limit should be chosen so that about 10–100MB of records per shard are accumulated before flushing, otherwise the maximum Kinesis throughput may not be reached). !after.png! Throughput with a queue limit of 10 records (the spikes are checkpoints, where the queue is still flushed completely) was: The {{FlinkKinesisProducer}} just accepts records and forwards it to a {{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL internally holds an unbounded queue of records that have not yet been sent. Since Kinesis is rate-limited to 1MB per second per shard, this queue may grow indefinitely if Flink sends records faster than the KPL can forward them to Kinesis. One way to circumvent this problem is to set a record TTL, so that queued records are dropped after a certain amount of time, but this will lead to data loss under high loads. Currently the only time the queue is flushed is during checkpointing: {{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a checkpoint is reached (and will wait until the queue is flushed), or until out-of-memory, whichever is reached first. (This gets worse due to the fact that the Java KPL is only a thin wrapper around a C++ process, so it is not even the Java process that runs out of memory, but the C++ process.) The implicit rate-limit due to checkpointing leads to a ragged throughput graph like this (the periods with zero throughput are the wait times before a checkpoint): !file:///home/fthoma/projects/flink/before.png!!before.png! Throughput limited by checkpointing only My proposed solution is to add a config option {{queueLimit}} to set a maximum number of records that may be waiting in the KPL queue. If this limit is reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and wait (blocking) until the queue length is below the limit again. This automatically leads to backpressuring, since the {{FlinkKinesisProducer}} cannot accept records while waiting. For compatibility, {{queueLimit}} is set to {{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a client explicitly sets the value. Setting a »sane« default value is not possible unfortunately, since sensible values for the limit depend on the record size (the limit should be chosen so that about 10–100MB of records per shard are accumulated before flushing, otherwise the maximum Kinesis throughput may not be reached). !after.png! Throughput with a queue limit of 10 records (the spikes are checkpoints, where the queue is still flushed completely) > Flink Kinesis Producer does not backpressure > > > Key: FLINK-9374 > URL: https://issues.apache.org/jira/browse/FLINK-9374 >
[GitHub] flink issue #6019: [FLINK-9182]async checkpoints for timer service
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6019 I wonder can you introduce a `HeapState` which scoped to `key group` to support timer service. This way timer service is backed by keyed state backend, which looks like a beautiful things. ---
[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5995 I would actually keep the package name for now. It makes sense, because the connection to the registry is avro-specific at the moment... ---
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477009#comment-16477009 ] ASF GitHub Bot commented on FLINK-9182: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6019 I wonder can you introduce a `HeapState` which scoped to `key group` to support timer service. This way timer service is backed by keyed state backend, which looks like a beautiful things. > async checkpoints for timer service > --- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copy tuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > ## when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is guarded > by write lock. > ## last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without physically > delete it from timerservice. after this, check if snapshotVersions size is 0 > (which means there is no running snapshot) and if true, delete timer .the > other place to delete is in snapshot timer's iterat: when timer's delete > version is less than min value of snapshotVersions, which means the timer is > deleted and no running snapshot should keep it. > ## some more additions: processingTimeTimers and eventTimeTimers for each > group used to be hashset and now it is changed to concurrenthashmap with > key+namesapce+timestamp as its hash key. > # related mail list thread > ## > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html > # github pull request > ## //coming soon -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477008#comment-16477008 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5995 I would actually keep the package name for now. It makes sense, because the connection to the registry is avro-specific at the moment... > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...
GitHub user fmthoma opened a pull request: https://github.com/apache/flink/pull/6021 [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpressuring ## What is the purpose of the change The `FlinkKinesisProducer` just accepts records and forwards it to a `KinesisProducer` from the Amazon Kinesis Producer Library (KPL). The KPL internally holds an unbounded queue of records that have not yet been sent. Since Kinesis is rate-limited to 1MB per second per shard, this queue may grow indefinitely if Flink sends records faster than the KPL can forward them to Kinesis. One way to circumvent this problem is to set a record TTL, so that queued records are dropped after a certain amount of time, but this will lead to data loss under high loads. Currently the only time the queue is flushed is during checkpointing: `FlinkKinesisProducer` consumes records at arbitrary rate, either until a checkpoint is reached (and will wait until the queue is flushed), or until out-of-memory, whichever is reached first. (This gets worse due to the fact that the Java KPL is only a thin wrapper around a C++ process, so it is not even the Java process that runs out of memory, but the C++ process.) My proposed solution is to add a config option `queueLimit` to set a maximum number of records that may be waiting in the KPL queue. If this limit is reached, the `FlinkKinesisProducer` should trigger a `flush()` and wait (blocking) until the queue length is below the limit again. This automatically leads to backpressuring, since the `FlinkKinesisProducer` cannot accept records while waiting. For compatibility, `queueLimit` is set to `Integer.MAX_VALUE` by default, so the behavior is unchanged unless a client explicitly sets the value. Setting a »sane« default value is not possible unfortunately, since sensible values for the limit depend on the record size (the limit should be chosen so that about 10â100MB of records per shard are accumulated before flushing, otherwise the maximum Kinesis throughput may not be reached). ## Brief change log * Add a `queueLimit` setting to `FlinkKinesisProducer` to limit the number of in-flight records in the Kinesis Producer Library, and enable backpressuring if the limit is exceeded ## Verifying this change This change added tests and can be verified as follows: * Added unit test * Manually verified the change by running a job that produces to a 2-shard Kinesis stream. The input rate is limited by Kinesis (verified that the Kinesis stream is indeed at maximum capacity). ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes, but backwards compatible (option was added) - The serializers: no - The runtime per-record code paths (performance sensitive): don't know - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: don't know - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/fmthoma/flink queueLimit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6021.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6021 commit 9a2930cbbec4cd6979e6bfacb741da820cdbb284 Author: Franz Thoma Date: 2018-05-09T06:27:47Z [FLINK-9374] [kinesis] Add hardcoded queue size limit of 10 records commit e41037eb5e07efb73ded7f945111d0d5f6e9b18b Author: Franz Thoma Date: 2018-05-09T06:56:53Z [FLINK-9374] [kinesis] Expose queueLimit option commit 9222849869da0018718072c33b32d8d935f3dec4 Author: Franz Thoma Date: 2018-05-09T07:08:11Z [FLINK-9374] [kinesis] Refactor test: Mock implementation of flush() only flushes *some*, not *all* records commit f062c5b9cd2e572da9fef0cdb5c8ea89af2a228c Author: Franz Thoma Date: 2018-05-09T11:59:05Z [FLINK-9374] [kinesis] adapt tests ---
[jira] [Commented] (FLINK-9374) Flink Kinesis Producer does not backpressure
[ https://issues.apache.org/jira/browse/FLINK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477014#comment-16477014 ] ASF GitHub Bot commented on FLINK-9374: --- GitHub user fmthoma opened a pull request: https://github.com/apache/flink/pull/6021 [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpressuring ## What is the purpose of the change The `FlinkKinesisProducer` just accepts records and forwards it to a `KinesisProducer` from the Amazon Kinesis Producer Library (KPL). The KPL internally holds an unbounded queue of records that have not yet been sent. Since Kinesis is rate-limited to 1MB per second per shard, this queue may grow indefinitely if Flink sends records faster than the KPL can forward them to Kinesis. One way to circumvent this problem is to set a record TTL, so that queued records are dropped after a certain amount of time, but this will lead to data loss under high loads. Currently the only time the queue is flushed is during checkpointing: `FlinkKinesisProducer` consumes records at arbitrary rate, either until a checkpoint is reached (and will wait until the queue is flushed), or until out-of-memory, whichever is reached first. (This gets worse due to the fact that the Java KPL is only a thin wrapper around a C++ process, so it is not even the Java process that runs out of memory, but the C++ process.) My proposed solution is to add a config option `queueLimit` to set a maximum number of records that may be waiting in the KPL queue. If this limit is reached, the `FlinkKinesisProducer` should trigger a `flush()` and wait (blocking) until the queue length is below the limit again. This automatically leads to backpressuring, since the `FlinkKinesisProducer` cannot accept records while waiting. For compatibility, `queueLimit` is set to `Integer.MAX_VALUE` by default, so the behavior is unchanged unless a client explicitly sets the value. Setting a »sane« default value is not possible unfortunately, since sensible values for the limit depend on the record size (the limit should be chosen so that about 10–100MB of records per shard are accumulated before flushing, otherwise the maximum Kinesis throughput may not be reached). ## Brief change log * Add a `queueLimit` setting to `FlinkKinesisProducer` to limit the number of in-flight records in the Kinesis Producer Library, and enable backpressuring if the limit is exceeded ## Verifying this change This change added tests and can be verified as follows: * Added unit test * Manually verified the change by running a job that produces to a 2-shard Kinesis stream. The input rate is limited by Kinesis (verified that the Kinesis stream is indeed at maximum capacity). ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes, but backwards compatible (option was added) - The serializers: no - The runtime per-record code paths (performance sensitive): don't know - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: don't know - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/fmthoma/flink queueLimit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6021.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6021 commit 9a2930cbbec4cd6979e6bfacb741da820cdbb284 Author: Franz Thoma Date: 2018-05-09T06:27:47Z [FLINK-9374] [kinesis] Add hardcoded queue size limit of 10 records commit e41037eb5e07efb73ded7f945111d0d5f6e9b18b Author: Franz Thoma Date: 2018-05-09T06:56:53Z [FLINK-9374] [kinesis] Expose queueLimit option commit 9222849869da0018718072c33b32d8d935f3dec4 Author: Franz Thoma Date: 2018-05-09T07:08:11Z [FLINK-9374] [kinesis] Refactor test: Mock implementation of flush() only flushes *some*, not *all* records commit f062c5b9cd2e572da9fef0cdb5c8ea89af2a228c Author: Franz Thoma Date: 2018-05-09T11:59:05Z [FLINK-9374] [kinesis] adapt tests > Flink Kinesis Producer does not backpressure > > > Key: FLINK-9374 > URL: https://issues.apache.org/jira/browse/FLINK-9374 > Project: Flink > Issue Type: Bug >Reporter: Fr
[GitHub] flink pull request #6022: [FLINK-9283] Update cluster execution docs
GitHub user yuqi1129 opened a pull request: https://github.com/apache/flink/pull/6022 [FLINK-9283] Update cluster execution docs ## What is the purpose of the change This pull request is to fix port error in example about using `RemoteStreamEnvironment` ## Brief change log - Change port from 6123 to 8081 in example ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yuqi1129/flink FLINK-9283 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6022.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6022 commit 7adf511f21e14797610fccd8b6e4dd9ba72410e4 Author: hzyuqi1 Date: 2018-05-16T08:06:33Z [FLINK-9283] Update cluster execution docs This closes #9283. ---
[jira] [Commented] (FLINK-9283) Update cluster execution docs
[ https://issues.apache.org/jira/browse/FLINK-9283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477017#comment-16477017 ] ASF GitHub Bot commented on FLINK-9283: --- GitHub user yuqi1129 opened a pull request: https://github.com/apache/flink/pull/6022 [FLINK-9283] Update cluster execution docs ## What is the purpose of the change This pull request is to fix port error in example about using `RemoteStreamEnvironment` ## Brief change log - Change port from 6123 to 8081 in example ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yuqi1129/flink FLINK-9283 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6022.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6022 commit 7adf511f21e14797610fccd8b6e4dd9ba72410e4 Author: hzyuqi1 Date: 2018-05-16T08:06:33Z [FLINK-9283] Update cluster execution docs This closes #9283. > Update cluster execution docs > - > > Key: FLINK-9283 > URL: https://issues.apache.org/jira/browse/FLINK-9283 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: yuqi >Priority: Major > Fix For: 1.5.0 > > > The [Cluster > Execution|https://ci.apache.org/projects/flink/flink-docs-master/dev/cluster_execution.html#cluster-execution] > page must be updated for 1.5. > The [RemoteEnvironment > example|https://ci.apache.org/projects/flink/flink-docs-master/dev/cluster_execution.html#example] > should use the port {{8081}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6009: [FLINK-9357][tests][yarn] Add margins to exception excerp...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6009 merging. ---
[jira] [Commented] (FLINK-9357) Add margins to yarn exception excerpts
[ https://issues.apache.org/jira/browse/FLINK-9357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477022#comment-16477022 ] ASF GitHub Bot commented on FLINK-9357: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/6009 merging. > Add margins to yarn exception excerpts > -- > > Key: FLINK-9357 > URL: https://issues.apache.org/jira/browse/FLINK-9357 > Project: Flink > Issue Type: Improvement > Components: Tests, YARN >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.5.0 > > > The yarn tests check the log files for exceptions to detect test failures. If > detected a test will fail and an excerpt from the logs will be printed. > The excerpt content is currently the stack of the detected exception. This > only works correctly if the stacktrace follows a specific formatting style; > for example if an exception message contains an empty line the output will be > cut off. > I propose including the 10 before/after the found exception to make this a > bit more reliable. As a side-effect we also get a little contextual > information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8918) Introduce Runtime Filter Join
[ https://issues.apache.org/jira/browse/FLINK-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-8918: -- Description: In general, stream join is one of the most performance cost task. For every record from both side, we need to query the state from the other side, this will lead to poor performance when the state size if huge. So, in production, we always need to spend a lot slots to handle stream join. But, indeed, we can improve this in somehow, there a phenomenon of stream join can be found in production. That's the `joined ratio` of the stream join is often very low, for example. - stream join in promotion analysis: Job need to join the promotion log with the action(click, view, payment, collection, retweet) log with the `promotion_id` to analysis the effect of the promotion. - stream join in AD(advertising) attribution: Job need to join the AD click log with the item payment log on the `click_id` to find which click of which AD that brings the payment to do attribution. - stream join in click log analysis of doc: Job need to join viewed log(doc viewed by users) with the click log (doc clicked by users) to analysis the reason of the click and the property of the users. - ….so on All these cases have one common property, that is the _joined ratio_ is very low. Here is a example to describe it, imagine that, we have 1 records from the left stream, and 1 records from the right stream, and we execute _select * from leftStream l join rightStream r on l.id = r.id_ , we only got 100 record from the result, that is the case for low _joined ratio_, this is an example for inner join, but it can also apply to left & right join. there are more example I can come up with low _joined ratio_ , but the most important point I want to expressed is that, the low _joined ratio_ of stream join in production is a very common phenomenon(maybe the almost common phenomenon in some companies, at least in our company that is the case). *Then how to improve it?* We can see from the above case, 1 record join 1 record we only got 100 result, that means, we query the state 2 times (1 for the left stream and 1 for the right stream) but only 100 of them are meaningful!!! If we could reduce the useless query times, then we can definitely improve the performance of stream join. the way we used to improve this is to introduce the _Runtime Filter Join_, the mainly ideal is that, we build a _filter_ for the state on each side (left stream & right stream). When we need to query the state on that side we first check the corresponding _filter_ whether the _key_ is possible in the state, if the _filter_ say "not, it impossible in the state", then we stop querying the state, if it say "hmm, it maybe in state", then we need to query the state. As you can see, the best choose of the _filter_ is _Bloom Filter_, it has all the feature that we expected: _extremely good performance_, _non-existence of false negative_. *the simplest pseudo code for _Runtime Filter Join_(the comments inline are based on RocksDBBackend)* {code:java} void performJoinNormally(Record recordFromLeftStream) { Iterator rightIterator = rigthStreamState.iterator(); // perform the `seek()` on the RocksDB, and iterator one by one, // this is an expensive operation especially when the key can't be found in RocksDB. for (Record recordFromRightState : rightIterator) { ... } } void performRuntimeFilterJoin(Record recordFromLeftStream) { Iterator rightIterator = EMPTY_ITERATOR; if (rigthStreamfilter.containsCurrentKey()) { rightIterator = rigthStreamState.iterator(); } // perform the `seek()` only when filter.containsCurrentKey() return true for (Record recordFromRightState : rightIterator) { ... } // add the current key into the filter of left stream. leftStreamFilter.addCurrentKey(); } {code} A description of Runtime Filter Join for batch join can be found [here|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html] (even though it not for stream join original, but we can easily refer it to `stream join`) was: In general, stream join is one of the most performance cost task. For every record from both side, we need to query the state from the other side, this will lead to poor performance when the state size if huge. So, in production, we always need to spend a lot slots to handle stream join. But, indeed, we can improve this in somehow, there a phenomenon of stream join can be found in production. That's the `joined ratio` of the stream join is often very low, for example. - stream join in promotion analysis: Job need to join the promotion log with the action(click, view, payment, collection, retweet) log with the `promotion_id` to analysis the effect of the promotion.
[GitHub] flink issue #6022: [FLINK-9283] Update cluster execution docs
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6022 merging. ---
[jira] [Created] (FLINK-9375) Introduce AbortCheckpoint message from JM to TMs
Stefan Richter created FLINK-9375: - Summary: Introduce AbortCheckpoint message from JM to TMs Key: FLINK-9375 URL: https://issues.apache.org/jira/browse/FLINK-9375 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Reporter: Stefan Richter We should introduce an {{AbortCheckpoint}} message that a jobmanager can send to taskmanagers if a checkpoint is canceled so that the operators can eagerly stop their alignment phase and continue to normal processing. This can reduce some backpressure issues in the context of canceled and restarted checkpoints. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9283) Update cluster execution docs
[ https://issues.apache.org/jira/browse/FLINK-9283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477026#comment-16477026 ] ASF GitHub Bot commented on FLINK-9283: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/6022 merging. > Update cluster execution docs > - > > Key: FLINK-9283 > URL: https://issues.apache.org/jira/browse/FLINK-9283 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: yuqi >Priority: Major > Fix For: 1.5.0 > > > The [Cluster > Execution|https://ci.apache.org/projects/flink/flink-docs-master/dev/cluster_execution.html#cluster-execution] > page must be updated for 1.5. > The [RemoteEnvironment > example|https://ci.apache.org/projects/flink/flink-docs-master/dev/cluster_execution.html#example] > should use the port {{8081}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6013: [FLINK-9284] [doc] update the cli page,change rest port f...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6013 merging. ---
[jira] [Commented] (FLINK-9284) Update CLI page
[ https://issues.apache.org/jira/browse/FLINK-9284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477033#comment-16477033 ] ASF GitHub Bot commented on FLINK-9284: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/6013 merging. > Update CLI page > --- > > Key: FLINK-9284 > URL: https://issues.apache.org/jira/browse/FLINK-9284 > Project: Flink > Issue Type: Improvement > Components: Client, Documentation >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Triones Deng >Priority: Critical > Fix For: 1.5.0 > > > The [CLI|https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html] > page must be updated for 1.5. > The > [examples|https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html#examples] > using the {{-m}} option must be updated to use {{8081}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6020: [FLINK-9373][state] Fix potential data losing for RocksDB...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6020 cc @StefanRRichter ---
[jira] [Commented] (FLINK-9373) Always call RocksIterator.status() to check the internal error of RocksDB
[ https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477035#comment-16477035 ] ASF GitHub Bot commented on FLINK-9373: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6020 cc @StefanRRichter > Always call RocksIterator.status() to check the internal error of RocksDB > - > > Key: FLINK-9373 > URL: https://issues.apache.org/jira/browse/FLINK-9373 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Blocker > > Currently, when using RocksIterator we only use the _iterator.isValid()_ to > check whether we have reached the end of the iterator. But that is not > enough, if we refer to RocksDB's wiki > https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should > find that even if _iterator.isValid()=true_, there may also exist some > internal error. A safer way to use the _RocksIterator_ is to always call the > _iterator.status()_ to check the internal error of _RocksDB_. There is a case > from user email seems to lost data because of this > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6005: [FLINK-9176][tests] Remove category annotations
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6005 merging. ---
[GitHub] flink issue #6010: [FLINK-9359][docs] Update quickstart docs to only mention...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6010 merging. ---
[jira] [Commented] (FLINK-9176) Category annotations are unused
[ https://issues.apache.org/jira/browse/FLINK-9176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477038#comment-16477038 ] ASF GitHub Bot commented on FLINK-9176: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/6005 merging. > Category annotations are unused > --- > > Key: FLINK-9176 > URL: https://issues.apache.org/jira/browse/FLINK-9176 > Project: Flink > Issue Type: Bug > Components: Build System, Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: mingleizhang >Priority: Critical > Fix For: 1.5.0 > > > The {{LegacyAndNew}} and {{New}} annotations, that were previously used to > disable tests based on whether the {{legacyCode}} profile is active, are > effectively unused. > While several tests are annotated with them they are never used in the actual > {{surefire}} configuration. > We should either re-introduce them into the {{surefire}} configuration, or > remove them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9359) Update quickstart docs to only mention Java 8
[ https://issues.apache.org/jira/browse/FLINK-9359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477037#comment-16477037 ] ASF GitHub Bot commented on FLINK-9359: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/6010 merging. > Update quickstart docs to only mention Java 8 > - > > Key: FLINK-9359 > URL: https://issues.apache.org/jira/browse/FLINK-9359 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.5.0, 1.4.2, 1.6.0 >Reporter: David Anderson >Assignee: David Anderson >Priority: Major > Fix For: 1.5.0, 1.4.2, 1.6.0 > > > Java 7 support was dropped from Flink 1.4, and Java 9 and 10 aren't yet > supported, but the quickstart docs still say "the only requirement is to have > a working *Java 7.x* (or higher) installation". -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9360) HA end-to-end nightly test takes more than 15 min in Travis CI
[ https://issues.apache.org/jira/browse/FLINK-9360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477040#comment-16477040 ] Chesnay Schepler commented on FLINK-9360: - I will push a hotfix to reduce the sleep to 30 seconds. > HA end-to-end nightly test takes more than 15 min in Travis CI > -- > > Key: FLINK-9360 > URL: https://issues.apache.org/jira/browse/FLINK-9360 > Project: Flink > Issue Type: Test > Components: Tests >Affects Versions: 1.5.0 >Reporter: Andrey Zagrebin >Priority: Major > Labels: E2E, Nightly > Fix For: 1.5.0 > > > We have not discussed how long the nightly tests should run. Currently > overall testing build time is around the limit of free Travis CI VM (50 min). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9333) QuickStart Docs Spelling fix and some info regarding IntelliJ JVM Options
[ https://issues.apache.org/jira/browse/FLINK-9333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477043#comment-16477043 ] ASF GitHub Bot commented on FLINK-9333: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5989 merging. > QuickStart Docs Spelling fix and some info regarding IntelliJ JVM Options > - > > Key: FLINK-9333 > URL: https://issues.apache.org/jira/browse/FLINK-9333 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.4.1, 1.4.2 >Reporter: Yazdan Shirvany >Assignee: Yazdan Shirvany >Priority: Trivial > Labels: document, spelling > > - Spelling fix for QuickStart Project Template for Java > - Adding more details regarding changing JVM options in IntelliJ IDEA -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5989: [FLINK-9333] [Docs] QuickStart Docs Spelling fix and some...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5989 merging. ---
[jira] [Assigned] (FLINK-9375) Introduce AbortCheckpoint message from JM to TMs
[ https://issues.apache.org/jira/browse/FLINK-9375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-9375: --- Assignee: vinoyang > Introduce AbortCheckpoint message from JM to TMs > > > Key: FLINK-9375 > URL: https://issues.apache.org/jira/browse/FLINK-9375 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: vinoyang >Priority: Major > > We should introduce an {{AbortCheckpoint}} message that a jobmanager can send > to taskmanagers if a checkpoint is canceled so that the operators can eagerly > stop their alignment phase and continue to normal processing. This can reduce > some backpressure issues in the context of canceled and restarted checkpoints. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9341) Oracle: "Type is not supported: Date"
[ https://issues.apache.org/jira/browse/FLINK-9341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477066#comment-16477066 ] Fabian Hueske commented on FLINK-9341: -- Thanks for investigating the issue [~kgeis] and [~Sergey Nuyanzin]. I completely agree that we should improve the error message. Would one of you mind to create a corresponding JIRA issue and maybe also be interested in contributing the improvement? Thanks, Fabian > Oracle: "Type is not supported: Date" > - > > Key: FLINK-9341 > URL: https://issues.apache.org/jira/browse/FLINK-9341 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.2 >Reporter: Ken Geis >Priority: Major > > When creating a Table from an Oracle JDBCInputFormat with a date column, I > get the error "Type is not supported: Date". This happens with as simple a > query as > {code:java} > SELECT SYSDATE FROM DUAL{code} > Stack trace: > {noformat} > Caused by: org.apache.flink.table.api.TableException: Type is not supported: > Date > at > org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:336) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromTypeInfo(FlinkTypeFactory.scala:68) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildLogicalRowType$1.apply(FlinkTypeFactory.scala:198) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildLogicalRowType$1.apply(FlinkTypeFactory.scala:195) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > ~[scala-library-2.11.11.jar:na] > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > ~[scala-library-2.11.11.jar:na] > at > org.apache.flink.table.calcite.FlinkTypeFactory.buildLogicalRowType(FlinkTypeFactory.scala:195) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.plan.schema.InlineTable.getRowType(InlineTable.scala:105) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.api.TableEnvironment.scanInternal(TableEnvironment.scala:499) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.api.TableEnvironment.scan(TableEnvironment.scala:485) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.api.java.BatchTableEnvironment.fromDataSet(BatchTableEnvironment.scala:61) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.api.java.BatchTableEnvironment$fromDataSet$0.call(Unknown > Source) ~[na:na] > (at my code...) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r188545921 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -381,6 +393,9 @@ protected void sanityCheck() { if (query == null || query.length() == 0) { throw new IllegalArgumentException("Query must not be null or empty."); } + if (keyspace != null && keyspace.length() != 0) { --- End diff -- we only have to check for non-null. ---
[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r188545761 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -381,6 +393,9 @@ protected void sanityCheck() { if (query == null || query.length() == 0) { throw new IllegalArgumentException("Query must not be null or empty."); } + if (keyspace != null && keyspace.length() != 0) { + throw new IllegalArgumentException("Specifying a keyspace is only allowed when using a Pojo-Stream as input."); --- End diff -- "default keyspace" ---
[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r188546038 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -470,6 +488,9 @@ protected void sanityCheck() { if (query == null || query.length() == 0) { throw new IllegalArgumentException("Query must not be null or empty."); } + if (keyspace != null && keyspace.length() != 0) { + throw new IllegalArgumentException("Specifying a keyspace is only allowed when using a Pojo-Stream as input."); --- End diff -- same as above ---
[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r188546432 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -429,6 +429,25 @@ public void testCassandraPojoAtLeastOnceSink() throws Exception { Assert.assertEquals(20, rs.all().size()); } + @Test + public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() throws Exception { + session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "test2")); --- End diff -- let's use a more descriptive table name, to avoid conflicts in the future. ---
[jira] [Commented] (FLINK-8655) Add a default keyspace to CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-8655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477070#comment-16477070 ] ASF GitHub Bot commented on FLINK-8655: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r188546432 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -429,6 +429,25 @@ public void testCassandraPojoAtLeastOnceSink() throws Exception { Assert.assertEquals(20, rs.all().size()); } + @Test + public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() throws Exception { + session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "test2")); --- End diff -- let's use a more descriptive table name, to avoid conflicts in the future. > Add a default keyspace to CassandraSink > --- > > Key: FLINK-8655 > URL: https://issues.apache.org/jira/browse/FLINK-8655 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Affects Versions: 1.4.0 >Reporter: Christopher Hughes >Priority: Minor > Labels: features > Fix For: 1.6.0 > > > Currently, to use the CassandraPojoSink, it is necessary for a user to > provide keyspace information on the desired POJOs using datastax annotations. > This allows various POJOs to be written to multiple keyspaces while sinking > messages, but prevent runtime flexibility. > For many developers, non-production environments may all share a single > Cassandra instance differentiated by keyspace names. I propose adding a > `defaultKeyspace(String keyspace)` to the ClusterBuilder. POJOs lacking a > definitive keyspace would attempt to be loaded to the provided default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r188545817 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -410,6 +425,9 @@ protected void sanityCheck() { if (query == null || query.length() == 0) { throw new IllegalArgumentException("Query must not be null or empty."); } + if (keyspace != null && keyspace.length() != 0) { + throw new IllegalArgumentException("Specifying a keyspace is only allowed when using a Pojo-Stream as input."); --- End diff -- same as above ---
[jira] [Commented] (FLINK-8655) Add a default keyspace to CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-8655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477069#comment-16477069 ] ASF GitHub Bot commented on FLINK-8655: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r188545721 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -258,6 +259,17 @@ public CassandraSinkBuilder(DataStream input, TypeInformation typeInfo, return this; } + /** +* Sets the keyspace to be used. +* +* @param keyspace keyspace to use +* @return this builder +*/ + public CassandraSinkBuilder setKeyspace(String keyspace) { --- End diff -- rename to `setDefaultKeyspace` > Add a default keyspace to CassandraSink > --- > > Key: FLINK-8655 > URL: https://issues.apache.org/jira/browse/FLINK-8655 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Affects Versions: 1.4.0 >Reporter: Christopher Hughes >Priority: Minor > Labels: features > Fix For: 1.6.0 > > > Currently, to use the CassandraPojoSink, it is necessary for a user to > provide keyspace information on the desired POJOs using datastax annotations. > This allows various POJOs to be written to multiple keyspaces while sinking > messages, but prevent runtime flexibility. > For many developers, non-production environments may all share a single > Cassandra instance differentiated by keyspace names. I propose adding a > `defaultKeyspace(String keyspace)` to the ClusterBuilder. POJOs lacking a > definitive keyspace would attempt to be loaded to the provided default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r188545721 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -258,6 +259,17 @@ public CassandraSinkBuilder(DataStream input, TypeInformation typeInfo, return this; } + /** +* Sets the keyspace to be used. +* +* @param keyspace keyspace to use +* @return this builder +*/ + public CassandraSinkBuilder setKeyspace(String keyspace) { --- End diff -- rename to `setDefaultKeyspace` ---
[jira] [Commented] (FLINK-8655) Add a default keyspace to CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-8655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477072#comment-16477072 ] ASF GitHub Bot commented on FLINK-8655: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r188545817 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -410,6 +425,9 @@ protected void sanityCheck() { if (query == null || query.length() == 0) { throw new IllegalArgumentException("Query must not be null or empty."); } + if (keyspace != null && keyspace.length() != 0) { + throw new IllegalArgumentException("Specifying a keyspace is only allowed when using a Pojo-Stream as input."); --- End diff -- same as above > Add a default keyspace to CassandraSink > --- > > Key: FLINK-8655 > URL: https://issues.apache.org/jira/browse/FLINK-8655 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Affects Versions: 1.4.0 >Reporter: Christopher Hughes >Priority: Minor > Labels: features > Fix For: 1.6.0 > > > Currently, to use the CassandraPojoSink, it is necessary for a user to > provide keyspace information on the desired POJOs using datastax annotations. > This allows various POJOs to be written to multiple keyspaces while sinking > messages, but prevent runtime flexibility. > For many developers, non-production environments may all share a single > Cassandra instance differentiated by keyspace names. I propose adding a > `defaultKeyspace(String keyspace)` to the ClusterBuilder. POJOs lacking a > definitive keyspace would attempt to be loaded to the provided default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8655) Add a default keyspace to CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-8655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477073#comment-16477073 ] ASF GitHub Bot commented on FLINK-8655: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r188546038 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -470,6 +488,9 @@ protected void sanityCheck() { if (query == null || query.length() == 0) { throw new IllegalArgumentException("Query must not be null or empty."); } + if (keyspace != null && keyspace.length() != 0) { + throw new IllegalArgumentException("Specifying a keyspace is only allowed when using a Pojo-Stream as input."); --- End diff -- same as above > Add a default keyspace to CassandraSink > --- > > Key: FLINK-8655 > URL: https://issues.apache.org/jira/browse/FLINK-8655 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Affects Versions: 1.4.0 >Reporter: Christopher Hughes >Priority: Minor > Labels: features > Fix For: 1.6.0 > > > Currently, to use the CassandraPojoSink, it is necessary for a user to > provide keyspace information on the desired POJOs using datastax annotations. > This allows various POJOs to be written to multiple keyspaces while sinking > messages, but prevent runtime flexibility. > For many developers, non-production environments may all share a single > Cassandra instance differentiated by keyspace names. I propose adding a > `defaultKeyspace(String keyspace)` to the ClusterBuilder. POJOs lacking a > definitive keyspace would attempt to be loaded to the provided default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8655) Add a default keyspace to CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-8655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477071#comment-16477071 ] ASF GitHub Bot commented on FLINK-8655: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r188545921 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -381,6 +393,9 @@ protected void sanityCheck() { if (query == null || query.length() == 0) { throw new IllegalArgumentException("Query must not be null or empty."); } + if (keyspace != null && keyspace.length() != 0) { --- End diff -- we only have to check for non-null. > Add a default keyspace to CassandraSink > --- > > Key: FLINK-8655 > URL: https://issues.apache.org/jira/browse/FLINK-8655 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Affects Versions: 1.4.0 >Reporter: Christopher Hughes >Priority: Minor > Labels: features > Fix For: 1.6.0 > > > Currently, to use the CassandraPojoSink, it is necessary for a user to > provide keyspace information on the desired POJOs using datastax annotations. > This allows various POJOs to be written to multiple keyspaces while sinking > messages, but prevent runtime flexibility. > For many developers, non-production environments may all share a single > Cassandra instance differentiated by keyspace names. I propose adding a > `defaultKeyspace(String keyspace)` to the ClusterBuilder. POJOs lacking a > definitive keyspace would attempt to be loaded to the provided default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8655) Add a default keyspace to CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-8655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477074#comment-16477074 ] ASF GitHub Bot commented on FLINK-8655: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r188545761 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -381,6 +393,9 @@ protected void sanityCheck() { if (query == null || query.length() == 0) { throw new IllegalArgumentException("Query must not be null or empty."); } + if (keyspace != null && keyspace.length() != 0) { + throw new IllegalArgumentException("Specifying a keyspace is only allowed when using a Pojo-Stream as input."); --- End diff -- "default keyspace" > Add a default keyspace to CassandraSink > --- > > Key: FLINK-8655 > URL: https://issues.apache.org/jira/browse/FLINK-8655 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Affects Versions: 1.4.0 >Reporter: Christopher Hughes >Priority: Minor > Labels: features > Fix For: 1.6.0 > > > Currently, to use the CassandraPojoSink, it is necessary for a user to > provide keyspace information on the desired POJOs using datastax annotations. > This allows various POJOs to be written to multiple keyspaces while sinking > messages, but prevent runtime flexibility. > For many developers, non-production environments may all share a single > Cassandra instance differentiated by keyspace names. I propose adding a > `defaultKeyspace(String keyspace)` to the ClusterBuilder. POJOs lacking a > definitive keyspace would attempt to be loaded to the provided default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5946: [FLINK-9285][REST][docs] Update REST API docs
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5946 merging. ---
[jira] [Commented] (FLINK-9285) Update REST API page
[ https://issues.apache.org/jira/browse/FLINK-9285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477075#comment-16477075 ] ASF GitHub Bot commented on FLINK-9285: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5946 merging. > Update REST API page > > > Key: FLINK-9285 > URL: https://issues.apache.org/jira/browse/FLINK-9285 > Project: Flink > Issue Type: Improvement > Components: Documentation, REST >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Critical > Fix For: 1.5.0 > > > The [REST > API|https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html] > must be updated for 1.5. > The [Available > requests|https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html#available-requests] > section still predominantly lists legacy calls. These should be either > removed or moved to the bottom, and explicitly marked as legacy. > The [developing > section|https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html#developing] > must be updated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6008: [FLINK-9354][travis] Print execution times for nig...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6008#discussion_r188548653 --- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh --- @@ -33,8 +33,8 @@ else NUM_SLOTS=$NEW_DOP fi -STATE_BACKEND_TYPE=${STATE_BACKEND_TYPE:-file} -STATE_BACKEND_FILE_ASYNC=${STATE_BACKEND_FILE_ASYNC:-true} +STATE_BACKEND_TYPE=${3:-file} +STATE_BACKEND_FILE_ASYNC=${4:-true} --- End diff -- We should also update the usage message at the beginning of this file to reflect these extra parameters. ---
[jira] [Commented] (FLINK-9354) print execution times for end-to-end tests
[ https://issues.apache.org/jira/browse/FLINK-9354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477086#comment-16477086 ] ASF GitHub Bot commented on FLINK-9354: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6008#discussion_r188548653 --- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh --- @@ -33,8 +33,8 @@ else NUM_SLOTS=$NEW_DOP fi -STATE_BACKEND_TYPE=${STATE_BACKEND_TYPE:-file} -STATE_BACKEND_FILE_ASYNC=${STATE_BACKEND_FILE_ASYNC:-true} +STATE_BACKEND_TYPE=${3:-file} +STATE_BACKEND_FILE_ASYNC=${4:-true} --- End diff -- We should also update the usage message at the beginning of this file to reflect these extra parameters. > print execution times for end-to-end tests > -- > > Key: FLINK-9354 > URL: https://issues.apache.org/jira/browse/FLINK-9354 > Project: Flink > Issue Type: Improvement > Components: Tests, Travis >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.5.0 > > > We need to modify the end-to-end scripts to include the time it takes for a > test to run. > We currently don't have any clue how long a test actually runs for. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5928: [hotfix][doc] fix doc of externalized checkpoint
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5928#discussion_r188548955 --- Diff: docs/ops/state/checkpoints.md --- @@ -35,60 +35,62 @@ the same semantics as a failure-free execution. See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) for how to enable and configure checkpoints for your program. -## Externalized Checkpoints +## Retain The Checkpoints --- End diff -- "Retained Checkpoints" ---
[GitHub] flink pull request #5928: [hotfix][doc] fix doc of externalized checkpoint
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5928#discussion_r188548729 --- Diff: docs/_includes/generated/checkpointing_configuration.html --- @@ -40,7 +40,7 @@ state.checkpoints.dir (none) -The default directory used for checkpoints. Used by the state backends that write checkpoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). +The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. Note: the storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). --- End diff -- typo: "Note: The" ---
[GitHub] flink pull request #5928: [hotfix][doc] fix doc of externalized checkpoint
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5928#discussion_r188549219 --- Diff: docs/ops/state/checkpoints.md --- @@ -35,60 +35,62 @@ the same semantics as a failure-free execution. See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) for how to enable and configure checkpoints for your program. -## Externalized Checkpoints +## Retain The Checkpoints Checkpoints are by default not persisted externally and are only used to resume a job from failures. They are deleted when a program is cancelled. You can, however, configure periodic checkpoints to be persisted externally -similarly to [savepoints](savepoints.html). These *externalized checkpoints* -write their meta data out to persistent storage and are *not* automatically -cleaned up when the job fails. This way, you will have a checkpoint around -to resume from if your job fails. +similarly to [savepoints](savepoints.html). This way, you will have a persisted +checkpoint around to resume from if your job fails. {% highlight java %} CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); {% endhighlight %} -The `ExternalizedCheckpointCleanup` mode configures what happens with externalized checkpoints when you cancel the job: +The `ExternalizedCheckpointCleanup` mode configures what happens with checkpoints when you cancel the job: -- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the externalized checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case. +- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case. -- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the externalized checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails. +- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails. ### Directory Structure -Similarly to [savepoints](savepoints.html), an externalized checkpoint consists -of a meta data file and, depending on the state back-end, some additional data -files. The **target directory** for the externalized checkpoint's meta data is -determined from the configuration key `state.checkpoints.dir` which, currently, -can only be set via the configuration files. +Similarly to [savepoints](savepoints.html), an checkpoint consists --- End diff -- typo: "a checkpoint" ---
[GitHub] flink pull request #6008: [FLINK-9354][travis] Print execution times for nig...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6008#discussion_r188550129 --- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh --- @@ -33,8 +33,8 @@ else NUM_SLOTS=$NEW_DOP fi -STATE_BACKEND_TYPE=${STATE_BACKEND_TYPE:-file} -STATE_BACKEND_FILE_ASYNC=${STATE_BACKEND_FILE_ASYNC:-true} +STATE_BACKEND_TYPE=${3:-file} +STATE_BACKEND_FILE_ASYNC=${4:-true} --- End diff -- will do ð ---
[jira] [Commented] (FLINK-9354) print execution times for end-to-end tests
[ https://issues.apache.org/jira/browse/FLINK-9354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477097#comment-16477097 ] ASF GitHub Bot commented on FLINK-9354: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6008#discussion_r188550129 --- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh --- @@ -33,8 +33,8 @@ else NUM_SLOTS=$NEW_DOP fi -STATE_BACKEND_TYPE=${STATE_BACKEND_TYPE:-file} -STATE_BACKEND_FILE_ASYNC=${STATE_BACKEND_FILE_ASYNC:-true} +STATE_BACKEND_TYPE=${3:-file} +STATE_BACKEND_FILE_ASYNC=${4:-true} --- End diff -- will do 👍 > print execution times for end-to-end tests > -- > > Key: FLINK-9354 > URL: https://issues.apache.org/jira/browse/FLINK-9354 > Project: Flink > Issue Type: Improvement > Components: Tests, Travis >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.5.0 > > > We need to modify the end-to-end scripts to include the time it takes for a > test to run. > We currently don't have any clue how long a test actually runs for. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6001: [FLINK-9299] ProcessWindowFunction documentation Java exa...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6001 @StephanEwen fixed based on your suggestion ---
[GitHub] flink issue #5961: [FLINK-8255][DataSet API, DataStream API] key expressions...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5961 Thanks for the update @snuyanzin. I'll try to have a look at the changes in the next days. Best, Fabian ---
[jira] [Commented] (FLINK-8255) Key expressions on named row types do not work
[ https://issues.apache.org/jira/browse/FLINK-8255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477104#comment-16477104 ] ASF GitHub Bot commented on FLINK-8255: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5961 Thanks for the update @snuyanzin. I'll try to have a look at the changes in the next days. Best, Fabian > Key expressions on named row types do not work > -- > > Key: FLINK-8255 > URL: https://issues.apache.org/jira/browse/FLINK-8255 > Project: Flink > Issue Type: Bug > Components: DataSet API, DataStream API >Affects Versions: 1.4.0, 1.5.0 >Reporter: Timo Walther >Assignee: Sergey Nuyanzin >Priority: Major > > The following program fails with a {{ClassCastException}}. It seems that key > expressions and rows are not tested well. We should add more tests for them. > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > TypeInformation[] types = new TypeInformation[] {Types.INT, Types.INT}; > String[] fieldNames = new String[]{"id", "value"}; > RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames); > env.fromCollection(Collections.singleton(new Row(2)), rowTypeInfo) > .keyBy("id").sum("value").print(); > env.execute("Streaming WordCount"); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9299) ProcessWindowFunction documentation Java examples have errors
[ https://issues.apache.org/jira/browse/FLINK-9299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477103#comment-16477103 ] ASF GitHub Bot commented on FLINK-9299: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6001 @StephanEwen fixed based on your suggestion > ProcessWindowFunction documentation Java examples have errors > - > > Key: FLINK-9299 > URL: https://issues.apache.org/jira/browse/FLINK-9299 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.4.2 >Reporter: Ken Krugler >Assignee: vinoyang >Priority: Minor > > In looking at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation], > I noticed a few errors... > * "This allows to incrementally compute windows" should be "This allows it > to incrementally compute windows" > * DataStream input = ...; should be > DataStream> input = ...; > * The getResult() method needs to cast one of the accumulator values to a > double, if that's what it is going to return. > * MyProcessWindowFunction needs to extend, not implement > ProcessWindowFunction > * MyProcessWindowFunction needs to implement a process() method, not an > apply() method. > * The call to .timeWindow takes a Time parameter, not a window assigner. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-766) Web interface progress monitoring for DataSources and DataSinks with splits
[ https://issues.apache.org/jira/browse/FLINK-766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477115#comment-16477115 ] Flavio Pompermaier commented on FLINK-766: -- +1 for this feature! > Web interface progress monitoring for DataSources and DataSinks with splits > --- > > Key: FLINK-766 > URL: https://issues.apache.org/jira/browse/FLINK-766 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Reporter: GitHub Import >Priority: Minor > Labels: github-import > Fix For: pre-apache > > > The progress monitoring for DataSources and DataSinks can be improved by > including the number of processed vs total splits into the progress. > Imported from GitHub > Url: https://github.com/stratosphere/stratosphere/issues/766 > Created by: [rmetzger|https://github.com/rmetzger] > Labels: enhancement, gui, simple-issue, > Milestone: Release 0.6 (unplanned) > Created at: Wed May 07 12:05:54 CEST 2014 > State: open -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477127#comment-16477127 ] Fabian Hueske commented on FLINK-8866: -- Regarding (3), originally, {{TableSink}} was designed to be flexible with regard to the output schema. A query that should be emitted was planned by the optimizer. Based on the resulting type, the TableSink was internally configured for the result schema. The configuration method produces a configured copy of the sink that is used to emit the result. So, the TableSink was not known to Calcite and only handled by the TableEnvironment. When we added support for {{INSERT INTO}} this didn't work anymore because Calcite validates that the schema of the target table is compatible with the result schema of the SELECT query. Hence, we added the field names and types to the registration, configure the TableSink, and register the newly configured TableSink in Calcite's catalog. By doing it this way, we did not have to change the interface of the TableSink which did not only mean backwards compatibility but also that all TableSinks can be used in either way. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9373) Fix potential data losing for RocksDBBackend
[ https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-9373: -- Summary: Fix potential data losing for RocksDBBackend (was: Always call RocksIterator.status() to check the internal error of RocksDB) > Fix potential data losing for RocksDBBackend > > > Key: FLINK-9373 > URL: https://issues.apache.org/jira/browse/FLINK-9373 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Blocker > > Currently, when using RocksIterator we only use the _iterator.isValid()_ to > check whether we have reached the end of the iterator. But that is not > enough, if we refer to RocksDB's wiki > https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should > find that even if _iterator.isValid()=true_, there may also exist some > internal error. A safer way to use the _RocksIterator_ is to always call the > _iterator.status()_ to check the internal error of _RocksDB_. There is a case > from user email seems to lost data because of this > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9373) Fix potential data losing for RocksDBBackend
[ https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-9373: -- Fix Version/s: 1.5.0 > Fix potential data losing for RocksDBBackend > > > Key: FLINK-9373 > URL: https://issues.apache.org/jira/browse/FLINK-9373 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Blocker > Fix For: 1.5.0 > > > Currently, when using RocksIterator we only use the _iterator.isValid()_ to > check whether we have reached the end of the iterator. But that is not > enough, if we refer to RocksDB's wiki > https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should > find that even if _iterator.isValid()=true_, there may also exist some > internal error. A safer way to use the _RocksIterator_ is to always call the > _iterator.status()_ to check the internal error of _RocksDB_. There is a case > from user email seems to lost data because of this > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9376) Allow upgrading to incompatible state serializers (state schema evolution)
Tzu-Li (Gordon) Tai created FLINK-9376: -- Summary: Allow upgrading to incompatible state serializers (state schema evolution) Key: FLINK-9376 URL: https://issues.apache.org/jira/browse/FLINK-9376 Project: Flink Issue Type: New Feature Components: State Backends, Checkpointing, Type Serialization System Reporter: Tzu-Li (Gordon) Tai Fix For: 1.6.0 Currently, users have access to upgrade state serializers on the restore run of a stateful job, as long as the upgraded new serializer remains backwards compatible with all previous written data in the savepoint (i.e. it can read all previous and current schema of serialized state objects). What is still lacking is the ability to upgrade to incompatible serializers. Upon being registered an incompatible serializer for existing restored state, that state needs to go through the process of - 1. read serialized state with the previous serializer 2. passing each deserialized state object through a “migration map function”, and 3. writing back the state with the new serializer This should be strictly limited to state registrations that occur before the actual processing begins (e.g. in the `open` or `initializeState` methods), so that we avoid performing these operations during processing. Procedure 2. will allow even state type migrations, but that is out-of-scope of this JIRA. This ticket focuses only on procedures 1. and 3., where we try to enable schema evolution without state type changes. This is an umbrella JIRA ticket that overlooks this feature, including a few preliminary tasks that work towards enabling it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...
Github user EAlexRojas commented on a diff in the pull request: https://github.com/apache/flink/pull/5991#discussion_r188564095 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java --- @@ -221,7 +221,8 @@ private FlinkKafkaConsumer08( getLong( checkNotNull(props, "props"), KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED), - !getBoolean(props, KEY_DISABLE_METRICS, false)); + !getBoolean(props, KEY_DISABLE_METRICS, false), + getBoolean(props, KEY_CHECK_UNAVAILABLE_TOPICS, false)); --- End diff -- You're right, I'll change it ---
[jira] [Commented] (FLINK-9303) Unassign partitions from Kafka client if partitions become unavailable
[ https://issues.apache.org/jira/browse/FLINK-9303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477171#comment-16477171 ] ASF GitHub Bot commented on FLINK-9303: --- Github user EAlexRojas commented on a diff in the pull request: https://github.com/apache/flink/pull/5991#discussion_r188564095 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java --- @@ -221,7 +221,8 @@ private FlinkKafkaConsumer08( getLong( checkNotNull(props, "props"), KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED), - !getBoolean(props, KEY_DISABLE_METRICS, false)); + !getBoolean(props, KEY_DISABLE_METRICS, false), + getBoolean(props, KEY_CHECK_UNAVAILABLE_TOPICS, false)); --- End diff -- You're right, I'll change it > Unassign partitions from Kafka client if partitions become unavailable > -- > > Key: FLINK-9303 > URL: https://issues.apache.org/jira/browse/FLINK-9303 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.6.0 > > > Originally reported in ML: > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamically-deleting-kafka-topics-does-not-remove-partitions-from-kafkaConsumer-td19946.html] > The problem is that the Kafka consumer has no notion of "closed" partitions > at the moment, so statically assigned partitions to the Kafka client is never > removed and is always continuously requested for records. > This causes LOG noises as reported in the reported mail thread. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information
Tzu-Li (Gordon) Tai created FLINK-9377: -- Summary: Remove writing serializers as part of the checkpoint meta information Key: FLINK-9377 URL: https://issues.apache.org/jira/browse/FLINK-9377 Project: Flink Issue Type: Sub-task Components: State Backends, Checkpointing Reporter: Tzu-Li (Gordon) Tai Assignee: Tzu-Li (Gordon) Tai Fix For: 1.6.0 When writing meta information of a state in savepoints, we currently write both the state serializer as well as the state serializer's configuration snapshot. Writing both is actually redundant, as most of the time they have identical information. Moreover, the fact that we use Java serialization to write the serializer and rely on it to be re-readable on the restore run, already poses problems for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202). The proposal here is to leave only the config snapshot as meta information, and use that as the single source of truth of information about the schema of serialized state. The config snapshot should be treated as a factory (or provided to a factory) to re-create serializers capable of reading old, serialized state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information
[ https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-9377: --- Description: When writing meta information of a state in savepoints, we currently write both the state serializer as well as the state serializer's configuration snapshot. Writing both is actually redundant, as most of the time they have identical information. Moreover, the fact that we use Java serialization to write the serializer and rely on it to be re-readable on the restore run, already poses problems for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) to perform even a compatible upgrade. The proposal here is to leave only the config snapshot as meta information, and use that as the single source of truth of information about the schema of serialized state. The config snapshot should be treated as a factory (or provided to a factory) to re-create serializers capable of reading old, serialized state. was: When writing meta information of a state in savepoints, we currently write both the state serializer as well as the state serializer's configuration snapshot. Writing both is actually redundant, as most of the time they have identical information. Moreover, the fact that we use Java serialization to write the serializer and rely on it to be re-readable on the restore run, already poses problems for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202). The proposal here is to leave only the config snapshot as meta information, and use that as the single source of truth of information about the schema of serialized state. The config snapshot should be treated as a factory (or provided to a factory) to re-create serializers capable of reading old, serialized state. > Remove writing serializers as part of the checkpoint meta information > - > > Key: FLINK-9377 > URL: https://issues.apache.org/jira/browse/FLINK-9377 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.6.0 > > > When writing meta information of a state in savepoints, we currently write > both the state serializer as well as the state serializer's configuration > snapshot. > Writing both is actually redundant, as most of the time they have identical > information. > Moreover, the fact that we use Java serialization to write the serializer > and rely on it to be re-readable on the restore run, already poses problems > for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) > to perform even a compatible upgrade. > The proposal here is to leave only the config snapshot as meta information, > and use that as the single source of truth of information about the schema of > serialized state. > The config snapshot should be treated as a factory (or provided to a > factory) to re-create serializers capable of reading old, serialized state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9376) Allow upgrading to incompatible state serializers (state schema evolution)
[ https://issues.apache.org/jira/browse/FLINK-9376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-9376: --- Description: Currently, users have access to upgrade state serializers on the restore run of a stateful job, as long as the upgraded new serializer remains backwards compatible with all previous written data in the savepoint (i.e. it can read all previous and current schema of serialized state objects). What is still lacking is the ability to upgrade to incompatible serializers. Upon being registered an incompatible serializer for existing restored state, that state needs to go through the process of - 1. read serialized state with the previous serializer 2. passing each deserialized state object through a “migration map function”, and 3. writing back the state with the new serializer This should be strictly limited to state registrations that occur before the actual processing begins (e.g. in the {{open}} or {{initializeState}} methods), so that we avoid performing these operations during processing. Procedure 2. will allow even state type migrations, but that is out-of-scope of this JIRA. This ticket focuses only on procedures 1. and 3., where we try to enable schema evolution without state type changes. This is an umbrella JIRA ticket that overlooks this feature, including a few preliminary tasks that work towards enabling it. was: Currently, users have access to upgrade state serializers on the restore run of a stateful job, as long as the upgraded new serializer remains backwards compatible with all previous written data in the savepoint (i.e. it can read all previous and current schema of serialized state objects). What is still lacking is the ability to upgrade to incompatible serializers. Upon being registered an incompatible serializer for existing restored state, that state needs to go through the process of - 1. read serialized state with the previous serializer 2. passing each deserialized state object through a “migration map function”, and 3. writing back the state with the new serializer This should be strictly limited to state registrations that occur before the actual processing begins (e.g. in the `open` or `initializeState` methods), so that we avoid performing these operations during processing. Procedure 2. will allow even state type migrations, but that is out-of-scope of this JIRA. This ticket focuses only on procedures 1. and 3., where we try to enable schema evolution without state type changes. This is an umbrella JIRA ticket that overlooks this feature, including a few preliminary tasks that work towards enabling it. > Allow upgrading to incompatible state serializers (state schema evolution) > -- > > Key: FLINK-9376 > URL: https://issues.apache.org/jira/browse/FLINK-9376 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing, Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.6.0 > > > Currently, users have access to upgrade state serializers on the restore run > of a stateful job, as long as the upgraded new serializer remains backwards > compatible with all previous written data in the savepoint (i.e. it can read > all previous and current schema of serialized state objects). > What is still lacking is the ability to upgrade to incompatible serializers. > Upon being registered an incompatible serializer for existing restored state, > that state needs to go through the process of - > 1. read serialized state with the previous serializer > 2. passing each deserialized state object through a “migration map > function”, and > 3. writing back the state with the new serializer > This should be strictly limited to state registrations that occur before the > actual processing begins (e.g. in the {{open}} or {{initializeState}} > methods), so that we avoid performing these operations during processing. > Procedure 2. will allow even state type migrations, but that is out-of-scope > of this JIRA. > This ticket focuses only on procedures 1. and 3., where we try to enable > schema evolution without state type changes. > This is an umbrella JIRA ticket that overlooks this feature, including a few > preliminary tasks that work towards enabling it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9376) Allow upgrading to incompatible state serializers (state schema evolution)
[ https://issues.apache.org/jira/browse/FLINK-9376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-9376: --- Description: Currently, users have access to upgrade state serializers on the restore run of a stateful job, as long as the upgraded new serializer remains backwards compatible with all previous written data in the savepoint (i.e. it can read all previous and current schema of serialized state objects). What is still lacking is the ability to upgrade to incompatible serializers. Upon being registered an incompatible serializer for existing restored state, that state needs to go through the process of - 1. read serialized state with the previous serializer 2. passing each deserialized state object through a “migration map function”, and 3. writing back the state with the new serializer The availability of this process should be strictly limited to state registrations that occur before the actual processing begins (e.g. in the {{open}} or {{initializeState}} methods), so that we avoid performing these operations during processing. Procedure 2. will allow even state type migrations, but that is out-of-scope of this JIRA. This ticket focuses only on procedures 1. and 3., where we try to enable schema evolution without state type changes. This is an umbrella JIRA ticket that overlooks this feature, including a few preliminary tasks that work towards enabling it. was: Currently, users have access to upgrade state serializers on the restore run of a stateful job, as long as the upgraded new serializer remains backwards compatible with all previous written data in the savepoint (i.e. it can read all previous and current schema of serialized state objects). What is still lacking is the ability to upgrade to incompatible serializers. Upon being registered an incompatible serializer for existing restored state, that state needs to go through the process of - 1. read serialized state with the previous serializer 2. passing each deserialized state object through a “migration map function”, and 3. writing back the state with the new serializer This should be strictly limited to state registrations that occur before the actual processing begins (e.g. in the {{open}} or {{initializeState}} methods), so that we avoid performing these operations during processing. Procedure 2. will allow even state type migrations, but that is out-of-scope of this JIRA. This ticket focuses only on procedures 1. and 3., where we try to enable schema evolution without state type changes. This is an umbrella JIRA ticket that overlooks this feature, including a few preliminary tasks that work towards enabling it. > Allow upgrading to incompatible state serializers (state schema evolution) > -- > > Key: FLINK-9376 > URL: https://issues.apache.org/jira/browse/FLINK-9376 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing, Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.6.0 > > > Currently, users have access to upgrade state serializers on the restore run > of a stateful job, as long as the upgraded new serializer remains backwards > compatible with all previous written data in the savepoint (i.e. it can read > all previous and current schema of serialized state objects). > What is still lacking is the ability to upgrade to incompatible serializers. > Upon being registered an incompatible serializer for existing restored state, > that state needs to go through the process of - > 1. read serialized state with the previous serializer > 2. passing each deserialized state object through a “migration map > function”, and > 3. writing back the state with the new serializer > The availability of this process should be strictly limited to state > registrations that occur before the actual processing begins (e.g. in the > {{open}} or {{initializeState}} methods), so that we avoid performing these > operations during processing. > Procedure 2. will allow even state type migrations, but that is out-of-scope > of this JIRA. > This ticket focuses only on procedures 1. and 3., where we try to enable > schema evolution without state type changes. > This is an umbrella JIRA ticket that overlooks this feature, including a few > preliminary tasks that work towards enabling it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9378) Improve TableException message with TypeName usage
Sergey Nuyanzin created FLINK-9378: -- Summary: Improve TableException message with TypeName usage Key: FLINK-9378 URL: https://issues.apache.org/jira/browse/FLINK-9378 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Sergey Nuyanzin Assignee: Sergey Nuyanzin Currently in TableException simple name is in use. It is not clear what is the issue while having error message like {noformat} Exception in thread "main" org.apache.flink.table.api.TableException: Result field does not match requested type. Requested: Date; Actual: Date at org.apache.flink.table.api.TableEnvironment$$anonfun$generateRowConverterFunction$1.apply(TableEnvironment.scala:953) {noformat} or {noformat}Caused by: org.apache.flink.table.api.TableException: Type is not supported: Date at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53){noformat} also for more detailed have a look at FLINK-9341 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9341) Oracle: "Type is not supported: Date"
[ https://issues.apache.org/jira/browse/FLINK-9341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477191#comment-16477191 ] Sergey Nuyanzin commented on FLINK-9341: issue created FLINK-9378 will be able to do it in the next days > Oracle: "Type is not supported: Date" > - > > Key: FLINK-9341 > URL: https://issues.apache.org/jira/browse/FLINK-9341 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.2 >Reporter: Ken Geis >Priority: Major > > When creating a Table from an Oracle JDBCInputFormat with a date column, I > get the error "Type is not supported: Date". This happens with as simple a > query as > {code:java} > SELECT SYSDATE FROM DUAL{code} > Stack trace: > {noformat} > Caused by: org.apache.flink.table.api.TableException: Type is not supported: > Date > at > org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:336) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromTypeInfo(FlinkTypeFactory.scala:68) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildLogicalRowType$1.apply(FlinkTypeFactory.scala:198) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildLogicalRowType$1.apply(FlinkTypeFactory.scala:195) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > ~[scala-library-2.11.11.jar:na] > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > ~[scala-library-2.11.11.jar:na] > at > org.apache.flink.table.calcite.FlinkTypeFactory.buildLogicalRowType(FlinkTypeFactory.scala:195) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.plan.schema.InlineTable.getRowType(InlineTable.scala:105) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.api.TableEnvironment.scanInternal(TableEnvironment.scala:499) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.api.TableEnvironment.scan(TableEnvironment.scala:485) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.api.java.BatchTableEnvironment.fromDataSet(BatchTableEnvironment.scala:61) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.api.java.BatchTableEnvironment$fromDataSet$0.call(Unknown > Source) ~[na:na] > (at my code...) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9376) Allow upgrading to incompatible state serializers (state schema evolution)
[ https://issues.apache.org/jira/browse/FLINK-9376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-9376: --- Description: Currently, users have access to upgrade state serializers on the restore run of a stateful job, as long as the upgraded new serializer remains backwards compatible with all previous written data in the savepoint (i.e. it can read all previous and current schema of serialized state objects). What is still lacking is the ability to upgrade to incompatible serializers. Upon being registered an incompatible serializer for existing restored state, that state needs to go through the process of - 1. read serialized state with the previous serializer 2. passing each deserialized state object through a “migration map function”, and 3. writing back the state with the new serializer The availability of this process should be strictly limited to state registrations that occur before the actual processing begins (e.g. in the {{open}} or {{initializeState}} methods), so that we avoid performing these operations during processing. How this procedure actually occurs, differs across different types of state backends. For example, for state backends that eagerly deserialize / lazily serialize state (e.g. {{HeapStateBackend}}), the job execution itself can be seen as a "migration"; everything is deserialized to state objects on restore, and is only serialized again, with the new serializer, on checkpoints. Therefore, for these state backends, the above process is irrelevant. On the other hand, for state backends that lazily deserialize / eagerly serialize state (e.g. {{RocksDBStateBackend}}), the state evolution process needs to happen for every state with a newly registered incompatible serializer. Procedure 2. will allow even state type migrations, but that is out-of-scope of this JIRA. This ticket focuses only on procedures 1. and 3., where we try to enable schema evolution without state type changes. This is an umbrella JIRA ticket that overlooks this feature, including a few preliminary tasks that work towards enabling it. was: Currently, users have access to upgrade state serializers on the restore run of a stateful job, as long as the upgraded new serializer remains backwards compatible with all previous written data in the savepoint (i.e. it can read all previous and current schema of serialized state objects). What is still lacking is the ability to upgrade to incompatible serializers. Upon being registered an incompatible serializer for existing restored state, that state needs to go through the process of - 1. read serialized state with the previous serializer 2. passing each deserialized state object through a “migration map function”, and 3. writing back the state with the new serializer The availability of this process should be strictly limited to state registrations that occur before the actual processing begins (e.g. in the {{open}} or {{initializeState}} methods), so that we avoid performing these operations during processing. Procedure 2. will allow even state type migrations, but that is out-of-scope of this JIRA. This ticket focuses only on procedures 1. and 3., where we try to enable schema evolution without state type changes. This is an umbrella JIRA ticket that overlooks this feature, including a few preliminary tasks that work towards enabling it. > Allow upgrading to incompatible state serializers (state schema evolution) > -- > > Key: FLINK-9376 > URL: https://issues.apache.org/jira/browse/FLINK-9376 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing, Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.6.0 > > > Currently, users have access to upgrade state serializers on the restore run > of a stateful job, as long as the upgraded new serializer remains backwards > compatible with all previous written data in the savepoint (i.e. it can read > all previous and current schema of serialized state objects). > What is still lacking is the ability to upgrade to incompatible serializers. > Upon being registered an incompatible serializer for existing restored state, > that state needs to go through the process of - > 1. read serialized state with the previous serializer > 2. passing each deserialized state object through a “migration map > function”, and > 3. writing back the state with the new serializer > The availability of this process should be strictly limited to state > registrations that occur before the actual processing begins (e.g. in the > {{open}} or {{initializeState}} methods), so that we avoid performing these > operations during processing. > How this procedure
[jira] [Commented] (FLINK-6944) Introduce DefaultTypeSerializerConfigSnapshot as a base implementation for serializer compatibility checks
[ https://issues.apache.org/jira/browse/FLINK-6944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477194#comment-16477194 ] Tzu-Li (Gordon) Tai commented on FLINK-6944: I'll change this to be a subtask of FLINK-9376, so that all things related to state evolution is consolidated there for easier tracking. Will also mark this as a blocker for 1.6.0. > Introduce DefaultTypeSerializerConfigSnapshot as a base implementation for > serializer compatibility checks > -- > > Key: FLINK-6944 > URL: https://issues.apache.org/jira/browse/FLINK-6944 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing, Type Serialization System >Affects Versions: 1.3.0, 1.3.1 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.6.0 > > > Currently, we store both the {{TypeSerializer}} and its corresponding > {{TypeSerializerConfigSnapshot}} in checkpoints of managed state. This, in > most cases, are actually duplicate information. > This JIRA proposes to change this by only storing the > {{TypeSerializerConfigSnapshot}}, while at the same time, letting > {{TypeSerializer.snapshotConfiguration}} return a default > {{DefaultTypeSerializerConfigSnapshot}}. > This default simply serializes the serializer instance using Java > serialization. > The {{DefaultTypeSerializerConfigSnapshot}} should wrap the serializer bytes, > the serialVersionUID of the serializer class, and the serializer class' > classname. The latter two will be used to check compatibility in the default > implementation of {{TypeSerializer.ensureCompatibility}}. Specifically, if > classname / serialVersionUID has changed, the default implementation of > {{TypeSerializer.ensureCompatibility}} will simply return > {{CompatibilityResult.requiresMigration}} with the deserialized serializer as > the convert deserializer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-6944) Introduce DefaultTypeSerializerConfigSnapshot as a base implementation for serializer compatibility checks
[ https://issues.apache.org/jira/browse/FLINK-6944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-6944: --- Issue Type: Sub-task (was: Improvement) Parent: FLINK-9376 > Introduce DefaultTypeSerializerConfigSnapshot as a base implementation for > serializer compatibility checks > -- > > Key: FLINK-6944 > URL: https://issues.apache.org/jira/browse/FLINK-6944 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing, Type Serialization System >Affects Versions: 1.3.0, 1.3.1 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.6.0 > > > Currently, we store both the {{TypeSerializer}} and its corresponding > {{TypeSerializerConfigSnapshot}} in checkpoints of managed state. This, in > most cases, are actually duplicate information. > This JIRA proposes to change this by only storing the > {{TypeSerializerConfigSnapshot}}, while at the same time, letting > {{TypeSerializer.snapshotConfiguration}} return a default > {{DefaultTypeSerializerConfigSnapshot}}. > This default simply serializes the serializer instance using Java > serialization. > The {{DefaultTypeSerializerConfigSnapshot}} should wrap the serializer bytes, > the serialVersionUID of the serializer class, and the serializer class' > classname. The latter two will be used to check compatibility in the default > implementation of {{TypeSerializer.ensureCompatibility}}. Specifically, if > classname / serialVersionUID has changed, the default implementation of > {{TypeSerializer.ensureCompatibility}} will simply return > {{CompatibilityResult.requiresMigration}} with the deserialized serializer as > the convert deserializer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-6944) Introduce DefaultTypeSerializerConfigSnapshot as a base implementation for serializer compatibility checks
[ https://issues.apache.org/jira/browse/FLINK-6944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-6944: --- Priority: Blocker (was: Major) > Introduce DefaultTypeSerializerConfigSnapshot as a base implementation for > serializer compatibility checks > -- > > Key: FLINK-6944 > URL: https://issues.apache.org/jira/browse/FLINK-6944 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing, Type Serialization System >Affects Versions: 1.3.0, 1.3.1 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.6.0 > > > Currently, we store both the {{TypeSerializer}} and its corresponding > {{TypeSerializerConfigSnapshot}} in checkpoints of managed state. This, in > most cases, are actually duplicate information. > This JIRA proposes to change this by only storing the > {{TypeSerializerConfigSnapshot}}, while at the same time, letting > {{TypeSerializer.snapshotConfiguration}} return a default > {{DefaultTypeSerializerConfigSnapshot}}. > This default simply serializes the serializer instance using Java > serialization. > The {{DefaultTypeSerializerConfigSnapshot}} should wrap the serializer bytes, > the serialVersionUID of the serializer class, and the serializer class' > classname. The latter two will be used to check compatibility in the default > implementation of {{TypeSerializer.ensureCompatibility}}. Specifically, if > classname / serialVersionUID has changed, the default implementation of > {{TypeSerializer.ensureCompatibility}} will simply return > {{CompatibilityResult.requiresMigration}} with the deserialized serializer as > the convert deserializer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-6944) Introduce DefaultTypeSerializerConfigSnapshot as a base implementation for serializer compatibility checks
[ https://issues.apache.org/jira/browse/FLINK-6944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-6944: --- Description: FLINK-9377 proposes to remove writing serializers as part of checkpoint meta info, and only write its configuration snapshot. Since then serializer config snapshots will be the single source of truth for previous serializer schema, this JIRA proposes to follow up the change in FLINK-9377 by having a base default implementation for letting {{TypeSerializer.snapshotConfiguration}} that returns a {{DefaultTypeSerializerConfigSnapshot}}. The {{DefaultTypeSerializerConfigSnapshot}} should wrap the serialVersionUID of the serializer class, and the serializer class' classname. The latter two will be used to check compatibility in the default implementation of {{TypeSerializer.ensureCompatibility}}. Specifically, if classname / serialVersionUID has changed, the default implementation of {{TypeSerializer.ensureCompatibility}} should return {{CompatibilityResult.requiresMigration}}. was: Currently, we store both the {{TypeSerializer}} and its corresponding {{TypeSerializerConfigSnapshot}} in checkpoints of managed state. This, in most cases, are actually duplicate information. This JIRA proposes to change this by only storing the {{TypeSerializerConfigSnapshot}}, while at the same time, letting {{TypeSerializer.snapshotConfiguration}} return a default {{DefaultTypeSerializerConfigSnapshot}}. This default simply serializes the serializer instance using Java serialization. The {{DefaultTypeSerializerConfigSnapshot}} should wrap the serializer bytes, the serialVersionUID of the serializer class, and the serializer class' classname. The latter two will be used to check compatibility in the default implementation of {{TypeSerializer.ensureCompatibility}}. Specifically, if classname / serialVersionUID has changed, the default implementation of {{TypeSerializer.ensureCompatibility}} will simply return {{CompatibilityResult.requiresMigration}} with the deserialized serializer as the convert deserializer. > Introduce DefaultTypeSerializerConfigSnapshot as a base implementation for > serializer compatibility checks > -- > > Key: FLINK-6944 > URL: https://issues.apache.org/jira/browse/FLINK-6944 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing, Type Serialization System >Affects Versions: 1.3.0, 1.3.1 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.6.0 > > > FLINK-9377 proposes to remove writing serializers as part of checkpoint meta > info, and only write its configuration snapshot. > Since then serializer config snapshots will be the single source of truth for > previous serializer schema, this JIRA proposes to follow up the change in > FLINK-9377 by having a base default implementation for letting > {{TypeSerializer.snapshotConfiguration}} that returns a > {{DefaultTypeSerializerConfigSnapshot}}. > The {{DefaultTypeSerializerConfigSnapshot}} should wrap the serialVersionUID > of the serializer class, and the serializer class' classname. The latter two > will be used to check compatibility in the default implementation of > {{TypeSerializer.ensureCompatibility}}. Specifically, if classname / > serialVersionUID has changed, the default implementation of > {{TypeSerializer.ensureCompatibility}} should return > {{CompatibilityResult.requiresMigration}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9370) Activate distributed cache end-to-end test
[ https://issues.apache.org/jira/browse/FLINK-9370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-9370: Affects Version/s: (was: 1.5.0) 1.6.0 > Activate distributed cache end-to-end test > -- > > Key: FLINK-9370 > URL: https://issues.apache.org/jira/browse/FLINK-9370 > Project: Flink > Issue Type: Bug > Components: Local Runtime, Tests >Affects Versions: 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Critical > Fix For: 1.5.0 > > > The distributed cache end-to-end test is not run at the moment; it was never > added to {{run-pre-commit-tests.sh}} or {{run-nightly-tests.sh}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9370) Activate distributed cache end-to-end test
[ https://issues.apache.org/jira/browse/FLINK-9370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-9370: Fix Version/s: (was: 1.5.0) 1.6.0 > Activate distributed cache end-to-end test > -- > > Key: FLINK-9370 > URL: https://issues.apache.org/jira/browse/FLINK-9370 > Project: Flink > Issue Type: Bug > Components: Local Runtime, Tests >Affects Versions: 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Critical > Fix For: 1.6.0 > > > The distributed cache end-to-end test is not run at the moment; it was never > added to {{run-pre-commit-tests.sh}} or {{run-nightly-tests.sh}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...
Github user EAlexRojas commented on a diff in the pull request: https://github.com/apache/flink/pull/5991#discussion_r188596584 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java --- @@ -80,6 +82,9 @@ /** The queue of unassigned partitions that we need to assign to the Kafka consumer. */ private final ClosableBlockingQueue> unassignedPartitionsQueue; + /** The list of partitions to be removed from kafka consumer. */ + private final List partitionsToBeRemoved; --- End diff -- You are right, a Set should be better for all the calls to the `contains()` method. ---
[jira] [Commented] (FLINK-9303) Unassign partitions from Kafka client if partitions become unavailable
[ https://issues.apache.org/jira/browse/FLINK-9303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477294#comment-16477294 ] ASF GitHub Bot commented on FLINK-9303: --- Github user EAlexRojas commented on a diff in the pull request: https://github.com/apache/flink/pull/5991#discussion_r188596584 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java --- @@ -80,6 +82,9 @@ /** The queue of unassigned partitions that we need to assign to the Kafka consumer. */ private final ClosableBlockingQueue> unassignedPartitionsQueue; + /** The list of partitions to be removed from kafka consumer. */ + private final List partitionsToBeRemoved; --- End diff -- You are right, a Set should be better for all the calls to the `contains()` method. > Unassign partitions from Kafka client if partitions become unavailable > -- > > Key: FLINK-9303 > URL: https://issues.apache.org/jira/browse/FLINK-9303 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.6.0 > > > Originally reported in ML: > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamically-deleting-kafka-topics-does-not-remove-partitions-from-kafkaConsumer-td19946.html] > The problem is that the Kafka consumer has no notion of "closed" partitions > at the moment, so statically assigned partitions to the Kafka client is never > removed and is always continuously requested for records. > This causes LOG noises as reported in the reported mail thread. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9379) HA end-to-end test failing locally
Till Rohrmann created FLINK-9379: Summary: HA end-to-end test failing locally Key: FLINK-9379 URL: https://issues.apache.org/jira/browse/FLINK-9379 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.5.0 Reporter: Till Rohrmann The HA end-to-end test fails sometimes with {code} The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: Could not submit job 797547d5fd619ea240d4c6690adc9101. at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:247) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:410) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020) at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096) Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$5(RestClusterClient.java:357) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.rest.util.RestClientException: [Service temporarily unavailable due to an ongoing leader election. Please refresh.] at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ... 4 more Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Service temporarily unavailable due to an ongoing leader election. Please refresh.] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:225) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:209) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) ... 5 more {code} when executing it locally. I assume that the test does not properly wait until the cluster is ready for a job submission. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9380) Failing end-to-end tests should not clean up logs
Till Rohrmann created FLINK-9380: Summary: Failing end-to-end tests should not clean up logs Key: FLINK-9380 URL: https://issues.apache.org/jira/browse/FLINK-9380 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.5.0 Reporter: Till Rohrmann Some of the end-to-end tests clean up their logs also in the failure case. This makes debugging and understanding the problem extremely difficult. Ideally, the scripts says where it stored the respective logs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9381) BlobServer data for a job is not getting cleaned up at JM
Amit Jain created FLINK-9381: Summary: BlobServer data for a job is not getting cleaned up at JM Key: FLINK-9381 URL: https://issues.apache.org/jira/browse/FLINK-9381 Project: Flink Issue Type: Bug Affects Versions: 1.5.0 Environment: Flink 1.5.0 RC3 Commit e725269 Reporter: Amit Jain We are running Flink 1.5.0 rc3 with YARN as cluster manager and found Job Manager is getting killed due to out of disk error. Upon further analysis, we found blob server data for a job is not getting cleaned up. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9373) Fix potential data losing for RocksDBBackend
[ https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477309#comment-16477309 ] ASF GitHub Bot commented on FLINK-9373: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6020 The reasons that the travis given red light is unrelated... > Fix potential data losing for RocksDBBackend > > > Key: FLINK-9373 > URL: https://issues.apache.org/jira/browse/FLINK-9373 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Blocker > Fix For: 1.5.0 > > > Currently, when using RocksIterator we only use the _iterator.isValid()_ to > check whether we have reached the end of the iterator. But that is not > enough, if we refer to RocksDB's wiki > https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should > find that even if _iterator.isValid()=true_, there may also exist some > internal error. A safer way to use the _RocksIterator_ is to always call the > _iterator.status()_ to check the internal error of _RocksDB_. There is a case > from user email seems to lost data because of this > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6020: [FLINK-9373][state] Fix potential data losing for RocksDB...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6020 The reasons that the travis given red light is unrelated... ---
[jira] [Commented] (FLINK-8918) Introduce Runtime Filter Join
[ https://issues.apache.org/jira/browse/FLINK-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477312#comment-16477312 ] Fabian Hueske commented on FLINK-8918: -- Hi [~sihuazhou], thanks for this proposal! I see a few challenges with using a bloom filter as an early filter to avoid look ups in the state. *1. Bloom Filter Data Structure*: It is not possible to remove a key from a bloom filter. This doesn't matter in batch queries (which is the use case that you linked to) but in long running streaming applications with possibly evolving key spaces, this might lead to a saturation of the bloom filter and effectively make it useless. *2. Checkpointing, Recovery, Rescaling*: The bloom filter would need to be held as operator state because its information spans several keys. While we can checkpoint operator state, Flink does not guarantee that operator state is recovered on the same operator instance. If you'd like to rescale an application, this would not be possible at all. We would need a new type of state for this which is scoped to a key-group. I am not aware of any discussions in that direction though. What do you think? Best, Fabian > Introduce Runtime Filter Join > - > > Key: FLINK-8918 > URL: https://issues.apache.org/jira/browse/FLINK-8918 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > In general, stream join is one of the most performance cost task. For every > record from both side, we need to query the state from the other side, this > will lead to poor performance when the state size if huge. So, in production, > we always need to spend a lot slots to handle stream join. But, indeed, we > can improve this in somehow, there a phenomenon of stream join can be found > in production. That's the `joined ratio` of the stream join is often very > low, for example. > - stream join in promotion analysis: Job need to join the promotion log with > the action(click, view, payment, collection, retweet) log with the > `promotion_id` to analysis the effect of the promotion. > - stream join in AD(advertising) attribution: Job need to join the AD click > log with the item payment log on the `click_id` to find which click of which > AD that brings the payment to do attribution. > - stream join in click log analysis of doc: Job need to join viewed log(doc > viewed by users) with the click log (doc clicked by users) to analysis the > reason of the click and the property of the users. > - ….so on > All these cases have one common property, that is the _joined ratio_ is very > low. Here is a example to describe it, imagine that, we have 1 records > from the left stream, and 1 records from the right stream, and we execute > _select * from leftStream l join rightStream r on l.id = r.id_ , we only got > 100 record from the result, that is the case for low _joined ratio_, this is > an example for inner join, but it can also apply to left & right join. > there are more example I can come up with low _joined ratio_ , but the most > important point I want to expressed is that, the low _joined ratio_ of stream > join in production is a very common phenomenon(maybe the almost common > phenomenon in some companies, at least in our company that is the case). > *Then how to improve it?* > We can see from the above case, 1 record join 1 record we only got > 100 result, that means, we query the state 2 times (1 for the left > stream and 1 for the right stream) but only 100 of them are meaningful!!! > If we could reduce the useless query times, then we can definitely improve > the performance of stream join. > the way we used to improve this is to introduce the _Runtime Filter Join_, > the mainly ideal is that, we build a _filter_ for the state on each side > (left stream & right stream). When we need to query the state on that side we > first check the corresponding _filter_ whether the _key_ is possible in the > state, if the _filter_ say "not, it impossible in the state", then we stop > querying the state, if it say "hmm, it maybe in state", then we need to query > the state. As you can see, the best choose of the _filter_ is _Bloom Filter_, > it has all the feature that we expected: _extremely good performance_, > _non-existence of false negative_. > > *the simplest pseudo code for _Runtime Filter Join_(the comments inline are > based on RocksDBBackend)* > {code:java} > void performJoinNormally(Record recordFromLeftStream) { > Iterator rightIterator = rigthStreamState.iterator(); > // perform the `seek()` on the RocksDB, and iterator one by one, > // this is an expensive operation especially when the key
[jira] [Commented] (FLINK-9381) BlobServer data for a job is not getting cleaned up at JM
[ https://issues.apache.org/jira/browse/FLINK-9381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477313#comment-16477313 ] Chesnay Schepler commented on FLINK-9381: - Searching for {{BlobServer#cleanupJob}} only yields results in the legacy {{JobManager}}; it is never called by the {{JobMaster}} or {{Dispatcher}}. [~till.rohrmann] Is this simply missing or are we cleaning up things in another way? > BlobServer data for a job is not getting cleaned up at JM > - > > Key: FLINK-9381 > URL: https://issues.apache.org/jira/browse/FLINK-9381 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 > Environment: Flink 1.5.0 RC3 Commit e725269 >Reporter: Amit Jain >Priority: Blocker > > We are running Flink 1.5.0 rc3 with YARN as cluster manager and found > Job Manager is getting killed due to out of disk error. > > Upon further analysis, we found blob server data for a job is not > getting cleaned up. -- This message was sent by Atlassian JIRA (v7.6.3#76005)