[jira] [Created] (FLINK-7646) Restart failed jobs with configurable parallelism range
Elias Levy created FLINK-7646: - Summary: Restart failed jobs with configurable parallelism range Key: FLINK-7646 URL: https://issues.apache.org/jira/browse/FLINK-7646 Project: Flink Issue Type: Improvement Components: DataStream API Affects Versions: 1.3.2 Reporter: Elias Levy Currently, if a TaskManager fails the whole job is terminated and then, depending on the restart policy, may be attempted to be restarted. If the failed TaskManager has not been replaced, and there are no spare task slots in the cluster, the job will fail to be restarted. There are situations where restoring or adding a new TaskManager may take a while For instance, in AWS an Auto Scaling Group can only be used to manage a group of instances in a single availability zone. If you have a cluster of TaskManagers that spans an AZ, managed by one ASG per AZ, and an AZ goes dark, the other ASGs won't scale automatically to make up for the lost TaskManagers. To resolve the situation the healthy ASGs will need to be modified manually or by systems external to AWS. With that in mind, it would be useful if you could specify a range for the parallelism parameter. Under normal circumstances the job would execute with the maximum parallelism of the range. But if TaskManagers were lost and not replaced after some time, the job would accept being execute with some lower parallelism within the range. I understand that this may not be feasible with checkpoints, as savepoints are supposed to be the mechanism used to change parallelism of a stateful job. Therefore, this proposal may need to wait until the implementation of the periodic savepoint feature (FLINK-4511). This feature would aid the availability of Flink jobs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-6733) Remove commented out AvgAggregationFunction.java
[ https://issues.apache.org/jira/browse/FLINK-6733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-6733. - Resolution: Fixed > Remove commented out AvgAggregationFunction.java > > > Key: FLINK-6733 > URL: https://issues.apache.org/jira/browse/FLINK-6733 > Project: Flink > Issue Type: Improvement > Components: Java API >Reporter: Dawid Wysakowicz >Assignee: Mikhail Lipkovich >Priority: Trivial > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Reopened] (FLINK-6733) Remove commented out AvgAggregationFunction.java
[ https://issues.apache.org/jira/browse/FLINK-6733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young reopened FLINK-6733: --- Assignee: Mikhail Lipkovich (was: Kurt Young) > Remove commented out AvgAggregationFunction.java > > > Key: FLINK-6733 > URL: https://issues.apache.org/jira/browse/FLINK-6733 > Project: Flink > Issue Type: Improvement > Components: Java API >Reporter: Dawid Wysakowicz >Assignee: Mikhail Lipkovich >Priority: Trivial > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4685: [hotfix] Fix typo for English spells in Debugging ...
GitHub user YMorisawa opened a pull request: https://github.com/apache/flink/pull/4685 [hotfix] Fix typo for English spells in Debugging Classloading documentation Fixed some basic spelling mistake in Docs. --- devided -> divided deploymens -> deployments posible -> possible starting a the Flink cluster -> starting a Flink cluster beginnign -> beginning --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/YMorisawa/flink hotfix-doc-classloading Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4685.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4685 commit 729ec1fbb8237a5aff811d88b5b826fbd0de9974 Author: desktop Date: 2017-09-20T03:38:26Z [hotfix] Fix typo in Debugging Classloading documentation ---
[jira] [Updated] (FLINK-6733) Remove commented out AvgAggregationFunction.java
[ https://issues.apache.org/jira/browse/FLINK-6733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-6733: -- Priority: Trivial (was: Major) Issue Type: Improvement (was: Bug) > Remove commented out AvgAggregationFunction.java > > > Key: FLINK-6733 > URL: https://issues.apache.org/jira/browse/FLINK-6733 > Project: Flink > Issue Type: Improvement > Components: Java API >Reporter: Dawid Wysakowicz >Assignee: Kurt Young >Priority: Trivial > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-6733) Remove commented out AvgAggregationFunction.java
[ https://issues.apache.org/jira/browse/FLINK-6733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-6733. - Resolution: Fixed > Remove commented out AvgAggregationFunction.java > > > Key: FLINK-6733 > URL: https://issues.apache.org/jira/browse/FLINK-6733 > Project: Flink > Issue Type: Improvement > Components: Java API >Reporter: Dawid Wysakowicz >Assignee: Kurt Young >Priority: Trivial > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Reopened] (FLINK-6733) Remove commented out AvgAggregationFunction.java
[ https://issues.apache.org/jira/browse/FLINK-6733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young reopened FLINK-6733: --- Assignee: Kurt Young (was: Mikhail Lipkovich) > Remove commented out AvgAggregationFunction.java > > > Key: FLINK-6733 > URL: https://issues.apache.org/jira/browse/FLINK-6733 > Project: Flink > Issue Type: Bug > Components: Java API >Reporter: Dawid Wysakowicz >Assignee: Kurt Young > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6173) flink-table not pack-in com.fasterxml.jackson.* in after #FLINK-5414
[ https://issues.apache.org/jira/browse/FLINK-6173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16172649#comment-16172649 ] Kent Murra commented on FLINK-6173: --- The guidance to use a previous version of Maven seems to be for building flink. I'm getting this issue when using the built-and-published flink libraries. > flink-table not pack-in com.fasterxml.jackson.* in after #FLINK-5414 > > > Key: FLINK-6173 > URL: https://issues.apache.org/jira/browse/FLINK-6173 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Zhenghua Gao > > Currently, flink-table will pack-in com.fasterxml.jackson.* and rename them > to org.apache.flink.shaded.calcite.com.fasterxml.jackson.* > If a project depends on flink-table, and uses fasterxml as follows(function > explain uses fasterxml indirectly): > {code:title=WordCount.scala|borderStyle=solid} > object WordCountWithTable { > def main(args: Array[String]): Unit = { > // set up execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", > 1)) > val expr = input.toTable(tEnv) > val result = expr > .groupBy('word) > .select('word, 'frequency.sum as 'frequency) > .filter('frequency === 2) > println(tEnv.explain(result)) > result.toDataSet[WC].print() > } > case class WC(word: String, frequency: Long) > } > {code} > It actually uses org.apache.flink.shaded.calcite.com.fasterxml.jackson.* > I found after FLINK-5414, flink-table didn't pack-in com.fasterxml.jackson.* > and the project would throw class not found exception. > {code:borderStyle=solid} > Exception in thread "main" java.lang.NoClassDefFoundError: > org/apache/flink/shaded/calcite/com/fasterxml/jackson/databind/ObjectMapper > at > org.apache.flink.table.explain.PlanJsonParser.getSqlExecutionPlan(PlanJsonParser.java:32) > at > org.apache.flink.table.api.BatchTableEnvironment.explain(BatchTableEnvironment.scala:143) > at > org.apache.flink.table.api.BatchTableEnvironment.explain(BatchTableEnvironment.scala:164) > at > org.apache.flink.quickstart.WordCountWithTable$.main(WordCountWithTable.scala:34) > at > org.apache.flink.quickstart.WordCountWithTable.main(WordCountWithTable.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.shaded.calcite.com.fasterxml.jackson.databind.ObjectMapper > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > ... 10 more > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7645) Modify system-metrics part show in the document
Hai Zhou_UTC+8 created FLINK-7645: - Summary: Modify system-metrics part show in the document Key: FLINK-7645 URL: https://issues.apache.org/jira/browse/FLINK-7645 Project: Flink Issue Type: Improvement Components: Documentation, Metrics Reporter: Hai Zhou_UTC+8 Fix For: 1.4.0 the system-metrics contents structure: Currently, {noformat} ├── System metrics ├── Latency tracking ├── Dashboard integration {noformat} I think the following is more reasonable, {noformat} ├── System metrics ├── CPU ├── Memory ├── Threads ├── GarbageCollection ├── ClassLoader ├── Network ├── Cluster ├── Availability ├── Checkpointing ├── IO └── Connectors └── Kafka Connectors ├── Latency tracking ├── Dashboard integration {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6733) Remove commented out AvgAggregationFunction.java
[ https://issues.apache.org/jira/browse/FLINK-6733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16172577#comment-16172577 ] ASF GitHub Bot commented on FLINK-6733: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4630 > Remove commented out AvgAggregationFunction.java > > > Key: FLINK-6733 > URL: https://issues.apache.org/jira/browse/FLINK-6733 > Project: Flink > Issue Type: Bug > Components: Java API >Reporter: Dawid Wysakowicz >Assignee: Mikhail Lipkovich > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4630: [FLINK-6733] Remove commented out AvgAggregationFu...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4630 ---
[jira] [Closed] (FLINK-6733) Remove commented out AvgAggregationFunction.java
[ https://issues.apache.org/jira/browse/FLINK-6733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-6733. - Resolution: Fixed Fix Version/s: 1.4.0 > Remove commented out AvgAggregationFunction.java > > > Key: FLINK-6733 > URL: https://issues.apache.org/jira/browse/FLINK-6733 > Project: Flink > Issue Type: Bug > Components: Java API >Reporter: Dawid Wysakowicz >Assignee: Mikhail Lipkovich > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7644) Line the extra semicolon in the source code
[ https://issues.apache.org/jira/browse/FLINK-7644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hai Zhou_UTC+8 updated FLINK-7644: -- Description: eg. final TestDuplicateSerializer keySerializer = new TestDuplicateSerializer()*;;* Unnecessary semicolon in the end of line. was: eg. final TestDuplicateSerializer keySerializer = new TestDuplicateSerializer();; Unnecessary semicolon in the end of line. > Line the extra semicolon in the source code > --- > > Key: FLINK-7644 > URL: https://issues.apache.org/jira/browse/FLINK-7644 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Hai Zhou_UTC+8 >Assignee: Hai Zhou_UTC+8 >Priority: Minor > Fix For: 1.4.0, 1.3.3 > > > eg. > final TestDuplicateSerializer keySerializer = new > TestDuplicateSerializer()*;;* > Unnecessary semicolon in the end of line. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7644) Line the extra semicolon in the source code
Hai Zhou_UTC+8 created FLINK-7644: - Summary: Line the extra semicolon in the source code Key: FLINK-7644 URL: https://issues.apache.org/jira/browse/FLINK-7644 Project: Flink Issue Type: Improvement Components: Build System Reporter: Hai Zhou_UTC+8 Assignee: Hai Zhou_UTC+8 Priority: Minor Fix For: 1.4.0, 1.3.3 eg. final TestDuplicateSerializer keySerializer = new TestDuplicateSerializer();; Unnecessary semicolon in the end of line. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-6444) Add a check that '@VisibleForTesting' methods are only used in tests
[ https://issues.apache.org/jira/browse/FLINK-6444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hai Zhou_UTC+8 reassigned FLINK-6444: - Assignee: Hai Zhou_UTC+8 > Add a check that '@VisibleForTesting' methods are only used in tests > > > Key: FLINK-6444 > URL: https://issues.apache.org/jira/browse/FLINK-6444 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: Hai Zhou_UTC+8 > > Some methods are annotated with {{@VisibleForTesting}}. These methods should > only be called from tests. > This is currently not enforced / checked during the build. We should add such > a check. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7626) Add some metric description about checkpoints
[ https://issues.apache.org/jira/browse/FLINK-7626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hai Zhou_UTC+8 closed FLINK-7626. - Resolution: Fixed > Add some metric description about checkpoints > - > > Key: FLINK-7626 > URL: https://issues.apache.org/jira/browse/FLINK-7626 > Project: Flink > Issue Type: Bug > Components: Documentation, Metrics >Affects Versions: 1.3.2 >Reporter: Hai Zhou_UTC+8 >Assignee: Hai Zhou_UTC+8 > Fix For: 1.4.0, 1.3.3 > > > I export the metrics to the logfile via > Slf4jReporter(https://issues.apache.org/jira/browse/FLINK-4831), and found > that there are some checkpoint metrics that are not described in the > document, so I added. > {noformat} > //Number of total checkpoints (in progress, completed, failed) > totalNumberOfCheckpoints > //Number of in progress checkpoints. > numberOfInProgressCheckpoints > //Number of successfully completed checkpoints > numberOfCompletedCheckpoints > //Number of failed checkpoints. > numberOfFailedCheckpoints > //Timestamp when the checkpoint was restored at the coordinator. > lastCheckpointRestoreTimestamp > //Buffered bytes during alignment over all subtasks. > lastCheckpointAlignmentBuffered > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7634) Add option to create a savepoint while canceling a job in the dashboard
[ https://issues.apache.org/jira/browse/FLINK-7634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16172561#comment-16172561 ] Hai Zhou_UTC+8 commented on FLINK-7634: --- +1 > Add option to create a savepoint while canceling a job in the dashboard > > > Key: FLINK-7634 > URL: https://issues.apache.org/jira/browse/FLINK-7634 > Project: Flink > Issue Type: Improvement > Components: JobManager >Affects Versions: 1.3.2 >Reporter: Elias Levy >Priority: Minor > > Currently there appears to be no way to trigger a savepoint in the dashboard, > to cancel a job while taking a savepoint, to list savepoints, or to list > external checkpoints. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7613) Fix documentation error in QuickStart
[ https://issues.apache.org/jira/browse/FLINK-7613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16172533#comment-16172533 ] ASF GitHub Bot commented on FLINK-7613: --- Github user raymondtay commented on a diff in the pull request: https://github.com/apache/flink/pull/4666#discussion_r139849283 --- Diff: docs/dev/datastream_api.md --- @@ -497,7 +497,9 @@ env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis); LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment env.setBufferTimeout(timeoutMillis) -env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis) +def myMap : Long => Long = _ + 1 // `generateSequence` returns a `DataStream[Long]` type --- End diff -- I'm taking the perspective of the absolute beginner since this doc was linked from the QuickStart section; when i started with the QuickStart section most parts of the doc were good enough (imo) to guide me w/o referring to the javadocs (which i think makes perfect sense) so i thought it might be a good idea to continue this tradition and make things explicitly clear. What do you think @zentol ? > Fix documentation error in QuickStart > - > > Key: FLINK-7613 > URL: https://issues.apache.org/jira/browse/FLINK-7613 > Project: Flink > Issue Type: Task > Components: Documentation >Affects Versions: 1.4.0 >Reporter: Raymond Tay >Priority: Minor > > In the `QuickStart => Run The Example` section, there's a typographical error > which points the reader to `*jobmanager* but it should be `*taskmanager*` in > Apache Flink 1.4.x. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4666: [FLINK-7613][Documentation] Fixed typographical er...
Github user raymondtay commented on a diff in the pull request: https://github.com/apache/flink/pull/4666#discussion_r139849283 --- Diff: docs/dev/datastream_api.md --- @@ -497,7 +497,9 @@ env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis); LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment env.setBufferTimeout(timeoutMillis) -env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis) +def myMap : Long => Long = _ + 1 // `generateSequence` returns a `DataStream[Long]` type --- End diff -- I'm taking the perspective of the absolute beginner since this doc was linked from the QuickStart section; when i started with the QuickStart section most parts of the doc were good enough (imo) to guide me w/o referring to the javadocs (which i think makes perfect sense) so i thought it might be a good idea to continue this tradition and make things explicitly clear. What do you think @zentol ? ---
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16172532#comment-16172532 ] ASF GitHub Bot commented on FLINK-6233: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139849018 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute row(event) time bounded stream inner-join. + */ +class RowTimeBoundedStreamInnerJoin( +leftLowerBound: Long, +leftUpperBound: Long, +allowedLateness: Long, +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String, +leftTimeIdx: Int, +rightTimeIdx: Int) +extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode, + leftTimeIdx, + rightTimeIdx, + JoinTimeIndicator.ROWTIME) { + + override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = { +timeForRow <= watermark - allowedLateness + } + + override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = { +rightOperatorTime = + if (ctx.timerService().currentWatermark() > 0) ctx.timerService().currentWatermark() --- End diff -- Totally understand 😄 > Support rowtime inner equi-join between two streams in the SQL API > -- > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: Xingcan Cui > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL > '1' HOUR}} only can use rowtime that is a system attribute, the time > condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime > - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support > unbounded like {{o.rowtime < s.rowtime}} , and should include both two > stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + > 1}} should also not be supported. > An row-time streams join will not be able to handle late data, because this > would mean in insert a row into a sorted order shift all other computations. > This would be too expensive to maintain. Therefore, we will throw an error if > a user tries to use an row-time stream join with late data handling. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139849018 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute row(event) time bounded stream inner-join. + */ +class RowTimeBoundedStreamInnerJoin( +leftLowerBound: Long, +leftUpperBound: Long, +allowedLateness: Long, +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String, +leftTimeIdx: Int, +rightTimeIdx: Int) +extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode, + leftTimeIdx, + rightTimeIdx, + JoinTimeIndicator.ROWTIME) { + + override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = { +timeForRow <= watermark - allowedLateness + } + + override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = { +rightOperatorTime = + if (ctx.timerService().currentWatermark() > 0) ctx.timerService().currentWatermark() --- End diff -- Totally understand ð ---
[jira] [Commented] (FLINK-7613) Fix documentation error in QuickStart
[ https://issues.apache.org/jira/browse/FLINK-7613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16172526#comment-16172526 ] ASF GitHub Bot commented on FLINK-7613: --- Github user raymondtay commented on a diff in the pull request: https://github.com/apache/flink/pull/4666#discussion_r139848500 --- Diff: docs/dev/execution_configuration.md --- @@ -42,44 +42,44 @@ var executionConfig = env.getConfig The following configuration options are available: (the default is bold) -- **`enableClosureCleaner()`** / `disableClosureCleaner()`. The closure cleaner is enabled by default. The closure cleaner removes unneeded references to the surrounding class of anonymous functions inside Flink programs. +- **`enableClosureCleaner()`** / **`disableClosureCleaner()`**. The closure cleaner is enabled by default. The closure cleaner removes unneeded references to the surrounding class of anonymous functions inside Flink programs. --- End diff -- @zentol I wasn't aware of that convention in all honesty, thank you for pointing it out. > Fix documentation error in QuickStart > - > > Key: FLINK-7613 > URL: https://issues.apache.org/jira/browse/FLINK-7613 > Project: Flink > Issue Type: Task > Components: Documentation >Affects Versions: 1.4.0 >Reporter: Raymond Tay >Priority: Minor > > In the `QuickStart => Run The Example` section, there's a typographical error > which points the reader to `*jobmanager* but it should be `*taskmanager*` in > Apache Flink 1.4.x. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4666: [FLINK-7613][Documentation] Fixed typographical er...
Github user raymondtay commented on a diff in the pull request: https://github.com/apache/flink/pull/4666#discussion_r139848500 --- Diff: docs/dev/execution_configuration.md --- @@ -42,44 +42,44 @@ var executionConfig = env.getConfig The following configuration options are available: (the default is bold) -- **`enableClosureCleaner()`** / `disableClosureCleaner()`. The closure cleaner is enabled by default. The closure cleaner removes unneeded references to the surrounding class of anonymous functions inside Flink programs. +- **`enableClosureCleaner()`** / **`disableClosureCleaner()`**. The closure cleaner is enabled by default. The closure cleaner removes unneeded references to the surrounding class of anonymous functions inside Flink programs. --- End diff -- @zentol I wasn't aware of that convention in all honesty, thank you for pointing it out. ---
[jira] [Commented] (FLINK-7531) Move existing REST handler to flink-runtime
[ https://issues.apache.org/jira/browse/FLINK-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16172471#comment-16172471 ] ASF GitHub Bot commented on FLINK-7531: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4600 > Move existing REST handler to flink-runtime > --- > > Key: FLINK-7531 > URL: https://issues.apache.org/jira/browse/FLINK-7531 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.4.0 > > > Since the new REST endpoints live in {{flink-runtime}} we should move the > existing rest handlers to {{flink-runtime}} as well. The static web server > content remains in {{flink-runtime-web}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4600: [FLINK-7531] Move Flink legacy rest handler to fli...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4600 ---
[jira] [Closed] (FLINK-7531) Move existing REST handler to flink-runtime
[ https://issues.apache.org/jira/browse/FLINK-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-7531. Resolution: Fixed Fix Version/s: 1.4.0 Fixed via 4fc019a96a08446d7ba5f57664904abcd585e31c > Move existing REST handler to flink-runtime > --- > > Key: FLINK-7531 > URL: https://issues.apache.org/jira/browse/FLINK-7531 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.4.0 > > > Since the new REST endpoints live in {{flink-runtime}} we should move the > existing rest handlers to {{flink-runtime}} as well. The static web server > content remains in {{flink-runtime-web}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7529) Retrieve complete REST address from RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-7529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-7529. Resolution: Fixed Fix Version/s: 1.4.0 Added via 32770103253e01cd61c8634378cfa1b26707e19a > Retrieve complete REST address from RestfulGateway > -- > > Key: FLINK-7529 > URL: https://issues.apache.org/jira/browse/FLINK-7529 > Project: Flink > Issue Type: Improvement > Components: REST, Webfrontend >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > Fix For: 1.4.0 > > > At the moment we only retrieve the web port from the > {{JobManager}}/{{Dispatcher}}. Instead it would be better to retrieve the > complete REST address from these components, including the protocol > (http/https) and the hostname. That way we wouldn't have to pass information > like wether https is enabled to all REST handlers where it is used for the > generation of the redirection address. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4599: [FLINK-7529] Retrieve complete REST address from g...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4599 ---
[jira] [Commented] (FLINK-7529) Retrieve complete REST address from RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-7529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16172461#comment-16172461 ] ASF GitHub Bot commented on FLINK-7529: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4599 > Retrieve complete REST address from RestfulGateway > -- > > Key: FLINK-7529 > URL: https://issues.apache.org/jira/browse/FLINK-7529 > Project: Flink > Issue Type: Improvement > Components: REST, Webfrontend >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > Fix For: 1.4.0 > > > At the moment we only retrieve the web port from the > {{JobManager}}/{{Dispatcher}}. Instead it would be better to retrieve the > complete REST address from these components, including the protocol > (http/https) and the hostname. That way we wouldn't have to pass information > like wether https is enabled to all REST handlers where it is used for the > generation of the redirection address. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7528) Create Dispatcher REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16172458#comment-16172458 ] ASF GitHub Bot commented on FLINK-7528: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4598 > Create Dispatcher REST endpoint > --- > > Key: FLINK-7528 > URL: https://issues.apache.org/jira/browse/FLINK-7528 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > Fix For: 1.4.0 > > > Create and integrate the {{DispatcherRestEndpoint}} with the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4598 ---
[jira] [Closed] (FLINK-7528) Create Dispatcher REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-7528. Resolution: Fixed Fix Version/s: 1.4.0 Added via 6a62f1455313ee8fae0ff79945da61fb67ec8edb > Create Dispatcher REST endpoint > --- > > Key: FLINK-7528 > URL: https://issues.apache.org/jira/browse/FLINK-7528 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > Fix For: 1.4.0 > > > Create and integrate the {{DispatcherRestEndpoint}} with the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7527) Add redirection logic to AbstractRestHandler
[ https://issues.apache.org/jira/browse/FLINK-7527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-7527. Resolution: Fixed Fix Version/s: 1.4.0 Added via 75e84e04f5a3e2766e331fd05ddb725fe9b00d99 > Add redirection logic to AbstractRestHandler > > > Key: FLINK-7527 > URL: https://issues.apache.org/jira/browse/FLINK-7527 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > The {{AbstractRestHandler}} should extend the {{RedirectHandler}} introduced > with FLINK-7459 in order to add redirection functionality to the > {{AbstractRestHandler}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7527) Add redirection logic to AbstractRestHandler
[ https://issues.apache.org/jira/browse/FLINK-7527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16172445#comment-16172445 ] ASF GitHub Bot commented on FLINK-7527: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4597 > Add redirection logic to AbstractRestHandler > > > Key: FLINK-7527 > URL: https://issues.apache.org/jira/browse/FLINK-7527 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > The {{AbstractRestHandler}} should extend the {{RedirectHandler}} introduced > with FLINK-7459 in order to add redirection functionality to the > {{AbstractRestHandler}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4597 ---
[jira] [Commented] (FLINK-7643) HadoopFileSystem always reloads GlobalConfiguration
[ https://issues.apache.org/jira/browse/FLINK-7643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16172361#comment-16172361 ] Bowen Li commented on FLINK-7643: - Looks like it's related to FLINK-7365 > HadoopFileSystem always reloads GlobalConfiguration > --- > > Key: FLINK-7643 > URL: https://issues.apache.org/jira/browse/FLINK-7643 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Ufuk Celebi > > HadoopFileSystem always reloads GlobalConfiguration, which potentially leads > to a lot of noise in the logs, because this happens on each checkpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7643) HadoopFileSystem always reloads GlobalConfiguration
[ https://issues.apache.org/jira/browse/FLINK-7643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-7643: - Affects Version/s: 1.4.0 > HadoopFileSystem always reloads GlobalConfiguration > --- > > Key: FLINK-7643 > URL: https://issues.apache.org/jira/browse/FLINK-7643 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Ufuk Celebi > > HadoopFileSystem always reloads GlobalConfiguration, which potentially leads > to a lot of noise in the logs, because this happens on each checkpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7514) fix BackPressureStatsTrackerITCase releasing buffers twice
[ https://issues.apache.org/jira/browse/FLINK-7514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16172286#comment-16172286 ] ASF GitHub Bot commented on FLINK-7514: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4591 @zentol unfortunately, when changing to using Netty's buffer counting, it will check any form of illegal reference count usages, and a double-free is one of them. Even without Netty, this pattern could result from an invalid use and may be guarded the same way. > fix BackPressureStatsTrackerITCase releasing buffers twice > -- > > Key: FLINK-7514 > URL: https://issues.apache.org/jira/browse/FLINK-7514 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > {{BackPressureStatsTrackerITCase#testBackPressuredProducer()}} is releasing > its buffers twice which should be fixed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4591: [FLINK-7514][tests] fix BackPressureStatsTrackerITCase re...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4591 @zentol unfortunately, when changing to using Netty's buffer counting, it will check any form of illegal reference count usages, and a double-free is one of them. Even without Netty, this pattern could result from an invalid use and may be guarded the same way. ---
[jira] [Commented] (FLINK-7527) Add redirection logic to AbstractRestHandler
[ https://issues.apache.org/jira/browse/FLINK-7527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16172278#comment-16172278 ] ASF GitHub Bot commented on FLINK-7527: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4597#discussion_r139805450 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java --- @@ -38,6 +40,10 @@ public ErrorResponseBody(String error) { this(Collections.singletonList(error)); } + public ErrorResponseBody(Throwable throwable) { --- End diff -- True. Will remove it. > Add redirection logic to AbstractRestHandler > > > Key: FLINK-7527 > URL: https://issues.apache.org/jira/browse/FLINK-7527 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > The {{AbstractRestHandler}} should extend the {{RedirectHandler}} introduced > with FLINK-7459 in order to add redirection functionality to the > {{AbstractRestHandler}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4597#discussion_r139805450 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java --- @@ -38,6 +40,10 @@ public ErrorResponseBody(String error) { this(Collections.singletonList(error)); } + public ErrorResponseBody(Throwable throwable) { --- End diff -- True. Will remove it. ---
[jira] [Commented] (FLINK-7603) Support within clause in MatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-7603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16172085#comment-16172085 ] ASF GitHub Bot commented on FLINK-7603: --- GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/4684 [FLINK-7603][savepoint/doc] Document how to take a savepoint on YARN ## What is the purpose of the change The documentation should have a separate entry for savepoint related CLI commands in combination with YARN. It is currently not documented that you have to supply the application id, nor how you can pass it. ## Brief change log - *add instruction of taking savepoints on YARN to both Savepoint and CLI doc* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-7603 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4684.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4684 commit 6386983239bd3024b395c865ec4fd33e232ca5a3 Author: Bowen Li Date: 2017-08-30T16:35:03Z FLINK-7422 Upgrade Kinesis Client Library (KCL) and AWS SDK in flink-connector-kinesis commit 381cd4156b84673a1d32d2db3f7b2d748d90d980 Author: Bowen Li Date: 2017-09-07T06:33:37Z Merge remote-tracking branch 'upstream/master' commit dcf40bd821187b848d924f7f4df6805b1b924c16 Author: Bowen Li Date: 2017-09-15T18:00:03Z Merge remote-tracking branch 'upstream/master' commit 169ea0a3bee1ba315d39fa49c16e9bd7c71d1338 Author: Bowen Li Date: 2017-09-18T06:25:26Z Merge remote-tracking branch 'upstream/master' commit 659e91c18ade8eb65d355b5b85ae2d402a61ff5e Author: Bowen Li Date: 2017-09-18T23:50:48Z Merge remote-tracking branch 'upstream/master' commit 990c4648a1427ca7c3c27453fe2a40cd5cac3734 Author: Bowen Li Date: 2017-09-19T17:18:54Z Merge remote-tracking branch 'upstream/master' commit ae4b5647e25d6e6915486c6ac4a42e887a53101c Author: Bowen Li Date: 2017-09-19T17:44:16Z FLINK-7603 Document how to take a savepoint on YARN commit f82db156e6cabab9f43b7c3ec658b0ddc1a0637c Author: Bowen Li Date: 2017-09-19T17:48:48Z update doc > Support within clause in MatchRecognize > --- > > Key: FLINK-7603 > URL: https://issues.apache.org/jira/browse/FLINK-7603 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4684: [FLINK-7603][savepoint/doc] Document how to take a...
GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/4684 [FLINK-7603][savepoint/doc] Document how to take a savepoint on YARN ## What is the purpose of the change The documentation should have a separate entry for savepoint related CLI commands in combination with YARN. It is currently not documented that you have to supply the application id, nor how you can pass it. ## Brief change log - *add instruction of taking savepoints on YARN to both Savepoint and CLI doc* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-7603 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4684.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4684 commit 6386983239bd3024b395c865ec4fd33e232ca5a3 Author: Bowen Li Date: 2017-08-30T16:35:03Z FLINK-7422 Upgrade Kinesis Client Library (KCL) and AWS SDK in flink-connector-kinesis commit 381cd4156b84673a1d32d2db3f7b2d748d90d980 Author: Bowen Li Date: 2017-09-07T06:33:37Z Merge remote-tracking branch 'upstream/master' commit dcf40bd821187b848d924f7f4df6805b1b924c16 Author: Bowen Li Date: 2017-09-15T18:00:03Z Merge remote-tracking branch 'upstream/master' commit 169ea0a3bee1ba315d39fa49c16e9bd7c71d1338 Author: Bowen Li Date: 2017-09-18T06:25:26Z Merge remote-tracking branch 'upstream/master' commit 659e91c18ade8eb65d355b5b85ae2d402a61ff5e Author: Bowen Li Date: 2017-09-18T23:50:48Z Merge remote-tracking branch 'upstream/master' commit 990c4648a1427ca7c3c27453fe2a40cd5cac3734 Author: Bowen Li Date: 2017-09-19T17:18:54Z Merge remote-tracking branch 'upstream/master' commit ae4b5647e25d6e6915486c6ac4a42e887a53101c Author: Bowen Li Date: 2017-09-19T17:44:16Z FLINK-7603 Document how to take a savepoint on YARN commit f82db156e6cabab9f43b7c3ec658b0ddc1a0637c Author: Bowen Li Date: 2017-09-19T17:48:48Z update doc ---
[jira] [Closed] (FLINK-6549) Improve error message for type mismatches with side outputs
[ https://issues.apache.org/jira/browse/FLINK-6549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li closed FLINK-6549. --- Resolution: Fixed > Improve error message for type mismatches with side outputs > --- > > Key: FLINK-6549 > URL: https://issues.apache.org/jira/browse/FLINK-6549 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.3.0, 1.4.0 >Reporter: Chesnay Schepler >Assignee: Bowen Li >Priority: Minor > > A type mismatch when using side outputs causes a ClassCastException to be > thrown. It would be neat to include the name of the OutputTags in the > exception message. > This can occur when multiple {{OutputTag]}s with different types but > identical names are being used. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r139742368 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -405,6 +411,8 @@ public void merge(W mergeResult, // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && lateDataOutputTag != null && isElementLate(element)) { sideOutput(element); + } else if (isSkippedElement) { --- End diff -- i think when the `isSkippedElement` is true, the `isElementLate(element)` is always be true. Because `isSkippedElement` is true when all the assigned window's window.endtime + allowLateness < currentLowWatermark, and `isElementLate` is true when element.time + allowLateness < currentLowWatermark. and element.time is <= bigest window.endtime. so does `isElementLate` always be true when isSkippedElement is true? And i think if i want to rule out the situation that **because no windows were assigned to it.**, i just need to judge whether the variable `Collection elementWindows` is empty? ---
[jira] [Commented] (FLINK-7630) Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile()
[ https://issues.apache.org/jira/browse/FLINK-7630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171912#comment-16171912 ] ASF GitHub Bot commented on FLINK-7630: --- Github user tony810430 closed the pull request at: https://github.com/apache/flink/pull/4678 > Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile() > > > Key: FLINK-7630 > URL: https://issues.apache.org/jira/browse/FLINK-7630 > Project: Flink > Issue Type: Improvement > Components: Java API >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei >Priority: Minor > Fix For: 1.4.0, 1.3.3 > > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-user-ParameterTool-fromPropertiesFile-to-get-resource-file-inside-my-jar-tp15482.html > From this discussion, it seems that the current functionality of > {{ParameterTool.fromPropertiesFile}} is not enough. > It will be more useful if {{ParameterTool.fromPropertiesFile}} can provide > more kinds of parameter type such as {{File}} and {{InputStream}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4678: [FLINK-7630] [Java API] Allow passing a File or an...
Github user tony810430 closed the pull request at: https://github.com/apache/flink/pull/4678 ---
[jira] [Commented] (FLINK-7630) Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile()
[ https://issues.apache.org/jira/browse/FLINK-7630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171911#comment-16171911 ] ASF GitHub Bot commented on FLINK-7630: --- Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/4678 I thought it would be closed automatically. But it's okay, I will close it. Thanks for your quick review. > Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile() > > > Key: FLINK-7630 > URL: https://issues.apache.org/jira/browse/FLINK-7630 > Project: Flink > Issue Type: Improvement > Components: Java API >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei >Priority: Minor > Fix For: 1.4.0, 1.3.3 > > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-user-ParameterTool-fromPropertiesFile-to-get-resource-file-inside-my-jar-tp15482.html > From this discussion, it seems that the current functionality of > {{ParameterTool.fromPropertiesFile}} is not enough. > It will be more useful if {{ParameterTool.fromPropertiesFile}} can provide > more kinds of parameter type such as {{File}} and {{InputStream}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4678: [FLINK-7630] [Java API] Allow passing a File or an InputS...
Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/4678 I thought it would be closed automatically. But it's okay, I will close it. Thanks for your quick review. ---
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171909#comment-16171909 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4625 Btw, I noticed I did not reply to your comments. I think it would be good to have the eager state cleaning in the initial version. Shouldn't be too much effort. Basically, getting the condition right and calling `remove()` on the `Map.Entry`. What do you mean by "distinguish the < and <=signs"? If there is an off-by-one issue in the computation of the window boundaries, it needs to be fixed with this PR. We shouldn't merge a semantically incorrect operator (of course there might be bugs...). Performance issues are OK but the semantics must be correct. Regarding the `"misc"` test failures, yes that can happen. No need to worry about that as long as the `""` libraries build passes. I'll run the tests anyway again before merging ;-) > Support rowtime inner equi-join between two streams in the SQL API > -- > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: Xingcan Cui > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL > '1' HOUR}} only can use rowtime that is a system attribute, the time > condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime > - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support > unbounded like {{o.rowtime < s.rowtime}} , and should include both two > stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + > 1}} should also not be supported. > An row-time streams join will not be able to handle late data, because this > would mean in insert a row into a sorted order shift all other computations. > This would be too expensive to maintain. Therefore, we will throw an error if > a user tries to use an row-time stream join with late data handling. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4625: [FLINK-6233] [table] Support time-bounded stream inner jo...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4625 Btw, I noticed I did not reply to your comments. I think it would be good to have the eager state cleaning in the initial version. Shouldn't be too much effort. Basically, getting the condition right and calling `remove()` on the `Map.Entry`. What do you mean by "distinguish the < and <=signs"? If there is an off-by-one issue in the computation of the window boundaries, it needs to be fixed with this PR. We shouldn't merge a semantically incorrect operator (of course there might be bugs...). Performance issues are OK but the semantics must be correct. Regarding the `"misc"` test failures, yes that can happen. No need to worry about that as long as the `""` libraries build passes. I'll run the tests anyway again before merging ;-) ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139728162 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute row(event) time bounded stream inner-join. + */ +class RowTimeBoundedStreamInnerJoin( +leftLowerBound: Long, +leftUpperBound: Long, +allowedLateness: Long, +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String, +leftTimeIdx: Int, +rightTimeIdx: Int) +extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode, + leftTimeIdx, + rightTimeIdx, + JoinTimeIndicator.ROWTIME) { + + override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = { +timeForRow <= watermark - allowedLateness + } + + override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = { +rightOperatorTime = + if (ctx.timerService().currentWatermark() > 0) ctx.timerService().currentWatermark() --- End diff -- OK, let's keep it here. Changing the value of watermarks won't be possible as it is built into the DataStream API and some users rely on the current behavior. The curse of public APIs ;-) ---
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171900#comment-16171900 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139728162 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute row(event) time bounded stream inner-join. + */ +class RowTimeBoundedStreamInnerJoin( +leftLowerBound: Long, +leftUpperBound: Long, +allowedLateness: Long, +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String, +leftTimeIdx: Int, +rightTimeIdx: Int) +extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode, + leftTimeIdx, + rightTimeIdx, + JoinTimeIndicator.ROWTIME) { + + override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = { +timeForRow <= watermark - allowedLateness + } + + override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = { +rightOperatorTime = + if (ctx.timerService().currentWatermark() > 0) ctx.timerService().currentWatermark() --- End diff -- OK, let's keep it here. Changing the value of watermarks won't be possible as it is built into the DataStream API and some users rely on the current behavior. The curse of public APIs ;-) > Support rowtime inner equi-join between two streams in the SQL API > -- > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: Xingcan Cui > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL > '1' HOUR}} only can use rowtime that is a system attribute, the time > condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime > - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support > unbounded like {{o.rowtime < s.rowtime}} , and should include both two > stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + > 1}} should also not be supported. > An row-time streams join will not be able to handle late data, because this > would mean in insert a row into a sorted order shift all other computations. > This would be too expensive to maintain. Therefore, we will throw an error if > a user tries to use an row-time stream join with late data handling. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin).
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171894#comment-16171894 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139727416 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute processing time bounded stream inner-join. + */ +class ProcTimeBoundedStreamInnerJoin( +leftLowerBound: Long, +leftUpperBound: Long, +allowedLateness: Long, +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String) +extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode, + leftTimeIdx = -1, + rightTimeIdx = -1, + JoinTimeIndicator.PROCTIME) { + + override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = false + + override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = { +rightOperatorTime = ctx.timerService().currentProcessingTime() +leftOperatorTime = ctx.timerService().currentProcessingTime() + } + + override def getTimeForLeftStream( + context: CoProcessFunction[CRow, CRow, CRow]#Context, + row: CRow): Long = { +context.timerService().currentProcessingTime() --- End diff -- To be honest, I would not put too much effort into the processing time case, especially not if it affects the performance of event-time processing. Processing time is non-deterministic anyway. The reason I brought this up is because I wasn't sure of the side effects if the the row proctime > operator time. If this is not an issue, we can keep it like this. Otherwise, the easiest solution would be to just add a comment to the invocations of `updateOperatorTime` that this call must be the first call in all processing methods (`processElement()`, `onTimer()`). Since this is just internal API, this should be fine. > Support rowtime inner equi-join between two streams in the SQL API > -- > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: Xingcan Cui > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL > '1' HOUR}} only can use rowtime that is a system attribute, the time > condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime > - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support > unbounded like {{o.rowtime < s.rowtime}} , and should include both two > strea
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139727416 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * The function to execute processing time bounded stream inner-join. + */ +class ProcTimeBoundedStreamInnerJoin( +leftLowerBound: Long, +leftUpperBound: Long, +allowedLateness: Long, +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String) +extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode, + leftTimeIdx = -1, + rightTimeIdx = -1, + JoinTimeIndicator.PROCTIME) { + + override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = false + + override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = { +rightOperatorTime = ctx.timerService().currentProcessingTime() +leftOperatorTime = ctx.timerService().currentProcessingTime() + } + + override def getTimeForLeftStream( + context: CoProcessFunction[CRow, CRow, CRow]#Context, + row: CRow): Long = { +context.timerService().currentProcessingTime() --- End diff -- To be honest, I would not put too much effort into the processing time case, especially not if it affects the performance of event-time processing. Processing time is non-deterministic anyway. The reason I brought this up is because I wasn't sure of the side effects if the the row proctime > operator time. If this is not an issue, we can keep it like this. Otherwise, the easiest solution would be to just add a comment to the invocations of `updateOperatorTime` that this call must be the first call in all processing methods (`processElement()`, `onTimer()`). Since this is just internal API, this should be fine. ---
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171867#comment-16171867 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139723927 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativ
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139723927 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139723011 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171860#comment-16171860 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139723011 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativ
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171856#comment-16171856 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139722338 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativ
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139722338 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[jira] [Commented] (FLINK-7630) Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile()
[ https://issues.apache.org/jira/browse/FLINK-7630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171852#comment-16171852 ] ASF GitHub Bot commented on FLINK-7630: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4678 I merged, could you please close this PR? And thanks for working on this! 😃 > Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile() > > > Key: FLINK-7630 > URL: https://issues.apache.org/jira/browse/FLINK-7630 > Project: Flink > Issue Type: Improvement > Components: Java API >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei >Priority: Minor > Fix For: 1.4.0, 1.3.3 > > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-user-ParameterTool-fromPropertiesFile-to-get-resource-file-inside-my-jar-tp15482.html > From this discussion, it seems that the current functionality of > {{ParameterTool.fromPropertiesFile}} is not enough. > It will be more useful if {{ParameterTool.fromPropertiesFile}} can provide > more kinds of parameter type such as {{File}} and {{InputStream}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4678: [FLINK-7630] [Java API] Allow passing a File or an InputS...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4678 I merged, could you please close this PR? And thanks for working on this! ð ---
[jira] [Closed] (FLINK-7630) Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile()
[ https://issues.apache.org/jira/browse/FLINK-7630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-7630. --- Resolution: Fixed Implemented on release-1.3 in 100951e279ad352f3b8921970217cd8edf14ac20 Implemented on master in a66315a5cc52a2596e76f09641867be6ce22242c > Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile() > > > Key: FLINK-7630 > URL: https://issues.apache.org/jira/browse/FLINK-7630 > Project: Flink > Issue Type: Improvement > Components: Java API >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei >Priority: Minor > Fix For: 1.4.0, 1.3.3 > > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-user-ParameterTool-fromPropertiesFile-to-get-resource-file-inside-my-jar-tp15482.html > From this discussion, it seems that the current functionality of > {{ParameterTool.fromPropertiesFile}} is not enough. > It will be more useful if {{ParameterTool.fromPropertiesFile}} can provide > more kinds of parameter type such as {{File}} and {{InputStream}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6733) Remove commented out AvgAggregationFunction.java
[ https://issues.apache.org/jira/browse/FLINK-6733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171814#comment-16171814 ] ASF GitHub Bot commented on FLINK-6733: --- Github user mlipkovich commented on the issue: https://github.com/apache/flink/pull/4630 @zentol thanks for your comment. As I understand I can't merge it without a formal review. May I ask you to review it? > Remove commented out AvgAggregationFunction.java > > > Key: FLINK-6733 > URL: https://issues.apache.org/jira/browse/FLINK-6733 > Project: Flink > Issue Type: Bug > Components: Java API >Reporter: Dawid Wysakowicz >Assignee: Mikhail Lipkovich > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4630: [FLINK-6733] Remove commented out AvgAggregationFunction....
Github user mlipkovich commented on the issue: https://github.com/apache/flink/pull/4630 @zentol thanks for your comment. As I understand I can't merge it without a formal review. May I ask you to review it? ---
[GitHub] flink pull request #4638: [FLINK-6563] [table] Add time indicator support to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4638#discussion_r139714565 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -62,10 +88,107 @@ DeserializationSchema deserializationSchema, TypeInformation typeInfo) { - this.topic = Preconditions.checkNotNull(topic, "Topic"); - this.properties = Preconditions.checkNotNull(properties, "Properties"); - this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema"); - this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type information"); + this.topic = Preconditions.checkNotNull(topic, "Topic must not be null."); + this.properties = Preconditions.checkNotNull(properties, "Properties must not be null."); + this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema must not be null."); + this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type information must not be null."); + } + + /** +* Adds processing time attribute to the table. The attribute is appended to each row. +* +* @param proctime The name of the added processing time attribute. +*/ + public void addProcTimeAttribute(String proctime) { + Preconditions.checkNotNull(proctime, "Processing time attribute must not be null."); + this.procTimeAttribute = proctime; + } + + /** +* Adds an ingestion time attribute to the table. The attribute is append at the end of each row. +* +* For each row, the ingestion time attribute is initialized with the current time when the row +* is read from Kafka. From there on, it behaves as an event time attribute. +* +* @param ingestionTime The name of the added ingestion time attribute. +*/ + public void addIngestionTimeAttribute(String ingestionTime) { + Preconditions.checkNotNull(ingestionTime, "Ingestion time attribute must not be null."); + if (this.rowTimeAttribute != null) { + throw new ValidationException( + "You can only specify an ingestion time attribute OR a row time attribute."); + } + this.rowTimeAttribute = ingestionTime; --- End diff -- Yes, that's inconsistently handled. Thanks for pointing this out. I removed `this.ingestionTimeAttribute` ---
[jira] [Commented] (FLINK-6563) Expose time indicator attributes in the KafkaTableSource
[ https://issues.apache.org/jira/browse/FLINK-6563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171806#comment-16171806 ] ASF GitHub Bot commented on FLINK-6563: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4638#discussion_r139714565 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -62,10 +88,107 @@ DeserializationSchema deserializationSchema, TypeInformation typeInfo) { - this.topic = Preconditions.checkNotNull(topic, "Topic"); - this.properties = Preconditions.checkNotNull(properties, "Properties"); - this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema"); - this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type information"); + this.topic = Preconditions.checkNotNull(topic, "Topic must not be null."); + this.properties = Preconditions.checkNotNull(properties, "Properties must not be null."); + this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema must not be null."); + this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type information must not be null."); + } + + /** +* Adds processing time attribute to the table. The attribute is appended to each row. +* +* @param proctime The name of the added processing time attribute. +*/ + public void addProcTimeAttribute(String proctime) { + Preconditions.checkNotNull(proctime, "Processing time attribute must not be null."); + this.procTimeAttribute = proctime; + } + + /** +* Adds an ingestion time attribute to the table. The attribute is append at the end of each row. +* +* For each row, the ingestion time attribute is initialized with the current time when the row +* is read from Kafka. From there on, it behaves as an event time attribute. +* +* @param ingestionTime The name of the added ingestion time attribute. +*/ + public void addIngestionTimeAttribute(String ingestionTime) { + Preconditions.checkNotNull(ingestionTime, "Ingestion time attribute must not be null."); + if (this.rowTimeAttribute != null) { + throw new ValidationException( + "You can only specify an ingestion time attribute OR a row time attribute."); + } + this.rowTimeAttribute = ingestionTime; --- End diff -- Yes, that's inconsistently handled. Thanks for pointing this out. I removed `this.ingestionTimeAttribute` > Expose time indicator attributes in the KafkaTableSource > > > Key: FLINK-6563 > URL: https://issues.apache.org/jira/browse/FLINK-6563 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Haohui Mai >Assignee: Haohui Mai >Priority: Critical > Fix For: 1.4.0 > > > This is a follow up for FLINK-5884. > After FLINK-5884 requires the {{TableSource}} interfaces to expose the > processing time and the event time for the data stream. This jira proposes to > expose these two information in the Kafka table source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5944) Flink should support reading Snappy Files
[ https://issues.apache.org/jira/browse/FLINK-5944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171805#comment-16171805 ] Mikhail Lipkovich commented on FLINK-5944: -- The build has failed again but errors seem to be unrelated (kafka-connector and scala examples). Locally it was built and tested. [~Zentol] could you please review changes? > Flink should support reading Snappy Files > - > > Key: FLINK-5944 > URL: https://issues.apache.org/jira/browse/FLINK-5944 > Project: Flink > Issue Type: New Feature > Components: Batch Connectors and Input/Output Formats >Reporter: Ilya Ganelin >Assignee: Mikhail Lipkovich > Labels: features > > Snappy is an extremely performant compression format that's widely used > offering fast decompression/compression. > This can be easily implemented by creating a SnappyInflaterInputStreamFactory > and updating the initDefaultInflateInputStreamFactories in FileInputFormat. > Flink already includes the Snappy dependency in the project. > There is a minor gotcha in this. If we wish to use this with Hadoop, then we > must provide two separate implementations since Hadoop uses a different > version of the snappy format than Snappy Java (which is the xerial/snappy > included in Flink). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7575) Dashboard jobs/tasks metrics display 0 when metrics are not yet available
[ https://issues.apache.org/jira/browse/FLINK-7575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171797#comment-16171797 ] ASF GitHub Bot commented on FLINK-7575: --- Github user jameslafa commented on the issue: https://github.com/apache/flink/pull/4647 @zentol I agree, it make it a lot easier to read. I just pushed the update. Thanks for your feedback. > Dashboard jobs/tasks metrics display 0 when metrics are not yet available > - > > Key: FLINK-7575 > URL: https://issues.apache.org/jira/browse/FLINK-7575 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 1.3.2 >Reporter: James Lafa >Assignee: James Lafa >Priority: Minor > > The web frontend is currently displaying "0" when a metric is not available > yet (ex: records-in/out, bytes-in/out). > 0 is misleading and it's preferable to display no value while the value is > still unknown. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4647: [FLINK-7575] [WEB-DASHBOARD] Display "Fetching..." instea...
Github user jameslafa commented on the issue: https://github.com/apache/flink/pull/4647 @zentol I agree, it make it a lot easier to read. I just pushed the update. Thanks for your feedback. ---
[jira] [Commented] (FLINK-6563) Expose time indicator attributes in the KafkaTableSource
[ https://issues.apache.org/jira/browse/FLINK-6563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171795#comment-16171795 ] ASF GitHub Bot commented on FLINK-6563: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4638#discussion_r139711376 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -106,8 +240,191 @@ return deserializationSchema; } - @Override - public String explainSource() { - return ""; + /** +* Assigns ingestion time timestamps and watermarks. +*/ + public static class IngestionTimeWatermarkAssigner implements AssignerWithPeriodicWatermarks { + + private long curTime = Long.MIN_VALUE; + + @Override + public long extractTimestamp(Row element, long previousElementTimestamp) { + long t = System.currentTimeMillis(); + if (t > curTime) { + curTime = t; + } + return curTime; + } + + @Nullable + @Override + public Watermark getCurrentWatermark() { + return new Watermark(curTime - 1); + } + } + + protected AssignerWithPeriodicWatermarks getAssigner() { + return this.timestampAssigner; + } + + /** +* Checks that the provided row time attribute is valid, determines its position in the schema, +* and adjusts the return type. +* +* @param rowtime The attribute to check. +*/ + private void configureRowTimeAttribute(String rowtime) { + Preconditions.checkNotNull(rowtime, "Row time attribute must not be null."); + + if (this.ingestionTimeAttribute != null) { + throw new ValidationException( + "You can only specify a row time attribute OR an ingestion time attribute."); + } + + if (this.rowTimeAttribute != null) { + throw new ValidationException( + "Row time attribute can only be specified once."); + } + + // get current fields + String[] fieldNames = ((RowTypeInfo) this.getReturnType()).getFieldNames(); + TypeInformation[] fieldTypes = ((RowTypeInfo) this.getReturnType()).getFieldTypes(); + + // check if the rowtime field exists and remember position + this.rowtimeFieldPos = -1; + for (int i = 0; i < fieldNames.length; i++) { + if (fieldNames[i].equals(rowtime)) { + if (fieldTypes[i] != Types.LONG) { + throw new IllegalArgumentException("Specified rowtime field must be of type BIGINT. " + + "Available fields: " + toSchemaString(fieldNames, fieldTypes)); + } + this.rowtimeFieldPos = i; + break; + } + } + if (this.rowtimeFieldPos < 0) { + throw new IllegalArgumentException("Specified rowtime field must be present in data. " + + "Available fields: " + toSchemaString(fieldNames, fieldTypes)); + } + this.rowTimeAttribute = rowtime; + + // adjust result type by removing rowtime field (will be added later) + String[] newNames = new String[fieldNames.length - 1]; + TypeInformation[] newTypes = new TypeInformation[fieldTypes.length - 1]; + for (int i = 0; i < rowtimeFieldPos; i++) { + newNames[i] = fieldNames[i]; + newTypes[i] = fieldTypes[i]; + } + for (int i = rowtimeFieldPos + 1; i < fieldNames.length; i++) { + newNames[i - 1] = fieldNames[i]; + newTypes[i - 1] = fieldTypes[i]; + } + this.typeInfo = new RowTypeInfo(newTypes, newNames); + } + + /** +* Util method to create a schema description. +* +* @param fieldNames The names of the fields. +* @param fieldTypes The types of the fields. +* @return A string describing the schema of the given field information. +*/ + private String toSchemaString(String[] fieldNames, TypeInformation[] fieldTypes) { + Preconditions.checkArgument(fieldNames.length == fieldTypes.length); +
[GitHub] flink pull request #4638: [FLINK-6563] [table] Add time indicator support to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4638#discussion_r139711376 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -106,8 +240,191 @@ return deserializationSchema; } - @Override - public String explainSource() { - return ""; + /** +* Assigns ingestion time timestamps and watermarks. +*/ + public static class IngestionTimeWatermarkAssigner implements AssignerWithPeriodicWatermarks { + + private long curTime = Long.MIN_VALUE; + + @Override + public long extractTimestamp(Row element, long previousElementTimestamp) { + long t = System.currentTimeMillis(); + if (t > curTime) { + curTime = t; + } + return curTime; + } + + @Nullable + @Override + public Watermark getCurrentWatermark() { + return new Watermark(curTime - 1); + } + } + + protected AssignerWithPeriodicWatermarks getAssigner() { + return this.timestampAssigner; + } + + /** +* Checks that the provided row time attribute is valid, determines its position in the schema, +* and adjusts the return type. +* +* @param rowtime The attribute to check. +*/ + private void configureRowTimeAttribute(String rowtime) { + Preconditions.checkNotNull(rowtime, "Row time attribute must not be null."); + + if (this.ingestionTimeAttribute != null) { + throw new ValidationException( + "You can only specify a row time attribute OR an ingestion time attribute."); + } + + if (this.rowTimeAttribute != null) { + throw new ValidationException( + "Row time attribute can only be specified once."); + } + + // get current fields + String[] fieldNames = ((RowTypeInfo) this.getReturnType()).getFieldNames(); + TypeInformation[] fieldTypes = ((RowTypeInfo) this.getReturnType()).getFieldTypes(); + + // check if the rowtime field exists and remember position + this.rowtimeFieldPos = -1; + for (int i = 0; i < fieldNames.length; i++) { + if (fieldNames[i].equals(rowtime)) { + if (fieldTypes[i] != Types.LONG) { + throw new IllegalArgumentException("Specified rowtime field must be of type BIGINT. " + + "Available fields: " + toSchemaString(fieldNames, fieldTypes)); + } + this.rowtimeFieldPos = i; + break; + } + } + if (this.rowtimeFieldPos < 0) { + throw new IllegalArgumentException("Specified rowtime field must be present in data. " + + "Available fields: " + toSchemaString(fieldNames, fieldTypes)); + } + this.rowTimeAttribute = rowtime; + + // adjust result type by removing rowtime field (will be added later) + String[] newNames = new String[fieldNames.length - 1]; + TypeInformation[] newTypes = new TypeInformation[fieldTypes.length - 1]; + for (int i = 0; i < rowtimeFieldPos; i++) { + newNames[i] = fieldNames[i]; + newTypes[i] = fieldTypes[i]; + } + for (int i = rowtimeFieldPos + 1; i < fieldNames.length; i++) { + newNames[i - 1] = fieldNames[i]; + newTypes[i - 1] = fieldTypes[i]; + } + this.typeInfo = new RowTypeInfo(newTypes, newNames); + } + + /** +* Util method to create a schema description. +* +* @param fieldNames The names of the fields. +* @param fieldTypes The types of the fields. +* @return A string describing the schema of the given field information. +*/ + private String toSchemaString(String[] fieldNames, TypeInformation[] fieldTypes) { + Preconditions.checkArgument(fieldNames.length == fieldTypes.length); + + StringBuilder sb = new StringBuilder("["); + for (int i = 0; i < fieldNames.length - 1; i++) { + sb.append(fieldNames[i]); + sb.append(": "); + if (fieldTypes[i
[jira] [Commented] (FLINK-7630) Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile()
[ https://issues.apache.org/jira/browse/FLINK-7630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171778#comment-16171778 ] ASF GitHub Bot commented on FLINK-7630: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4678 The changes look good! 👍 I especially like the added tests and documentation. > Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile() > > > Key: FLINK-7630 > URL: https://issues.apache.org/jira/browse/FLINK-7630 > Project: Flink > Issue Type: Improvement > Components: Java API >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei >Priority: Minor > Fix For: 1.4.0, 1.3.3 > > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-user-ParameterTool-fromPropertiesFile-to-get-resource-file-inside-my-jar-tp15482.html > From this discussion, it seems that the current functionality of > {{ParameterTool.fromPropertiesFile}} is not enough. > It will be more useful if {{ParameterTool.fromPropertiesFile}} can provide > more kinds of parameter type such as {{File}} and {{InputStream}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4678: [FLINK-7630] [Java API] Allow passing a File or an InputS...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4678 The changes look good! ð I especially like the added tests and documentation. ---
[jira] [Commented] (FLINK-7357) HOP_START() HOP_END() does not work when using HAVING clause with GROUP BY HOP window
[ https://issues.apache.org/jira/browse/FLINK-7357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171774#comment-16171774 ] ASF GitHub Bot commented on FLINK-7357: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4521 Thanks for the PR @walterddr. We also need to replace group aux functions in the expressions of the filter predicate to support something like this: ``` HAVING SUM(a) > 0 AND QUARTER(HOP_START(ts, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE)) = 1 ``` But that's just a minor change. I'll fix that, extend the tests to cover the case, and do some refactorings before I merge this PR. Cheers, Fabian > HOP_START() HOP_END() does not work when using HAVING clause with GROUP BY > HOP window > - > > Key: FLINK-7357 > URL: https://issues.apache.org/jira/browse/FLINK-7357 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Rong Rong >Assignee: Rong Rong > > The following SQL does not compile: > {code:title=invalid_having_hop_start_sql} > SELECT > c AS k, > COUNT(a) AS v, > HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS > windowStart, > HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowEnd > FROM > T1 > GROUP BY > HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), > c > HAVING > SUM(b) > 1 > {code} > While individually keeping HAVING clause or HOP_START field compiles and runs > without issue. > more details: > https://github.com/apache/flink/compare/master...walterddr:having_does_not_work_with_hop_start_end -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4521: [FLINK-7357] [table] Created extended rules for WindowSta...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4521 Thanks for the PR @walterddr. We also need to replace group aux functions in the expressions of the filter predicate to support something like this: ``` HAVING SUM(a) > 0 AND QUARTER(HOP_START(ts, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE)) = 1 ``` But that's just a minor change. I'll fix that, extend the tests to cover the case, and do some refactorings before I merge this PR. Cheers, Fabian ---
[jira] [Commented] (FLINK-7635) Support sideOutput in ProcessWindowFunciton & ProcessAllWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171770#comment-16171770 ] Aljoscha Krettek commented on FLINK-7635: - Sounds good! +1 The important thing is also to have unit tests for the changes. A good starting point for that is {{ProcessOperatorTest}} and {{SideOutputITCase}}. > Support sideOutput in ProcessWindowFunciton & ProcessAllWindowFunction > -- > > Key: FLINK-7635 > URL: https://issues.apache.org/jira/browse/FLINK-7635 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Scala API >Reporter: Chen Qin >Assignee: Bowen Li >Priority: Minor > Fix For: 1.4.0, 1.3.3 > > > [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only > implemented output to ProcessFunction Context. It would be nice to add > support to ProcessWindow and ProcessAllWindow functions as well. [email > threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html] > [~aljoscha] I thought this is good warm up task for ppl to learn how window > function works in general. Otherwise feel free to assign back to me. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7635) Support sideOutput in ProcessWindowFunciton & ProcessAllWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-7635: Summary: Support sideOutput in ProcessWindowFunciton & ProcessAllWindowFunction (was: support sideOutput in ProcessWindowFunciton & ProcessAllWindowFunction) > Support sideOutput in ProcessWindowFunciton & ProcessAllWindowFunction > -- > > Key: FLINK-7635 > URL: https://issues.apache.org/jira/browse/FLINK-7635 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Scala API >Reporter: Chen Qin >Assignee: Bowen Li >Priority: Minor > Fix For: 1.4.0, 1.3.3 > > > [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only > implemented output to ProcessFunction Context. It would be nice to add > support to ProcessWindow and ProcessAllWindow functions as well. [email > threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html] > [~aljoscha] I thought this is good warm up task for ppl to learn how window > function works in general. Otherwise feel free to assign back to me. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r139702816 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -405,6 +411,8 @@ public void merge(W mergeResult, // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && lateDataOutputTag != null && isElementLate(element)) { sideOutput(element); + } else if (isSkippedElement) { --- End diff -- I think we also need to check whether it's late. An element could also be skipped because no windows were assigned to it. ---
[jira] [Commented] (FLINK-7552) Extend SinkFunction interface with SinkContext
[ https://issues.apache.org/jira/browse/FLINK-7552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171753#comment-16171753 ] ASF GitHub Bot commented on FLINK-7552: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4616 @pnowojski and @EronWright I changed the name of the `Context`, I added a test in `StreamSinkOperatorTest`, I added methods or querying current processing time/watermark to the context. I changed the `timestamp()` method to return a primitive `long` and I added a method `hasTimestamp()`. Also, `timestamp()` now throws an exception if no timestamp is available. What do you think? > Extend SinkFunction interface with SinkContext > -- > > Key: FLINK-7552 > URL: https://issues.apache.org/jira/browse/FLINK-7552 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.4.0 > > > Now that we require Java 8 we can extend the {{SinkFunction}} interface > without breaking backwards compatibility. I'm proposing this: > {code} > /** > * Interface for implementing user defined sink functionality. > * > * @param Input type parameter. > */ > @Public > public interface SinkFunction extends Function, Serializable { > /** >* Function for standard sink behaviour. This function is called for > every record. >* >* @param value The input record. >* @throws Exception >* @deprecated Use {@link #invoke(SinkContext, Object)}. >*/ > @Deprecated > default void invoke(IN value) throws Exception { > } > /** >* Writes the given value to the sink. This function is called for > every record. >* >* @param context Additional context about the input record. >* @param value The input record. >* @throws Exception >*/ > default void invoke(SinkContext context, IN value) throws Exception { > invoke(value); > } > /** >* Context that {@link SinkFunction SinkFunctions } can use for getting > additional data about >* an input record. >* >* @param The type of elements accepted by the sink. >*/ > @Public // Interface might be extended in the future with additional > methods. > interface SinkContext { > /** >* Returns the timestamp of the current input record. >*/ > long timestamp(); > } > } > {code} > For now, this only allows access to the element timestamp. This would allow > us to fix the abomination that is {{FlinkKafkaProducer010}}, which is a > hybrid {{SinkFunction}}/{{StreamOperator}} only because it needs access to > timestamps. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / Use in ...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4616 @pnowojski and @EronWright I changed the name of the `Context`, I added a test in `StreamSinkOperatorTest`, I added methods or querying current processing time/watermark to the context. I changed the `timestamp()` method to return a primitive `long` and I added a method `hasTimestamp()`. Also, `timestamp()` now throws an exception if no timestamp is available. What do you think? ---
[jira] [Created] (FLINK-7643) HadoopFileSystem always reloads GlobalConfiguration
Ufuk Celebi created FLINK-7643: -- Summary: HadoopFileSystem always reloads GlobalConfiguration Key: FLINK-7643 URL: https://issues.apache.org/jira/browse/FLINK-7643 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Reporter: Ufuk Celebi HadoopFileSystem always reloads GlobalConfiguration, which potentially leads to a lot of noise in the logs, because this happens on each checkpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7581) Name netty threads of rest components
[ https://issues.apache.org/jira/browse/FLINK-7581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-7581. --- Resolution: Fixed 1.4: 06d8e83009628a325e012635339b7e0b2821ddea > Name netty threads of rest components > - > > Key: FLINK-7581 > URL: https://issues.apache.org/jira/browse/FLINK-7581 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7583) Create singleton isntance for the content type header
[ https://issues.apache.org/jira/browse/FLINK-7583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-7583. --- Resolution: Fixed 1.4: ad4c41532ac22a0accf82c7ea9427aed9b71aaf4 > Create singleton isntance for the content type header > - > > Key: FLINK-7583 > URL: https://issues.apache.org/jira/browse/FLINK-7583 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > > The content type header for all rest requests/responses is always the same, > but we currently allocate a separate string for each request/response. We > should instead use a static constant. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4665: [Flink-7611]add metrics to measure the num of data droppe...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4665 Would be good to rename the metric to `numLateRecordsDropped` to be more consistent with existing metrics. @aljoscha will have to comment on whether the counting is correct or not. It may also be interesting to count the number of late elements that are NOT dropped. ---
[jira] [Commented] (FLINK-5944) Flink should support reading Snappy Files
[ https://issues.apache.org/jira/browse/FLINK-5944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171711#comment-16171711 ] ASF GitHub Bot commented on FLINK-5944: --- GitHub user mlipkovich opened a pull request: https://github.com/apache/flink/pull/4683 [FLINK-5944] Support reading of Snappy files ## What is the purpose of the change Support reading of Snappy compressed text files (both Xerial and Hadoop snappy) ## Brief change log - *Added InputStreamFactories for Xerial and Hadoop snappy* - *Added config parameter to control whether Xerial or Hadoop snappy should be used* ## Verifying this change - *Manually verified the change by running word count for text files compressed using different Snappy versions* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/mlipkovich/flink FLINK-5944 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4683.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4683 commit c4d4016f1e6b44833d24994c97532b4c5243e4d2 Author: Mikhail Lipkovich Date: 2017-09-19T13:34:10Z [FLINK-5944] Support reading of Snappy files > Flink should support reading Snappy Files > - > > Key: FLINK-5944 > URL: https://issues.apache.org/jira/browse/FLINK-5944 > Project: Flink > Issue Type: New Feature > Components: Batch Connectors and Input/Output Formats >Reporter: Ilya Ganelin >Assignee: Mikhail Lipkovich > Labels: features > > Snappy is an extremely performant compression format that's widely used > offering fast decompression/compression. > This can be easily implemented by creating a SnappyInflaterInputStreamFactory > and updating the initDefaultInflateInputStreamFactories in FileInputFormat. > Flink already includes the Snappy dependency in the project. > There is a minor gotcha in this. If we wish to use this with Hadoop, then we > must provide two separate implementations since Hadoop uses a different > version of the snappy format than Snappy Java (which is the xerial/snappy > included in Flink). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4683: [FLINK-5944] Support reading of Snappy files
GitHub user mlipkovich opened a pull request: https://github.com/apache/flink/pull/4683 [FLINK-5944] Support reading of Snappy files ## What is the purpose of the change Support reading of Snappy compressed text files (both Xerial and Hadoop snappy) ## Brief change log - *Added InputStreamFactories for Xerial and Hadoop snappy* - *Added config parameter to control whether Xerial or Hadoop snappy should be used* ## Verifying this change - *Manually verified the change by running word count for text files compressed using different Snappy versions* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/mlipkovich/flink FLINK-5944 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4683.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4683 commit c4d4016f1e6b44833d24994c97532b4c5243e4d2 Author: Mikhail Lipkovich Date: 2017-09-19T13:34:10Z [FLINK-5944] Support reading of Snappy files ---
[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints
[ https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171686#comment-16171686 ] ASF GitHub Bot commented on FLINK-7449: --- Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139691558 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipeline’s data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``
[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints
[ https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171696#comment-16171696 ] ASF GitHub Bot commented on FLINK-7449: --- Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139687800 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipeline’s data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``
[jira] [Commented] (FLINK-6563) Expose time indicator attributes in the KafkaTableSource
[ https://issues.apache.org/jira/browse/FLINK-6563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171704#comment-16171704 ] ASF GitHub Bot commented on FLINK-6563: --- Github user uybhatti commented on a diff in the pull request: https://github.com/apache/flink/pull/4638#discussion_r139692943 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -62,10 +88,107 @@ DeserializationSchema deserializationSchema, TypeInformation typeInfo) { - this.topic = Preconditions.checkNotNull(topic, "Topic"); - this.properties = Preconditions.checkNotNull(properties, "Properties"); - this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema"); - this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type information"); + this.topic = Preconditions.checkNotNull(topic, "Topic must not be null."); + this.properties = Preconditions.checkNotNull(properties, "Properties must not be null."); + this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema must not be null."); + this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type information must not be null."); + } + + /** +* Adds processing time attribute to the table. The attribute is appended to each row. +* +* @param proctime The name of the added processing time attribute. +*/ + public void addProcTimeAttribute(String proctime) { + Preconditions.checkNotNull(proctime, "Processing time attribute must not be null."); + this.procTimeAttribute = proctime; + } + + /** +* Adds an ingestion time attribute to the table. The attribute is append at the end of each row. +* +* For each row, the ingestion time attribute is initialized with the current time when the row +* is read from Kafka. From there on, it behaves as an event time attribute. +* +* @param ingestionTime The name of the added ingestion time attribute. +*/ + public void addIngestionTimeAttribute(String ingestionTime) { + Preconditions.checkNotNull(ingestionTime, "Ingestion time attribute must not be null."); + if (this.rowTimeAttribute != null) { + throw new ValidationException( + "You can only specify an ingestion time attribute OR a row time attribute."); + } + this.rowTimeAttribute = ingestionTime; --- End diff -- it should be `this.ingestionTimeAttribute = ingestionTime;` Otherwise no need of `ingestionTimeAttribute` variable > Expose time indicator attributes in the KafkaTableSource > > > Key: FLINK-6563 > URL: https://issues.apache.org/jira/browse/FLINK-6563 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Haohui Mai >Assignee: Haohui Mai >Priority: Critical > Fix For: 1.4.0 > > > This is a follow up for FLINK-5884. > After FLINK-5884 requires the {{TableSource}} interfaces to expose the > processing time and the event time for the data stream. This jira proposes to > expose these two information in the Kafka table source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints
[ https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171694#comment-16171694 ] ASF GitHub Bot commented on FLINK-7449: --- Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139691882 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipeline’s data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``
[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints
[ https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171695#comment-16171695 ] ASF GitHub Bot commented on FLINK-7449: --- Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139692206 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipeline’s data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``
[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints
[ https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171692#comment-16171692 ] ASF GitHub Bot commented on FLINK-7449: --- Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139689544 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipeline’s data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``
[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints
[ https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171693#comment-16171693 ] ASF GitHub Bot commented on FLINK-7449: --- Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139684257 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipeline’s data processing work. --- End diff -- on top of the normal load from the pipeline’s data processing work. (add "the") > Improve and enhance documentation for incremental checkpoints > - > > Key: FLINK-7449 > URL: https://issues.apache.org/jira/browse/FLINK-7449 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.4.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Minor > > We should provide more details about incremental checkpoints in the > documentation. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints
[ https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171698#comment-16171698 ] ASF GitHub Bot commented on FLINK-7449: --- Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139690107 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipeline’s data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``
[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints
[ https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171687#comment-16171687 ] ASF GitHub Bot commented on FLINK-7449: --- Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139687994 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipeline’s data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``
[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints
[ https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171689#comment-16171689 ] ASF GitHub Bot commented on FLINK-7449: --- Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139687055 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipeline’s data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``
[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints
[ https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171684#comment-16171684 ] ASF GitHub Bot commented on FLINK-7449: --- Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139691118 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipeline’s data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``
[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints
[ https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171683#comment-16171683 ] ASF GitHub Bot commented on FLINK-7449: --- Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139686793 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipeline’s data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``