[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer
[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584676#comment-16584676 ] ASF GitHub Bot commented on FLINK-9962: --- kl0u commented on issue #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer URL: https://github.com/apache/flink/pull/6492#issuecomment-414036401 Hi @bowenli86 ! Are you planning to continue working on this issue? If not, I could work on that next week ;) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > allow users to specify TimeZone in DateTimeBucketer > --- > > Key: FLINK-9962 > URL: https://issues.apache.org/jira/browse/FLINK-9962 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.1, 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently {{DateTimeBucketer}} will return a bucket path by using local > timezone. We should add a {{timezone}} constructor param to allow users to > specify a timezone. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] kl0u commented on issue #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer
kl0u commented on issue #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer URL: https://github.com/apache/flink/pull/6492#issuecomment-414036401 Hi @bowenli86 ! Are you planning to continue working on this issue? If not, I could work on that next week ;) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10168) support filtering files by modified/created time in StreamExecutionEnvironment.readFile()
[ https://issues.apache.org/jira/browse/FLINK-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584675#comment-16584675 ] buptljy commented on FLINK-10168: - [~phoenixjiangnan] Thanks! I think this will be a good improvement. We can define some readfile functions, which are based on the prefix and suffix of file names and last modified time. However, is it necessary to expose a generic filter function and let developers define their own file filters? Do we really have so many different application scenarios of readfile function? As far as I know, most cases can be covered by the three functions above. > support filtering files by modified/created time in > StreamExecutionEnvironment.readFile() > - > > Key: FLINK-10168 > URL: https://issues.apache.org/jira/browse/FLINK-10168 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.6.0 >Reporter: Bowen Li >Assignee: buptljy >Priority: Major > Fix For: 1.7.0 > > > support filtering files by modified/created time in > {{StreamExecutionEnvironment.readFile()}} > for example, in a source dir with lots of file, we only want to read files > that is created or modified after a specific time. > This API can expose a generic filter function of files, and let users define > filtering rules. Currently Flink only supports filtering files by path. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584672#comment-16584672 ] Timo Walther commented on FLINK-10163: -- [~suez1224] Yes, I think the full implementation as [~hequn8128] added it to the issue should be done with the DDL FLIP. We need a proper parser for this that also allows escaping like \{{CREATE VIEW `My Table Name`}} and specifying optional column names and types. Also \{{DROP VIEW}} and \{{REPLACE VIEW}}. This should be integrated with the Table API. This issue descibes just a MVP solution for allowing the very basic \{{CREATE_VIEW name AS query}} for the SQL Client. Because there is currently no way of defining a similar functionality like \{{tableEnv.registerTable("name", ...)}} in Table API. > Support CREATE VIEW in SQL Client > - > > Key: FLINK-10163 > URL: https://issues.apache.org/jira/browse/FLINK-10163 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > The possibility to define a name for a subquery would improve the usability > of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a > virtual table. > > Example: > {code} > CREATE VIEW viewName > [ '(' columnName [, columnName ]* ')' ] > AS Query > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10168) support filtering files by modified/created time in StreamExecutionEnvironment.readFile()
[ https://issues.apache.org/jira/browse/FLINK-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] buptljy reassigned FLINK-10168: --- Assignee: buptljy > support filtering files by modified/created time in > StreamExecutionEnvironment.readFile() > - > > Key: FLINK-10168 > URL: https://issues.apache.org/jira/browse/FLINK-10168 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.6.0 >Reporter: Bowen Li >Assignee: buptljy >Priority: Major > Fix For: 1.7.0 > > > support filtering files by modified/created time in > {{StreamExecutionEnvironment.readFile()}} > for example, in a source dir with lots of file, we only want to read files > that is created or modified after a specific time. > This API can expose a generic filter function of files, and let users define > filtering rules. Currently Flink only supports filtering files by path. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584667#comment-16584667 ] ASF GitHub Bot commented on FLINK-9407: --- phenixmzy edited a comment on issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink writer URL: https://github.com/apache/flink/pull/6075#issuecomment-414035357 @zhangminglei the OrcFileWriter with BucketingSink can rolling when the batchSize is full? It seems that one record one file. the code: === def orcSchemaMetaInfo = String.format( "struct<%s,%s,%s...,%s>", "nt:string", "event_time:string", "event_id:string", .., "appname:string") def getRowSink(distPath : String) = { val sink = new BucketingSink[Row](distPath + "/with-bucket/") sink.setBatchSize(1024 * 1024 * 1024) .setBucketer(new DateTimeBucketer[[Row]] ("MMdd/HHmm")) .setWriter(new OrcFileWriter[[Row]] (orcSchemaMetaInfo)) .setPartPrefix("sdk-etl") sink } def getOrcRow(item : sdkItem) : Row={ val row = Row.of( item.getNt, item.getEvent_time, item.getEvent_id,.,item.getAppid,item.getAppname) row } ... val kafkaConsumer = new FlinkKafkaConsumer011(inputTopic, new SimpleStringSchema, params.getProperties) val messageStream = env.addSource(kafkaConsumer) .flatMap(in => SDKParse.parseSDK(in, inputTopic)) .filter(item => item != None) .flatMap(item => Some(item).get) .map(item => getOrcRow(item)) messageStream.addSink(getRowSink(distPath)) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support orc rolling sink writer > --- > > Key: FLINK-9407 > URL: https://issues.apache.org/jira/browse/FLINK-9407 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: zhangminglei >Assignee: zhangminglei >Priority: Major > Labels: pull-request-available > > Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and > {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling > sink. > Below, FYI. > I tested the PR and verify the results with spark sql. Obviously, we can get > the results of what we had written down before. But I will give more tests in > the next couple of days. Including the performance under compression with > short checkpoint intervals. And more UTs. > {code:java} > scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21") > res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> > scala> res1.registerTempTable("tablerice") > warning: there was one deprecation warning; re-run with -deprecation for > details > scala> spark.sql("select * from tablerice") > res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> res3.show(3) > +-+---+---+ > | name|age|married| > +-+---+---+ > |Sagar| 26| false| > |Sagar| 30| false| > |Sagar| 34| false| > +-+---+---+ > only showing top 3 rows > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] phenixmzy edited a comment on issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink writer
phenixmzy edited a comment on issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink writer URL: https://github.com/apache/flink/pull/6075#issuecomment-414035357 @zhangminglei the OrcFileWriter with BucketingSink can rolling when the batchSize is full? It seems that one record one file. the code: === def orcSchemaMetaInfo = String.format( "struct<%s,%s,%s...,%s>", "nt:string", "event_time:string", "event_id:string", .., "appname:string") def getRowSink(distPath : String) = { val sink = new BucketingSink[Row](distPath + "/with-bucket/") sink.setBatchSize(1024 * 1024 * 1024) .setBucketer(new DateTimeBucketer[[Row]] ("MMdd/HHmm")) .setWriter(new OrcFileWriter[[Row]] (orcSchemaMetaInfo)) .setPartPrefix("sdk-etl") sink } def getOrcRow(item : sdkItem) : Row={ val row = Row.of( item.getNt, item.getEvent_time, item.getEvent_id,.,item.getAppid,item.getAppname) row } ... val kafkaConsumer = new FlinkKafkaConsumer011(inputTopic, new SimpleStringSchema, params.getProperties) val messageStream = env.addSource(kafkaConsumer) .flatMap(in => SDKParse.parseSDK(in, inputTopic)) .filter(item => item != None) .flatMap(item => Some(item).get) .map(item => getOrcRow(item)) messageStream.addSink(getRowSink(distPath)) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584665#comment-16584665 ] ASF GitHub Bot commented on FLINK-9407: --- phenixmzy edited a comment on issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink writer URL: https://github.com/apache/flink/pull/6075#issuecomment-414035357 @zhangminglei the OrcFileWriter with BucketingSink can rolling when the batchSize is full? It seems that one record one file. the code: === def orcSchemaMetaInfo = String.format( "struct<%s,%s,%s...,%s>", "nt:string", "event_time:string", "event_id:string", .., "appname:string") def getRowSink(distPath : String) = { val sink = new BucketingSink[Row](distPath + "/with-bucket/") sink.setBatchSize(1024 * 1024 * 1024) .setBucketer(new DateTimeBucketer[Row]("MMdd/HHmm")) .setWriter(new OrcFileWriter[Row](orcSchemaMetaInfo)) .setPartPrefix("sdk-etl") sink } def getOrcRow(item : sdkItem) : Row={ val row = Row.of( item.getNt, item.getEvent_time, item.getEvent_id,.,item.getAppid,item.getAppname) row } ... val kafkaConsumer = new FlinkKafkaConsumer011(inputTopic, new SimpleStringSchema, params.getProperties) val messageStream = env.addSource(kafkaConsumer) .flatMap(in => SDKParse.parseSDK(in, inputTopic)) .filter(item => item != None) .flatMap(item => Some(item).get) .map(item => getOrcRow(item)) messageStream.addSink(getRowSink(distPath)) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support orc rolling sink writer > --- > > Key: FLINK-9407 > URL: https://issues.apache.org/jira/browse/FLINK-9407 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: zhangminglei >Assignee: zhangminglei >Priority: Major > Labels: pull-request-available > > Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and > {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling > sink. > Below, FYI. > I tested the PR and verify the results with spark sql. Obviously, we can get > the results of what we had written down before. But I will give more tests in > the next couple of days. Including the performance under compression with > short checkpoint intervals. And more UTs. > {code:java} > scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21") > res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> > scala> res1.registerTempTable("tablerice") > warning: there was one deprecation warning; re-run with -deprecation for > details > scala> spark.sql("select * from tablerice") > res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> res3.show(3) > +-+---+---+ > | name|age|married| > +-+---+---+ > |Sagar| 26| false| > |Sagar| 30| false| > |Sagar| 34| false| > +-+---+---+ > only showing top 3 rows > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] phenixmzy edited a comment on issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink writer
phenixmzy edited a comment on issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink writer URL: https://github.com/apache/flink/pull/6075#issuecomment-414035357 @zhangminglei the OrcFileWriter with BucketingSink can rolling when the batchSize is full? It seems that one record one file. the code: === def orcSchemaMetaInfo = String.format( "struct<%s,%s,%s...,%s>", "nt:string", "event_time:string", "event_id:string", .., "appname:string") def getRowSink(distPath : String) = { val sink = new BucketingSink[Row](distPath + "/with-bucket/") sink.setBatchSize(1024 * 1024 * 1024) .setBucketer(new DateTimeBucketer[Row]("MMdd/HHmm")) .setWriter(new OrcFileWriter[Row](orcSchemaMetaInfo)) .setPartPrefix("sdk-etl") sink } def getOrcRow(item : sdkItem) : Row={ val row = Row.of( item.getNt, item.getEvent_time, item.getEvent_id,.,item.getAppid,item.getAppname) row } ... val kafkaConsumer = new FlinkKafkaConsumer011(inputTopic, new SimpleStringSchema, params.getProperties) val messageStream = env.addSource(kafkaConsumer) .flatMap(in => SDKParse.parseSDK(in, inputTopic)) .filter(item => item != None) .flatMap(item => Some(item).get) .map(item => getOrcRow(item)) messageStream.addSink(getRowSink(distPath)) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584664#comment-16584664 ] ASF GitHub Bot commented on FLINK-9407: --- phenixmzy commented on issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink writer URL: https://github.com/apache/flink/pull/6075#issuecomment-414035357 @zhangminglei the OrcFileWriter with BucketingSink can rolling when the batchSize is full?It seems that one record one file. the code: === def orcSchemaMetaInfo = String.format( "struct<%s,%s,%s...,%s>", "nt:string", "event_time:string", "event_id:string", .., "appname:string") def getRowSink(distPath : String) = { val sink = new BucketingSink[Row](distPath + "/with-bucket/") sink.setBatchSize(1024 * 1024 * 1024) .setBucketer(new DateTimeBucketer[Row]("MMdd/HHmm")) .setWriter(new OrcFileWriter[Row](orcSchemaMetaInfo)) .setPartPrefix("sdk-etl") sink } def getOrcRow(item : sdkItem) : Row={ val row = Row.of( item.getNt, item.getEvent_time, item.getEvent_id,.,item.getAppid,item.getAppname) row } ... val kafkaConsumer = new FlinkKafkaConsumer011(inputTopic, new SimpleStringSchema, params.getProperties) val messageStream = env.addSource(kafkaConsumer) .flatMap(in => SDKParse.parseSDK(in, inputTopic)) .filter(item => item != None) .flatMap(item => Some(item).get) .map(item => getOrcRow(item)) messageStream.addSink(getRowSink(distPath)) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support orc rolling sink writer > --- > > Key: FLINK-9407 > URL: https://issues.apache.org/jira/browse/FLINK-9407 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: zhangminglei >Assignee: zhangminglei >Priority: Major > Labels: pull-request-available > > Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and > {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling > sink. > Below, FYI. > I tested the PR and verify the results with spark sql. Obviously, we can get > the results of what we had written down before. But I will give more tests in > the next couple of days. Including the performance under compression with > short checkpoint intervals. And more UTs. > {code:java} > scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21") > res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> > scala> res1.registerTempTable("tablerice") > warning: there was one deprecation warning; re-run with -deprecation for > details > scala> spark.sql("select * from tablerice") > res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> res3.show(3) > +-+---+---+ > | name|age|married| > +-+---+---+ > |Sagar| 26| false| > |Sagar| 30| false| > |Sagar| 34| false| > +-+---+---+ > only showing top 3 rows > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] phenixmzy commented on issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink writer
phenixmzy commented on issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink writer URL: https://github.com/apache/flink/pull/6075#issuecomment-414035357 @zhangminglei the OrcFileWriter with BucketingSink can rolling when the batchSize is full?It seems that one record one file. the code: === def orcSchemaMetaInfo = String.format( "struct<%s,%s,%s...,%s>", "nt:string", "event_time:string", "event_id:string", .., "appname:string") def getRowSink(distPath : String) = { val sink = new BucketingSink[Row](distPath + "/with-bucket/") sink.setBatchSize(1024 * 1024 * 1024) .setBucketer(new DateTimeBucketer[Row]("MMdd/HHmm")) .setWriter(new OrcFileWriter[Row](orcSchemaMetaInfo)) .setPartPrefix("sdk-etl") sink } def getOrcRow(item : sdkItem) : Row={ val row = Row.of( item.getNt, item.getEvent_time, item.getEvent_id,.,item.getAppid,item.getAppname) row } ... val kafkaConsumer = new FlinkKafkaConsumer011(inputTopic, new SimpleStringSchema, params.getProperties) val messageStream = env.addSource(kafkaConsumer) .flatMap(in => SDKParse.parseSDK(in, inputTopic)) .filter(item => item != None) .flatMap(item => Some(item).get) .map(item => getOrcRow(item)) messageStream.addSink(getRowSink(distPath)) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10068) Add documentation for async/RocksDB-based timers
[ https://issues.apache.org/jira/browse/FLINK-10068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584658#comment-16584658 ] ASF GitHub Bot commented on FLINK-10068: twalthr commented on issue #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#issuecomment-414035015 Thanks for the update @StefanRRichter. LGTM % the single comment. Feel free to merge this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add documentation for async/RocksDB-based timers > > > Key: FLINK-10068 > URL: https://issues.apache.org/jira/browse/FLINK-10068 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > Documentation how to activate RocksDB based timers, and update that > snapshotting now works async, expect for heap-timers + > rocks-incremental-snapshot). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr commented on issue #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…
twalthr commented on issue #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#issuecomment-414035015 Thanks for the update @StefanRRichter. LGTM % the single comment. Feel free to merge this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10068) Add documentation for async/RocksDB-based timers
[ https://issues.apache.org/jira/browse/FLINK-10068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584657#comment-16584657 ] ASF GitHub Bot commented on FLINK-10068: twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#discussion_r211066338 ## File path: docs/dev/stream/operators/process_function.md ## @@ -277,19 +277,18 @@ Both types of timers (processing-time and event-time) are internally maintained The `TimerService` deduplicates timers per key and timestamp, i.e., there is at most one timer per key and timestamp. If multiple timers are registered for the same timestamp, the `onTimer()` method will be called just once. -**Note:** Flink synchronizes invocations of `onTimer()` and `processElement()`. Hence, users do not have to worry about concurrent modification of state. +Note Flink synchronizes invocations of `onTimer()` and `processElement()`. Hence, users do not have to worry about concurrent modification of state. ### Fault Tolerance Timers are fault tolerant and checkpointed along with the state of the application. In case of a failure recovery or when starting an application from a savepoint, the timers are restored. -**Note:** Checkpointed processing-time timers that were supposed to fire before their restoration, will fire immediately. +Note Checkpointed processing-time timers that were supposed to fire before their restoration, will fire immediately. This might happen when an application recovers from a failure or when it is started from a savepoint. -**Note:** Timers are always synchronously checkpointed, regardless of the configuration of the state backends. -Therefore, a large number of timers can significantly increase checkpointing time. -See the "Timer Coalescing" section for advice on how to reduce the number of timers. +Note Timers are always asynchronously checkpointed, except for the combination of RocksDB backend / with incremental snapshots / with heap-based timers (will be resolved with `FLINK-10026`). Review comment: If there is nothing more to say in your opinion, then I'm fine with this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add documentation for async/RocksDB-based timers > > > Key: FLINK-10068 > URL: https://issues.apache.org/jira/browse/FLINK-10068 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > Documentation how to activate RocksDB based timers, and update that > snapshotting now works async, expect for heap-timers + > rocks-incremental-snapshot). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#discussion_r211066338 ## File path: docs/dev/stream/operators/process_function.md ## @@ -277,19 +277,18 @@ Both types of timers (processing-time and event-time) are internally maintained The `TimerService` deduplicates timers per key and timestamp, i.e., there is at most one timer per key and timestamp. If multiple timers are registered for the same timestamp, the `onTimer()` method will be called just once. -**Note:** Flink synchronizes invocations of `onTimer()` and `processElement()`. Hence, users do not have to worry about concurrent modification of state. +Note Flink synchronizes invocations of `onTimer()` and `processElement()`. Hence, users do not have to worry about concurrent modification of state. ### Fault Tolerance Timers are fault tolerant and checkpointed along with the state of the application. In case of a failure recovery or when starting an application from a savepoint, the timers are restored. -**Note:** Checkpointed processing-time timers that were supposed to fire before their restoration, will fire immediately. +Note Checkpointed processing-time timers that were supposed to fire before their restoration, will fire immediately. This might happen when an application recovers from a failure or when it is started from a savepoint. -**Note:** Timers are always synchronously checkpointed, regardless of the configuration of the state backends. -Therefore, a large number of timers can significantly increase checkpointing time. -See the "Timer Coalescing" section for advice on how to reduce the number of timers. +Note Timers are always asynchronously checkpointed, except for the combination of RocksDB backend / with incremental snapshots / with heap-based timers (will be resolved with `FLINK-10026`). Review comment: If there is nothing more to say in your opinion, then I'm fine with this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10068) Add documentation for async/RocksDB-based timers
[ https://issues.apache.org/jira/browse/FLINK-10068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584656#comment-16584656 ] ASF GitHub Bot commented on FLINK-10068: twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#discussion_r211066305 ## File path: docs/dev/stream/operators/process_function.md ## @@ -277,19 +277,18 @@ Both types of timers (processing-time and event-time) are internally maintained The `TimerService` deduplicates timers per key and timestamp, i.e., there is at most one timer per key and timestamp. If multiple timers are registered for the same timestamp, the `onTimer()` method will be called just once. Review comment: Writing more docs about this might be a bigger change but moving 10 lines into a subsection on a different page can be done in this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add documentation for async/RocksDB-based timers > > > Key: FLINK-10068 > URL: https://issues.apache.org/jira/browse/FLINK-10068 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > Documentation how to activate RocksDB based timers, and update that > snapshotting now works async, expect for heap-timers + > rocks-incremental-snapshot). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#discussion_r211066305 ## File path: docs/dev/stream/operators/process_function.md ## @@ -277,19 +277,18 @@ Both types of timers (processing-time and event-time) are internally maintained The `TimerService` deduplicates timers per key and timestamp, i.e., there is at most one timer per key and timestamp. If multiple timers are registered for the same timestamp, the `onTimer()` method will be called just once. Review comment: Writing more docs about this might be a bigger change but moving 10 lines into a subsection on a different page can be done in this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584645#comment-16584645 ] Shuyi Chen commented on FLINK-10163: I think we can add this ticket as part of the DDL Flip, what do you think, [~twalthr]? > Support CREATE VIEW in SQL Client > - > > Key: FLINK-10163 > URL: https://issues.apache.org/jira/browse/FLINK-10163 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > The possibility to define a name for a subquery would improve the usability > of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a > virtual table. > > Example: > {code} > CREATE VIEW viewName > [ '(' columnName [, columnName ]* ')' ] > AS Query > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584631#comment-16584631 ] Hequn Cheng commented on FLINK-10163: - This would be a great feature. > Support CREATE VIEW in SQL Client > - > > Key: FLINK-10163 > URL: https://issues.apache.org/jira/browse/FLINK-10163 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > The possibility to define a name for a subquery would improve the usability > of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a > virtual table. > > Example: > {code} > CREATE VIEW viewName > [ '(' columnName [, columnName ]* ')' ] > AS Query > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10163) Support CREATE VIEW in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng updated FLINK-10163: Description: The possibility to define a name for a subquery would improve the usability of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a virtual table. Example: {code} CREATE VIEW viewName [ '(' columnName [, columnName ]* ')' ] AS Query {code} was: The possibility to define a name for a subquery would improve the usability of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a virtual table. Example: {code} CREATE VIEW view_name AS SELECT {code} > Support CREATE VIEW in SQL Client > - > > Key: FLINK-10163 > URL: https://issues.apache.org/jira/browse/FLINK-10163 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > The possibility to define a name for a subquery would improve the usability > of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a > virtual table. > > Example: > {code} > CREATE VIEW viewName > [ '(' columnName [, columnName ]* ')' ] > AS Query > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584620#comment-16584620 ] Hequn Cheng commented on FLINK-10156: - {{insertInto()}} requires a configured {{TableSink}} while {{writeToSink}} doesn't. Would it add cost to use table-api since we have to pass fieldNames and fieldTypes? > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Fabian Hueske >Priority: Major > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10168) support filtering files by modified/created time in StreamExecutionEnvironment.readFile()
Bowen Li created FLINK-10168: Summary: support filtering files by modified/created time in StreamExecutionEnvironment.readFile() Key: FLINK-10168 URL: https://issues.apache.org/jira/browse/FLINK-10168 Project: Flink Issue Type: Improvement Components: DataStream API Affects Versions: 1.6.0 Reporter: Bowen Li Fix For: 1.7.0 support filtering files by modified/created time in {{StreamExecutionEnvironment.readFile()}} for example, in a source dir with lots of file, we only want to read files that is created or modified after a specific time. This API can expose a generic filter function of files, and let users define filtering rules. Currently Flink only supports filtering files by path. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8868) Support Table Function as Table for Stream Sql
[ https://issues.apache.org/jira/browse/FLINK-8868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruidong Li updated FLINK-8868: -- Description: for stream sql: support SQL like: SELECT * FROM Lateral TABLE(tf("a")) for batch sql: udtf might produce infinite recors, it need to be discussed was:support SQL like: SELECT * FROM TABLE(tf("a")) > Support Table Function as Table for Stream Sql > -- > > Key: FLINK-8868 > URL: https://issues.apache.org/jira/browse/FLINK-8868 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li >Priority: Major > Labels: pull-request-available > > for stream sql: > support SQL like: SELECT * FROM Lateral TABLE(tf("a")) > for batch sql: > udtf might produce infinite recors, it need to be discussed -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10167) SessionWindows not compatible with typed DataStreams in scala
Andrew Roberts created FLINK-10167: -- Summary: SessionWindows not compatible with typed DataStreams in scala Key: FLINK-10167 URL: https://issues.apache.org/jira/browse/FLINK-10167 Project: Flink Issue Type: Bug Reporter: Andrew Roberts I'm trying to construct a trivial job that uses session windows, and it looks like the data type parameter is hardcoded to `Object`/`AnyRef`. Due to the invariance of java classes in scala, this means that we can't use the provided SessionWindow helper classes in scala on typed streams. Example job: {code:java} import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window} import org.apache.flink.util.Collector object TestJob { val jobName = "TestJob" def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.fromCollection(Range(0, 100).toList) .keyBy(_ / 10) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(1))) .reduce( (a: Int, b: Int) => a + b, (key: Int, window: Window, items: Iterable[Int], out: Collector[String]) => s"${key}: ${items}" ) .map(println(_)) env.execute(jobName) } }{code} Compile error: {code:java} [error] found : org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows [error] required: org.apache.flink.streaming.api.windowing.assigners.WindowAssigner[_ >: Int, ?] [error] Note: Object <: Any (and org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows <: org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner[Object,org.apache.flink.streaming.api.windowing.windows.TimeWindow]), but Java-defined class WindowAssigner is invariant in type T. [error] You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10) [error] .window(ProcessingTimeSessionWindows.withGap(Time.minutes(1))){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections
[ https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584231#comment-16584231 ] Dominik Wosiński commented on FLINK-10052: -- I will try to fix that :) > Tolerate temporarily suspended ZooKeeper connections > > > Key: FLINK-10052 > URL: https://issues.apache.org/jira/browse/FLINK-10052 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.4.2, 1.5.2, 1.6.0 >Reporter: Till Rohrmann >Assignee: Dominik Wosiński >Priority: Major > > This issue results from FLINK-10011 which uncovered a problem with Flink's HA > recovery and proposed the following solution to harden Flink: > The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator > recipe for leader election. The leader latch revokes leadership in case of a > suspended ZooKeeper connection. This can be premature in case that the system > can reconnect to ZooKeeper before its session expires. The effect of the lost > leadership is that all jobs will be canceled and directly restarted after > regaining the leadership. > Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper > connection, it would be better to wait until the ZooKeeper connection is > LOST. That way we would allow the system to reconnect and not lose the > leadership. This could be achievable by using Curator's {{LeaderSelector}} > instead of the {{LeaderLatch}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections
[ https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dominik Wosiński reassigned FLINK-10052: Assignee: Dominik Wosiński > Tolerate temporarily suspended ZooKeeper connections > > > Key: FLINK-10052 > URL: https://issues.apache.org/jira/browse/FLINK-10052 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.4.2, 1.5.2, 1.6.0 >Reporter: Till Rohrmann >Assignee: Dominik Wosiński >Priority: Major > > This issue results from FLINK-10011 which uncovered a problem with Flink's HA > recovery and proposed the following solution to harden Flink: > The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator > recipe for leader election. The leader latch revokes leadership in case of a > suspended ZooKeeper connection. This can be premature in case that the system > can reconnect to ZooKeeper before its session expires. The effect of the lost > leadership is that all jobs will be canceled and directly restarted after > regaining the leadership. > Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper > connection, it would be better to wait until the ZooKeeper connection is > LOST. That way we would allow the system to reconnect and not lose the > leadership. This could be achievable by using Curator's {{LeaderSelector}} > instead of the {{LeaderLatch}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10119) JsonRowDeserializationSchema deserialize kafka message
[ https://issues.apache.org/jira/browse/FLINK-10119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584206#comment-16584206 ] ASF GitHub Bot commented on FLINK-10119: buptljy commented on issue #6571: [FLINK-10119]- Add failure handlers for JsonRowDeserializationSchema URL: https://github.com/apache/flink/pull/6571#issuecomment-413939315 I think the failure is not caused by this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > JsonRowDeserializationSchema deserialize kafka message > -- > > Key: FLINK-10119 > URL: https://issues.apache.org/jira/browse/FLINK-10119 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.1 > Environment: 无 >Reporter: sean.miao >Assignee: buptljy >Priority: Major > Labels: pull-request-available > > Recently, we are using Kafka010JsonTableSource to process kafka's json > messages.We turned on checkpoint and auto-restart strategy . > We found that as long as the format of a message is not json, it will cause > the job to not be pulled up. Of course, this is to ensure that only once > processing or at least once processing, but the resulting application is not > available and has a greater impact on us. > the code is : > class : JsonRowDeserializationSchema > function : > @Override > public Row deserialize(byte[] message) throws IOException { > try > { final JsonNode root = objectMapper.readTree(message); return > convertRow(root, (RowTypeInfo) typeInfo); } > catch (Throwable t) > { throw new IOException("Failed to deserialize JSON object.", t); } > } > now ,i change it to : > public Row deserialize(byte[] message) throws IOException { > try > { JsonNode root = this.objectMapper.readTree(message); return > this.convertRow(root, (RowTypeInfo)this.typeInfo); } > catch (Throwable var4) { > message = this.objectMapper.writeValueAsBytes("{}"); > JsonNode root = this.objectMapper.readTree(message); > return this.convertRow(root, (RowTypeInfo)this.typeInfo); > } > } > > I think that data format errors are inevitable during network transmission, > so can we add a new column to the table for the wrong data format? like spark > sql does。 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10119) JsonRowDeserializationSchema deserialize kafka message
[ https://issues.apache.org/jira/browse/FLINK-10119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584207#comment-16584207 ] ASF GitHub Bot commented on FLINK-10119: buptljy edited a comment on issue #6571: [FLINK-10119]- Add failure handlers for JsonRowDeserializationSchema URL: https://github.com/apache/flink/pull/6571#issuecomment-413939315 I think the travis testing failure is not caused by this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > JsonRowDeserializationSchema deserialize kafka message > -- > > Key: FLINK-10119 > URL: https://issues.apache.org/jira/browse/FLINK-10119 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.1 > Environment: 无 >Reporter: sean.miao >Assignee: buptljy >Priority: Major > Labels: pull-request-available > > Recently, we are using Kafka010JsonTableSource to process kafka's json > messages.We turned on checkpoint and auto-restart strategy . > We found that as long as the format of a message is not json, it will cause > the job to not be pulled up. Of course, this is to ensure that only once > processing or at least once processing, but the resulting application is not > available and has a greater impact on us. > the code is : > class : JsonRowDeserializationSchema > function : > @Override > public Row deserialize(byte[] message) throws IOException { > try > { final JsonNode root = objectMapper.readTree(message); return > convertRow(root, (RowTypeInfo) typeInfo); } > catch (Throwable t) > { throw new IOException("Failed to deserialize JSON object.", t); } > } > now ,i change it to : > public Row deserialize(byte[] message) throws IOException { > try > { JsonNode root = this.objectMapper.readTree(message); return > this.convertRow(root, (RowTypeInfo)this.typeInfo); } > catch (Throwable var4) { > message = this.objectMapper.writeValueAsBytes("{}"); > JsonNode root = this.objectMapper.readTree(message); > return this.convertRow(root, (RowTypeInfo)this.typeInfo); > } > } > > I think that data format errors are inevitable during network transmission, > so can we add a new column to the table for the wrong data format? like spark > sql does。 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] buptljy edited a comment on issue #6571: [FLINK-10119]- Add failure handlers for JsonRowDeserializationSchema
buptljy edited a comment on issue #6571: [FLINK-10119]- Add failure handlers for JsonRowDeserializationSchema URL: https://github.com/apache/flink/pull/6571#issuecomment-413939315 I think the travis testing failure is not caused by this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] buptljy commented on issue #6571: [FLINK-10119]- Add failure handlers for JsonRowDeserializationSchema
buptljy commented on issue #6571: [FLINK-10119]- Add failure handlers for JsonRowDeserializationSchema URL: https://github.com/apache/flink/pull/6571#issuecomment-413939315 I think the failure is not caused by this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10153) Add tutorial section to documentation
[ https://issues.apache.org/jira/browse/FLINK-10153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584183#comment-16584183 ] ASF GitHub Bot commented on FLINK-10153: fhueske commented on issue #6565: [FLINK-10153] [docs] Add Tutorials section and rework structure. URL: https://github.com/apache/flink/pull/6565#issuecomment-413933284 Thanks everyone for the comments so far. I fixed broken links, added redirects for pages that I moved, and pushed an update. Let me know what you think. Best, Fabian This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add tutorial section to documentation > - > > Key: FLINK-10153 > URL: https://issues.apache.org/jira/browse/FLINK-10153 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > The current documentation does not feature a dedicated tutorials section and > has a few issues that should be fix in order to help our (future) users > getting started with Flink. > I propose to add a single "Tutorials" section to the documentation where > users find step-by-step guides. The tutorials section help users with > different goals: > * Get a quick idea of the overall system > * Implement a DataStream/DataSet/Table API/SQL job > * Set up Flink on a local machine (or run a Docker container) > There are already a few guides to get started but they are located at > different places and should be moved into the Tutorials section. Moreover, > some sections such as "Project Setup" contain content that addresses users > with very different intentions. > I propose to > * add a new Tutorials section and move all existing tutorials there (and > later add new ones). > * move the "Quickstart" section to "Tutorials". > * remove the "Project Setup" section and move the pages to other sections > (some pages will be split up or adjusted). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fhueske commented on issue #6565: [FLINK-10153] [docs] Add Tutorials section and rework structure.
fhueske commented on issue #6565: [FLINK-10153] [docs] Add Tutorials section and rework structure. URL: https://github.com/apache/flink/pull/6565#issuecomment-413933284 Thanks everyone for the comments so far. I fixed broken links, added redirects for pages that I moved, and pushed an update. Let me know what you think. Best, Fabian This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10059) Add LTRIM supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584110#comment-16584110 ] ASF GitHub Bot commented on FLINK-10059: yanghua commented on issue #6494: [FLINK-10059] [table] Add LTRIM supported in Table API and SQL URL: https://github.com/apache/flink/pull/6494#issuecomment-413909036 @xccui the travis failure is not caused by this PR's change. I have updated the RTRIM PR : #6509 , please review again thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add LTRIM supported in Table API and SQL > > > Key: FLINK-10059 > URL: https://issues.apache.org/jira/browse/FLINK-10059 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > refer to MYSQL ltrim function : > https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ltrim -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6494: [FLINK-10059] [table] Add LTRIM supported in Table API and SQL
yanghua commented on issue #6494: [FLINK-10059] [table] Add LTRIM supported in Table API and SQL URL: https://github.com/apache/flink/pull/6494#issuecomment-413909036 @xccui the travis failure is not caused by this PR's change. I have updated the RTRIM PR : #6509 , please review again thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9610) Add Kafka partitioner that uses the key to partition by
[ https://issues.apache.org/jira/browse/FLINK-9610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584046#comment-16584046 ] ASF GitHub Bot commented on FLINK-9610: --- nielsbasjes commented on issue #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafka Partitioner that uses the hash of the provided key. URL: https://github.com/apache/flink/pull/6181#issuecomment-413893719 @tzulitai I did some more digging and it is in fact there is a good reason for this patch. Have a look at https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java There you'll find constructors with essentially the following variations in parameters: - Either a SerializationSchema or a KeyedSerializationSchema - Either a FlinkKafkaPartitioner or no partitioner which means it actually uses FlinkFixedPartitioner as the default. Looking at all of these constructors 1. If you do not specify a partitioner then all constructors use the FlinkFixedPartitioner. 2. If you do specify a partitioner then it will use that partitioner. Even the constructor that uses a KeyedSerializationSchema will NOT use that key for the partitioning (which we saw in production which caused problems). Essentially the current FlinkKafkaProducer API makes it very hard to 'not' specify a partitioner and use the hash(key) partitioning which is nativly present in Kafka. So give the current API we came to the conclusion an extra partitioner is needed. Because the Flink API works this way I never looked deeper into base code to see how it really moves below the surface. Given what I understand now I see two viable ways forward: 1. We change the behavior of the API so that if a KeyedSerializationSchema is used that in that case the hash of the key will be used by Kafka to partition the data over. This is however an impact full change in the way the API behaves. I.e. breaking the API. 2. We simply add the partitioner I created. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Kafka partitioner that uses the key to partition by > --- > > Key: FLINK-9610 > URL: https://issues.apache.org/jira/browse/FLINK-9610 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector >Reporter: Niels Basjes >Assignee: Niels Basjes >Priority: Major > Labels: pull-request-available > > The kafka connector package only contains the FlinkFixedPartitioner > implementation of the FlinkKafkaPartitioner. > The most common usecase I have seen is the need to spread the records across > the Kafka partitions while keeping all messages with the same key together. > I'll put up a pull request with a very simple implementation that should make > this a lot easier for others to use and extend. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nielsbasjes commented on issue #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafka Partitioner that uses the hash of the provided key.
nielsbasjes commented on issue #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafka Partitioner that uses the hash of the provided key. URL: https://github.com/apache/flink/pull/6181#issuecomment-413893719 @tzulitai I did some more digging and it is in fact there is a good reason for this patch. Have a look at https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java There you'll find constructors with essentially the following variations in parameters: - Either a SerializationSchema or a KeyedSerializationSchema - Either a FlinkKafkaPartitioner or no partitioner which means it actually uses FlinkFixedPartitioner as the default. Looking at all of these constructors 1. If you do not specify a partitioner then all constructors use the FlinkFixedPartitioner. 2. If you do specify a partitioner then it will use that partitioner. Even the constructor that uses a KeyedSerializationSchema will NOT use that key for the partitioning (which we saw in production which caused problems). Essentially the current FlinkKafkaProducer API makes it very hard to 'not' specify a partitioner and use the hash(key) partitioning which is nativly present in Kafka. So give the current API we came to the conclusion an extra partitioner is needed. Because the Flink API works this way I never looked deeper into base code to see how it really moves below the surface. Given what I understand now I see two viable ways forward: 1. We change the behavior of the API so that if a KeyedSerializationSchema is used that in that case the hash of the key will be used by Kafka to partition the data over. This is however an impact full change in the way the API behaves. I.e. breaking the API. 2. We simply add the partitioner I created. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes opened a new pull request #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafka Partitioner that uses the hash of the provided key.
nielsbasjes opened a new pull request #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafka Partitioner that uses the hash of the provided key. URL: https://github.com/apache/flink/pull/6181 ## What is the purpose of the change Add the simple feature of being able to route records into Kafka using a key based partitioning. ## Brief change log - Added the FlinkKeyHashPartitioner class with some tests. ## Verifying this change This change added tests and can be verified as follows: - Use this instead of the FlinkFixedPartitioner while instantiating the FlinkKafkaProducer. Also add an KeyedSerializationSchema implementation that returns the right key that is to be used. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9610) Add Kafka partitioner that uses the key to partition by
[ https://issues.apache.org/jira/browse/FLINK-9610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584007#comment-16584007 ] ASF GitHub Bot commented on FLINK-9610: --- nielsbasjes opened a new pull request #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafka Partitioner that uses the hash of the provided key. URL: https://github.com/apache/flink/pull/6181 ## What is the purpose of the change Add the simple feature of being able to route records into Kafka using a key based partitioning. ## Brief change log - Added the FlinkKeyHashPartitioner class with some tests. ## Verifying this change This change added tests and can be verified as follows: - Use this instead of the FlinkFixedPartitioner while instantiating the FlinkKafkaProducer. Also add an KeyedSerializationSchema implementation that returns the right key that is to be used. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Kafka partitioner that uses the key to partition by > --- > > Key: FLINK-9610 > URL: https://issues.apache.org/jira/browse/FLINK-9610 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector >Reporter: Niels Basjes >Assignee: Niels Basjes >Priority: Major > Labels: pull-request-available > > The kafka connector package only contains the FlinkFixedPartitioner > implementation of the FlinkKafkaPartitioner. > The most common usecase I have seen is the need to spread the records across > the Kafka partitions while keeping all messages with the same key together. > I'll put up a pull request with a very simple implementation that should make > this a lot easier for others to use and extend. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9610) Add Kafka partitioner that uses the key to partition by
[ https://issues.apache.org/jira/browse/FLINK-9610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584000#comment-16584000 ] ASF GitHub Bot commented on FLINK-9610: --- nielsbasjes closed pull request #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafka Partitioner that uses the hash of the provided key. URL: https://github.com/apache/flink/pull/6181 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKeyHashPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKeyHashPartitioner.java new file mode 100644 index 000..7a10d6a34a8 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKeyHashPartitioner.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.partitioner; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Arrays; + +/** + * A partitioner that uses the hash of the provided key to distribute + * the values over the partitions as evenly as possible. + * This partitioner ensures that all records with the same key will be sent to + * the same Kafka partition. + * + * Note that this will cause a lot of network connections to be created between + * all the Flink instances and all the Kafka brokers. + */ +@PublicEvolving +public class FlinkKeyHashPartitioner extends FlinkKafkaPartitioner { + + private static final long serialVersionUID = -2006468063065010594L; + + @Override + public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + Preconditions.checkArgument( + partitions != null && partitions.length > 0, + "Partitions of the target topic is empty."); + + return partitions[Math.abs(hash(key)) % partitions.length]; + } + + /** +* The overridable implementation of the hashing algorithm. +* @param key The key of the provided record on which the partition selection is based. (key can be null!) +* @return The hash value for the provided key. +*/ + protected int hash(@Nullable byte[] key) { + return Arrays.hashCode(key); + } + +} diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKeyHashPartitionerTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKeyHashPartitionerTest.java new file mode 100644 index 000..cdf8afb7181 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKeyHashPartitionerTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package or
[jira] [Commented] (FLINK-9610) Add Kafka partitioner that uses the key to partition by
[ https://issues.apache.org/jira/browse/FLINK-9610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583999#comment-16583999 ] ASF GitHub Bot commented on FLINK-9610: --- nielsbasjes commented on issue #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafka Partitioner that uses the hash of the provided key. URL: https://github.com/apache/flink/pull/6181#issuecomment-413882146 @tzulitai Thanks for pointing this out. I did some digging into the code and you are right. This pull request is needless. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Kafka partitioner that uses the key to partition by > --- > > Key: FLINK-9610 > URL: https://issues.apache.org/jira/browse/FLINK-9610 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector >Reporter: Niels Basjes >Assignee: Niels Basjes >Priority: Major > Labels: pull-request-available > > The kafka connector package only contains the FlinkFixedPartitioner > implementation of the FlinkKafkaPartitioner. > The most common usecase I have seen is the need to spread the records across > the Kafka partitions while keeping all messages with the same key together. > I'll put up a pull request with a very simple implementation that should make > this a lot easier for others to use and extend. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9610) Add Kafka partitioner that uses the key to partition by
[ https://issues.apache.org/jira/browse/FLINK-9610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9610: -- Labels: pull-request-available (was: ) > Add Kafka partitioner that uses the key to partition by > --- > > Key: FLINK-9610 > URL: https://issues.apache.org/jira/browse/FLINK-9610 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector >Reporter: Niels Basjes >Assignee: Niels Basjes >Priority: Major > Labels: pull-request-available > > The kafka connector package only contains the FlinkFixedPartitioner > implementation of the FlinkKafkaPartitioner. > The most common usecase I have seen is the need to spread the records across > the Kafka partitions while keeping all messages with the same key together. > I'll put up a pull request with a very simple implementation that should make > this a lot easier for others to use and extend. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nielsbasjes closed pull request #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafka Partitioner that uses the hash of the provided key.
nielsbasjes closed pull request #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafka Partitioner that uses the hash of the provided key. URL: https://github.com/apache/flink/pull/6181 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKeyHashPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKeyHashPartitioner.java new file mode 100644 index 000..7a10d6a34a8 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKeyHashPartitioner.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.partitioner; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Arrays; + +/** + * A partitioner that uses the hash of the provided key to distribute + * the values over the partitions as evenly as possible. + * This partitioner ensures that all records with the same key will be sent to + * the same Kafka partition. + * + * Note that this will cause a lot of network connections to be created between + * all the Flink instances and all the Kafka brokers. + */ +@PublicEvolving +public class FlinkKeyHashPartitioner extends FlinkKafkaPartitioner { + + private static final long serialVersionUID = -2006468063065010594L; + + @Override + public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + Preconditions.checkArgument( + partitions != null && partitions.length > 0, + "Partitions of the target topic is empty."); + + return partitions[Math.abs(hash(key)) % partitions.length]; + } + + /** +* The overridable implementation of the hashing algorithm. +* @param key The key of the provided record on which the partition selection is based. (key can be null!) +* @return The hash value for the provided key. +*/ + protected int hash(@Nullable byte[] key) { + return Arrays.hashCode(key); + } + +} diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKeyHashPartitionerTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKeyHashPartitionerTest.java new file mode 100644 index 000..cdf8afb7181 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKeyHashPartitionerTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKeyHashPartitioner; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nullable; + +/** + * Tests for the {@link F
[GitHub] nielsbasjes commented on issue #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafka Partitioner that uses the hash of the provided key.
nielsbasjes commented on issue #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafka Partitioner that uses the hash of the provided key. URL: https://github.com/apache/flink/pull/6181#issuecomment-413882146 @tzulitai Thanks for pointing this out. I did some digging into the code and you are right. This pull request is needless. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10142) Reduce synchronization overhead for credit notifications
[ https://issues.apache.org/jira/browse/FLINK-10142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583981#comment-16583981 ] ASF GitHub Bot commented on FLINK-10142: pnowojski commented on a change in pull request #6555: [FLINK-10142][network] reduce locking around credit notification URL: https://github.com/apache/flink/pull/6555#discussion_r210915852 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java ## @@ -357,19 +357,20 @@ private void writeAndFlushNextMessageIfPossible(Channel channel) { return; } - //It is no need to notify credit for the released channel. - if (!inputChannel.isReleased()) { - AddCredit msg = new AddCredit( - inputChannel.getPartitionId(), - inputChannel.getAndResetUnannouncedCredit(), - inputChannel.getInputChannelId()); + if (inputChannel.isReleased()) { Review comment: Are those changes in this file optimising anything? Or is this irrelevant change to the the rest of the commit? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce synchronization overhead for credit notifications > > > Key: FLINK-10142 > URL: https://issues.apache.org/jira/browse/FLINK-10142 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.2, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > When credit-based flow control was introduced, we also added some checks and > optimisations for uncommon code paths that make common code paths > unnecessarily more expensive, e.g. checking whether a channel was released > before forwarding a credit notification to Netty. Such checks would have to > be confirmed by the Netty thread anyway and thus only add additional load for > something that happens only once (per channel). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10142) Reduce synchronization overhead for credit notifications
[ https://issues.apache.org/jira/browse/FLINK-10142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583985#comment-16583985 ] ASF GitHub Bot commented on FLINK-10142: pnowojski commented on a change in pull request #6555: [FLINK-10142][network] reduce locking around credit notification URL: https://github.com/apache/flink/pull/6555#discussion_r210917887 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ## @@ -509,33 +514,40 @@ void onSenderBacklog(int backlog) throws IOException { } public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException { - boolean success = false; + boolean recycleBuffer = true; try { + + final boolean wasEmpty; synchronized (receivedBuffers) { - if (!isReleased.get()) { - if (expectedSequenceNumber == sequenceNumber) { - int available = receivedBuffers.size(); + // Similar to notifyBufferAvailable(), make sure that we never add a buffer + // after releaseAllResources() released all buffers from receivedBuffers + // (see above for details). + if (isReleased.get()) { + return; + } - receivedBuffers.add(buffer); - expectedSequenceNumber++; + if (expectedSequenceNumber != sequenceNumber) { + onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber)); + return; + } - if (available == 0) { - notifyChannelNonEmpty(); - } + wasEmpty = receivedBuffers.isEmpty(); + receivedBuffers.add(buffer); + recycleBuffer = false; + } - success = true; - } else { - onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber)); - } - } + ++expectedSequenceNumber; + + if (wasEmpty) { + notifyChannelNonEmpty(); Review comment: has moving this line from under the lock improved performance in some case? If not, commit title > optimisations reducing lock contention is misleading and I would change it to something about refactor/clean up. Btw, again I do not like such squashing multiple changes in one commit. It actually took me ~10 minutes to realise and make sure what this change is all about and if there are any other meaningful changes or not here. While if you split this commit into `[hotfix][network] refactor/cleanups` and `[FLINK-10141][network] move notifyChannelNonEmpty outside of synchronised block` it would be much much easier to review... Could you split this before merging? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce synchronization overhead for credit notifications > > > Key: FLINK-10142 > URL: https://issues.apache.org/jira/browse/FLINK-10142 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.2, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > When credit-based flow control was introduced, we also added some checks and > optimisations for uncommon code paths that make common code paths > unnecessarily more expensive, e.g. checking whether a channel was released > before forwarding a credit notification to Netty. Such checks would have to > be confirmed by the Netty thread anyway and thus only add additional load for > something that happens only once (per channel). -- This message was sent by Atlassian JIR
[jira] [Commented] (FLINK-10142) Reduce synchronization overhead for credit notifications
[ https://issues.apache.org/jira/browse/FLINK-10142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583984#comment-16583984 ] ASF GitHub Bot commented on FLINK-10142: pnowojski commented on a change in pull request #6555: [FLINK-10142][network] reduce locking around credit notification URL: https://github.com/apache/flink/pull/6555#discussion_r210916528 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ## @@ -191,15 +191,15 @@ void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException, Int checkError(); final Buffer next; - final int remaining; + final boolean moreAvailable; synchronized (receivedBuffers) { next = receivedBuffers.poll(); - remaining = receivedBuffers.size(); + moreAvailable = !receivedBuffers.isEmpty(); } numBytesIn.inc(next.getSizeUnsafe()); - return Optional.of(new BufferAndAvailability(next, remaining > 0, getSenderBacklog())); + return Optional.of(new BufferAndAvailability(next, moreAvailable, getSenderBacklog())); Review comment: This has also nothing to do with `optimisations reducing lock contention`. Can you extract those changes to separate hotfix? If someone in the future will be looking at `git blame` regarding real `optimisations reducing lock contention` he will have the same "wtf" moments as me right now ;) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce synchronization overhead for credit notifications > > > Key: FLINK-10142 > URL: https://issues.apache.org/jira/browse/FLINK-10142 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.2, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > When credit-based flow control was introduced, we also added some checks and > optimisations for uncommon code paths that make common code paths > unnecessarily more expensive, e.g. checking whether a channel was released > before forwarding a credit notification to Netty. Such checks would have to > be confirmed by the Netty thread anyway and thus only add additional load for > something that happens only once (per channel). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10142) Reduce synchronization overhead for credit notifications
[ https://issues.apache.org/jira/browse/FLINK-10142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583983#comment-16583983 ] ASF GitHub Bot commented on FLINK-10142: pnowojski commented on a change in pull request #6555: [FLINK-10142][network] reduce locking around credit notification URL: https://github.com/apache/flink/pull/6555#discussion_r210920075 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java ## @@ -170,6 +170,65 @@ public Void call() throws Exception { } } + @Test + public void testConcurrentNotifyBufferAvailableAndRelease() throws Exception { Review comment: Hmmm, isn't this cover by for example `org.apache.flink.streaming.runtime.io.benchmark.StreamNetworkThroughputBenchmarkTest#largeRemoteMode`? Maybe we do not need this test after all? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce synchronization overhead for credit notifications > > > Key: FLINK-10142 > URL: https://issues.apache.org/jira/browse/FLINK-10142 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.2, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > When credit-based flow control was introduced, we also added some checks and > optimisations for uncommon code paths that make common code paths > unnecessarily more expensive, e.g. checking whether a channel was released > before forwarding a credit notification to Netty. Such checks would have to > be confirmed by the Netty thread anyway and thus only add additional load for > something that happens only once (per channel). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10142) Reduce synchronization overhead for credit notifications
[ https://issues.apache.org/jira/browse/FLINK-10142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583982#comment-16583982 ] ASF GitHub Bot commented on FLINK-10142: pnowojski commented on a change in pull request #6555: [FLINK-10142][network] reduce locking around credit notification URL: https://github.com/apache/flink/pull/6555#discussion_r210920974 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java ## @@ -170,6 +170,65 @@ public Void call() throws Exception { } } + @Test + public void testConcurrentNotifyBufferAvailableAndRelease() throws Exception { + // Config + // Repeatedly spawn two tasks: one to notify buffer availability and the other to release the channel + // concurrently. We do this repeatedly to provoke races. + final int numberOfRepetitions = 1024; + + // Setup + final ExecutorService executor = Executors.newFixedThreadPool(2); + final Buffer buffer = TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE); + + try { + // Test + final SingleInputGate inputGate = mock(SingleInputGate.class); Review comment: Please no mockito in new tests. Use real object or create proper mock. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce synchronization overhead for credit notifications > > > Key: FLINK-10142 > URL: https://issues.apache.org/jira/browse/FLINK-10142 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.2, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > When credit-based flow control was introduced, we also added some checks and > optimisations for uncommon code paths that make common code paths > unnecessarily more expensive, e.g. checking whether a channel was released > before forwarding a credit notification to Netty. Such checks would have to > be confirmed by the Netty thread anyway and thus only add additional load for > something that happens only once (per channel). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on a change in pull request #6555: [FLINK-10142][network] reduce locking around credit notification
pnowojski commented on a change in pull request #6555: [FLINK-10142][network] reduce locking around credit notification URL: https://github.com/apache/flink/pull/6555#discussion_r210920974 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java ## @@ -170,6 +170,65 @@ public Void call() throws Exception { } } + @Test + public void testConcurrentNotifyBufferAvailableAndRelease() throws Exception { + // Config + // Repeatedly spawn two tasks: one to notify buffer availability and the other to release the channel + // concurrently. We do this repeatedly to provoke races. + final int numberOfRepetitions = 1024; + + // Setup + final ExecutorService executor = Executors.newFixedThreadPool(2); + final Buffer buffer = TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE); + + try { + // Test + final SingleInputGate inputGate = mock(SingleInputGate.class); Review comment: Please no mockito in new tests. Use real object or create proper mock. It should be easy to replace this one. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10142) Reduce synchronization overhead for credit notifications
[ https://issues.apache.org/jira/browse/FLINK-10142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583987#comment-16583987 ] ASF GitHub Bot commented on FLINK-10142: pnowojski commented on a change in pull request #6555: [FLINK-10142][network] reduce locking around credit notification URL: https://github.com/apache/flink/pull/6555#discussion_r210920974 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java ## @@ -170,6 +170,65 @@ public Void call() throws Exception { } } + @Test + public void testConcurrentNotifyBufferAvailableAndRelease() throws Exception { + // Config + // Repeatedly spawn two tasks: one to notify buffer availability and the other to release the channel + // concurrently. We do this repeatedly to provoke races. + final int numberOfRepetitions = 1024; + + // Setup + final ExecutorService executor = Executors.newFixedThreadPool(2); + final Buffer buffer = TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE); + + try { + // Test + final SingleInputGate inputGate = mock(SingleInputGate.class); Review comment: Please no mockito in new tests. Use real object or create proper mock. It should be easy to replace this one. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce synchronization overhead for credit notifications > > > Key: FLINK-10142 > URL: https://issues.apache.org/jira/browse/FLINK-10142 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.2, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > When credit-based flow control was introduced, we also added some checks and > optimisations for uncommon code paths that make common code paths > unnecessarily more expensive, e.g. checking whether a channel was released > before forwarding a credit notification to Netty. Such checks would have to > be confirmed by the Netty thread anyway and thus only add additional load for > something that happens only once (per channel). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on a change in pull request #6555: [FLINK-10142][network] reduce locking around credit notification
pnowojski commented on a change in pull request #6555: [FLINK-10142][network] reduce locking around credit notification URL: https://github.com/apache/flink/pull/6555#discussion_r210920075 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java ## @@ -170,6 +170,65 @@ public Void call() throws Exception { } } + @Test + public void testConcurrentNotifyBufferAvailableAndRelease() throws Exception { Review comment: Hmmm, isn't this cover by for example `org.apache.flink.streaming.runtime.io.benchmark.StreamNetworkThroughputBenchmarkTest#largeRemoteMode`? Maybe we do not need this test after all? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6555: [FLINK-10142][network] reduce locking around credit notification
pnowojski commented on a change in pull request #6555: [FLINK-10142][network] reduce locking around credit notification URL: https://github.com/apache/flink/pull/6555#discussion_r210916528 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ## @@ -191,15 +191,15 @@ void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException, Int checkError(); final Buffer next; - final int remaining; + final boolean moreAvailable; synchronized (receivedBuffers) { next = receivedBuffers.poll(); - remaining = receivedBuffers.size(); + moreAvailable = !receivedBuffers.isEmpty(); } numBytesIn.inc(next.getSizeUnsafe()); - return Optional.of(new BufferAndAvailability(next, remaining > 0, getSenderBacklog())); + return Optional.of(new BufferAndAvailability(next, moreAvailable, getSenderBacklog())); Review comment: This has also nothing to do with `optimisations reducing lock contention`. Can you extract those changes to separate hotfix? If someone in the future will be looking at `git blame` regarding real `optimisations reducing lock contention` he will have the same "wtf" moments as me right now ;) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6555: [FLINK-10142][network] reduce locking around credit notification
pnowojski commented on a change in pull request #6555: [FLINK-10142][network] reduce locking around credit notification URL: https://github.com/apache/flink/pull/6555#discussion_r210920974 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java ## @@ -170,6 +170,65 @@ public Void call() throws Exception { } } + @Test + public void testConcurrentNotifyBufferAvailableAndRelease() throws Exception { + // Config + // Repeatedly spawn two tasks: one to notify buffer availability and the other to release the channel + // concurrently. We do this repeatedly to provoke races. + final int numberOfRepetitions = 1024; + + // Setup + final ExecutorService executor = Executors.newFixedThreadPool(2); + final Buffer buffer = TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE); + + try { + // Test + final SingleInputGate inputGate = mock(SingleInputGate.class); Review comment: Please no mockito in new tests. Use real object or create proper mock. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6555: [FLINK-10142][network] reduce locking around credit notification
pnowojski commented on a change in pull request #6555: [FLINK-10142][network] reduce locking around credit notification URL: https://github.com/apache/flink/pull/6555#discussion_r210917887 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ## @@ -509,33 +514,40 @@ void onSenderBacklog(int backlog) throws IOException { } public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException { - boolean success = false; + boolean recycleBuffer = true; try { + + final boolean wasEmpty; synchronized (receivedBuffers) { - if (!isReleased.get()) { - if (expectedSequenceNumber == sequenceNumber) { - int available = receivedBuffers.size(); + // Similar to notifyBufferAvailable(), make sure that we never add a buffer + // after releaseAllResources() released all buffers from receivedBuffers + // (see above for details). + if (isReleased.get()) { + return; + } - receivedBuffers.add(buffer); - expectedSequenceNumber++; + if (expectedSequenceNumber != sequenceNumber) { + onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber)); + return; + } - if (available == 0) { - notifyChannelNonEmpty(); - } + wasEmpty = receivedBuffers.isEmpty(); + receivedBuffers.add(buffer); + recycleBuffer = false; + } - success = true; - } else { - onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber)); - } - } + ++expectedSequenceNumber; + + if (wasEmpty) { + notifyChannelNonEmpty(); Review comment: has moving this line from under the lock improved performance in some case? If not, commit title > optimisations reducing lock contention is misleading and I would change it to something about refactor/clean up. Btw, again I do not like such squashing multiple changes in one commit. It actually took me ~10 minutes to realise and make sure what this change is all about and if there are any other meaningful changes or not here. While if you split this commit into `[hotfix][network] refactor/cleanups` and `[FLINK-10141][network] move notifyChannelNonEmpty outside of synchronised block` it would be much much easier to review... Could you split this before merging? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6555: [FLINK-10142][network] reduce locking around credit notification
pnowojski commented on a change in pull request #6555: [FLINK-10142][network] reduce locking around credit notification URL: https://github.com/apache/flink/pull/6555#discussion_r210915852 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java ## @@ -357,19 +357,20 @@ private void writeAndFlushNextMessageIfPossible(Channel channel) { return; } - //It is no need to notify credit for the released channel. - if (!inputChannel.isReleased()) { - AddCredit msg = new AddCredit( - inputChannel.getPartitionId(), - inputChannel.getAndResetUnannouncedCredit(), - inputChannel.getInputChannelId()); + if (inputChannel.isReleased()) { Review comment: Are those changes in this file optimising anything? Or is this irrelevant change to the the rest of the commit? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-8357) enable rolling in default log settings
[ https://issues.apache.org/jira/browse/FLINK-8357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-8357: -- Labels: pull-request-available (was: ) > enable rolling in default log settings > -- > > Key: FLINK-8357 > URL: https://issues.apache.org/jira/browse/FLINK-8357 > Project: Flink > Issue Type: Improvement > Components: Logging >Reporter: Xu Mingmin >Assignee: zhangminglei >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The release packages uses {{org.apache.log4j.FileAppender}} for log4j and > {{ch.qos.logback.core.FileAppender}} for logback, which could results in very > large log files. > For most cases, if not all, we need to enable rotation in a production > cluster, and I suppose it's a good idea to make rotation as default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8357) enable rolling in default log settings
[ https://issues.apache.org/jira/browse/FLINK-8357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583977#comment-16583977 ] ASF GitHub Bot commented on FLINK-8357: --- GJL commented on issue #5371: [FLINK-8357] [conf] Enable rolling in default log settings URL: https://github.com/apache/flink/pull/5371#issuecomment-413878586 Hi, what's the state of this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > enable rolling in default log settings > -- > > Key: FLINK-8357 > URL: https://issues.apache.org/jira/browse/FLINK-8357 > Project: Flink > Issue Type: Improvement > Components: Logging >Reporter: Xu Mingmin >Assignee: zhangminglei >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The release packages uses {{org.apache.log4j.FileAppender}} for log4j and > {{ch.qos.logback.core.FileAppender}} for logback, which could results in very > large log files. > For most cases, if not all, we need to enable rotation in a production > cluster, and I suppose it's a good idea to make rotation as default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] GJL commented on issue #5371: [FLINK-8357] [conf] Enable rolling in default log settings
GJL commented on issue #5371: [FLINK-8357] [conf] Enable rolling in default log settings URL: https://github.com/apache/flink/pull/5371#issuecomment-413878586 Hi, what's the state of this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] FredTing commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer
FredTing commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r210898792 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ## @@ -45,6 +45,22 @@ */ T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; + /** +* Deserializes the byte message. +* +* @param messageKey the key as a byte array (null if no key has been set). +* @param message The message, as a byte array (null if the message was empty or deleted). +* @param partition The partition the message has originated from. +* @param offset the offset of the message in the original source (for example the Kafka offset). +* @param timestamp the timestamp of the consumer record +* @param timestampType The timestamp type, could be NO_TIMESTAMP, CREATE_TIME or INGEST_TIME. +* +* @return The deserialized message as an object (null if the message cannot be deserialized). +*/ + default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException { Review comment: I have a small problem with the second alternative. When I implement this interface in Scala, I do not want to use mutable objectS. I think the `setTimestamp` method is forcing to make the `T deserializedRecord` mutable (for at least the timestamp field) I have no problem with the first alternative, but I think we are better of when we throw an exception with a message explaining that must implement/override one of the `deserialize` methods. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583865#comment-16583865 ] ASF GitHub Bot commented on FLINK-8500: --- FredTing commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r210898792 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ## @@ -45,6 +45,22 @@ */ T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; + /** +* Deserializes the byte message. +* +* @param messageKey the key as a byte array (null if no key has been set). +* @param message The message, as a byte array (null if the message was empty or deleted). +* @param partition The partition the message has originated from. +* @param offset the offset of the message in the original source (for example the Kafka offset). +* @param timestamp the timestamp of the consumer record +* @param timestampType The timestamp type, could be NO_TIMESTAMP, CREATE_TIME or INGEST_TIME. +* +* @return The deserialized message as an object (null if the message cannot be deserialized). +*/ + default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException { Review comment: I have a small problem with the second alternative. When I implement this interface in Scala, I do not want to use mutable objectS. I think the `setTimestamp` method is forcing to make the `T deserializedRecord` mutable (for at least the timestamp field) I have no problem with the first alternative, but I think we are better of when we throw an exception with a message explaining that must implement/override one of the `deserialize` methods. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-7964) Add Apache Kafka 1.0/1.1 connectors
[ https://issues.apache.org/jira/browse/FLINK-7964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-7964: --- Assignee: vinoyang (was: Hai Zhou) > Add Apache Kafka 1.0/1.1 connectors > --- > > Key: FLINK-7964 > URL: https://issues.apache.org/jira/browse/FLINK-7964 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Hai Zhou >Assignee: vinoyang >Priority: Major > Fix For: 1.7.0 > > > Kafka 1.0.0 is no mere bump of the version number. The Apache Kafka Project > Management Committee has packed a number of valuable enhancements into the > release. Here is a summary of a few of them: > * Since its introduction in version 0.10, the Streams API has become hugely > popular among Kafka users, including the likes of Pinterest, Rabobank, > Zalando, and The New York Times. In 1.0, the the API continues to evolve at a > healthy pace. To begin with, the builder API has been improved (KIP-120). A > new API has been added to expose the state of active tasks at runtime > (KIP-130). The new cogroup API makes it much easier to deal with partitioned > aggregates with fewer StateStores and fewer moving parts in your code > (KIP-150). Debuggability gets easier with enhancements to the print() and > writeAsText() methods (KIP-160). And if that’s not enough, check out KIP-138 > and KIP-161 too. For more on streams, check out the Apache Kafka Streams > documentation, including some helpful new tutorial videos. > * Operating Kafka at scale requires that the system remain observable, and to > make that easier, we’ve made a number of improvements to metrics. These are > too many to summarize without becoming tedious, but Connect metrics have been > significantly improved (KIP-196), a litany of new health check metrics are > now exposed (KIP-188), and we now have a global topic and partition count > (KIP-168). Check out KIP-164 and KIP-187 for even more. > * We now support Java 9, leading, among other things, to significantly faster > TLS and CRC32C implementations. Over-the-wire encryption will be faster now, > which will keep Kafka fast and compute costs low when encryption is enabled. > * In keeping with the security theme, KIP-152 cleans up the error handling on > Simple Authentication Security Layer (SASL) authentication attempts. > Previously, some authentication error conditions were indistinguishable from > broker failures and were not logged in a clear way. This is cleaner now. > * Kafka can now tolerate disk failures better. Historically, JBOD storage > configurations have not been recommended, but the architecture has > nevertheless been tempting: after all, why not rely on Kafka’s own > replication mechanism to protect against storage failure rather than using > RAID? With KIP-112, Kafka now handles disk failure more gracefully. A single > disk failure in a JBOD broker will not bring the entire broker down; rather, > the broker will continue serving any log files that remain on functioning > disks. > * Since release 0.11.0, the idempotent producer (which is the producer used > in the presence of a transaction, which of course is the producer we use for > exactly-once processing) required max.in.flight.requests.per.connection to be > equal to one. As anyone who has written or tested a wire protocol can attest, > this put an upper bound on throughput. Thanks to KAFKA-5949, this can now be > as large as five, relaxing the throughput constraint quite a bit. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7964) Add Apache Kafka 1.0/1.1 connectors
[ https://issues.apache.org/jira/browse/FLINK-7964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583844#comment-16583844 ] vinoyang commented on FLINK-7964: - Considering that I waited for a week or so, I still haven't received any response. I decided to take over the issue and if you have any questions, you can contact me. > Add Apache Kafka 1.0/1.1 connectors > --- > > Key: FLINK-7964 > URL: https://issues.apache.org/jira/browse/FLINK-7964 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Hai Zhou >Assignee: Hai Zhou >Priority: Major > Fix For: 1.7.0 > > > Kafka 1.0.0 is no mere bump of the version number. The Apache Kafka Project > Management Committee has packed a number of valuable enhancements into the > release. Here is a summary of a few of them: > * Since its introduction in version 0.10, the Streams API has become hugely > popular among Kafka users, including the likes of Pinterest, Rabobank, > Zalando, and The New York Times. In 1.0, the the API continues to evolve at a > healthy pace. To begin with, the builder API has been improved (KIP-120). A > new API has been added to expose the state of active tasks at runtime > (KIP-130). The new cogroup API makes it much easier to deal with partitioned > aggregates with fewer StateStores and fewer moving parts in your code > (KIP-150). Debuggability gets easier with enhancements to the print() and > writeAsText() methods (KIP-160). And if that’s not enough, check out KIP-138 > and KIP-161 too. For more on streams, check out the Apache Kafka Streams > documentation, including some helpful new tutorial videos. > * Operating Kafka at scale requires that the system remain observable, and to > make that easier, we’ve made a number of improvements to metrics. These are > too many to summarize without becoming tedious, but Connect metrics have been > significantly improved (KIP-196), a litany of new health check metrics are > now exposed (KIP-188), and we now have a global topic and partition count > (KIP-168). Check out KIP-164 and KIP-187 for even more. > * We now support Java 9, leading, among other things, to significantly faster > TLS and CRC32C implementations. Over-the-wire encryption will be faster now, > which will keep Kafka fast and compute costs low when encryption is enabled. > * In keeping with the security theme, KIP-152 cleans up the error handling on > Simple Authentication Security Layer (SASL) authentication attempts. > Previously, some authentication error conditions were indistinguishable from > broker failures and were not logged in a clear way. This is cleaner now. > * Kafka can now tolerate disk failures better. Historically, JBOD storage > configurations have not been recommended, but the architecture has > nevertheless been tempting: after all, why not rely on Kafka’s own > replication mechanism to protect against storage failure rather than using > RAID? With KIP-112, Kafka now handles disk failure more gracefully. A single > disk failure in a JBOD broker will not bring the entire broker down; rather, > the broker will continue serving any log files that remain on functioning > disks. > * Since release 0.11.0, the idempotent producer (which is the producer used > in the presence of a transaction, which of course is the producer we use for > exactly-once processing) required max.in.flight.requests.per.connection to be > equal to one. As anyone who has written or tested a wire protocol can attest, > this put an upper bound on throughput. Thanks to KAFKA-5949, this can now be > as large as five, relaxing the throughput constraint quite a bit. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10166) Dependency problems when executing SQL query in sql-client
[ https://issues.apache.org/jira/browse/FLINK-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583794#comment-16583794 ] Dawid Wysakowicz commented on FLINK-10166: -- I found out it happens only for the hadoopless binary. For others the {{commons-codec}} dependency is shipped with hadoop shaded uber jar. > Dependency problems when executing SQL query in sql-client > -- > > Key: FLINK-10166 > URL: https://issues.apache.org/jira/browse/FLINK-10166 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: Dawid Wysakowicz >Priority: Major > > When tried to run query: > {code} > select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name) > {code} > in {{sql-client.sh}} I got: > {code} > [ERROR] Could not execute SQL statement. Reason: > org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown > variable or type "org.apache.commons.codec.binary.Base64" > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8954) Escape control characters when outputting on SQL Client CLI
[ https://issues.apache.org/jira/browse/FLINK-8954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583742#comment-16583742 ] Timo Walther commented on FLINK-8954: - [~dmitry_amosov] Thank you. I gave you contributor permissions. You can now assign issues to yourself. > Escape control characters when outputting on SQL Client CLI > --- > > Key: FLINK-8954 > URL: https://issues.apache.org/jira/browse/FLINK-8954 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Priority: Major > > Control characters in the result output of a SQL query influence the behavior > of the CLI. We should escape everything that could cause side effects. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10166) Dependency problems when executing SQL query in sql-client
[ https://issues.apache.org/jira/browse/FLINK-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz updated FLINK-10166: - Description: When tried to run query: {code} select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name) {code} in {{sql-client.sh}} I got: {code} [ERROR] Could not execute SQL statement. Reason: org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown variable or type "org.apache.commons.codec.binary.Base64" {code} was: When tried to run query: {{select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name)}} in {{sql-client.sh}} I got: {code} [ERROR] Could not execute SQL statement. Reason: org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown variable or type "org.apache.commons.codec.binary.Base64" {code} > Dependency problems when executing SQL query in sql-client > -- > > Key: FLINK-10166 > URL: https://issues.apache.org/jira/browse/FLINK-10166 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: Dawid Wysakowicz >Priority: Major > > When tried to run query: > {code} > select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name) > {code} > in {{sql-client.sh}} I got: > {code} > [ERROR] Could not execute SQL statement. Reason: > org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown > variable or type "org.apache.commons.codec.binary.Base64" > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10166) Dependency problems when executing SQL query in sql-client
[ https://issues.apache.org/jira/browse/FLINK-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz updated FLINK-10166: - Description: When tried to run query: {{select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name)}} in {{sql-client.sh}} I got: {code} [ERROR] Could not execute SQL statement. Reason: org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown variable or type "org.apache.commons.codec.binary.Base64" {code} was: When tried to run query: {{select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name)}} in `sql-client.sh` I got: {code} [ERROR] Could not execute SQL statement. Reason: org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown variable or type "org.apache.commons.codec.binary.Base64" {code} > Dependency problems when executing SQL query in sql-client > -- > > Key: FLINK-10166 > URL: https://issues.apache.org/jira/browse/FLINK-10166 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: Dawid Wysakowicz >Priority: Major > > When tried to run query: > {{select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name)}} > in {{sql-client.sh}} I got: > {code} > [ERROR] Could not execute SQL statement. Reason: > org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown > variable or type "org.apache.commons.codec.binary.Base64" > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10166) Dependency problems when executing SQL query in sql-client
Dawid Wysakowicz created FLINK-10166: Summary: Dependency problems when executing SQL query in sql-client Key: FLINK-10166 URL: https://issues.apache.org/jira/browse/FLINK-10166 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.6.0 Reporter: Dawid Wysakowicz When tried to run query: {{select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name)}} in `sql-client.sh` I got: {code} [ERROR] Could not execute SQL statement. Reason: org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown variable or type "org.apache.commons.codec.binary.Base64" {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8954) Escape control characters when outputting on SQL Client CLI
[ https://issues.apache.org/jira/browse/FLINK-8954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583696#comment-16583696 ] Dmitry Amosov commented on FLINK-8954: -- Hello, I'd like to help with this task. Can you assign it to me? > Escape control characters when outputting on SQL Client CLI > --- > > Key: FLINK-8954 > URL: https://issues.apache.org/jira/browse/FLINK-8954 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Priority: Major > > Control characters in the result output of a SQL query influence the behavior > of the CLI. We should escape everything that could cause side effects. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10065) InstantiationUtil.deserializeObject(InputStream in, ClassLoader cl, boolean isFailureTolerant) will close the inputStream
[ https://issues.apache.org/jira/browse/FLINK-10065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583694#comment-16583694 ] ASF GitHub Bot commented on FLINK-10065: klion26 commented on issue #6498: [FLINK-10065] InstantiationUtil.deserializeObject(InputStream in, ClassLoader cl, boolean isFailureTolerant) will close the inputStream URL: https://github.com/apache/flink/pull/6498#issuecomment-413810934 ping @StephanEwen @tzulitai @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > InstantiationUtil.deserializeObject(InputStream in, ClassLoader cl, boolean > isFailureTolerant) will close the inputStream > - > > Key: FLINK-10065 > URL: https://issues.apache.org/jira/browse/FLINK-10065 > Project: Flink > Issue Type: Bug >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > Now, the implementation of InstantiationUtil.deserializeObject(InputStream > in, ClassLoader cl, boolean isFailureTolerant) is > {code:java} > @SuppressWarnings("unchecked") > public static T deserializeObject(InputStream in, ClassLoader cl, boolean > isFailureTolerant) > throws IOException, ClassNotFoundException { > final ClassLoader old = Thread.currentThread().getContextClassLoader(); > // not using resource try to avoid AutoClosable's close() on the given stream > try (ObjectInputStream oois = isFailureTolerant > ? new InstantiationUtil.FailureTolerantObjectInputStream(in, cl) > : new InstantiationUtil.ClassLoaderObjectInputStream(in, cl)) { > Thread.currentThread().setContextClassLoader(cl); > return (T) oois.readObject(); > } > finally { > Thread.currentThread().setContextClassLoader(old); > } > } > {code} > InputStream is closable, so the parameter will be closed after call this > method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] klion26 commented on issue #6498: [FLINK-10065] InstantiationUtil.deserializeObject(InputStream in, ClassLoader cl, boolean isFailureTolerant) will close the inputStream
klion26 commented on issue #6498: [FLINK-10065] InstantiationUtil.deserializeObject(InputStream in, ClassLoader cl, boolean isFailureTolerant) will close the inputStream URL: https://github.com/apache/flink/pull/6498#issuecomment-413810934 ping @StephanEwen @tzulitai @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10059) Add LTRIM supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583686#comment-16583686 ] ASF GitHub Bot commented on FLINK-10059: yanghua commented on issue #6494: [FLINK-10059] [table] Add LTRIM supported in Table API and SQL URL: https://github.com/apache/flink/pull/6494#issuecomment-413808097 @xccui updated, will also update `RTRIM` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add LTRIM supported in Table API and SQL > > > Key: FLINK-10059 > URL: https://issues.apache.org/jira/browse/FLINK-10059 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > refer to MYSQL ltrim function : > https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ltrim -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10165) JarRunHandler/JarRunRequestBody should allow to pass program arguments as escaped json list
Maciej Prochniak created FLINK-10165: Summary: JarRunHandler/JarRunRequestBody should allow to pass program arguments as escaped json list Key: FLINK-10165 URL: https://issues.apache.org/jira/browse/FLINK-10165 Project: Flink Issue Type: Improvement Components: REST Affects Versions: 1.6.0 Reporter: Maciej Prochniak Currently program arguments are parsed from plain string: [https://github.com/apache/flink/blob/master/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L106] It doesn't allow to put quotes or new lines in arguments - in particular it's difficult to pass json as argument. I think it would be good to pass arguments as json list - then jackson would handle escaping. It'd be a bit more problematic for query string parameters... WDYT [~Zentol]? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6494: [FLINK-10059] [table] Add LTRIM supported in Table API and SQL
yanghua commented on issue #6494: [FLINK-10059] [table] Add LTRIM supported in Table API and SQL URL: https://github.com/apache/flink/pull/6494#issuecomment-413808097 @xccui updated, will also update `RTRIM` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10059) Add LTRIM supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583608#comment-16583608 ] ASF GitHub Bot commented on FLINK-10059: xccui commented on a change in pull request #6494: [FLINK-10059] [table] Add LTRIM supported in Table API and SQL URL: https://github.com/apache/flink/pull/6494#discussion_r210846147 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ## @@ -626,6 +626,33 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "-") } + @Test Review comment: Please add a test case (for demonstration) in `org.apache.flink.table.expressions.SqlExpressionTest`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add LTRIM supported in Table API and SQL > > > Key: FLINK-10059 > URL: https://issues.apache.org/jira/browse/FLINK-10059 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > refer to MYSQL ltrim function : > https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ltrim -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xccui commented on a change in pull request #6494: [FLINK-10059] [table] Add LTRIM supported in Table API and SQL
xccui commented on a change in pull request #6494: [FLINK-10059] [table] Add LTRIM supported in Table API and SQL URL: https://github.com/apache/flink/pull/6494#discussion_r210846147 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ## @@ -626,6 +626,33 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "-") } + @Test Review comment: Please add a test case (for demonstration) in `org.apache.flink.table.expressions.SqlExpressionTest`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10059) Add LTRIM supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583606#comment-16583606 ] ASF GitHub Bot commented on FLINK-10059: xccui commented on a change in pull request #6494: [FLINK-10059] [table] Add LTRIM supported in Table API and SQL URL: https://github.com/apache/flink/pull/6494#discussion_r210845279 ## File path: docs/dev/table/functions.md ## @@ -2482,6 +2482,18 @@ TO_BASE64(string) E.g., TO_BASE64('hello world') returns "aGVsbG8gd29ybGQ=". + + + +{% highlight text %} Review comment: Please add the corresponding docs for Java and Scala. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add LTRIM supported in Table API and SQL > > > Key: FLINK-10059 > URL: https://issues.apache.org/jira/browse/FLINK-10059 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > refer to MYSQL ltrim function : > https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ltrim -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xccui commented on a change in pull request #6494: [FLINK-10059] [table] Add LTRIM supported in Table API and SQL
xccui commented on a change in pull request #6494: [FLINK-10059] [table] Add LTRIM supported in Table API and SQL URL: https://github.com/apache/flink/pull/6494#discussion_r210845279 ## File path: docs/dev/table/functions.md ## @@ -2482,6 +2482,18 @@ TO_BASE64(string) E.g., TO_BASE64('hello world') returns "aGVsbG8gd29ybGQ=". + + + +{% highlight text %} Review comment: Please add the corresponding docs for Java and Scala. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10059) Add LTRIM supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583598#comment-16583598 ] ASF GitHub Bot commented on FLINK-10059: yanghua commented on issue #6494: [FLINK-10059] [table] Add LTRIM supported in Table API and SQL URL: https://github.com/apache/flink/pull/6494#issuecomment-413798852 @xccui Does this PR look good to you? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add LTRIM supported in Table API and SQL > > > Key: FLINK-10059 > URL: https://issues.apache.org/jira/browse/FLINK-10059 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > refer to MYSQL ltrim function : > https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ltrim -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6494: [FLINK-10059] [table] Add LTRIM supported in Table API and SQL
yanghua commented on issue #6494: [FLINK-10059] [table] Add LTRIM supported in Table API and SQL URL: https://github.com/apache/flink/pull/6494#issuecomment-413798852 @xccui Does this PR look good to you? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jrthe42 opened a new pull request #6575: [hotfix][table] Fix bug in RowtimeValidator when getting custom TimestampExtractor
jrthe42 opened a new pull request #6575: [hotfix][table] Fix bug in RowtimeValidator when getting custom TimestampExtractor URL: https://github.com/apache/flink/pull/6575 This pull request fixes a bug in ```RowtimeValidator``` , which may cause an exception when using custom ```TimestampExtractor``` in Flink SQL. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-9850. - > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski resolved FLINK-9850. --- Resolution: Fixed Fix Version/s: 1.7.0 Merged to master as effa09cb80adc76842952ea78acc746fd1f826e7 > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583552#comment-16583552 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski closed pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java new file mode 100644 index 000..de058b69f1f --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions.util; + +import org.apache.flink.annotation.Internal; + +import java.io.PrintStream; +import java.io.Serializable; + +/** + * Print sink output writer for DataStream and DataSet print API. + */ +@Internal +public class PrintSinkOutputWriter implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final boolean STD_OUT = false; + private static final boolean STD_ERR = true; + + private final boolean target; + private transient PrintStream stream; + private final String sinkIdentifier; + private transient String completedPrefix; + + public PrintSinkOutputWriter() { + this("", STD_OUT); + } + + public PrintSinkOutputWriter(final boolean stdErr) { + this("", stdErr); + } + + public PrintSinkOutputWriter(final String sinkIdentifier, final boolean stdErr) { + this.target = stdErr; + this.sinkIdentifier = (sinkIdentifier == null ? "" : sinkIdentifier); + } + + public void open(int subtaskIndex, int numParallelSubtasks) { + // get the target stream + stream = target == STD_OUT ? System.out : System.err; + + completedPrefix = sinkIdentifier; + + if (numParallelSubtasks > 1) { + if (!completedPrefix.isEmpty()) { + completedPrefix += ":"; + } + completedPrefix += (subtaskIndex + 1); + } + + if (!completedPrefix.isEmpty()) { + completedPrefix += "> "; + } + } + + public void write(IN record) { + stream.println(completedPrefix + record.toString()); + } + + @Override + public String toString() { + return "Print to " + (target == STD_OUT ? "System.out" : "System.err"); + } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java index 0ab1abb2efb..62eabd0b739 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java @@ -19,45 +19,46 @@ package org.apache.flink.api.java.io; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.util.PrintSinkOutputWriter; import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.configuration.Configuration; -import java.io.PrintStream; - /** * Output format that prints results into either stdout or stderr. - * @param + * + * + * Four possible format options: + * {@code sinkIdentifier}:taskId> output <- {@code sinkIdentifier} provided, parallelism > 1 + * {@code sinkIdentifier}> output <- {@code sinkIdentifier} provided, parallelism == 1 + * taskId> output
[GitHub] pnowojski closed pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream
pnowojski closed pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java new file mode 100644 index 000..de058b69f1f --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions.util; + +import org.apache.flink.annotation.Internal; + +import java.io.PrintStream; +import java.io.Serializable; + +/** + * Print sink output writer for DataStream and DataSet print API. + */ +@Internal +public class PrintSinkOutputWriter implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final boolean STD_OUT = false; + private static final boolean STD_ERR = true; + + private final boolean target; + private transient PrintStream stream; + private final String sinkIdentifier; + private transient String completedPrefix; + + public PrintSinkOutputWriter() { + this("", STD_OUT); + } + + public PrintSinkOutputWriter(final boolean stdErr) { + this("", stdErr); + } + + public PrintSinkOutputWriter(final String sinkIdentifier, final boolean stdErr) { + this.target = stdErr; + this.sinkIdentifier = (sinkIdentifier == null ? "" : sinkIdentifier); + } + + public void open(int subtaskIndex, int numParallelSubtasks) { + // get the target stream + stream = target == STD_OUT ? System.out : System.err; + + completedPrefix = sinkIdentifier; + + if (numParallelSubtasks > 1) { + if (!completedPrefix.isEmpty()) { + completedPrefix += ":"; + } + completedPrefix += (subtaskIndex + 1); + } + + if (!completedPrefix.isEmpty()) { + completedPrefix += "> "; + } + } + + public void write(IN record) { + stream.println(completedPrefix + record.toString()); + } + + @Override + public String toString() { + return "Print to " + (target == STD_OUT ? "System.out" : "System.err"); + } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java index 0ab1abb2efb..62eabd0b739 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java @@ -19,45 +19,46 @@ package org.apache.flink.api.java.io; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.util.PrintSinkOutputWriter; import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.configuration.Configuration; -import java.io.PrintStream; - /** * Output format that prints results into either stdout or stderr. - * @param + * + * + * Four possible format options: + * {@code sinkIdentifier}:taskId> output <- {@code sinkIdentifier} provided, parallelism > 1 + * {@code sinkIdentifier}> output <- {@code sinkIdentifier} provided, parallelism == 1 + * taskId> output<- no {@code sinkIdentifier} provided, parallelism > 1 + * output<- no {@code sinkIdentifier} provided, parallelism == 1 + * + * + * @param Input record type */ @PublicEvolving publi
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583550#comment-16583550 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-413787139 Thanks for the contribution! @zentol told me his LGTM so merging now :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10122) KafkaConsumer should use partitionable state over union state if partition discovery is not active
[ https://issues.apache.org/jira/browse/FLINK-10122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583549#comment-16583549 ] ASF GitHub Bot commented on FLINK-10122: StefanRRichter commented on issue #6537: [FLINK-10122] KafkaConsumer should use partitionable state over union state if partition discovery is not active URL: https://github.com/apache/flink/pull/6537#issuecomment-413787110 Thanks @tzulitai ! I was aware that this will break the behavior for partition discovery. However, the current implementation was already broken for user at large scale, as pointed out in the description. This PR was intended as a quick solution for this case. I think that we can have better non-breaking solutions in the future like splitting the source into two operators or a different state partitioning scheme. I think that we can close the PR and go for the long term solution in official releases. Nevertheless I think that we should cherry-pick two parts of this PR into releases, the hotfix to improve memory utilization and the option to remove operator state (or - even better - states in general). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KafkaConsumer should use partitionable state over union state if partition > discovery is not active > -- > > Key: FLINK-10122 > URL: https://issues.apache.org/jira/browse/FLINK-10122 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > KafkaConsumer store its offsets state always as union state. I think this is > only required in the case that partition discovery is active. For jobs with a > very high parallelism, the union state can lead to prohibitively expensive > deployments. For example, a job with 2000 source and a total of 10MB > checkpointed union state offsets state would have to ship ~ 2000 x 10MB = > 20GB of state. With partitionable state, it would have to ship ~10MB. > For now, I would suggest to go back to partitionable state in case that > partition discovery is not active. In the long run, I have some ideas for > more efficient partitioning schemes that would also work for active discovery. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream
pnowojski commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-413787139 Thanks for the contribution! @zentol told me his LGTM so merging now :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on issue #6537: [FLINK-10122] KafkaConsumer should use partitionable state over union state if partition discovery is not active
StefanRRichter commented on issue #6537: [FLINK-10122] KafkaConsumer should use partitionable state over union state if partition discovery is not active URL: https://github.com/apache/flink/pull/6537#issuecomment-413787110 Thanks @tzulitai ! I was aware that this will break the behavior for partition discovery. However, the current implementation was already broken for user at large scale, as pointed out in the description. This PR was intended as a quick solution for this case. I think that we can have better non-breaking solutions in the future like splitting the source into two operators or a different state partitioning scheme. I think that we can close the PR and go for the long term solution in official releases. Nevertheless I think that we should cherry-pick two parts of this PR into releases, the hotfix to improve memory utilization and the option to remove operator state (or - even better - states in general). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-10159) TestHarness#initializeState(xyz) calls after TestHarness#open() are being silently ignored
[ https://issues.apache.org/jira/browse/FLINK-10159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski resolved FLINK-10159. Resolution: Fixed Fix Version/s: 1.7.0 > TestHarness#initializeState(xyz) calls after TestHarness#open() are being > silently ignored > -- > > Key: FLINK-10159 > URL: https://issues.apache.org/jira/browse/FLINK-10159 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.6.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > This is an old issue. Incorrect order of initializeState and open result to > initializeState being ignored. For example in this code: > {code:java} > testHarness = createTestHarness(topic); > testHarness.setup(); > testHarness.open(); > testHarness.initializeState(snapshot1); > {code} > Which is miss-leading both for Flink developers and for users (since we > recommend using test harness for unit tests). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10159) TestHarness#initializeState(xyz) calls after TestHarness#open() are being silently ignored
[ https://issues.apache.org/jira/browse/FLINK-10159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583499#comment-16583499 ] Piotr Nowojski commented on FLINK-10159: Merged on master as b70f78392424b5bdb4119ee8fdbfe16fee13 > TestHarness#initializeState(xyz) calls after TestHarness#open() are being > silently ignored > -- > > Key: FLINK-10159 > URL: https://issues.apache.org/jira/browse/FLINK-10159 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.6.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > This is an old issue. Incorrect order of initializeState and open result to > initializeState being ignored. For example in this code: > {code:java} > testHarness = createTestHarness(topic); > testHarness.setup(); > testHarness.open(); > testHarness.initializeState(snapshot1); > {code} > Which is miss-leading both for Flink developers and for users (since we > recommend using test harness for unit tests). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10159) TestHarness#initializeState(xyz) calls after TestHarness#open() are being silently ignored
[ https://issues.apache.org/jira/browse/FLINK-10159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-10159. -- > TestHarness#initializeState(xyz) calls after TestHarness#open() are being > silently ignored > -- > > Key: FLINK-10159 > URL: https://issues.apache.org/jira/browse/FLINK-10159 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.6.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > This is an old issue. Incorrect order of initializeState and open result to > initializeState being ignored. For example in this code: > {code:java} > testHarness = createTestHarness(topic); > testHarness.setup(); > testHarness.open(); > testHarness.initializeState(snapshot1); > {code} > Which is miss-leading both for Flink developers and for users (since we > recommend using test harness for unit tests). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10159) TestHarness#initializeState(xyz) calls after TestHarness#open() are being silently ignored
[ https://issues.apache.org/jira/browse/FLINK-10159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583496#comment-16583496 ] ASF GitHub Bot commented on FLINK-10159: pnowojski closed pull request #6570: [FLINK-10159][tests] Fail TestHarness.initializeState if harness has already been initialized URL: https://github.com/apache/flink/pull/6570 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java index 74c58ad2891..57b7e77dc7f 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java @@ -172,7 +172,6 @@ public void testFlinkKafkaProducer011FailBeforeNotify() throws Exception { testHarness.setup(); testHarness.open(); - testHarness.initializeState(null); testHarness.processElement(42, 0); testHarness.snapshot(0, 1); testHarness.processElement(43, 2); @@ -225,7 +224,6 @@ public void testFlinkKafkaProducer011FailTransactionCoordinatorBeforeNotify() th testHarness1.setup(); testHarness1.open(); - testHarness1.initializeState(null); testHarness1.processElement(42, 0); testHarness1.snapshot(0, 1); testHarness1.processElement(43, 2); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java index ff0b0fcb172..644ab04fb70 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java @@ -30,6 +30,9 @@ import java.util.Collections; import java.util.List; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + /** * Tests for {@link ListCheckpointed}. */ @@ -37,35 +40,39 @@ @Test public void testUDFReturningNull() throws Exception { - TestUserFunction userFunction = new TestUserFunction(null); - AbstractStreamOperatorTestHarness testHarness = - new AbstractStreamOperatorTestHarness<>(new StreamMap<>(userFunction), 1, 1, 0); - testHarness.open(); - OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); - testHarness.initializeState(snapshot); - Assert.assertTrue(userFunction.isRestored()); + testUDF(new TestUserFunction(null)); } @Test public void testUDFReturningEmpty() throws Exception { - TestUserFunction userFunction = new TestUserFunction(Collections.emptyList()); - AbstractStreamOperatorTestHarness testHarness = - new AbstractStreamOperatorTestHarness<>(new StreamMap<>(userFunction), 1, 1, 0); - testHarness.open(); - OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); - testHarness.initializeState(snapshot); - Assert.assertTrue(userFunction.isRestored()); + testUDF(new TestUserFunction(Collections.emptyList())); } @Test public void testUDFReturningData() throws Exception { - TestUserFunction userFunction = new TestUserFunction(Arrays.asList(1, 2, 3)); - AbstractStreamOperatorTestHarness testHarness = - new AbstractStreamOperatorTestHarness<>(new StreamMap<>(userFunction), 1, 1, 0); - testHarness.open(); - OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); - testHarness.initializeState(snapshot); - Assert.assertTrue(userFunction.isRestored()); + testUDF(new TestUserFunction(Arrays.asList(1, 2, 3))); + } + + private static void testUDF(TestUserFunction userFunction) throws Exception { + OperatorSubtaskState snapshot; + try (AbstractStreamOperatorTestHarness testHarness = createTestHarness(userFunction)) { +
[jira] [Commented] (FLINK-10159) TestHarness#initializeState(xyz) calls after TestHarness#open() are being silently ignored
[ https://issues.apache.org/jira/browse/FLINK-10159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583497#comment-16583497 ] ASF GitHub Bot commented on FLINK-10159: pnowojski commented on issue #6570: [FLINK-10159][tests] Fail TestHarness.initializeState if harness has already been initialized URL: https://github.com/apache/flink/pull/6570#issuecomment-413782923 Thanks! Merged. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > TestHarness#initializeState(xyz) calls after TestHarness#open() are being > silently ignored > -- > > Key: FLINK-10159 > URL: https://issues.apache.org/jira/browse/FLINK-10159 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.6.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Critical > Labels: pull-request-available > > This is an old issue. Incorrect order of initializeState and open result to > initializeState being ignored. For example in this code: > {code:java} > testHarness = createTestHarness(topic); > testHarness.setup(); > testHarness.open(); > testHarness.initializeState(snapshot1); > {code} > Which is miss-leading both for Flink developers and for users (since we > recommend using test harness for unit tests). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on issue #6570: [FLINK-10159][tests] Fail TestHarness.initializeState if harness has already been initialized
pnowojski commented on issue #6570: [FLINK-10159][tests] Fail TestHarness.initializeState if harness has already been initialized URL: https://github.com/apache/flink/pull/6570#issuecomment-413782923 Thanks! Merged. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski closed pull request #6570: [FLINK-10159][tests] Fail TestHarness.initializeState if harness has already been initialized
pnowojski closed pull request #6570: [FLINK-10159][tests] Fail TestHarness.initializeState if harness has already been initialized URL: https://github.com/apache/flink/pull/6570 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java index 74c58ad2891..57b7e77dc7f 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java @@ -172,7 +172,6 @@ public void testFlinkKafkaProducer011FailBeforeNotify() throws Exception { testHarness.setup(); testHarness.open(); - testHarness.initializeState(null); testHarness.processElement(42, 0); testHarness.snapshot(0, 1); testHarness.processElement(43, 2); @@ -225,7 +224,6 @@ public void testFlinkKafkaProducer011FailTransactionCoordinatorBeforeNotify() th testHarness1.setup(); testHarness1.open(); - testHarness1.initializeState(null); testHarness1.processElement(42, 0); testHarness1.snapshot(0, 1); testHarness1.processElement(43, 2); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java index ff0b0fcb172..644ab04fb70 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java @@ -30,6 +30,9 @@ import java.util.Collections; import java.util.List; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + /** * Tests for {@link ListCheckpointed}. */ @@ -37,35 +40,39 @@ @Test public void testUDFReturningNull() throws Exception { - TestUserFunction userFunction = new TestUserFunction(null); - AbstractStreamOperatorTestHarness testHarness = - new AbstractStreamOperatorTestHarness<>(new StreamMap<>(userFunction), 1, 1, 0); - testHarness.open(); - OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); - testHarness.initializeState(snapshot); - Assert.assertTrue(userFunction.isRestored()); + testUDF(new TestUserFunction(null)); } @Test public void testUDFReturningEmpty() throws Exception { - TestUserFunction userFunction = new TestUserFunction(Collections.emptyList()); - AbstractStreamOperatorTestHarness testHarness = - new AbstractStreamOperatorTestHarness<>(new StreamMap<>(userFunction), 1, 1, 0); - testHarness.open(); - OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); - testHarness.initializeState(snapshot); - Assert.assertTrue(userFunction.isRestored()); + testUDF(new TestUserFunction(Collections.emptyList())); } @Test public void testUDFReturningData() throws Exception { - TestUserFunction userFunction = new TestUserFunction(Arrays.asList(1, 2, 3)); - AbstractStreamOperatorTestHarness testHarness = - new AbstractStreamOperatorTestHarness<>(new StreamMap<>(userFunction), 1, 1, 0); - testHarness.open(); - OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); - testHarness.initializeState(snapshot); - Assert.assertTrue(userFunction.isRestored()); + testUDF(new TestUserFunction(Arrays.asList(1, 2, 3))); + } + + private static void testUDF(TestUserFunction userFunction) throws Exception { + OperatorSubtaskState snapshot; + try (AbstractStreamOperatorTestHarness testHarness = createTestHarness(userFunction)) { + testHarness.open(); + snapshot = testHarness.snapshot(0L, 0L); + assertFalse(userFunction.isRestored()); + } + try (AbstractStreamOperatorTestHarness testHarness = createTest
[jira] [Commented] (FLINK-8868) Support Table Function as Table for Stream Sql
[ https://issues.apache.org/jira/browse/FLINK-8868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583487#comment-16583487 ] ASF GitHub Bot commented on FLINK-8868: --- Xpray commented on issue #6574: [FLINK-8868] [table] Support Table Function as Table Source for Stream Sql URL: https://github.com/apache/flink/pull/6574#issuecomment-413779455 It should be FLINK-8868, I'll fix this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support Table Function as Table for Stream Sql > -- > > Key: FLINK-8868 > URL: https://issues.apache.org/jira/browse/FLINK-8868 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li >Priority: Major > Labels: pull-request-available > > support SQL like: SELECT * FROM TABLE(tf("a")) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8868) Support Table Function as Table for Stream Sql
[ https://issues.apache.org/jira/browse/FLINK-8868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-8868: -- Labels: pull-request-available (was: ) > Support Table Function as Table for Stream Sql > -- > > Key: FLINK-8868 > URL: https://issues.apache.org/jira/browse/FLINK-8868 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li >Priority: Major > Labels: pull-request-available > > support SQL like: SELECT * FROM TABLE(tf("a")) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Xpray commented on issue #6574: [FLINK-8868] [table] Support Table Function as Table Source for Stream Sql
Xpray commented on issue #6574: [FLINK-8868] [table] Support Table Function as Table Source for Stream Sql URL: https://github.com/apache/flink/pull/6574#issuecomment-413779455 It should be FLINK-8868, I'll fix this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8532) RebalancePartitioner should use Random value for its first partition
[ https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583484#comment-16583484 ] ASF GitHub Bot commented on FLINK-8532: --- Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition URL: https://github.com/apache/flink/pull/6544#issuecomment-413778059 thanks~ @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RebalancePartitioner should use Random value for its first partition > > > Key: FLINK-8532 > URL: https://issues.apache.org/jira/browse/FLINK-8532 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Yuta Morisawa >Assignee: Guibo Pan >Priority: Major > Labels: pull-request-available > > In some conditions, RebalancePartitioner doesn't balance data correctly > because it use the same value for selecting next operators. > RebalancePartitioner initializes its partition id using the same value in > every threads, so it indeed balances data, but at one moment the amount of > data in each operator is skew. > Particularly, when the data rate of former operators is equal , data skew > becomes severe. > > > Example: > Consider a simple operator chain. > -> map1 -> rebalance -> map2 -> > Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, > 6). > map1 map2 > st1 st4 > st2 st5 > st3 st6 > > At the beginning, every subtasks in map1 sends data to st4 in map2 because > they use the same initial parition id. > Next time the map1 receive data st1,2,3 send data to st5 because they > increment its partition id when they processed former data. > In my environment, it takes twice the time to process data when I use > RebalancePartitioner as long as I use other partitioners(rescale, keyby). > > To solve this problem, in my opinion, RebalancePartitioner should use its own > operator id for the initial value. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition
Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition URL: https://github.com/apache/flink/pull/6544#issuecomment-413778059 thanks~ @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services