[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584676#comment-16584676
 ] 

ASF GitHub Bot commented on FLINK-9962:
---

kl0u commented on issue #6492: [FLINK-9962] [FS connector] allow users to 
specify TimeZone in DateTimeBucketer
URL: https://github.com/apache/flink/pull/6492#issuecomment-414036401
 
 
   Hi @bowenli86 ! Are you planning  to continue working on this issue? If not, 
I could work on that next week ;)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> allow users to specify TimeZone in DateTimeBucketer
> ---
>
> Key: FLINK-9962
> URL: https://issues.apache.org/jira/browse/FLINK-9962
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.5.1, 1.6.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently {{DateTimeBucketer}} will return a bucket path by using local 
> timezone. We should add a {{timezone}} constructor param to allow users to 
> specify a timezone.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] kl0u commented on issue #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer

2018-08-17 Thread GitBox
kl0u commented on issue #6492: [FLINK-9962] [FS connector] allow users to 
specify TimeZone in DateTimeBucketer
URL: https://github.com/apache/flink/pull/6492#issuecomment-414036401
 
 
   Hi @bowenli86 ! Are you planning  to continue working on this issue? If not, 
I could work on that next week ;)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10168) support filtering files by modified/created time in StreamExecutionEnvironment.readFile()

2018-08-17 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584675#comment-16584675
 ] 

buptljy commented on FLINK-10168:
-

[~phoenixjiangnan] Thanks! I think this will be a good improvement. We can 
define some readfile functions, which are based on the prefix and suffix of 
file names and last modified time.

However, is it necessary to expose a generic filter function and let developers 
define their own file filters?  Do we really have so many different application 
scenarios of readfile function? As far as I know, most cases can be covered by 
the three functions above.

> support filtering files by modified/created time in 
> StreamExecutionEnvironment.readFile()
> -
>
> Key: FLINK-10168
> URL: https://issues.apache.org/jira/browse/FLINK-10168
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.6.0
>Reporter: Bowen Li
>Assignee: buptljy
>Priority: Major
> Fix For: 1.7.0
>
>
> support filtering files by modified/created time in 
> {{StreamExecutionEnvironment.readFile()}}
> for example, in a source dir with lots of file, we only want to read files 
> that is created or modified after a specific time.
> This API can expose a generic filter function of files, and let users define 
> filtering rules. Currently Flink only supports filtering files by path.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client

2018-08-17 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584672#comment-16584672
 ] 

Timo Walther commented on FLINK-10163:
--

[~suez1224] Yes, I think the full implementation as [~hequn8128] added it to 
the issue should be done with the DDL FLIP. We need a proper parser for this 
that also allows escaping like \{{CREATE VIEW `My Table Name`}} and specifying 
optional column names and types. Also \{{DROP VIEW}} and \{{REPLACE VIEW}}. 
This should be integrated with the Table API.

This issue descibes just a MVP solution for allowing the very basic 
\{{CREATE_VIEW name AS query}} for the SQL Client. Because there is currently 
no way of defining a similar functionality like 
\{{tableEnv.registerTable("name", ...)}} in Table API.

> Support CREATE VIEW in SQL Client
> -
>
> Key: FLINK-10163
> URL: https://issues.apache.org/jira/browse/FLINK-10163
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> The possibility to define a name for a subquery would improve the usability 
> of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a 
> virtual table.
>  
> Example:
> {code}
>  CREATE VIEW viewName
>  [ '(' columnName [, columnName ]* ')' ]
>  AS Query
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10168) support filtering files by modified/created time in StreamExecutionEnvironment.readFile()

2018-08-17 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy reassigned FLINK-10168:
---

Assignee: buptljy

> support filtering files by modified/created time in 
> StreamExecutionEnvironment.readFile()
> -
>
> Key: FLINK-10168
> URL: https://issues.apache.org/jira/browse/FLINK-10168
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.6.0
>Reporter: Bowen Li
>Assignee: buptljy
>Priority: Major
> Fix For: 1.7.0
>
>
> support filtering files by modified/created time in 
> {{StreamExecutionEnvironment.readFile()}}
> for example, in a source dir with lots of file, we only want to read files 
> that is created or modified after a specific time.
> This API can expose a generic filter function of files, and let users define 
> filtering rules. Currently Flink only supports filtering files by path.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9407) Support orc rolling sink writer

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584667#comment-16584667
 ] 

ASF GitHub Bot commented on FLINK-9407:
---

phenixmzy edited a comment on issue #6075: [FLINK-9407] [hdfs connector] 
Support orc rolling sink writer
URL: https://github.com/apache/flink/pull/6075#issuecomment-414035357
 
 
   @zhangminglei 
   the OrcFileWriter with BucketingSink can rolling  when the batchSize is full?
   It seems that one record one file.
   the code:
   ===
   def orcSchemaMetaInfo = String.format(
   "struct<%s,%s,%s...,%s>",
   "nt:string", "event_time:string", "event_id:string", .., 
"appname:string")
   
   def getRowSink(distPath : String) = {
   val sink = new BucketingSink[Row](distPath + "/with-bucket/")
   sink.setBatchSize(1024 * 1024 * 1024)
 .setBucketer(new DateTimeBucketer[[Row]] ("MMdd/HHmm"))
 .setWriter(new OrcFileWriter[[Row]] (orcSchemaMetaInfo))
 .setPartPrefix("sdk-etl")
   sink
 }
   
   def getOrcRow(item : sdkItem) : Row={
   val row = Row.of(
 item.getNt, item.getEvent_time, 
item.getEvent_id,.,item.getAppid,item.getAppname)
   row
   }
   
   
   ...
   val kafkaConsumer = new FlinkKafkaConsumer011(inputTopic, new 
SimpleStringSchema, params.getProperties)
   val messageStream = env.addSource(kafkaConsumer)
 .flatMap(in => SDKParse.parseSDK(in, inputTopic))
 .filter(item => item != None)
 .flatMap(item => Some(item).get)
 .map(item => getOrcRow(item))
   messageStream.addSink(getRowSink(distPath))


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support orc rolling sink writer
> ---
>
> Key: FLINK-9407
> URL: https://issues.apache.org/jira/browse/FLINK-9407
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: zhangminglei
>Assignee: zhangminglei
>Priority: Major
>  Labels: pull-request-available
>
> Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and 
> {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling 
> sink.
> Below, FYI.
> I tested the PR and verify the results with spark sql. Obviously, we can get 
> the results of what we had written down before. But I will give more tests in 
> the next couple of days. Including the performance under compression with 
> short checkpoint intervals. And more UTs.
> {code:java}
> scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21")
> res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more 
> field]
> scala>
> scala> res1.registerTempTable("tablerice")
> warning: there was one deprecation warning; re-run with -deprecation for 
> details
> scala> spark.sql("select * from tablerice")
> res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more 
> field]
> scala> res3.show(3)
> +-+---+---+
> | name|age|married|
> +-+---+---+
> |Sagar| 26|  false|
> |Sagar| 30|  false|
> |Sagar| 34|  false|
> +-+---+---+
> only showing top 3 rows
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] phenixmzy edited a comment on issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink writer

2018-08-17 Thread GitBox
phenixmzy edited a comment on issue #6075: [FLINK-9407] [hdfs connector] 
Support orc rolling sink writer
URL: https://github.com/apache/flink/pull/6075#issuecomment-414035357
 
 
   @zhangminglei 
   the OrcFileWriter with BucketingSink can rolling  when the batchSize is full?
   It seems that one record one file.
   the code:
   ===
   def orcSchemaMetaInfo = String.format(
   "struct<%s,%s,%s...,%s>",
   "nt:string", "event_time:string", "event_id:string", .., 
"appname:string")
   
   def getRowSink(distPath : String) = {
   val sink = new BucketingSink[Row](distPath + "/with-bucket/")
   sink.setBatchSize(1024 * 1024 * 1024)
 .setBucketer(new DateTimeBucketer[[Row]] ("MMdd/HHmm"))
 .setWriter(new OrcFileWriter[[Row]] (orcSchemaMetaInfo))
 .setPartPrefix("sdk-etl")
   sink
 }
   
   def getOrcRow(item : sdkItem) : Row={
   val row = Row.of(
 item.getNt, item.getEvent_time, 
item.getEvent_id,.,item.getAppid,item.getAppname)
   row
   }
   
   
   ...
   val kafkaConsumer = new FlinkKafkaConsumer011(inputTopic, new 
SimpleStringSchema, params.getProperties)
   val messageStream = env.addSource(kafkaConsumer)
 .flatMap(in => SDKParse.parseSDK(in, inputTopic))
 .filter(item => item != None)
 .flatMap(item => Some(item).get)
 .map(item => getOrcRow(item))
   messageStream.addSink(getRowSink(distPath))


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9407) Support orc rolling sink writer

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584665#comment-16584665
 ] 

ASF GitHub Bot commented on FLINK-9407:
---

phenixmzy edited a comment on issue #6075: [FLINK-9407] [hdfs connector] 
Support orc rolling sink writer
URL: https://github.com/apache/flink/pull/6075#issuecomment-414035357
 
 
   @zhangminglei 
   the OrcFileWriter with BucketingSink can rolling  when the batchSize is full?
   It seems that one record one file.
   the code:
   ===
   def orcSchemaMetaInfo = String.format(
   "struct<%s,%s,%s...,%s>",
   "nt:string", "event_time:string", "event_id:string", .., 
"appname:string")
   
   def getRowSink(distPath : String) = {
   val sink = new BucketingSink[Row](distPath + "/with-bucket/")
   sink.setBatchSize(1024 * 1024 * 1024)
 .setBucketer(new DateTimeBucketer[Row]("MMdd/HHmm"))
 .setWriter(new OrcFileWriter[Row](orcSchemaMetaInfo))
 .setPartPrefix("sdk-etl")
   sink
 }
   
   def getOrcRow(item : sdkItem) : Row={
   val row = Row.of(
 item.getNt, item.getEvent_time, 
item.getEvent_id,.,item.getAppid,item.getAppname)
   row
   }
   
   
   ...
   val kafkaConsumer = new FlinkKafkaConsumer011(inputTopic, new 
SimpleStringSchema, params.getProperties)
   val messageStream = env.addSource(kafkaConsumer)
 .flatMap(in => SDKParse.parseSDK(in, inputTopic))
 .filter(item => item != None)
 .flatMap(item => Some(item).get)
 .map(item => getOrcRow(item))
   messageStream.addSink(getRowSink(distPath))


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support orc rolling sink writer
> ---
>
> Key: FLINK-9407
> URL: https://issues.apache.org/jira/browse/FLINK-9407
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: zhangminglei
>Assignee: zhangminglei
>Priority: Major
>  Labels: pull-request-available
>
> Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and 
> {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling 
> sink.
> Below, FYI.
> I tested the PR and verify the results with spark sql. Obviously, we can get 
> the results of what we had written down before. But I will give more tests in 
> the next couple of days. Including the performance under compression with 
> short checkpoint intervals. And more UTs.
> {code:java}
> scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21")
> res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more 
> field]
> scala>
> scala> res1.registerTempTable("tablerice")
> warning: there was one deprecation warning; re-run with -deprecation for 
> details
> scala> spark.sql("select * from tablerice")
> res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more 
> field]
> scala> res3.show(3)
> +-+---+---+
> | name|age|married|
> +-+---+---+
> |Sagar| 26|  false|
> |Sagar| 30|  false|
> |Sagar| 34|  false|
> +-+---+---+
> only showing top 3 rows
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] phenixmzy edited a comment on issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink writer

2018-08-17 Thread GitBox
phenixmzy edited a comment on issue #6075: [FLINK-9407] [hdfs connector] 
Support orc rolling sink writer
URL: https://github.com/apache/flink/pull/6075#issuecomment-414035357
 
 
   @zhangminglei 
   the OrcFileWriter with BucketingSink can rolling  when the batchSize is full?
   It seems that one record one file.
   the code:
   ===
   def orcSchemaMetaInfo = String.format(
   "struct<%s,%s,%s...,%s>",
   "nt:string", "event_time:string", "event_id:string", .., 
"appname:string")
   
   def getRowSink(distPath : String) = {
   val sink = new BucketingSink[Row](distPath + "/with-bucket/")
   sink.setBatchSize(1024 * 1024 * 1024)
 .setBucketer(new DateTimeBucketer[Row]("MMdd/HHmm"))
 .setWriter(new OrcFileWriter[Row](orcSchemaMetaInfo))
 .setPartPrefix("sdk-etl")
   sink
 }
   
   def getOrcRow(item : sdkItem) : Row={
   val row = Row.of(
 item.getNt, item.getEvent_time, 
item.getEvent_id,.,item.getAppid,item.getAppname)
   row
   }
   
   
   ...
   val kafkaConsumer = new FlinkKafkaConsumer011(inputTopic, new 
SimpleStringSchema, params.getProperties)
   val messageStream = env.addSource(kafkaConsumer)
 .flatMap(in => SDKParse.parseSDK(in, inputTopic))
 .filter(item => item != None)
 .flatMap(item => Some(item).get)
 .map(item => getOrcRow(item))
   messageStream.addSink(getRowSink(distPath))


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9407) Support orc rolling sink writer

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584664#comment-16584664
 ] 

ASF GitHub Bot commented on FLINK-9407:
---

phenixmzy commented on issue #6075: [FLINK-9407] [hdfs connector] Support orc 
rolling sink writer
URL: https://github.com/apache/flink/pull/6075#issuecomment-414035357
 
 
   @zhangminglei 
   the OrcFileWriter with BucketingSink can rolling  when the batchSize is 
full?It seems that one record one file.
   the code:
   ===
   def orcSchemaMetaInfo = String.format(
   "struct<%s,%s,%s...,%s>",
   "nt:string", "event_time:string", "event_id:string", .., 
"appname:string")
   
   def getRowSink(distPath : String) = {
   val sink = new BucketingSink[Row](distPath + "/with-bucket/")
   sink.setBatchSize(1024 * 1024 * 1024)
 .setBucketer(new DateTimeBucketer[Row]("MMdd/HHmm"))
 .setWriter(new OrcFileWriter[Row](orcSchemaMetaInfo))
 .setPartPrefix("sdk-etl")
   sink
 }
   
   def getOrcRow(item : sdkItem) : Row={
   val row = Row.of(
 item.getNt, item.getEvent_time, 
item.getEvent_id,.,item.getAppid,item.getAppname)
   row
   }
   
   
   ...
   val kafkaConsumer = new FlinkKafkaConsumer011(inputTopic, new 
SimpleStringSchema, params.getProperties)
   val messageStream = env.addSource(kafkaConsumer)
 .flatMap(in => SDKParse.parseSDK(in, inputTopic))
 .filter(item => item != None)
 .flatMap(item => Some(item).get)
 .map(item => getOrcRow(item))
   messageStream.addSink(getRowSink(distPath))


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support orc rolling sink writer
> ---
>
> Key: FLINK-9407
> URL: https://issues.apache.org/jira/browse/FLINK-9407
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: zhangminglei
>Assignee: zhangminglei
>Priority: Major
>  Labels: pull-request-available
>
> Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and 
> {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling 
> sink.
> Below, FYI.
> I tested the PR and verify the results with spark sql. Obviously, we can get 
> the results of what we had written down before. But I will give more tests in 
> the next couple of days. Including the performance under compression with 
> short checkpoint intervals. And more UTs.
> {code:java}
> scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21")
> res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more 
> field]
> scala>
> scala> res1.registerTempTable("tablerice")
> warning: there was one deprecation warning; re-run with -deprecation for 
> details
> scala> spark.sql("select * from tablerice")
> res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more 
> field]
> scala> res3.show(3)
> +-+---+---+
> | name|age|married|
> +-+---+---+
> |Sagar| 26|  false|
> |Sagar| 30|  false|
> |Sagar| 34|  false|
> +-+---+---+
> only showing top 3 rows
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] phenixmzy commented on issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink writer

2018-08-17 Thread GitBox
phenixmzy commented on issue #6075: [FLINK-9407] [hdfs connector] Support orc 
rolling sink writer
URL: https://github.com/apache/flink/pull/6075#issuecomment-414035357
 
 
   @zhangminglei 
   the OrcFileWriter with BucketingSink can rolling  when the batchSize is 
full?It seems that one record one file.
   the code:
   ===
   def orcSchemaMetaInfo = String.format(
   "struct<%s,%s,%s...,%s>",
   "nt:string", "event_time:string", "event_id:string", .., 
"appname:string")
   
   def getRowSink(distPath : String) = {
   val sink = new BucketingSink[Row](distPath + "/with-bucket/")
   sink.setBatchSize(1024 * 1024 * 1024)
 .setBucketer(new DateTimeBucketer[Row]("MMdd/HHmm"))
 .setWriter(new OrcFileWriter[Row](orcSchemaMetaInfo))
 .setPartPrefix("sdk-etl")
   sink
 }
   
   def getOrcRow(item : sdkItem) : Row={
   val row = Row.of(
 item.getNt, item.getEvent_time, 
item.getEvent_id,.,item.getAppid,item.getAppname)
   row
   }
   
   
   ...
   val kafkaConsumer = new FlinkKafkaConsumer011(inputTopic, new 
SimpleStringSchema, params.getProperties)
   val messageStream = env.addSource(kafkaConsumer)
 .flatMap(in => SDKParse.parseSDK(in, inputTopic))
 .filter(item => item != None)
 .flatMap(item => Some(item).get)
 .map(item => getOrcRow(item))
   messageStream.addSink(getRowSink(distPath))


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10068) Add documentation for async/RocksDB-based timers

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584658#comment-16584658
 ] 

ASF GitHub Bot commented on FLINK-10068:


twalthr commented on issue #6504: [FLINK-10068][docs] Add documentation for 
RocksDB-based timers and st…
URL: https://github.com/apache/flink/pull/6504#issuecomment-414035015
 
 
   Thanks for the update @StefanRRichter. LGTM % the single comment. Feel free 
to merge this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add documentation for async/RocksDB-based timers
> 
>
> Key: FLINK-10068
> URL: https://issues.apache.org/jira/browse/FLINK-10068
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> Documentation how to activate RocksDB based timers, and update that 
> snapshotting now works async, expect for heap-timers + 
> rocks-incremental-snapshot).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on issue #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…

2018-08-17 Thread GitBox
twalthr commented on issue #6504: [FLINK-10068][docs] Add documentation for 
RocksDB-based timers and st…
URL: https://github.com/apache/flink/pull/6504#issuecomment-414035015
 
 
   Thanks for the update @StefanRRichter. LGTM % the single comment. Feel free 
to merge this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10068) Add documentation for async/RocksDB-based timers

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584657#comment-16584657
 ] 

ASF GitHub Bot commented on FLINK-10068:


twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add 
documentation for RocksDB-based timers and st…
URL: https://github.com/apache/flink/pull/6504#discussion_r211066338
 
 

 ##
 File path: docs/dev/stream/operators/process_function.md
 ##
 @@ -277,19 +277,18 @@ Both types of timers (processing-time and event-time) 
are internally maintained
 
 The `TimerService` deduplicates timers per key and timestamp, i.e., there is 
at most one timer per key and timestamp. If multiple timers are registered for 
the same timestamp, the `onTimer()` method will be called just once.
 
-**Note:** Flink synchronizes invocations of `onTimer()` and 
`processElement()`. Hence, users do not have to worry about concurrent 
modification of state.
+Note Flink synchronizes invocations of 
`onTimer()` and `processElement()`. Hence, users do not have to worry about 
concurrent modification of state.
 
 ### Fault Tolerance
 
 Timers are fault tolerant and checkpointed along with the state of the 
application. 
 In case of a failure recovery or when starting an application from a 
savepoint, the timers are restored.
 
-**Note:** Checkpointed processing-time timers that were supposed to fire 
before their restoration, will fire immediately. 
+Note Checkpointed processing-time timers 
that were supposed to fire before their restoration, will fire immediately.
 This might happen when an application recovers from a failure or when it is 
started from a savepoint.
 
-**Note:** Timers are always synchronously checkpointed, regardless of the 
configuration of the state backends. 
-Therefore, a large number of timers can significantly increase checkpointing 
time. 
-See the "Timer Coalescing" section for advice on how to reduce the number of 
timers.
+Note Timers are always asynchronously 
checkpointed, except for the combination of RocksDB backend / with incremental 
snapshots / with heap-based timers (will be resolved with `FLINK-10026`).
 
 Review comment:
   If there is nothing more to say in your opinion, then I'm fine with this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add documentation for async/RocksDB-based timers
> 
>
> Key: FLINK-10068
> URL: https://issues.apache.org/jira/browse/FLINK-10068
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> Documentation how to activate RocksDB based timers, and update that 
> snapshotting now works async, expect for heap-timers + 
> rocks-incremental-snapshot).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…

2018-08-17 Thread GitBox
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add 
documentation for RocksDB-based timers and st…
URL: https://github.com/apache/flink/pull/6504#discussion_r211066338
 
 

 ##
 File path: docs/dev/stream/operators/process_function.md
 ##
 @@ -277,19 +277,18 @@ Both types of timers (processing-time and event-time) 
are internally maintained
 
 The `TimerService` deduplicates timers per key and timestamp, i.e., there is 
at most one timer per key and timestamp. If multiple timers are registered for 
the same timestamp, the `onTimer()` method will be called just once.
 
-**Note:** Flink synchronizes invocations of `onTimer()` and 
`processElement()`. Hence, users do not have to worry about concurrent 
modification of state.
+Note Flink synchronizes invocations of 
`onTimer()` and `processElement()`. Hence, users do not have to worry about 
concurrent modification of state.
 
 ### Fault Tolerance
 
 Timers are fault tolerant and checkpointed along with the state of the 
application. 
 In case of a failure recovery or when starting an application from a 
savepoint, the timers are restored.
 
-**Note:** Checkpointed processing-time timers that were supposed to fire 
before their restoration, will fire immediately. 
+Note Checkpointed processing-time timers 
that were supposed to fire before their restoration, will fire immediately.
 This might happen when an application recovers from a failure or when it is 
started from a savepoint.
 
-**Note:** Timers are always synchronously checkpointed, regardless of the 
configuration of the state backends. 
-Therefore, a large number of timers can significantly increase checkpointing 
time. 
-See the "Timer Coalescing" section for advice on how to reduce the number of 
timers.
+Note Timers are always asynchronously 
checkpointed, except for the combination of RocksDB backend / with incremental 
snapshots / with heap-based timers (will be resolved with `FLINK-10026`).
 
 Review comment:
   If there is nothing more to say in your opinion, then I'm fine with this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10068) Add documentation for async/RocksDB-based timers

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584656#comment-16584656
 ] 

ASF GitHub Bot commented on FLINK-10068:


twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add 
documentation for RocksDB-based timers and st…
URL: https://github.com/apache/flink/pull/6504#discussion_r211066305
 
 

 ##
 File path: docs/dev/stream/operators/process_function.md
 ##
 @@ -277,19 +277,18 @@ Both types of timers (processing-time and event-time) 
are internally maintained
 
 The `TimerService` deduplicates timers per key and timestamp, i.e., there is 
at most one timer per key and timestamp. If multiple timers are registered for 
the same timestamp, the `onTimer()` method will be called just once.
 
 Review comment:
   Writing more docs about this might be a bigger change but moving 10 lines 
into a subsection on a different page can be done in this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add documentation for async/RocksDB-based timers
> 
>
> Key: FLINK-10068
> URL: https://issues.apache.org/jira/browse/FLINK-10068
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> Documentation how to activate RocksDB based timers, and update that 
> snapshotting now works async, expect for heap-timers + 
> rocks-incremental-snapshot).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…

2018-08-17 Thread GitBox
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add 
documentation for RocksDB-based timers and st…
URL: https://github.com/apache/flink/pull/6504#discussion_r211066305
 
 

 ##
 File path: docs/dev/stream/operators/process_function.md
 ##
 @@ -277,19 +277,18 @@ Both types of timers (processing-time and event-time) 
are internally maintained
 
 The `TimerService` deduplicates timers per key and timestamp, i.e., there is 
at most one timer per key and timestamp. If multiple timers are registered for 
the same timestamp, the `onTimer()` method will be called just once.
 
 Review comment:
   Writing more docs about this might be a bigger change but moving 10 lines 
into a subsection on a different page can be done in this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client

2018-08-17 Thread Shuyi Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584645#comment-16584645
 ] 

Shuyi Chen commented on FLINK-10163:


I think we can add this ticket as part of the DDL Flip, what do you think, 
[~twalthr]?

> Support CREATE VIEW in SQL Client
> -
>
> Key: FLINK-10163
> URL: https://issues.apache.org/jira/browse/FLINK-10163
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> The possibility to define a name for a subquery would improve the usability 
> of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a 
> virtual table.
>  
> Example:
> {code}
>  CREATE VIEW viewName
>  [ '(' columnName [, columnName ]* ')' ]
>  AS Query
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client

2018-08-17 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584631#comment-16584631
 ] 

Hequn Cheng commented on FLINK-10163:
-

This would be a great feature.

> Support CREATE VIEW in SQL Client
> -
>
> Key: FLINK-10163
> URL: https://issues.apache.org/jira/browse/FLINK-10163
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> The possibility to define a name for a subquery would improve the usability 
> of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a 
> virtual table.
>  
> Example:
> {code}
>  CREATE VIEW viewName
>  [ '(' columnName [, columnName ]* ')' ]
>  AS Query
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10163) Support CREATE VIEW in SQL Client

2018-08-17 Thread Hequn Cheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng updated FLINK-10163:

Description: 
The possibility to define a name for a subquery would improve the usability of 
the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a 
virtual table.

 

Example:

{code}
 CREATE VIEW viewName
 [ '(' columnName [, columnName ]* ')' ]
 AS Query
{code}

  was:
The possibility to define a name for a subquery would improve the usability of 
the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a 
virtual table.

 

Example:

{code}
CREATE VIEW view_name AS SELECT 
{code}


> Support CREATE VIEW in SQL Client
> -
>
> Key: FLINK-10163
> URL: https://issues.apache.org/jira/browse/FLINK-10163
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> The possibility to define a name for a subquery would improve the usability 
> of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a 
> virtual table.
>  
> Example:
> {code}
>  CREATE VIEW viewName
>  [ '(' columnName [, columnName ]* ')' ]
>  AS Query
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-08-17 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584620#comment-16584620
 ] 

Hequn Cheng commented on FLINK-10156:
-

{{insertInto()}} requires a configured {{TableSink}} while {{writeToSink}} 
doesn't. Would it add cost to use table-api since we have to pass fieldNames 
and fieldTypes?

> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Priority: Major
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10168) support filtering files by modified/created time in StreamExecutionEnvironment.readFile()

2018-08-17 Thread Bowen Li (JIRA)
Bowen Li created FLINK-10168:


 Summary: support filtering files by modified/created time in 
StreamExecutionEnvironment.readFile()
 Key: FLINK-10168
 URL: https://issues.apache.org/jira/browse/FLINK-10168
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Affects Versions: 1.6.0
Reporter: Bowen Li
 Fix For: 1.7.0


support filtering files by modified/created time in 
{{StreamExecutionEnvironment.readFile()}}

for example, in a source dir with lots of file, we only want to read files that 
is created or modified after a specific time.

This API can expose a generic filter function of files, and let users define 
filtering rules. Currently Flink only supports filtering files by path.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8868) Support Table Function as Table for Stream Sql

2018-08-17 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-8868:
--
Description: 
for stream sql:
support SQL like:  SELECT * FROM Lateral TABLE(tf("a"))
for batch sql:
udtf might produce infinite recors, it need to be discussed

  was:support SQL like:  SELECT * FROM TABLE(tf("a"))


> Support Table Function as Table for Stream Sql
> --
>
> Key: FLINK-8868
> URL: https://issues.apache.org/jira/browse/FLINK-8868
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>
> for stream sql:
> support SQL like:  SELECT * FROM Lateral TABLE(tf("a"))
> for batch sql:
> udtf might produce infinite recors, it need to be discussed



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10167) SessionWindows not compatible with typed DataStreams in scala

2018-08-17 Thread Andrew Roberts (JIRA)
Andrew Roberts created FLINK-10167:
--

 Summary: SessionWindows not compatible with typed DataStreams in 
scala
 Key: FLINK-10167
 URL: https://issues.apache.org/jira/browse/FLINK-10167
 Project: Flink
  Issue Type: Bug
Reporter: Andrew Roberts


I'm trying to construct a trivial job that uses session windows, and it looks 
like the data type parameter is hardcoded to `Object`/`AnyRef`. Due to the 
invariance of java classes in scala, this means that we can't use the provided 
SessionWindow helper classes in scala on typed streams.

 

Example job:
{code:java}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
import 
org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
import org.apache.flink.util.Collector

object TestJob {
  val jobName = "TestJob"

  def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

env.fromCollection(Range(0, 100).toList)
  .keyBy(_ / 10)
  .window(ProcessingTimeSessionWindows.withGap(Time.minutes(1)))
  .reduce(
(a: Int, b: Int) => a + b,
(key: Int, window: Window, items: Iterable[Int], out: 
Collector[String]) => s"${key}: ${items}"
  )
  .map(println(_))

env.execute(jobName)
  }
}{code}
 

Compile error:
{code:java}
[error]  found   : 
org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows
[error]  required: 
org.apache.flink.streaming.api.windowing.assigners.WindowAssigner[_ >: Int, ?]
[error] Note: Object <: Any (and 
org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows 
<: 
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner[Object,org.apache.flink.streaming.api.windowing.windows.TimeWindow]),
 but Java-defined class WindowAssigner is invariant in type T.
[error] You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 
3.2.10)
[error]       
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(1))){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections

2018-08-17 Thread JIRA


[ 
https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584231#comment-16584231
 ] 

Dominik Wosiński commented on FLINK-10052:
--

I will try to fix that :)

 

> Tolerate temporarily suspended ZooKeeper connections
> 
>
> Key: FLINK-10052
> URL: https://issues.apache.org/jira/browse/FLINK-10052
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.2, 1.5.2, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Dominik Wosiński
>Priority: Major
>
> This issue results from FLINK-10011 which uncovered a problem with Flink's HA 
> recovery and proposed the following solution to harden Flink:
> The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator 
> recipe for leader election. The leader latch revokes leadership in case of a 
> suspended ZooKeeper connection. This can be premature in case that the system 
> can reconnect to ZooKeeper before its session expires. The effect of the lost 
> leadership is that all jobs will be canceled and directly restarted after 
> regaining the leadership.
> Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper 
> connection, it would be better to wait until the ZooKeeper connection is 
> LOST. That way we would allow the system to reconnect and not lose the 
> leadership. This could be achievable by using Curator's {{LeaderSelector}} 
> instead of the {{LeaderLatch}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections

2018-08-17 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dominik Wosiński reassigned FLINK-10052:


Assignee: Dominik Wosiński

> Tolerate temporarily suspended ZooKeeper connections
> 
>
> Key: FLINK-10052
> URL: https://issues.apache.org/jira/browse/FLINK-10052
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.2, 1.5.2, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Dominik Wosiński
>Priority: Major
>
> This issue results from FLINK-10011 which uncovered a problem with Flink's HA 
> recovery and proposed the following solution to harden Flink:
> The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator 
> recipe for leader election. The leader latch revokes leadership in case of a 
> suspended ZooKeeper connection. This can be premature in case that the system 
> can reconnect to ZooKeeper before its session expires. The effect of the lost 
> leadership is that all jobs will be canceled and directly restarted after 
> regaining the leadership.
> Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper 
> connection, it would be better to wait until the ZooKeeper connection is 
> LOST. That way we would allow the system to reconnect and not lose the 
> leadership. This could be achievable by using Curator's {{LeaderSelector}} 
> instead of the {{LeaderLatch}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10119) JsonRowDeserializationSchema deserialize kafka message

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584206#comment-16584206
 ] 

ASF GitHub Bot commented on FLINK-10119:


buptljy commented on issue #6571: [FLINK-10119]- Add failure handlers for 
JsonRowDeserializationSchema
URL: https://github.com/apache/flink/pull/6571#issuecomment-413939315
 
 
   I think the failure is not caused by this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JsonRowDeserializationSchema deserialize kafka message
> --
>
> Key: FLINK-10119
> URL: https://issues.apache.org/jira/browse/FLINK-10119
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.1
> Environment: 无
>Reporter: sean.miao
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> Recently, we are using Kafka010JsonTableSource to process kafka's json 
> messages.We turned on checkpoint and auto-restart strategy .
> We found that as long as the format of a message is not json, it will cause 
> the job to not be pulled up. Of course, this is to ensure that only once 
> processing or at least once processing, but the resulting application is not 
> available and has a greater impact on us.
> the code is :
> class : JsonRowDeserializationSchema
> function :
> @Override
>  public Row deserialize(byte[] message) throws IOException {
>  try
> { final JsonNode root = objectMapper.readTree(message); return 
> convertRow(root, (RowTypeInfo) typeInfo); }
> catch (Throwable t)
> { throw new IOException("Failed to deserialize JSON object.", t); }
> }
> now ,i change it to  :
> public Row deserialize(byte[] message) throws IOException {
>  try
> { JsonNode root = this.objectMapper.readTree(message); return 
> this.convertRow(root, (RowTypeInfo)this.typeInfo); }
> catch (Throwable var4) {
>  message = this.objectMapper.writeValueAsBytes("{}");
>  JsonNode root = this.objectMapper.readTree(message);
>  return this.convertRow(root, (RowTypeInfo)this.typeInfo);
>  }
>  }
>  
> I think that data format errors are inevitable during network transmission, 
> so can we add a new column to the table for the wrong data format? like spark 
> sql does。
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10119) JsonRowDeserializationSchema deserialize kafka message

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584207#comment-16584207
 ] 

ASF GitHub Bot commented on FLINK-10119:


buptljy edited a comment on issue #6571: [FLINK-10119]- Add failure handlers 
for JsonRowDeserializationSchema
URL: https://github.com/apache/flink/pull/6571#issuecomment-413939315
 
 
   I think the travis testing failure is not caused by this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JsonRowDeserializationSchema deserialize kafka message
> --
>
> Key: FLINK-10119
> URL: https://issues.apache.org/jira/browse/FLINK-10119
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.1
> Environment: 无
>Reporter: sean.miao
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> Recently, we are using Kafka010JsonTableSource to process kafka's json 
> messages.We turned on checkpoint and auto-restart strategy .
> We found that as long as the format of a message is not json, it will cause 
> the job to not be pulled up. Of course, this is to ensure that only once 
> processing or at least once processing, but the resulting application is not 
> available and has a greater impact on us.
> the code is :
> class : JsonRowDeserializationSchema
> function :
> @Override
>  public Row deserialize(byte[] message) throws IOException {
>  try
> { final JsonNode root = objectMapper.readTree(message); return 
> convertRow(root, (RowTypeInfo) typeInfo); }
> catch (Throwable t)
> { throw new IOException("Failed to deserialize JSON object.", t); }
> }
> now ,i change it to  :
> public Row deserialize(byte[] message) throws IOException {
>  try
> { JsonNode root = this.objectMapper.readTree(message); return 
> this.convertRow(root, (RowTypeInfo)this.typeInfo); }
> catch (Throwable var4) {
>  message = this.objectMapper.writeValueAsBytes("{}");
>  JsonNode root = this.objectMapper.readTree(message);
>  return this.convertRow(root, (RowTypeInfo)this.typeInfo);
>  }
>  }
>  
> I think that data format errors are inevitable during network transmission, 
> so can we add a new column to the table for the wrong data format? like spark 
> sql does。
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] buptljy edited a comment on issue #6571: [FLINK-10119]- Add failure handlers for JsonRowDeserializationSchema

2018-08-17 Thread GitBox
buptljy edited a comment on issue #6571: [FLINK-10119]- Add failure handlers 
for JsonRowDeserializationSchema
URL: https://github.com/apache/flink/pull/6571#issuecomment-413939315
 
 
   I think the travis testing failure is not caused by this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] buptljy commented on issue #6571: [FLINK-10119]- Add failure handlers for JsonRowDeserializationSchema

2018-08-17 Thread GitBox
buptljy commented on issue #6571: [FLINK-10119]- Add failure handlers for 
JsonRowDeserializationSchema
URL: https://github.com/apache/flink/pull/6571#issuecomment-413939315
 
 
   I think the failure is not caused by this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10153) Add tutorial section to documentation

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584183#comment-16584183
 ] 

ASF GitHub Bot commented on FLINK-10153:


fhueske commented on issue #6565: [FLINK-10153] [docs] Add Tutorials section 
and rework structure.
URL: https://github.com/apache/flink/pull/6565#issuecomment-413933284
 
 
   Thanks everyone for the comments so far.
   I fixed broken links, added redirects for pages that I moved, and pushed an 
update.
   
   Let me know what you think.
   Best, Fabian


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add tutorial section to documentation
> -
>
> Key: FLINK-10153
> URL: https://issues.apache.org/jira/browse/FLINK-10153
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> The current documentation does not feature a dedicated tutorials section and 
> has a few issues that should be fix in order to help our (future) users 
> getting started with Flink.
> I propose to add a single "Tutorials" section to the documentation where 
> users find step-by-step guides. The tutorials section help users with 
> different goals:
>   * Get a quick idea of the overall system
>   * Implement a DataStream/DataSet/Table API/SQL job
>   * Set up Flink on a local machine (or run a Docker container)
> There are already a few guides to get started but they are located at 
> different places and should be moved into the Tutorials section. Moreover, 
> some sections such as "Project Setup" contain content that addresses users 
> with very different intentions.
> I propose to
> * add a new Tutorials section and move all existing tutorials there (and 
> later add new ones).
> * move the "Quickstart" section to "Tutorials".
> * remove the "Project Setup" section and move the pages to other sections 
> (some pages will be split up or adjusted).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske commented on issue #6565: [FLINK-10153] [docs] Add Tutorials section and rework structure.

2018-08-17 Thread GitBox
fhueske commented on issue #6565: [FLINK-10153] [docs] Add Tutorials section 
and rework structure.
URL: https://github.com/apache/flink/pull/6565#issuecomment-413933284
 
 
   Thanks everyone for the comments so far.
   I fixed broken links, added redirects for pages that I moved, and pushed an 
update.
   
   Let me know what you think.
   Best, Fabian


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10059) Add LTRIM supported in Table API and SQL

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584110#comment-16584110
 ] 

ASF GitHub Bot commented on FLINK-10059:


yanghua commented on issue #6494: [FLINK-10059] [table] Add LTRIM supported in 
Table API and SQL
URL: https://github.com/apache/flink/pull/6494#issuecomment-413909036
 
 
   @xccui the travis failure is not caused by this PR's change. I have updated 
the RTRIM PR : #6509 , please review again thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add LTRIM supported in Table API and SQL
> 
>
> Key: FLINK-10059
> URL: https://issues.apache.org/jira/browse/FLINK-10059
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> refer to MYSQL ltrim function : 
> https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ltrim



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6494: [FLINK-10059] [table] Add LTRIM supported in Table API and SQL

2018-08-17 Thread GitBox
yanghua commented on issue #6494: [FLINK-10059] [table] Add LTRIM supported in 
Table API and SQL
URL: https://github.com/apache/flink/pull/6494#issuecomment-413909036
 
 
   @xccui the travis failure is not caused by this PR's change. I have updated 
the RTRIM PR : #6509 , please review again thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9610) Add Kafka partitioner that uses the key to partition by

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584046#comment-16584046
 ] 

ASF GitHub Bot commented on FLINK-9610:
---

nielsbasjes commented on issue #6181: [FLINK-9610] [flink-connector-kafka-base] 
Add Kafka Partitioner that uses the hash of the provided key.
URL: https://github.com/apache/flink/pull/6181#issuecomment-413893719
 
 
   @tzulitai
   I did some more digging and it is in fact there is a good reason for this 
patch.
   
   Have a look at 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
   
   There you'll find constructors with essentially the following variations in 
parameters:
   
   - Either a SerializationSchema or a KeyedSerializationSchema
   - Either a FlinkKafkaPartitioner or no partitioner which means it actually 
uses FlinkFixedPartitioner as the default.
   
   Looking at all of these constructors
   
   1. If you do not specify a partitioner then all constructors use the 
FlinkFixedPartitioner.
   2. If you do specify a partitioner then it will use that partitioner.
   
   Even the constructor that uses a KeyedSerializationSchema will NOT use that 
key for the partitioning (which we saw in production which caused problems).
   Essentially the current FlinkKafkaProducer API makes it very hard to 'not' 
specify a partitioner and use the hash(key) partitioning which is nativly 
present in Kafka.
   
   So give the current API we came to the conclusion an extra partitioner is 
needed.
   
   Because the Flink API works this way I never looked deeper into base code to 
see how it really moves below the surface.
   
   Given what I understand now I see two viable ways forward:
   
   1. We change the behavior of the API so that if a KeyedSerializationSchema 
is used that in that case the hash of the key will be used by Kafka to 
partition the data over. This is however an impact full change in the way the 
API behaves. I.e. breaking the API.
   2. We simply add the partitioner I created.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Kafka partitioner that uses the key to partition by
> ---
>
> Key: FLINK-9610
> URL: https://issues.apache.org/jira/browse/FLINK-9610
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Major
>  Labels: pull-request-available
>
> The kafka connector package only contains the FlinkFixedPartitioner 
> implementation of the FlinkKafkaPartitioner.
> The most common usecase I have seen is the need to spread the records across 
> the Kafka partitions while keeping all messages with the same key together.
> I'll put up a pull request with a very simple implementation that should make 
> this a lot easier for others to use and extend.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] nielsbasjes commented on issue #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafka Partitioner that uses the hash of the provided key.

2018-08-17 Thread GitBox
nielsbasjes commented on issue #6181: [FLINK-9610] [flink-connector-kafka-base] 
Add Kafka Partitioner that uses the hash of the provided key.
URL: https://github.com/apache/flink/pull/6181#issuecomment-413893719
 
 
   @tzulitai
   I did some more digging and it is in fact there is a good reason for this 
patch.
   
   Have a look at 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
   
   There you'll find constructors with essentially the following variations in 
parameters:
   
   - Either a SerializationSchema or a KeyedSerializationSchema
   - Either a FlinkKafkaPartitioner or no partitioner which means it actually 
uses FlinkFixedPartitioner as the default.
   
   Looking at all of these constructors
   
   1. If you do not specify a partitioner then all constructors use the 
FlinkFixedPartitioner.
   2. If you do specify a partitioner then it will use that partitioner.
   
   Even the constructor that uses a KeyedSerializationSchema will NOT use that 
key for the partitioning (which we saw in production which caused problems).
   Essentially the current FlinkKafkaProducer API makes it very hard to 'not' 
specify a partitioner and use the hash(key) partitioning which is nativly 
present in Kafka.
   
   So give the current API we came to the conclusion an extra partitioner is 
needed.
   
   Because the Flink API works this way I never looked deeper into base code to 
see how it really moves below the surface.
   
   Given what I understand now I see two viable ways forward:
   
   1. We change the behavior of the API so that if a KeyedSerializationSchema 
is used that in that case the hash of the key will be used by Kafka to 
partition the data over. This is however an impact full change in the way the 
API behaves. I.e. breaking the API.
   2. We simply add the partitioner I created.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes opened a new pull request #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafka Partitioner that uses the hash of the provided key.

2018-08-17 Thread GitBox
nielsbasjes opened a new pull request #6181: [FLINK-9610] 
[flink-connector-kafka-base] Add Kafka Partitioner that uses the hash of the 
provided key.
URL: https://github.com/apache/flink/pull/6181
 
 
   ## What is the purpose of the change
   
   Add the simple feature of being able to route records into Kafka using a key 
based partitioning.
   
   ## Brief change log
   
   - Added the FlinkKeyHashPartitioner class with some tests.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   - Use this instead of the FlinkFixedPartitioner while instantiating the 
FlinkKafkaProducer. Also add an KeyedSerializationSchema implementation that 
returns the right key that is to be used.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no 
 - The S3 file system connector:  no 
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? JavaDocs
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9610) Add Kafka partitioner that uses the key to partition by

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584007#comment-16584007
 ] 

ASF GitHub Bot commented on FLINK-9610:
---

nielsbasjes opened a new pull request #6181: [FLINK-9610] 
[flink-connector-kafka-base] Add Kafka Partitioner that uses the hash of the 
provided key.
URL: https://github.com/apache/flink/pull/6181
 
 
   ## What is the purpose of the change
   
   Add the simple feature of being able to route records into Kafka using a key 
based partitioning.
   
   ## Brief change log
   
   - Added the FlinkKeyHashPartitioner class with some tests.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   - Use this instead of the FlinkFixedPartitioner while instantiating the 
FlinkKafkaProducer. Also add an KeyedSerializationSchema implementation that 
returns the right key that is to be used.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no 
 - The S3 file system connector:  no 
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? JavaDocs
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Kafka partitioner that uses the key to partition by
> ---
>
> Key: FLINK-9610
> URL: https://issues.apache.org/jira/browse/FLINK-9610
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Major
>  Labels: pull-request-available
>
> The kafka connector package only contains the FlinkFixedPartitioner 
> implementation of the FlinkKafkaPartitioner.
> The most common usecase I have seen is the need to spread the records across 
> the Kafka partitions while keeping all messages with the same key together.
> I'll put up a pull request with a very simple implementation that should make 
> this a lot easier for others to use and extend.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9610) Add Kafka partitioner that uses the key to partition by

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584000#comment-16584000
 ] 

ASF GitHub Bot commented on FLINK-9610:
---

nielsbasjes closed pull request #6181: [FLINK-9610] 
[flink-connector-kafka-base] Add Kafka Partitioner that uses the hash of the 
provided key.
URL: https://github.com/apache/flink/pull/6181
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKeyHashPartitioner.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKeyHashPartitioner.java
new file mode 100644
index 000..7a10d6a34a8
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKeyHashPartitioner.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+
+/**
+ * A partitioner that uses the hash of the provided key to distribute
+ * the values over the partitions as evenly as possible.
+ * This partitioner ensures that all records with the same key will be sent to
+ * the same Kafka partition.
+ *
+ * Note that this will cause a lot of network connections to be created 
between
+ * all the Flink instances and all the Kafka brokers.
+ */
+@PublicEvolving
+public class FlinkKeyHashPartitioner extends FlinkKafkaPartitioner {
+
+   private static final long serialVersionUID = -2006468063065010594L;
+
+   @Override
+   public int partition(T record, byte[] key, byte[] value, String 
targetTopic, int[] partitions) {
+   Preconditions.checkArgument(
+   partitions != null && partitions.length > 0,
+   "Partitions of the target topic is empty.");
+
+   return partitions[Math.abs(hash(key)) % partitions.length];
+   }
+
+   /**
+* The overridable implementation of the hashing algorithm.
+* @param key The key of the provided record on which the partition 
selection is based. (key can be null!)
+* @return The hash value for the provided key.
+*/
+   protected int hash(@Nullable byte[] key) {
+   return Arrays.hashCode(key);
+   }
+
+}
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKeyHashPartitionerTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKeyHashPartitionerTest.java
new file mode 100644
index 000..cdf8afb7181
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKeyHashPartitionerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package or

[jira] [Commented] (FLINK-9610) Add Kafka partitioner that uses the key to partition by

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583999#comment-16583999
 ] 

ASF GitHub Bot commented on FLINK-9610:
---

nielsbasjes commented on issue #6181: [FLINK-9610] [flink-connector-kafka-base] 
Add Kafka Partitioner that uses the hash of the provided key.
URL: https://github.com/apache/flink/pull/6181#issuecomment-413882146
 
 
   @tzulitai
   Thanks for pointing this out.
   I did some digging into the code and you are right.
   This pull request is needless.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Kafka partitioner that uses the key to partition by
> ---
>
> Key: FLINK-9610
> URL: https://issues.apache.org/jira/browse/FLINK-9610
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Major
>  Labels: pull-request-available
>
> The kafka connector package only contains the FlinkFixedPartitioner 
> implementation of the FlinkKafkaPartitioner.
> The most common usecase I have seen is the need to spread the records across 
> the Kafka partitions while keeping all messages with the same key together.
> I'll put up a pull request with a very simple implementation that should make 
> this a lot easier for others to use and extend.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9610) Add Kafka partitioner that uses the key to partition by

2018-08-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9610:
--
Labels: pull-request-available  (was: )

> Add Kafka partitioner that uses the key to partition by
> ---
>
> Key: FLINK-9610
> URL: https://issues.apache.org/jira/browse/FLINK-9610
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Major
>  Labels: pull-request-available
>
> The kafka connector package only contains the FlinkFixedPartitioner 
> implementation of the FlinkKafkaPartitioner.
> The most common usecase I have seen is the need to spread the records across 
> the Kafka partitions while keeping all messages with the same key together.
> I'll put up a pull request with a very simple implementation that should make 
> this a lot easier for others to use and extend.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] nielsbasjes closed pull request #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafka Partitioner that uses the hash of the provided key.

2018-08-17 Thread GitBox
nielsbasjes closed pull request #6181: [FLINK-9610] 
[flink-connector-kafka-base] Add Kafka Partitioner that uses the hash of the 
provided key.
URL: https://github.com/apache/flink/pull/6181
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKeyHashPartitioner.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKeyHashPartitioner.java
new file mode 100644
index 000..7a10d6a34a8
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKeyHashPartitioner.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+
+/**
+ * A partitioner that uses the hash of the provided key to distribute
+ * the values over the partitions as evenly as possible.
+ * This partitioner ensures that all records with the same key will be sent to
+ * the same Kafka partition.
+ *
+ * Note that this will cause a lot of network connections to be created 
between
+ * all the Flink instances and all the Kafka brokers.
+ */
+@PublicEvolving
+public class FlinkKeyHashPartitioner extends FlinkKafkaPartitioner {
+
+   private static final long serialVersionUID = -2006468063065010594L;
+
+   @Override
+   public int partition(T record, byte[] key, byte[] value, String 
targetTopic, int[] partitions) {
+   Preconditions.checkArgument(
+   partitions != null && partitions.length > 0,
+   "Partitions of the target topic is empty.");
+
+   return partitions[Math.abs(hash(key)) % partitions.length];
+   }
+
+   /**
+* The overridable implementation of the hashing algorithm.
+* @param key The key of the provided record on which the partition 
selection is based. (key can be null!)
+* @return The hash value for the provided key.
+*/
+   protected int hash(@Nullable byte[] key) {
+   return Arrays.hashCode(key);
+   }
+
+}
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKeyHashPartitionerTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKeyHashPartitionerTest.java
new file mode 100644
index 000..cdf8afb7181
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKeyHashPartitionerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKeyHashPartitioner;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+/**
+ * Tests for the {@link F

[GitHub] nielsbasjes commented on issue #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafka Partitioner that uses the hash of the provided key.

2018-08-17 Thread GitBox
nielsbasjes commented on issue #6181: [FLINK-9610] [flink-connector-kafka-base] 
Add Kafka Partitioner that uses the hash of the provided key.
URL: https://github.com/apache/flink/pull/6181#issuecomment-413882146
 
 
   @tzulitai
   Thanks for pointing this out.
   I did some digging into the code and you are right.
   This pull request is needless.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10142) Reduce synchronization overhead for credit notifications

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583981#comment-16583981
 ] 

ASF GitHub Bot commented on FLINK-10142:


pnowojski commented on a change in pull request #6555: [FLINK-10142][network] 
reduce locking around credit notification
URL: https://github.com/apache/flink/pull/6555#discussion_r210915852
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
 ##
 @@ -357,19 +357,20 @@ private void writeAndFlushNextMessageIfPossible(Channel 
channel) {
return;
}
 
-   //It is no need to notify credit for the released 
channel.
-   if (!inputChannel.isReleased()) {
-   AddCredit msg = new AddCredit(
-   inputChannel.getPartitionId(),
-   
inputChannel.getAndResetUnannouncedCredit(),
-   inputChannel.getInputChannelId());
+   if (inputChannel.isReleased()) {
 
 Review comment:
   Are those changes in this file optimising anything? Or is this  irrelevant 
change to the the rest of the commit?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce synchronization overhead for credit notifications
> 
>
> Key: FLINK-10142
> URL: https://issues.apache.org/jira/browse/FLINK-10142
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.2, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> When credit-based flow control was introduced, we also added some checks and 
> optimisations for uncommon code paths that make common code paths 
> unnecessarily more expensive, e.g. checking whether a channel was released 
> before forwarding a credit notification to Netty. Such checks would have to 
> be confirmed by the Netty thread anyway and thus only add additional load for 
> something that happens only once (per channel).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10142) Reduce synchronization overhead for credit notifications

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583985#comment-16583985
 ] 

ASF GitHub Bot commented on FLINK-10142:


pnowojski commented on a change in pull request #6555: [FLINK-10142][network] 
reduce locking around credit notification
URL: https://github.com/apache/flink/pull/6555#discussion_r210917887
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ##
 @@ -509,33 +514,40 @@ void onSenderBacklog(int backlog) throws IOException {
}
 
public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) 
throws IOException {
-   boolean success = false;
+   boolean recycleBuffer = true;
 
try {
+
+   final boolean wasEmpty;
synchronized (receivedBuffers) {
-   if (!isReleased.get()) {
-   if (expectedSequenceNumber == 
sequenceNumber) {
-   int available = 
receivedBuffers.size();
+   // Similar to notifyBufferAvailable(), make 
sure that we never add a buffer
+   // after releaseAllResources() released all 
buffers from receivedBuffers
+   // (see above for details).
+   if (isReleased.get()) {
+   return;
+   }
 
-   receivedBuffers.add(buffer);
-   expectedSequenceNumber++;
+   if (expectedSequenceNumber != sequenceNumber) {
+   onError(new 
BufferReorderingException(expectedSequenceNumber, sequenceNumber));
+   return;
+   }
 
-   if (available == 0) {
-   notifyChannelNonEmpty();
-   }
+   wasEmpty = receivedBuffers.isEmpty();
+   receivedBuffers.add(buffer);
+   recycleBuffer = false;
+   }
 
-   success = true;
-   } else {
-   onError(new 
BufferReorderingException(expectedSequenceNumber, sequenceNumber));
-   }
-   }
+   ++expectedSequenceNumber;
+
+   if (wasEmpty) {
+   notifyChannelNonEmpty();
 
 Review comment:
   has moving this line from under the lock improved performance in some case? 
If not, commit title
   > optimisations reducing lock contention
   
   is misleading and I would change it to something about refactor/clean up.
   
   Btw, again I do not like such squashing multiple changes in one commit. It 
actually took me ~10 minutes to realise and make sure what this change is all 
about and if there are any other meaningful changes or not here. While if you 
split this commit into `[hotfix][network] refactor/cleanups` and 
`[FLINK-10141][network] move notifyChannelNonEmpty outside of synchronised 
block` it would be much much easier to review...
   
   Could you split this before merging?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce synchronization overhead for credit notifications
> 
>
> Key: FLINK-10142
> URL: https://issues.apache.org/jira/browse/FLINK-10142
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.2, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> When credit-based flow control was introduced, we also added some checks and 
> optimisations for uncommon code paths that make common code paths 
> unnecessarily more expensive, e.g. checking whether a channel was released 
> before forwarding a credit notification to Netty. Such checks would have to 
> be confirmed by the Netty thread anyway and thus only add additional load for 
> something that happens only once (per channel).



--
This message was sent by Atlassian JIR

[jira] [Commented] (FLINK-10142) Reduce synchronization overhead for credit notifications

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583984#comment-16583984
 ] 

ASF GitHub Bot commented on FLINK-10142:


pnowojski commented on a change in pull request #6555: [FLINK-10142][network] 
reduce locking around credit notification
URL: https://github.com/apache/flink/pull/6555#discussion_r210916528
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ##
 @@ -191,15 +191,15 @@ void retriggerSubpartitionRequest(int subpartitionIndex) 
throws IOException, Int
checkError();
 
final Buffer next;
-   final int remaining;
+   final boolean moreAvailable;
 
synchronized (receivedBuffers) {
next = receivedBuffers.poll();
-   remaining = receivedBuffers.size();
+   moreAvailable = !receivedBuffers.isEmpty();
}
 
numBytesIn.inc(next.getSizeUnsafe());
-   return Optional.of(new BufferAndAvailability(next, remaining > 
0, getSenderBacklog()));
+   return Optional.of(new BufferAndAvailability(next, 
moreAvailable, getSenderBacklog()));
 
 Review comment:
   This has also nothing to do with `optimisations reducing lock contention`. 
Can you extract those changes to separate hotfix? If someone in the future will 
be looking at `git blame` regarding real `optimisations reducing lock 
contention` he will have the same "wtf" moments as me right now ;)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce synchronization overhead for credit notifications
> 
>
> Key: FLINK-10142
> URL: https://issues.apache.org/jira/browse/FLINK-10142
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.2, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> When credit-based flow control was introduced, we also added some checks and 
> optimisations for uncommon code paths that make common code paths 
> unnecessarily more expensive, e.g. checking whether a channel was released 
> before forwarding a credit notification to Netty. Such checks would have to 
> be confirmed by the Netty thread anyway and thus only add additional load for 
> something that happens only once (per channel).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10142) Reduce synchronization overhead for credit notifications

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583983#comment-16583983
 ] 

ASF GitHub Bot commented on FLINK-10142:


pnowojski commented on a change in pull request #6555: [FLINK-10142][network] 
reduce locking around credit notification
URL: https://github.com/apache/flink/pull/6555#discussion_r210920075
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ##
 @@ -170,6 +170,65 @@ public Void call() throws Exception {
}
}
 
+   @Test
+   public void testConcurrentNotifyBufferAvailableAndRelease() throws 
Exception {
 
 Review comment:
   Hmmm, isn't this cover by for example 
`org.apache.flink.streaming.runtime.io.benchmark.StreamNetworkThroughputBenchmarkTest#largeRemoteMode`?
 Maybe we do not need this test after all?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce synchronization overhead for credit notifications
> 
>
> Key: FLINK-10142
> URL: https://issues.apache.org/jira/browse/FLINK-10142
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.2, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> When credit-based flow control was introduced, we also added some checks and 
> optimisations for uncommon code paths that make common code paths 
> unnecessarily more expensive, e.g. checking whether a channel was released 
> before forwarding a credit notification to Netty. Such checks would have to 
> be confirmed by the Netty thread anyway and thus only add additional load for 
> something that happens only once (per channel).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10142) Reduce synchronization overhead for credit notifications

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583982#comment-16583982
 ] 

ASF GitHub Bot commented on FLINK-10142:


pnowojski commented on a change in pull request #6555: [FLINK-10142][network] 
reduce locking around credit notification
URL: https://github.com/apache/flink/pull/6555#discussion_r210920974
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ##
 @@ -170,6 +170,65 @@ public Void call() throws Exception {
}
}
 
+   @Test
+   public void testConcurrentNotifyBufferAvailableAndRelease() throws 
Exception {
+   // Config
+   // Repeatedly spawn two tasks: one to notify buffer 
availability and the other to release the channel
+   // concurrently. We do this repeatedly to provoke races.
+   final int numberOfRepetitions = 1024;
+
+   // Setup
+   final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+   final Buffer buffer = 
TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE);
+
+   try {
+   // Test
+   final SingleInputGate inputGate = 
mock(SingleInputGate.class);
 
 Review comment:
   Please no mockito in new tests. Use real object or create proper mock.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce synchronization overhead for credit notifications
> 
>
> Key: FLINK-10142
> URL: https://issues.apache.org/jira/browse/FLINK-10142
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.2, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> When credit-based flow control was introduced, we also added some checks and 
> optimisations for uncommon code paths that make common code paths 
> unnecessarily more expensive, e.g. checking whether a channel was released 
> before forwarding a credit notification to Netty. Such checks would have to 
> be confirmed by the Netty thread anyway and thus only add additional load for 
> something that happens only once (per channel).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6555: [FLINK-10142][network] reduce locking around credit notification

2018-08-17 Thread GitBox
pnowojski commented on a change in pull request #6555: [FLINK-10142][network] 
reduce locking around credit notification
URL: https://github.com/apache/flink/pull/6555#discussion_r210920974
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ##
 @@ -170,6 +170,65 @@ public Void call() throws Exception {
}
}
 
+   @Test
+   public void testConcurrentNotifyBufferAvailableAndRelease() throws 
Exception {
+   // Config
+   // Repeatedly spawn two tasks: one to notify buffer 
availability and the other to release the channel
+   // concurrently. We do this repeatedly to provoke races.
+   final int numberOfRepetitions = 1024;
+
+   // Setup
+   final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+   final Buffer buffer = 
TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE);
+
+   try {
+   // Test
+   final SingleInputGate inputGate = 
mock(SingleInputGate.class);
 
 Review comment:
   Please no mockito in new tests. Use real object or create proper mock. It 
should be easy to replace this one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10142) Reduce synchronization overhead for credit notifications

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583987#comment-16583987
 ] 

ASF GitHub Bot commented on FLINK-10142:


pnowojski commented on a change in pull request #6555: [FLINK-10142][network] 
reduce locking around credit notification
URL: https://github.com/apache/flink/pull/6555#discussion_r210920974
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ##
 @@ -170,6 +170,65 @@ public Void call() throws Exception {
}
}
 
+   @Test
+   public void testConcurrentNotifyBufferAvailableAndRelease() throws 
Exception {
+   // Config
+   // Repeatedly spawn two tasks: one to notify buffer 
availability and the other to release the channel
+   // concurrently. We do this repeatedly to provoke races.
+   final int numberOfRepetitions = 1024;
+
+   // Setup
+   final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+   final Buffer buffer = 
TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE);
+
+   try {
+   // Test
+   final SingleInputGate inputGate = 
mock(SingleInputGate.class);
 
 Review comment:
   Please no mockito in new tests. Use real object or create proper mock. It 
should be easy to replace this one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce synchronization overhead for credit notifications
> 
>
> Key: FLINK-10142
> URL: https://issues.apache.org/jira/browse/FLINK-10142
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.2, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> When credit-based flow control was introduced, we also added some checks and 
> optimisations for uncommon code paths that make common code paths 
> unnecessarily more expensive, e.g. checking whether a channel was released 
> before forwarding a credit notification to Netty. Such checks would have to 
> be confirmed by the Netty thread anyway and thus only add additional load for 
> something that happens only once (per channel).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6555: [FLINK-10142][network] reduce locking around credit notification

2018-08-17 Thread GitBox
pnowojski commented on a change in pull request #6555: [FLINK-10142][network] 
reduce locking around credit notification
URL: https://github.com/apache/flink/pull/6555#discussion_r210920075
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ##
 @@ -170,6 +170,65 @@ public Void call() throws Exception {
}
}
 
+   @Test
+   public void testConcurrentNotifyBufferAvailableAndRelease() throws 
Exception {
 
 Review comment:
   Hmmm, isn't this cover by for example 
`org.apache.flink.streaming.runtime.io.benchmark.StreamNetworkThroughputBenchmarkTest#largeRemoteMode`?
 Maybe we do not need this test after all?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6555: [FLINK-10142][network] reduce locking around credit notification

2018-08-17 Thread GitBox
pnowojski commented on a change in pull request #6555: [FLINK-10142][network] 
reduce locking around credit notification
URL: https://github.com/apache/flink/pull/6555#discussion_r210916528
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ##
 @@ -191,15 +191,15 @@ void retriggerSubpartitionRequest(int subpartitionIndex) 
throws IOException, Int
checkError();
 
final Buffer next;
-   final int remaining;
+   final boolean moreAvailable;
 
synchronized (receivedBuffers) {
next = receivedBuffers.poll();
-   remaining = receivedBuffers.size();
+   moreAvailable = !receivedBuffers.isEmpty();
}
 
numBytesIn.inc(next.getSizeUnsafe());
-   return Optional.of(new BufferAndAvailability(next, remaining > 
0, getSenderBacklog()));
+   return Optional.of(new BufferAndAvailability(next, 
moreAvailable, getSenderBacklog()));
 
 Review comment:
   This has also nothing to do with `optimisations reducing lock contention`. 
Can you extract those changes to separate hotfix? If someone in the future will 
be looking at `git blame` regarding real `optimisations reducing lock 
contention` he will have the same "wtf" moments as me right now ;)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6555: [FLINK-10142][network] reduce locking around credit notification

2018-08-17 Thread GitBox
pnowojski commented on a change in pull request #6555: [FLINK-10142][network] 
reduce locking around credit notification
URL: https://github.com/apache/flink/pull/6555#discussion_r210920974
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ##
 @@ -170,6 +170,65 @@ public Void call() throws Exception {
}
}
 
+   @Test
+   public void testConcurrentNotifyBufferAvailableAndRelease() throws 
Exception {
+   // Config
+   // Repeatedly spawn two tasks: one to notify buffer 
availability and the other to release the channel
+   // concurrently. We do this repeatedly to provoke races.
+   final int numberOfRepetitions = 1024;
+
+   // Setup
+   final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+   final Buffer buffer = 
TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE);
+
+   try {
+   // Test
+   final SingleInputGate inputGate = 
mock(SingleInputGate.class);
 
 Review comment:
   Please no mockito in new tests. Use real object or create proper mock.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6555: [FLINK-10142][network] reduce locking around credit notification

2018-08-17 Thread GitBox
pnowojski commented on a change in pull request #6555: [FLINK-10142][network] 
reduce locking around credit notification
URL: https://github.com/apache/flink/pull/6555#discussion_r210917887
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ##
 @@ -509,33 +514,40 @@ void onSenderBacklog(int backlog) throws IOException {
}
 
public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) 
throws IOException {
-   boolean success = false;
+   boolean recycleBuffer = true;
 
try {
+
+   final boolean wasEmpty;
synchronized (receivedBuffers) {
-   if (!isReleased.get()) {
-   if (expectedSequenceNumber == 
sequenceNumber) {
-   int available = 
receivedBuffers.size();
+   // Similar to notifyBufferAvailable(), make 
sure that we never add a buffer
+   // after releaseAllResources() released all 
buffers from receivedBuffers
+   // (see above for details).
+   if (isReleased.get()) {
+   return;
+   }
 
-   receivedBuffers.add(buffer);
-   expectedSequenceNumber++;
+   if (expectedSequenceNumber != sequenceNumber) {
+   onError(new 
BufferReorderingException(expectedSequenceNumber, sequenceNumber));
+   return;
+   }
 
-   if (available == 0) {
-   notifyChannelNonEmpty();
-   }
+   wasEmpty = receivedBuffers.isEmpty();
+   receivedBuffers.add(buffer);
+   recycleBuffer = false;
+   }
 
-   success = true;
-   } else {
-   onError(new 
BufferReorderingException(expectedSequenceNumber, sequenceNumber));
-   }
-   }
+   ++expectedSequenceNumber;
+
+   if (wasEmpty) {
+   notifyChannelNonEmpty();
 
 Review comment:
   has moving this line from under the lock improved performance in some case? 
If not, commit title
   > optimisations reducing lock contention
   
   is misleading and I would change it to something about refactor/clean up.
   
   Btw, again I do not like such squashing multiple changes in one commit. It 
actually took me ~10 minutes to realise and make sure what this change is all 
about and if there are any other meaningful changes or not here. While if you 
split this commit into `[hotfix][network] refactor/cleanups` and 
`[FLINK-10141][network] move notifyChannelNonEmpty outside of synchronised 
block` it would be much much easier to review...
   
   Could you split this before merging?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6555: [FLINK-10142][network] reduce locking around credit notification

2018-08-17 Thread GitBox
pnowojski commented on a change in pull request #6555: [FLINK-10142][network] 
reduce locking around credit notification
URL: https://github.com/apache/flink/pull/6555#discussion_r210915852
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
 ##
 @@ -357,19 +357,20 @@ private void writeAndFlushNextMessageIfPossible(Channel 
channel) {
return;
}
 
-   //It is no need to notify credit for the released 
channel.
-   if (!inputChannel.isReleased()) {
-   AddCredit msg = new AddCredit(
-   inputChannel.getPartitionId(),
-   
inputChannel.getAndResetUnannouncedCredit(),
-   inputChannel.getInputChannelId());
+   if (inputChannel.isReleased()) {
 
 Review comment:
   Are those changes in this file optimising anything? Or is this  irrelevant 
change to the the rest of the commit?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-8357) enable rolling in default log settings

2018-08-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-8357:
--
Labels: pull-request-available  (was: )

> enable rolling in default log settings
> --
>
> Key: FLINK-8357
> URL: https://issues.apache.org/jira/browse/FLINK-8357
> Project: Flink
>  Issue Type: Improvement
>  Components: Logging
>Reporter: Xu Mingmin
>Assignee: zhangminglei
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The release packages uses {{org.apache.log4j.FileAppender}} for log4j and 
> {{ch.qos.logback.core.FileAppender}} for logback, which could results in very 
> large log files. 
> For most cases, if not all, we need to enable rotation in a production 
> cluster, and I suppose it's a good idea to make rotation as default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8357) enable rolling in default log settings

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583977#comment-16583977
 ] 

ASF GitHub Bot commented on FLINK-8357:
---

GJL commented on issue #5371: [FLINK-8357] [conf] Enable rolling in default log 
settings
URL: https://github.com/apache/flink/pull/5371#issuecomment-413878586
 
 
   Hi, what's the state of this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> enable rolling in default log settings
> --
>
> Key: FLINK-8357
> URL: https://issues.apache.org/jira/browse/FLINK-8357
> Project: Flink
>  Issue Type: Improvement
>  Components: Logging
>Reporter: Xu Mingmin
>Assignee: zhangminglei
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The release packages uses {{org.apache.log4j.FileAppender}} for log4j and 
> {{ch.qos.logback.core.FileAppender}} for logback, which could results in very 
> large log files. 
> For most cases, if not all, we need to enable rotation in a production 
> cluster, and I suppose it's a good idea to make rotation as default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] GJL commented on issue #5371: [FLINK-8357] [conf] Enable rolling in default log settings

2018-08-17 Thread GitBox
GJL commented on issue #5371: [FLINK-8357] [conf] Enable rolling in default log 
settings
URL: https://github.com/apache/flink/pull/5371#issuecomment-413878586
 
 
   Hi, what's the state of this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] FredTing commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer

2018-08-17 Thread GitBox
FredTing commented on a change in pull request #6105: [FLINK-8500] Get the 
timestamp of the Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#discussion_r210898792
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##
 @@ -45,6 +45,22 @@
 */
T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset) throws IOException;
 
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set).
+* @param message The message, as a byte array (null if the message was 
empty or deleted).
+* @param partition The partition the message has originated from.
+* @param offset the offset of the message in the original source (for 
example the Kafka offset).
+* @param timestamp the timestamp of the consumer record
+* @param timestampType The timestamp type, could be NO_TIMESTAMP, 
CREATE_TIME or INGEST_TIME.
+*
+* @return The deserialized message as an object (null if the message 
cannot be deserialized).
+*/
+   default T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset, long timestamp, TimestampType timestampType) throws 
IOException {
 
 Review comment:
   I have a small problem with the second alternative. When I implement this 
interface in Scala, I do not want to use mutable objectS. I think the 
`setTimestamp` method is forcing to make the `T deserializedRecord` mutable 
(for at least the timestamp field)
   
   I have no problem with the first alternative, but I think we are better of 
when we throw an exception with a message explaining that must 
implement/override one of the `deserialize`  methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583865#comment-16583865
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

FredTing commented on a change in pull request #6105: [FLINK-8500] Get the 
timestamp of the Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#discussion_r210898792
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##
 @@ -45,6 +45,22 @@
 */
T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset) throws IOException;
 
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set).
+* @param message The message, as a byte array (null if the message was 
empty or deleted).
+* @param partition The partition the message has originated from.
+* @param offset the offset of the message in the original source (for 
example the Kafka offset).
+* @param timestamp the timestamp of the consumer record
+* @param timestampType The timestamp type, could be NO_TIMESTAMP, 
CREATE_TIME or INGEST_TIME.
+*
+* @return The deserialized message as an object (null if the message 
cannot be deserialized).
+*/
+   default T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset, long timestamp, TimestampType timestampType) throws 
IOException {
 
 Review comment:
   I have a small problem with the second alternative. When I implement this 
interface in Scala, I do not want to use mutable objectS. I think the 
`setTimestamp` method is forcing to make the `T deserializedRecord` mutable 
(for at least the timestamp field)
   
   I have no problem with the first alternative, but I think we are better of 
when we throw an exception with a message explaining that must 
implement/override one of the `deserialize`  methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-7964) Add Apache Kafka 1.0/1.1 connectors

2018-08-17 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-7964:
---

Assignee: vinoyang  (was: Hai Zhou)

> Add Apache Kafka 1.0/1.1 connectors
> ---
>
> Key: FLINK-7964
> URL: https://issues.apache.org/jira/browse/FLINK-7964
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Hai Zhou
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.7.0
>
>
> Kafka 1.0.0 is no mere bump of the version number. The Apache Kafka Project 
> Management Committee has packed a number of valuable enhancements into the 
> release. Here is a summary of a few of them:
> * Since its introduction in version 0.10, the Streams API has become hugely 
> popular among Kafka users, including the likes of Pinterest, Rabobank, 
> Zalando, and The New York Times. In 1.0, the the API continues to evolve at a 
> healthy pace. To begin with, the builder API has been improved (KIP-120). A 
> new API has been added to expose the state of active tasks at runtime 
> (KIP-130). The new cogroup API makes it much easier to deal with partitioned 
> aggregates with fewer StateStores and fewer moving parts in your code 
> (KIP-150). Debuggability gets easier with enhancements to the print() and 
> writeAsText() methods (KIP-160). And if that’s not enough, check out KIP-138 
> and KIP-161 too. For more on streams, check out the Apache Kafka Streams 
> documentation, including some helpful new tutorial videos.
> * Operating Kafka at scale requires that the system remain observable, and to 
> make that easier, we’ve made a number of improvements to metrics. These are 
> too many to summarize without becoming tedious, but Connect metrics have been 
> significantly improved (KIP-196), a litany of new health check metrics are 
> now exposed (KIP-188), and we now have a global topic and partition count 
> (KIP-168). Check out KIP-164 and KIP-187 for even more.
> * We now support Java 9, leading, among other things, to significantly faster 
> TLS and CRC32C implementations. Over-the-wire encryption will be faster now, 
> which will keep Kafka fast and compute costs low when encryption is enabled.
> * In keeping with the security theme, KIP-152 cleans up the error handling on 
> Simple Authentication Security Layer (SASL) authentication attempts. 
> Previously, some authentication error conditions were indistinguishable from 
> broker failures and were not logged in a clear way. This is cleaner now.
> * Kafka can now tolerate disk failures better. Historically, JBOD storage 
> configurations have not been recommended, but the architecture has 
> nevertheless been tempting: after all, why not rely on Kafka’s own 
> replication mechanism to protect against storage failure rather than using 
> RAID? With KIP-112, Kafka now handles disk failure more gracefully. A single 
> disk failure in a JBOD broker will not bring the entire broker down; rather, 
> the broker will continue serving any log files that remain on functioning 
> disks.
> * Since release 0.11.0, the idempotent producer (which is the producer used 
> in the presence of a transaction, which of course is the producer we use for 
> exactly-once processing) required max.in.flight.requests.per.connection to be 
> equal to one. As anyone who has written or tested a wire protocol can attest, 
> this put an upper bound on throughput. Thanks to KAFKA-5949, this can now be 
> as large as five, relaxing the throughput constraint quite a bit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7964) Add Apache Kafka 1.0/1.1 connectors

2018-08-17 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583844#comment-16583844
 ] 

vinoyang commented on FLINK-7964:
-

Considering that I waited for a week or so, I still haven't received any 
response. I decided to take over the issue and if you have any questions, you 
can contact me.

> Add Apache Kafka 1.0/1.1 connectors
> ---
>
> Key: FLINK-7964
> URL: https://issues.apache.org/jira/browse/FLINK-7964
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Hai Zhou
>Assignee: Hai Zhou
>Priority: Major
> Fix For: 1.7.0
>
>
> Kafka 1.0.0 is no mere bump of the version number. The Apache Kafka Project 
> Management Committee has packed a number of valuable enhancements into the 
> release. Here is a summary of a few of them:
> * Since its introduction in version 0.10, the Streams API has become hugely 
> popular among Kafka users, including the likes of Pinterest, Rabobank, 
> Zalando, and The New York Times. In 1.0, the the API continues to evolve at a 
> healthy pace. To begin with, the builder API has been improved (KIP-120). A 
> new API has been added to expose the state of active tasks at runtime 
> (KIP-130). The new cogroup API makes it much easier to deal with partitioned 
> aggregates with fewer StateStores and fewer moving parts in your code 
> (KIP-150). Debuggability gets easier with enhancements to the print() and 
> writeAsText() methods (KIP-160). And if that’s not enough, check out KIP-138 
> and KIP-161 too. For more on streams, check out the Apache Kafka Streams 
> documentation, including some helpful new tutorial videos.
> * Operating Kafka at scale requires that the system remain observable, and to 
> make that easier, we’ve made a number of improvements to metrics. These are 
> too many to summarize without becoming tedious, but Connect metrics have been 
> significantly improved (KIP-196), a litany of new health check metrics are 
> now exposed (KIP-188), and we now have a global topic and partition count 
> (KIP-168). Check out KIP-164 and KIP-187 for even more.
> * We now support Java 9, leading, among other things, to significantly faster 
> TLS and CRC32C implementations. Over-the-wire encryption will be faster now, 
> which will keep Kafka fast and compute costs low when encryption is enabled.
> * In keeping with the security theme, KIP-152 cleans up the error handling on 
> Simple Authentication Security Layer (SASL) authentication attempts. 
> Previously, some authentication error conditions were indistinguishable from 
> broker failures and were not logged in a clear way. This is cleaner now.
> * Kafka can now tolerate disk failures better. Historically, JBOD storage 
> configurations have not been recommended, but the architecture has 
> nevertheless been tempting: after all, why not rely on Kafka’s own 
> replication mechanism to protect against storage failure rather than using 
> RAID? With KIP-112, Kafka now handles disk failure more gracefully. A single 
> disk failure in a JBOD broker will not bring the entire broker down; rather, 
> the broker will continue serving any log files that remain on functioning 
> disks.
> * Since release 0.11.0, the idempotent producer (which is the producer used 
> in the presence of a transaction, which of course is the producer we use for 
> exactly-once processing) required max.in.flight.requests.per.connection to be 
> equal to one. As anyone who has written or tested a wire protocol can attest, 
> this put an upper bound on throughput. Thanks to KAFKA-5949, this can now be 
> as large as five, relaxing the throughput constraint quite a bit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10166) Dependency problems when executing SQL query in sql-client

2018-08-17 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583794#comment-16583794
 ] 

Dawid Wysakowicz commented on FLINK-10166:
--

I found out it happens only for the hadoopless binary. For others the 
{{commons-codec}} dependency is shipped with hadoop shaded uber jar.

> Dependency problems when executing SQL query in sql-client
> --
>
> Key: FLINK-10166
> URL: https://issues.apache.org/jira/browse/FLINK-10166
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Dawid Wysakowicz
>Priority: Major
>
> When tried to run query:
> {code}
> select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name)
> {code}
> in {{sql-client.sh}} I got:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown 
> variable or type "org.apache.commons.codec.binary.Base64"
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8954) Escape control characters when outputting on SQL Client CLI

2018-08-17 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583742#comment-16583742
 ] 

Timo Walther commented on FLINK-8954:
-

[~dmitry_amosov] Thank you. I gave you contributor permissions. You can now 
assign issues to yourself.

> Escape control characters when outputting on SQL Client CLI
> ---
>
> Key: FLINK-8954
> URL: https://issues.apache.org/jira/browse/FLINK-8954
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Priority: Major
>
> Control characters in the result output of a SQL query influence the behavior 
> of the CLI. We should escape everything that could cause side effects.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10166) Dependency problems when executing SQL query in sql-client

2018-08-17 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz updated FLINK-10166:
-
Description: 
When tried to run query:
{code}
select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name)
{code}
in {{sql-client.sh}} I got:
{code}
[ERROR] Could not execute SQL statement. Reason:
org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown 
variable or type "org.apache.commons.codec.binary.Base64"
{code}

  was:
When tried to run query:
{{select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name)}}
in {{sql-client.sh}} I got:
{code}
[ERROR] Could not execute SQL statement. Reason:
org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown 
variable or type "org.apache.commons.codec.binary.Base64"
{code}


> Dependency problems when executing SQL query in sql-client
> --
>
> Key: FLINK-10166
> URL: https://issues.apache.org/jira/browse/FLINK-10166
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Dawid Wysakowicz
>Priority: Major
>
> When tried to run query:
> {code}
> select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name)
> {code}
> in {{sql-client.sh}} I got:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown 
> variable or type "org.apache.commons.codec.binary.Base64"
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10166) Dependency problems when executing SQL query in sql-client

2018-08-17 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz updated FLINK-10166:
-
Description: 
When tried to run query:
{{select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name)}}
in {{sql-client.sh}} I got:
{code}
[ERROR] Could not execute SQL statement. Reason:
org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown 
variable or type "org.apache.commons.codec.binary.Base64"
{code}

  was:
When tried to run query:
{{select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name)}}
in `sql-client.sh` I got:
{code}
[ERROR] Could not execute SQL statement. Reason:
org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown 
variable or type "org.apache.commons.codec.binary.Base64"
{code}


> Dependency problems when executing SQL query in sql-client
> --
>
> Key: FLINK-10166
> URL: https://issues.apache.org/jira/browse/FLINK-10166
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Dawid Wysakowicz
>Priority: Major
>
> When tried to run query:
> {{select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name)}}
> in {{sql-client.sh}} I got:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown 
> variable or type "org.apache.commons.codec.binary.Base64"
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10166) Dependency problems when executing SQL query in sql-client

2018-08-17 Thread Dawid Wysakowicz (JIRA)
Dawid Wysakowicz created FLINK-10166:


 Summary: Dependency problems when executing SQL query in sql-client
 Key: FLINK-10166
 URL: https://issues.apache.org/jira/browse/FLINK-10166
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.6.0
Reporter: Dawid Wysakowicz


When tried to run query:
{{select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name)}}
in `sql-client.sh` I got:
{code}
[ERROR] Could not execute SQL statement. Reason:
org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown 
variable or type "org.apache.commons.codec.binary.Base64"
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8954) Escape control characters when outputting on SQL Client CLI

2018-08-17 Thread Dmitry Amosov (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583696#comment-16583696
 ] 

Dmitry Amosov commented on FLINK-8954:
--

Hello,

I'd like to help with this task. Can you assign it to me?

> Escape control characters when outputting on SQL Client CLI
> ---
>
> Key: FLINK-8954
> URL: https://issues.apache.org/jira/browse/FLINK-8954
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Priority: Major
>
> Control characters in the result output of a SQL query influence the behavior 
> of the CLI. We should escape everything that could cause side effects.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10065) InstantiationUtil.deserializeObject(InputStream in, ClassLoader cl, boolean isFailureTolerant) will close the inputStream

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583694#comment-16583694
 ] 

ASF GitHub Bot commented on FLINK-10065:


klion26 commented on issue #6498: [FLINK-10065] 
InstantiationUtil.deserializeObject(InputStream in, ClassLoader cl, boolean 
isFailureTolerant) will close the inputStream
URL: https://github.com/apache/flink/pull/6498#issuecomment-413810934
 
 
   ping @StephanEwen @tzulitai @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> InstantiationUtil.deserializeObject(InputStream in, ClassLoader cl, boolean 
> isFailureTolerant) will close the inputStream
> -
>
> Key: FLINK-10065
> URL: https://issues.apache.org/jira/browse/FLINK-10065
> Project: Flink
>  Issue Type: Bug
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> Now, the implementation of InstantiationUtil.deserializeObject(InputStream 
> in, ClassLoader cl, boolean isFailureTolerant) is 
> {code:java}
> @SuppressWarnings("unchecked")
> public static  T deserializeObject(InputStream in, ClassLoader cl, boolean 
> isFailureTolerant)
> throws IOException, ClassNotFoundException {
> final ClassLoader old = Thread.currentThread().getContextClassLoader();
> // not using resource try to avoid AutoClosable's close() on the given stream
> try (ObjectInputStream oois = isFailureTolerant
> ? new InstantiationUtil.FailureTolerantObjectInputStream(in, cl)
> : new InstantiationUtil.ClassLoaderObjectInputStream(in, cl)) {
> Thread.currentThread().setContextClassLoader(cl);
> return (T) oois.readObject();
> }
> finally {
> Thread.currentThread().setContextClassLoader(old);
> }
> }
> {code}
> InputStream is closable, so the parameter will be closed after call this 
> method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] klion26 commented on issue #6498: [FLINK-10065] InstantiationUtil.deserializeObject(InputStream in, ClassLoader cl, boolean isFailureTolerant) will close the inputStream

2018-08-17 Thread GitBox
klion26 commented on issue #6498: [FLINK-10065] 
InstantiationUtil.deserializeObject(InputStream in, ClassLoader cl, boolean 
isFailureTolerant) will close the inputStream
URL: https://github.com/apache/flink/pull/6498#issuecomment-413810934
 
 
   ping @StephanEwen @tzulitai @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10059) Add LTRIM supported in Table API and SQL

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583686#comment-16583686
 ] 

ASF GitHub Bot commented on FLINK-10059:


yanghua commented on issue #6494: [FLINK-10059] [table] Add LTRIM supported in 
Table API and SQL
URL: https://github.com/apache/flink/pull/6494#issuecomment-413808097
 
 
   @xccui updated, will also update `RTRIM`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add LTRIM supported in Table API and SQL
> 
>
> Key: FLINK-10059
> URL: https://issues.apache.org/jira/browse/FLINK-10059
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> refer to MYSQL ltrim function : 
> https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ltrim



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10165) JarRunHandler/JarRunRequestBody should allow to pass program arguments as escaped json list

2018-08-17 Thread Maciej Prochniak (JIRA)
Maciej Prochniak created FLINK-10165:


 Summary: JarRunHandler/JarRunRequestBody should allow to pass 
program arguments as escaped json list
 Key: FLINK-10165
 URL: https://issues.apache.org/jira/browse/FLINK-10165
 Project: Flink
  Issue Type: Improvement
  Components: REST
Affects Versions: 1.6.0
Reporter: Maciej Prochniak


Currently program arguments are parsed from plain string: 
[https://github.com/apache/flink/blob/master/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L106]

It doesn't allow to put quotes or new lines in arguments - in particular it's 
difficult to pass json as argument. I think it would be good to pass arguments 
as json list - then jackson would handle escaping. It'd be a bit more 
problematic for query string parameters... WDYT [~Zentol]?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6494: [FLINK-10059] [table] Add LTRIM supported in Table API and SQL

2018-08-17 Thread GitBox
yanghua commented on issue #6494: [FLINK-10059] [table] Add LTRIM supported in 
Table API and SQL
URL: https://github.com/apache/flink/pull/6494#issuecomment-413808097
 
 
   @xccui updated, will also update `RTRIM`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10059) Add LTRIM supported in Table API and SQL

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583608#comment-16583608
 ] 

ASF GitHub Bot commented on FLINK-10059:


xccui commented on a change in pull request #6494: [FLINK-10059] [table] Add 
LTRIM supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6494#discussion_r210846147
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ##
 @@ -626,6 +626,33 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "-")
   }
 
+  @Test
 
 Review comment:
   Please add a test case (for demonstration) in 
`org.apache.flink.table.expressions.SqlExpressionTest`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add LTRIM supported in Table API and SQL
> 
>
> Key: FLINK-10059
> URL: https://issues.apache.org/jira/browse/FLINK-10059
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> refer to MYSQL ltrim function : 
> https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ltrim



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] xccui commented on a change in pull request #6494: [FLINK-10059] [table] Add LTRIM supported in Table API and SQL

2018-08-17 Thread GitBox
xccui commented on a change in pull request #6494: [FLINK-10059] [table] Add 
LTRIM supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6494#discussion_r210846147
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ##
 @@ -626,6 +626,33 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "-")
   }
 
+  @Test
 
 Review comment:
   Please add a test case (for demonstration) in 
`org.apache.flink.table.expressions.SqlExpressionTest`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10059) Add LTRIM supported in Table API and SQL

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583606#comment-16583606
 ] 

ASF GitHub Bot commented on FLINK-10059:


xccui commented on a change in pull request #6494: [FLINK-10059] [table] Add 
LTRIM supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6494#discussion_r210845279
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -2482,6 +2482,18 @@ TO_BASE64(string)
 E.g., TO_BASE64('hello world') returns 
"aGVsbG8gd29ybGQ=".
   
 
+
+
+  
+{% highlight text %}
 
 Review comment:
   Please add the corresponding docs for Java and Scala.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add LTRIM supported in Table API and SQL
> 
>
> Key: FLINK-10059
> URL: https://issues.apache.org/jira/browse/FLINK-10059
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> refer to MYSQL ltrim function : 
> https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ltrim



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] xccui commented on a change in pull request #6494: [FLINK-10059] [table] Add LTRIM supported in Table API and SQL

2018-08-17 Thread GitBox
xccui commented on a change in pull request #6494: [FLINK-10059] [table] Add 
LTRIM supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6494#discussion_r210845279
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -2482,6 +2482,18 @@ TO_BASE64(string)
 E.g., TO_BASE64('hello world') returns 
"aGVsbG8gd29ybGQ=".
   
 
+
+
+  
+{% highlight text %}
 
 Review comment:
   Please add the corresponding docs for Java and Scala.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10059) Add LTRIM supported in Table API and SQL

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583598#comment-16583598
 ] 

ASF GitHub Bot commented on FLINK-10059:


yanghua commented on issue #6494: [FLINK-10059] [table] Add LTRIM supported in 
Table API and SQL
URL: https://github.com/apache/flink/pull/6494#issuecomment-413798852
 
 
   @xccui Does this PR look good to you?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add LTRIM supported in Table API and SQL
> 
>
> Key: FLINK-10059
> URL: https://issues.apache.org/jira/browse/FLINK-10059
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> refer to MYSQL ltrim function : 
> https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ltrim



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6494: [FLINK-10059] [table] Add LTRIM supported in Table API and SQL

2018-08-17 Thread GitBox
yanghua commented on issue #6494: [FLINK-10059] [table] Add LTRIM supported in 
Table API and SQL
URL: https://github.com/apache/flink/pull/6494#issuecomment-413798852
 
 
   @xccui Does this PR look good to you?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jrthe42 opened a new pull request #6575: [hotfix][table] Fix bug in RowtimeValidator when getting custom TimestampExtractor

2018-08-17 Thread GitBox
jrthe42 opened a new pull request #6575: [hotfix][table] Fix bug in 
RowtimeValidator when getting custom TimestampExtractor
URL: https://github.com/apache/flink/pull/6575
 
 
   This pull request fixes a bug in ```RowtimeValidator``` , which may cause an 
exception when using custom ```TimestampExtractor``` in Flink SQL.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-17 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-9850.
-

> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-17 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski resolved FLINK-9850.
---
   Resolution: Fixed
Fix Version/s: 1.7.0

Merged to master as effa09cb80adc76842952ea78acc746fd1f826e7

> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583552#comment-16583552
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski closed pull request #6367: [FLINK-9850] Add a string to the print 
method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java
new file mode 100644
index 000..de058b69f1f
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions.util;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.PrintStream;
+import java.io.Serializable;
+
+/**
+ * Print sink output writer for DataStream and DataSet print API.
+ */
+@Internal
+public class PrintSinkOutputWriter implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final boolean STD_OUT = false;
+   private static final boolean STD_ERR = true;
+
+   private final boolean target;
+   private transient PrintStream stream;
+   private final String sinkIdentifier;
+   private transient String completedPrefix;
+
+   public PrintSinkOutputWriter() {
+   this("", STD_OUT);
+   }
+
+   public PrintSinkOutputWriter(final boolean stdErr) {
+   this("", stdErr);
+   }
+
+   public PrintSinkOutputWriter(final String sinkIdentifier, final boolean 
stdErr) {
+   this.target = stdErr;
+   this.sinkIdentifier = (sinkIdentifier == null ? "" : 
sinkIdentifier);
+   }
+
+   public void open(int subtaskIndex, int numParallelSubtasks) {
+   // get the target stream
+   stream = target == STD_OUT ? System.out : System.err;
+
+   completedPrefix = sinkIdentifier;
+
+   if (numParallelSubtasks > 1) {
+   if (!completedPrefix.isEmpty()) {
+   completedPrefix += ":";
+   }
+   completedPrefix += (subtaskIndex + 1);
+   }
+
+   if (!completedPrefix.isEmpty()) {
+   completedPrefix += "> ";
+   }
+   }
+
+   public void write(IN record) {
+   stream.println(completedPrefix + record.toString());
+   }
+
+   @Override
+   public String toString() {
+   return "Print to " + (target == STD_OUT ? "System.out" : 
"System.err");
+   }
+}
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
index 0ab1abb2efb..62eabd0b739 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
@@ -19,45 +19,46 @@
 package org.apache.flink.api.java.io;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.util.PrintSinkOutputWriter;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.configuration.Configuration;
 
-import java.io.PrintStream;
-
 /**
  * Output format that prints results into either stdout or stderr.
- * @param 
+ *
+ * 
+ * Four possible format options:
+ * {@code sinkIdentifier}:taskId> output  <- {@code sinkIdentifier} 
provided, parallelism > 1
+ * {@code sinkIdentifier}> output <- {@code sinkIdentifier} 
provided, parallelism == 1
+ *  taskId> output  

[GitHub] pnowojski closed pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-17 Thread GitBox
pnowojski closed pull request #6367: [FLINK-9850] Add a string to the print 
method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java
new file mode 100644
index 000..de058b69f1f
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions.util;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.PrintStream;
+import java.io.Serializable;
+
+/**
+ * Print sink output writer for DataStream and DataSet print API.
+ */
+@Internal
+public class PrintSinkOutputWriter implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final boolean STD_OUT = false;
+   private static final boolean STD_ERR = true;
+
+   private final boolean target;
+   private transient PrintStream stream;
+   private final String sinkIdentifier;
+   private transient String completedPrefix;
+
+   public PrintSinkOutputWriter() {
+   this("", STD_OUT);
+   }
+
+   public PrintSinkOutputWriter(final boolean stdErr) {
+   this("", stdErr);
+   }
+
+   public PrintSinkOutputWriter(final String sinkIdentifier, final boolean 
stdErr) {
+   this.target = stdErr;
+   this.sinkIdentifier = (sinkIdentifier == null ? "" : 
sinkIdentifier);
+   }
+
+   public void open(int subtaskIndex, int numParallelSubtasks) {
+   // get the target stream
+   stream = target == STD_OUT ? System.out : System.err;
+
+   completedPrefix = sinkIdentifier;
+
+   if (numParallelSubtasks > 1) {
+   if (!completedPrefix.isEmpty()) {
+   completedPrefix += ":";
+   }
+   completedPrefix += (subtaskIndex + 1);
+   }
+
+   if (!completedPrefix.isEmpty()) {
+   completedPrefix += "> ";
+   }
+   }
+
+   public void write(IN record) {
+   stream.println(completedPrefix + record.toString());
+   }
+
+   @Override
+   public String toString() {
+   return "Print to " + (target == STD_OUT ? "System.out" : 
"System.err");
+   }
+}
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
index 0ab1abb2efb..62eabd0b739 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
@@ -19,45 +19,46 @@
 package org.apache.flink.api.java.io;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.util.PrintSinkOutputWriter;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.configuration.Configuration;
 
-import java.io.PrintStream;
-
 /**
  * Output format that prints results into either stdout or stderr.
- * @param 
+ *
+ * 
+ * Four possible format options:
+ * {@code sinkIdentifier}:taskId> output  <- {@code sinkIdentifier} 
provided, parallelism > 1
+ * {@code sinkIdentifier}> output <- {@code sinkIdentifier} 
provided, parallelism == 1
+ *  taskId> output<- no {@code 
sinkIdentifier} provided, parallelism > 1
+ *  output<- no {@code 
sinkIdentifier} provided, parallelism == 1
+ * 
+ *
+ * @param  Input record type
  */
 @PublicEvolving
 publi

[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583550#comment-16583550
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on issue #6367: [FLINK-9850] Add a string to the print 
method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-413787139
 
 
   Thanks for the contribution! @zentol told me his LGTM so merging now :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10122) KafkaConsumer should use partitionable state over union state if partition discovery is not active

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583549#comment-16583549
 ] 

ASF GitHub Bot commented on FLINK-10122:


StefanRRichter commented on issue #6537: [FLINK-10122] KafkaConsumer should use 
partitionable state over union state if partition discovery is not active
URL: https://github.com/apache/flink/pull/6537#issuecomment-413787110
 
 
   Thanks @tzulitai ! I was aware that this will break the behavior for 
partition discovery. However, the current implementation was already broken for 
user at large scale, as pointed out in the description. This PR was intended as 
a quick solution for this case. I think that we can have better non-breaking 
solutions in the future like splitting the source into two operators or a 
different state partitioning scheme. I think that we can close the PR and go 
for the long term solution in official releases. Nevertheless I think that we 
should cherry-pick two parts of this PR into releases, the hotfix to improve 
memory utilization and the option to remove operator state (or - even better - 
states in general).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaConsumer should use partitionable state over union state if partition 
> discovery is not active
> --
>
> Key: FLINK-10122
> URL: https://issues.apache.org/jira/browse/FLINK-10122
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> KafkaConsumer store its offsets state always as union state. I think this is 
> only required in the case that partition discovery is active. For jobs with a 
> very high parallelism, the union state can lead to prohibitively expensive 
> deployments. For example, a job with 2000 source and a total of 10MB 
> checkpointed union state offsets state would have to ship ~ 2000 x 10MB = 
> 20GB of state. With partitionable state, it would have to ship ~10MB.
> For now, I would suggest to go back to partitionable state in case that 
> partition discovery is not active. In the long run, I have some ideas for 
> more efficient partitioning schemes that would also work for active discovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-17 Thread GitBox
pnowojski commented on issue #6367: [FLINK-9850] Add a string to the print 
method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-413787139
 
 
   Thanks for the contribution! @zentol told me his LGTM so merging now :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on issue #6537: [FLINK-10122] KafkaConsumer should use partitionable state over union state if partition discovery is not active

2018-08-17 Thread GitBox
StefanRRichter commented on issue #6537: [FLINK-10122] KafkaConsumer should use 
partitionable state over union state if partition discovery is not active
URL: https://github.com/apache/flink/pull/6537#issuecomment-413787110
 
 
   Thanks @tzulitai ! I was aware that this will break the behavior for 
partition discovery. However, the current implementation was already broken for 
user at large scale, as pointed out in the description. This PR was intended as 
a quick solution for this case. I think that we can have better non-breaking 
solutions in the future like splitting the source into two operators or a 
different state partitioning scheme. I think that we can close the PR and go 
for the long term solution in official releases. Nevertheless I think that we 
should cherry-pick two parts of this PR into releases, the hotfix to improve 
memory utilization and the option to remove operator state (or - even better - 
states in general).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-10159) TestHarness#initializeState(xyz) calls after TestHarness#open() are being silently ignored

2018-08-17 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski resolved FLINK-10159.

   Resolution: Fixed
Fix Version/s: 1.7.0

> TestHarness#initializeState(xyz) calls after TestHarness#open() are being 
> silently ignored
> --
>
> Key: FLINK-10159
> URL: https://issues.apache.org/jira/browse/FLINK-10159
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.6.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> This is an old issue. Incorrect order of initializeState and open result to 
> initializeState being ignored. For example in this code:
> {code:java}
> testHarness = createTestHarness(topic);
> testHarness.setup();
> testHarness.open();
> testHarness.initializeState(snapshot1);
> {code}
> Which is miss-leading both for Flink developers and for users (since we 
> recommend using test harness for unit tests).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10159) TestHarness#initializeState(xyz) calls after TestHarness#open() are being silently ignored

2018-08-17 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583499#comment-16583499
 ] 

Piotr Nowojski commented on FLINK-10159:


Merged on master as b70f78392424b5bdb4119ee8fdbfe16fee13

> TestHarness#initializeState(xyz) calls after TestHarness#open() are being 
> silently ignored
> --
>
> Key: FLINK-10159
> URL: https://issues.apache.org/jira/browse/FLINK-10159
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.6.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> This is an old issue. Incorrect order of initializeState and open result to 
> initializeState being ignored. For example in this code:
> {code:java}
> testHarness = createTestHarness(topic);
> testHarness.setup();
> testHarness.open();
> testHarness.initializeState(snapshot1);
> {code}
> Which is miss-leading both for Flink developers and for users (since we 
> recommend using test harness for unit tests).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10159) TestHarness#initializeState(xyz) calls after TestHarness#open() are being silently ignored

2018-08-17 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-10159.
--

> TestHarness#initializeState(xyz) calls after TestHarness#open() are being 
> silently ignored
> --
>
> Key: FLINK-10159
> URL: https://issues.apache.org/jira/browse/FLINK-10159
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.6.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> This is an old issue. Incorrect order of initializeState and open result to 
> initializeState being ignored. For example in this code:
> {code:java}
> testHarness = createTestHarness(topic);
> testHarness.setup();
> testHarness.open();
> testHarness.initializeState(snapshot1);
> {code}
> Which is miss-leading both for Flink developers and for users (since we 
> recommend using test harness for unit tests).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10159) TestHarness#initializeState(xyz) calls after TestHarness#open() are being silently ignored

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583496#comment-16583496
 ] 

ASF GitHub Bot commented on FLINK-10159:


pnowojski closed pull request #6570: [FLINK-10159][tests] Fail 
TestHarness.initializeState if harness has already been initialized
URL: https://github.com/apache/flink/pull/6570
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 74c58ad2891..57b7e77dc7f 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -172,7 +172,6 @@ public void testFlinkKafkaProducer011FailBeforeNotify() 
throws Exception {
 
testHarness.setup();
testHarness.open();
-   testHarness.initializeState(null);
testHarness.processElement(42, 0);
testHarness.snapshot(0, 1);
testHarness.processElement(43, 2);
@@ -225,7 +224,6 @@ public void 
testFlinkKafkaProducer011FailTransactionCoordinatorBeforeNotify() th
 
testHarness1.setup();
testHarness1.open();
-   testHarness1.initializeState(null);
testHarness1.processElement(42, 0);
testHarness1.snapshot(0, 1);
testHarness1.processElement(43, 2);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
index ff0b0fcb172..644ab04fb70 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
@@ -30,6 +30,9 @@
 import java.util.Collections;
 import java.util.List;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Tests for {@link ListCheckpointed}.
  */
@@ -37,35 +40,39 @@
 
@Test
public void testUDFReturningNull() throws Exception {
-   TestUserFunction userFunction = new TestUserFunction(null);
-   AbstractStreamOperatorTestHarness testHarness =
-   new AbstractStreamOperatorTestHarness<>(new 
StreamMap<>(userFunction), 1, 1, 0);
-   testHarness.open();
-   OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
-   testHarness.initializeState(snapshot);
-   Assert.assertTrue(userFunction.isRestored());
+   testUDF(new TestUserFunction(null));
}
 
@Test
public void testUDFReturningEmpty() throws Exception {
-   TestUserFunction userFunction = new 
TestUserFunction(Collections.emptyList());
-   AbstractStreamOperatorTestHarness testHarness =
-   new AbstractStreamOperatorTestHarness<>(new 
StreamMap<>(userFunction), 1, 1, 0);
-   testHarness.open();
-   OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
-   testHarness.initializeState(snapshot);
-   Assert.assertTrue(userFunction.isRestored());
+   testUDF(new TestUserFunction(Collections.emptyList()));
}
 
@Test
public void testUDFReturningData() throws Exception {
-   TestUserFunction userFunction = new 
TestUserFunction(Arrays.asList(1, 2, 3));
-   AbstractStreamOperatorTestHarness testHarness =
-   new AbstractStreamOperatorTestHarness<>(new 
StreamMap<>(userFunction), 1, 1, 0);
-   testHarness.open();
-   OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
-   testHarness.initializeState(snapshot);
-   Assert.assertTrue(userFunction.isRestored());
+   testUDF(new TestUserFunction(Arrays.asList(1, 2, 3)));
+   }
+
+   private static void testUDF(TestUserFunction userFunction) throws 
Exception {
+   OperatorSubtaskState snapshot;
+   try (AbstractStreamOperatorTestHarness testHarness = 
createTestHarness(userFunction)) {
+  

[jira] [Commented] (FLINK-10159) TestHarness#initializeState(xyz) calls after TestHarness#open() are being silently ignored

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583497#comment-16583497
 ] 

ASF GitHub Bot commented on FLINK-10159:


pnowojski commented on issue #6570: [FLINK-10159][tests] Fail 
TestHarness.initializeState if harness has already been initialized
URL: https://github.com/apache/flink/pull/6570#issuecomment-413782923
 
 
   Thanks! Merged.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TestHarness#initializeState(xyz) calls after TestHarness#open() are being 
> silently ignored
> --
>
> Key: FLINK-10159
> URL: https://issues.apache.org/jira/browse/FLINK-10159
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.6.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Critical
>  Labels: pull-request-available
>
> This is an old issue. Incorrect order of initializeState and open result to 
> initializeState being ignored. For example in this code:
> {code:java}
> testHarness = createTestHarness(topic);
> testHarness.setup();
> testHarness.open();
> testHarness.initializeState(snapshot1);
> {code}
> Which is miss-leading both for Flink developers and for users (since we 
> recommend using test harness for unit tests).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on issue #6570: [FLINK-10159][tests] Fail TestHarness.initializeState if harness has already been initialized

2018-08-17 Thread GitBox
pnowojski commented on issue #6570: [FLINK-10159][tests] Fail 
TestHarness.initializeState if harness has already been initialized
URL: https://github.com/apache/flink/pull/6570#issuecomment-413782923
 
 
   Thanks! Merged.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski closed pull request #6570: [FLINK-10159][tests] Fail TestHarness.initializeState if harness has already been initialized

2018-08-17 Thread GitBox
pnowojski closed pull request #6570: [FLINK-10159][tests] Fail 
TestHarness.initializeState if harness has already been initialized
URL: https://github.com/apache/flink/pull/6570
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 74c58ad2891..57b7e77dc7f 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -172,7 +172,6 @@ public void testFlinkKafkaProducer011FailBeforeNotify() 
throws Exception {
 
testHarness.setup();
testHarness.open();
-   testHarness.initializeState(null);
testHarness.processElement(42, 0);
testHarness.snapshot(0, 1);
testHarness.processElement(43, 2);
@@ -225,7 +224,6 @@ public void 
testFlinkKafkaProducer011FailTransactionCoordinatorBeforeNotify() th
 
testHarness1.setup();
testHarness1.open();
-   testHarness1.initializeState(null);
testHarness1.processElement(42, 0);
testHarness1.snapshot(0, 1);
testHarness1.processElement(43, 2);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
index ff0b0fcb172..644ab04fb70 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
@@ -30,6 +30,9 @@
 import java.util.Collections;
 import java.util.List;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Tests for {@link ListCheckpointed}.
  */
@@ -37,35 +40,39 @@
 
@Test
public void testUDFReturningNull() throws Exception {
-   TestUserFunction userFunction = new TestUserFunction(null);
-   AbstractStreamOperatorTestHarness testHarness =
-   new AbstractStreamOperatorTestHarness<>(new 
StreamMap<>(userFunction), 1, 1, 0);
-   testHarness.open();
-   OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
-   testHarness.initializeState(snapshot);
-   Assert.assertTrue(userFunction.isRestored());
+   testUDF(new TestUserFunction(null));
}
 
@Test
public void testUDFReturningEmpty() throws Exception {
-   TestUserFunction userFunction = new 
TestUserFunction(Collections.emptyList());
-   AbstractStreamOperatorTestHarness testHarness =
-   new AbstractStreamOperatorTestHarness<>(new 
StreamMap<>(userFunction), 1, 1, 0);
-   testHarness.open();
-   OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
-   testHarness.initializeState(snapshot);
-   Assert.assertTrue(userFunction.isRestored());
+   testUDF(new TestUserFunction(Collections.emptyList()));
}
 
@Test
public void testUDFReturningData() throws Exception {
-   TestUserFunction userFunction = new 
TestUserFunction(Arrays.asList(1, 2, 3));
-   AbstractStreamOperatorTestHarness testHarness =
-   new AbstractStreamOperatorTestHarness<>(new 
StreamMap<>(userFunction), 1, 1, 0);
-   testHarness.open();
-   OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
-   testHarness.initializeState(snapshot);
-   Assert.assertTrue(userFunction.isRestored());
+   testUDF(new TestUserFunction(Arrays.asList(1, 2, 3)));
+   }
+
+   private static void testUDF(TestUserFunction userFunction) throws 
Exception {
+   OperatorSubtaskState snapshot;
+   try (AbstractStreamOperatorTestHarness testHarness = 
createTestHarness(userFunction)) {
+   testHarness.open();
+   snapshot = testHarness.snapshot(0L, 0L);
+   assertFalse(userFunction.isRestored());
+   }
+   try (AbstractStreamOperatorTestHarness testHarness = 
createTest

[jira] [Commented] (FLINK-8868) Support Table Function as Table for Stream Sql

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583487#comment-16583487
 ] 

ASF GitHub Bot commented on FLINK-8868:
---

Xpray commented on issue #6574: [FLINK-8868] [table] Support Table Function as 
Table Source for Stream Sql
URL: https://github.com/apache/flink/pull/6574#issuecomment-413779455
 
 
   It should be FLINK-8868, I'll fix this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support Table Function as Table for Stream Sql
> --
>
> Key: FLINK-8868
> URL: https://issues.apache.org/jira/browse/FLINK-8868
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>
> support SQL like:  SELECT * FROM TABLE(tf("a"))



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8868) Support Table Function as Table for Stream Sql

2018-08-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-8868:
--
Labels: pull-request-available  (was: )

> Support Table Function as Table for Stream Sql
> --
>
> Key: FLINK-8868
> URL: https://issues.apache.org/jira/browse/FLINK-8868
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>
> support SQL like:  SELECT * FROM TABLE(tf("a"))



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Xpray commented on issue #6574: [FLINK-8868] [table] Support Table Function as Table Source for Stream Sql

2018-08-17 Thread GitBox
Xpray commented on issue #6574: [FLINK-8868] [table] Support Table Function as 
Table Source for Stream Sql
URL: https://github.com/apache/flink/pull/6574#issuecomment-413779455
 
 
   It should be FLINK-8868, I'll fix this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8532) RebalancePartitioner should use Random value for its first partition

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583484#comment-16583484
 ] 

ASF GitHub Bot commented on FLINK-8532:
---

Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify 
RebalancePartitioner to use a random partition as its first partition
URL: https://github.com/apache/flink/pull/6544#issuecomment-413778059
 
 
   thanks~ @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RebalancePartitioner should use Random value for its first partition
> 
>
> Key: FLINK-8532
> URL: https://issues.apache.org/jira/browse/FLINK-8532
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Yuta Morisawa
>Assignee: Guibo Pan
>Priority: Major
>  Labels: pull-request-available
>
> In some conditions, RebalancePartitioner doesn't balance data correctly 
> because it use the same value for selecting next operators.
> RebalancePartitioner initializes its partition id using the same value in 
> every threads, so it indeed balances data, but at one moment the amount of 
> data in each operator is skew.
> Particularly, when the data rate of  former operators is equal , data skew 
> becomes severe.
>  
>  
> Example:
> Consider a simple operator chain.
> -> map1 -> rebalance -> map2 ->
> Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, 
> 6).
> map1          map2
>  st1              st4
>  st2              st5
>  st3              st6
>  
> At the beginning, every subtasks in map1 sends data to st4 in map2 because 
> they use the same initial parition id.
> Next time the map1 receive data st1,2,3 send data to st5 because they 
> increment its partition id when they processed former data.
> In my environment,  it takes twice the time to process data when I use 
> RebalancePartitioner  as long as I use other partitioners(rescale, keyby).
>  
> To solve this problem, in my opinion, RebalancePartitioner should use its own 
> operator id for the initial value.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition

2018-08-17 Thread GitBox
Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify 
RebalancePartitioner to use a random partition as its first partition
URL: https://github.com/apache/flink/pull/6544#issuecomment-413778059
 
 
   thanks~ @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   >