[GitHub] tragicjun edited a comment on issue #6508: [Flink-10079] [table] Automatically register sink table from external catalogs

2018-08-24 Thread GitBox
tragicjun edited a comment on issue #6508: [Flink-10079] [table] Automatically 
register sink table from external catalogs 
URL: https://github.com/apache/flink/pull/6508#issuecomment-415720615
 
 
   CC @zentol @xccui  would you please help review this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tragicjun commented on issue #6508: [Flink-10079] [table] Automatically register sink table from external catalogs

2018-08-24 Thread GitBox
tragicjun commented on issue #6508: [Flink-10079] [table] Automatically 
register sink table from external catalogs 
URL: https://github.com/apache/flink/pull/6508#issuecomment-415720615
 
 
   CC @zentol @xccui  would you please review on it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10136) Add REPEAT supported in Table API and SQL

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591420#comment-16591420
 ] 

ASF GitHub Bot commented on FLINK-10136:


xccui commented on issue #6597: [FLINK-10136] [table] Add REPEAT supported in 
Table API and SQL
URL: https://github.com/apache/flink/pull/6597#issuecomment-415710651
 
 
   Thanks for your work, @yanghua. The PR looks good to me. +1 to merge.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add REPEAT supported in Table API and SQL
> -
>
> Key: FLINK-10136
> URL: https://issues.apache.org/jira/browse/FLINK-10136
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> Oracle : 
> [https://docs.oracle.com/cd/E17952_01/mysql-5.1-en/string-functions.html#function_repeat]
> MySql: 
> https://dev.mysql.com/doc/refman/5.5/en/string-functions.html#function_repeat



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] xccui commented on issue #6597: [FLINK-10136] [table] Add REPEAT supported in Table API and SQL

2018-08-24 Thread GitBox
xccui commented on issue #6597: [FLINK-10136] [table] Add REPEAT supported in 
Table API and SQL
URL: https://github.com/apache/flink/pull/6597#issuecomment-415710651
 
 
   Thanks for your work, @yanghua. The PR looks good to me. +1 to merge.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-10192) SQL Client table visualization mode does not update correctly

2018-08-24 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther reassigned FLINK-10192:


Assignee: Timo Walther

> SQL Client table visualization mode does not update correctly
> -
>
> Key: FLINK-10192
> URL: https://issues.apache.org/jira/browse/FLINK-10192
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Major
>
> The table visualization modes does not seem to update correctly.
> When I run a query that groups and aggregates on a few (6) distinct keys, the 
> client visualizes some keys multiple times. Also the aggregated values do not 
> seem to be correct.
> Due to the small number of keys, these get frequently updated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10204) Job is marked as FAILED after serialization exception

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591396#comment-16591396
 ] 

ASF GitHub Bot commented on FLINK-10204:


StephanEwen commented on issue #6610: [FLINK-10204] - fix serialization/copy 
error for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610#issuecomment-415706823
 
 
   Good catch!
   
   Could you undo the changes to `LatencyMarker`? Keeps the file's change 
history clean/meaningful...
   Otherwise +1 to merge this!
   
   Should also be merged into the 1.5 and 1.6 branches...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Job is marked as FAILED after serialization exception
> -
>
> Key: FLINK-10204
> URL: https://issues.apache.org/jira/browse/FLINK-10204
> Project: Flink
>  Issue Type: Bug
>Reporter: Ben La Monica
>Priority: Major
>  Labels: pull-request-available
>
> We have a long running flink job that eventually fails and is shut down due 
> to an internal serialization exception that we keep on getting. Here is the 
> stack trace:
> {code:java}
> 2018-08-23 18:39:48,199 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation 
> (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED.
> java.io.IOException: Corrupt stream, found tag: 127
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219)
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
> at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206)
> at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748){code}
>  
> I think I have tracked down the issue to a mismatch in the 
> serialization/deserialization/copy code in the StreamElementSerializer with 
> regards to the LATENCY_MARKER.
> The Serialization logic writes 3 LONGs and an INT. The copy logic only writes 
> (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an 
> EOFException, and fixing the copy code causes the test to pass again.
> I've written a unit test that highlights the problem, and have written the 
> code to correct it.
> I'll submit a PR that goes along with it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StephanEwen commented on issue #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records.

2018-08-24 Thread GitBox
StephanEwen commented on issue #6610: [FLINK-10204] - fix serialization/copy 
error for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610#issuecomment-415706823
 
 
   Good catch!
   
   Could you undo the changes to `LatencyMarker`? Keeps the file's change 
history clean/meaningful...
   Otherwise +1 to merge this!
   
   Should also be merged into the 1.5 and 1.6 branches...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9559) The type of a union of CHAR columns of different lengths should be VARCHAR

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591365#comment-16591365
 ] 

ASF GitHub Bot commented on FLINK-9559:
---

pnowojski edited a comment on issue #6519: [FLINK-9559] [table] The type of a 
union of CHAR columns of different lengths should be VARCHAR
URL: https://github.com/apache/flink/pull/6519#issuecomment-415699060
 
 
   @hequn8128 may I ask what is the use case that you are trying to solve? I'm 
asking because I have mixed feelings with this:
   
   1. as it is, it brakes standard, which is bad
   2. adding configuration option `shouldConvertRaggedUnionTypesToVarying`, 
which by default would be `true` is equally bad
   3. making it by default `false` leaves us with an option that's very hard to 
explain to the users what does it to. I would guess almost nobody would use it
   4. having more configuration rises the complexity of the system - more 
features that we need to maintain, support, refactor, test etc. This can 
quickly become a nightmare.
   5. it seems like very cosmetic change. In most cases it doesn't matter, 
since either the result is being printed or being stored in an external system 
that has it's own schema. As long as in this external system column is defined 
as `VARCHAR(x)` this issue doesn't matter. Isn't the scope of this limited 
purely to unit tests, where we need to compare to spaces?
   6. if we agree to introduce this change, should we also agree when next time 
someone reports that he/she would like to have things like:
   
   - `123.456` literal being double or decimal?
   - `'abc'` literal being `CHAR(3)` or `VARCHAR(3)`?
   - `'A  ' || 'B  '` returning `'AB'` or `'A  B  '`?
   - 'NULL == 0` being true, false or null?
   - `SUBSTRING('abc' FROM -2 FOR 4)` should yield 'a' or 'bc'?
   
   and the list goes on and on. We definitely can not provide a switch for all 
of such quirks. Having one switch like `mysql-compatibility-mode: true` would 
have more sense, however it's impossible to implement/maintain for different 
reasons.
   
   That's why I don't like changing the default behaviour and I see more 
problems then positives by introducing this as a configurable option. 
   
   However I see one issue here that would solve this problem. Since Flink does 
not support `CHAR(x)` anywhere besides string literals, and our `CHAR(x)` 
support seems to be broken (for example `'A  ' || 'B  '` should return `'A  B  
'`), maybe we should treat all string literals as varchars?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The type of a union of CHAR columns of different lengths should be VARCHAR
> --
>
> Key: FLINK-9559
> URL: https://issues.apache.org/jira/browse/FLINK-9559
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, If the case-when expression has two branches which return string 
> literal, redundant white spaces will be appended to the short string literal. 
> For example, for the sql: case 1 when 1 then 'a' when 2 then 'bcd' end, the 
> return value will be 'a ' of CHAR(3) instead of 'a'.
> Although, this follows the behavior in strict SQL standard mode(SQL:2003). We 
> should get the pragmatic return type in a real scenario without blank-padded. 
> Happily, this problem has been fixed by 
> [CALCITE-2321|https://issues.apache.org/jira/browse/CALCITE-2321], we can 
> upgrade calcite to the next release(1.17.0) and override 
> {{RelDataTypeSystem}} in flink to configure the return type, i.e., making 
> {{shouldConvertRaggedUnionTypesToVarying()}} return true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski edited a comment on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR

2018-08-24 Thread GitBox
pnowojski edited a comment on issue #6519: [FLINK-9559] [table] The type of a 
union of CHAR columns of different lengths should be VARCHAR
URL: https://github.com/apache/flink/pull/6519#issuecomment-415699060
 
 
   @hequn8128 may I ask what is the use case that you are trying to solve? I'm 
asking because I have mixed feelings with this:
   
   1. as it is, it brakes standard, which is bad
   2. adding configuration option `shouldConvertRaggedUnionTypesToVarying`, 
which by default would be `true` is equally bad
   3. making it by default `false` leaves us with an option that's very hard to 
explain to the users what does it to. I would guess almost nobody would use it
   4. having more configuration rises the complexity of the system - more 
features that we need to maintain, support, refactor, test etc. This can 
quickly become a nightmare.
   5. it seems like very cosmetic change. In most cases it doesn't matter, 
since either the result is being printed or being stored in an external system 
that has it's own schema. As long as in this external system column is defined 
as `VARCHAR(x)` this issue doesn't matter. Isn't the scope of this limited 
purely to unit tests, where we need to compare to spaces?
   6. if we agree to introduce this change, should we also agree when next time 
someone reports that he/she would like to have things like:
   
   - `123.456` literal being double or decimal?
   - `'abc'` literal being `CHAR(3)` or `VARCHAR(3)`?
   - `'A  ' || 'B  '` returning `'AB'` or `'A  B  '`?
   - 'NULL == 0` being true, false or null?
   - `SUBSTRING('abc' FROM -2 FOR 4)` should yield 'a' or 'bc'?
   
   and the list goes on and on. We definitely can not provide a switch for all 
of such quirks. Having one switch like `mysql-compatibility-mode: true` would 
have more sense, however it's impossible to implement/maintain for different 
reasons.
   
   That's why I don't like changing the default behaviour and I see more 
problems then positives by introducing this as a configurable option. 
   
   However I see one issue here that would solve this problem. Since Flink does 
not support `CHAR(x)` anywhere besides string literals, and our `CHAR(x)` 
support seems to be broken (for example `'A  ' || 'B  '` should return `'A  B  
'`), maybe we should treat all string literals as varchars?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR

2018-08-24 Thread GitBox
pnowojski commented on issue #6519: [FLINK-9559] [table] The type of a union of 
CHAR columns of different lengths should be VARCHAR
URL: https://github.com/apache/flink/pull/6519#issuecomment-415699060
 
 
   @hequn8128 may I ask what is the use case that you are trying to solve? I'm 
asking because I have mixed feelings with this:
   
   1. as it is, it brakes standard, which is bad
   2. adding configuration option `shouldConvertRaggedUnionTypesToVarying`, 
which by default would be `true` is equally bad
   3. making it by default `false` leaves us with an option that's very hard to 
explain to the users what does it to. I would guess almost nobody would use it
   4. having more configuration rises the complexity of the system - more 
features that we need to maintain, support, refactor, test etc. This can 
quickly become a nightmare.
   5. it seems like very cosmetic change. In most cases it doesn't matter, 
since either the result is being printed or being stored in an external system 
that has it's own schema. As long as in this external system column is defined 
as `VARCHAR(x)` this issue doesn't matter. Isn't the scope of this limited 
purely to unit tests, where we need to compare to spaces?
   6. if we agree to introduce this change, should we also agree when next time 
someone reports that he/she would like to have things like:
   
   - `123.456` literal being double or decimal?
   - `'abc'` literal being `CHAR(3)` or `VARCHAR(3)`?
   - `'A  ' || 'B  '` returning `'AB'` or `'A  B  '`
   - 'NULL == 0` being true, false or null?
   - `SUBSTRING('abc' FROM -2 FOR 4)` should yield 'a' or 'bc'?
   
   and the list goes on and on. We definitely can not provide a switch for all 
of such quirks. Having one switch like `mysql-compatibility-mode: true` would 
have more sense, however it's impossible to implement/maintain for different 
reasons.
   
   That's why I don't like changing the default behaviour and I see more 
problems then positives by introducing this as a configurable option. 
   
   However I see one issue here that would solve this problem. Since Flink does 
not support `CHAR(x)` anywhere besides string literals, and our `CHAR(x)` 
support seems to be broken (for example `'A  ' || 'B  '` should return `'A  B  
'`), maybe we should treat all string literals as varchars?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9559) The type of a union of CHAR columns of different lengths should be VARCHAR

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591358#comment-16591358
 ] 

ASF GitHub Bot commented on FLINK-9559:
---

pnowojski commented on issue #6519: [FLINK-9559] [table] The type of a union of 
CHAR columns of different lengths should be VARCHAR
URL: https://github.com/apache/flink/pull/6519#issuecomment-415699060
 
 
   @hequn8128 may I ask what is the use case that you are trying to solve? I'm 
asking because I have mixed feelings with this:
   
   1. as it is, it brakes standard, which is bad
   2. adding configuration option `shouldConvertRaggedUnionTypesToVarying`, 
which by default would be `true` is equally bad
   3. making it by default `false` leaves us with an option that's very hard to 
explain to the users what does it to. I would guess almost nobody would use it
   4. having more configuration rises the complexity of the system - more 
features that we need to maintain, support, refactor, test etc. This can 
quickly become a nightmare.
   5. it seems like very cosmetic change. In most cases it doesn't matter, 
since either the result is being printed or being stored in an external system 
that has it's own schema. As long as in this external system column is defined 
as `VARCHAR(x)` this issue doesn't matter. Isn't the scope of this limited 
purely to unit tests, where we need to compare to spaces?
   6. if we agree to introduce this change, should we also agree when next time 
someone reports that he/she would like to have things like:
   
   - `123.456` literal being double or decimal?
   - `'abc'` literal being `CHAR(3)` or `VARCHAR(3)`?
   - `'A  ' || 'B  '` returning `'AB'` or `'A  B  '`
   - 'NULL == 0` being true, false or null?
   - `SUBSTRING('abc' FROM -2 FOR 4)` should yield 'a' or 'bc'?
   
   and the list goes on and on. We definitely can not provide a switch for all 
of such quirks. Having one switch like `mysql-compatibility-mode: true` would 
have more sense, however it's impossible to implement/maintain for different 
reasons.
   
   That's why I don't like changing the default behaviour and I see more 
problems then positives by introducing this as a configurable option. 
   
   However I see one issue here that would solve this problem. Since Flink does 
not support `CHAR(x)` anywhere besides string literals, and our `CHAR(x)` 
support seems to be broken (for example `'A  ' || 'B  '` should return `'A  B  
'`), maybe we should treat all string literals as varchars?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The type of a union of CHAR columns of different lengths should be VARCHAR
> --
>
> Key: FLINK-9559
> URL: https://issues.apache.org/jira/browse/FLINK-9559
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, If the case-when expression has two branches which return string 
> literal, redundant white spaces will be appended to the short string literal. 
> For example, for the sql: case 1 when 1 then 'a' when 2 then 'bcd' end, the 
> return value will be 'a ' of CHAR(3) instead of 'a'.
> Although, this follows the behavior in strict SQL standard mode(SQL:2003). We 
> should get the pragmatic return type in a real scenario without blank-padded. 
> Happily, this problem has been fixed by 
> [CALCITE-2321|https://issues.apache.org/jira/browse/CALCITE-2321], we can 
> upgrade calcite to the next release(1.17.0) and override 
> {{RelDataTypeSystem}} in flink to configure the return type, i.e., making 
> {{shouldConvertRaggedUnionTypesToVarying()}} return true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10209) Exclude jdk.tools dependency from hadoop when running with java 9

2018-08-24 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-10209:


 Summary: Exclude jdk.tools dependency from hadoop when running 
with java 9
 Key: FLINK-10209
 URL: https://issues.apache.org/jira/browse/FLINK-10209
 Project: Flink
  Issue Type: Sub-task
  Components: Build System
Affects Versions: 1.7.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.7.0


{{hadoop-common}} has a {{jdk.tools}} dependency which cannot be resolved on 
java 9. At least for compiling we have to exclude this dependency.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10208) Bump mockito to 2.0+

2018-08-24 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-10208:


 Summary: Bump mockito to 2.0+
 Key: FLINK-10208
 URL: https://issues.apache.org/jira/browse/FLINK-10208
 Project: Flink
  Issue Type: Sub-task
  Components: Build System, Tests
Affects Versions: 1.7.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.7.0


Mockito only properly supports java 9 with version 2. We have to bump the 
dependency and fix various API incompatibilities.

Additionally we could investigate whether we still need powermock after bumping 
the dependency (which we'd also have to bump otherwise).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10207) Bump checkstyle-plugo to 8.9

2018-08-24 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-10207:


 Summary: Bump checkstyle-plugo to 8.9
 Key: FLINK-10207
 URL: https://issues.apache.org/jira/browse/FLINK-10207
 Project: Flink
  Issue Type: Sub-task
  Components: Build System
Affects Versions: 1.7.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.7.0


Our current checkstyle version (8.4) is incompatible with java 9, the earliest 
version to work properly is 8.9.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8033) Build Flink with JDK 9

2018-08-24 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-8033:
---

Assignee: Chesnay Schepler

> Build Flink with JDK 9
> --
>
> Key: FLINK-8033
> URL: https://issues.apache.org/jira/browse/FLINK-8033
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Hai Zhou
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.7.0
>
>
> This is a JIRA to track all issues that found to make Flink compatible with 
> Java 9.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10142) Reduce synchronization overhead for credit notifications

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591331#comment-16591331
 ] 

ASF GitHub Bot commented on FLINK-10142:


zhijiangW commented on issue #6555: [FLINK-10142][network] reduce locking 
around credit notification
URL: https://github.com/apache/flink/pull/6555#issuecomment-415692414
 
 
   Thanks @NicoK  for polishing the related processes.
   Actually there are two main changes in this pr:
   1. Remove some strict checks to reduce overheads in common cases.
   2. Remove notifications outside of the lock part.
   
   I reviewed the above modifications and seemed good to me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce synchronization overhead for credit notifications
> 
>
> Key: FLINK-10142
> URL: https://issues.apache.org/jira/browse/FLINK-10142
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.2, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> When credit-based flow control was introduced, we also added some checks and 
> optimisations for uncommon code paths that make common code paths 
> unnecessarily more expensive, e.g. checking whether a channel was released 
> before forwarding a credit notification to Netty. Such checks would have to 
> be confirmed by the Netty thread anyway and thus only add additional load for 
> something that happens only once (per channel).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10204) Job is marked as FAILED after serialization exception

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591329#comment-16591329
 ] 

ASF GitHub Bot commented on FLINK-10204:


zentol commented on a change in pull request #6610: [FLINK-10204] - fix 
serialization/copy error for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610#discussion_r212558074
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
 ##
 @@ -67,31 +67,40 @@ public int getSubtaskIndex() {
// 

 
@Override
-   public boolean equals(Object o) {
-   if (this == o) {
+   public boolean equals(Object obj) {
+   if (this == obj) {
return true;
}
-   if (o == null || getClass() != o.getClass()){
+   if (obj == null) {
return false;
}
-
-   LatencyMarker that = (LatencyMarker) o;
-
-   if (markedTime != that.markedTime) {
+   if (getClass() != obj.getClass()) {
return false;
}
-   if (operatorId != that.operatorId) {
+   LatencyMarker other = (LatencyMarker) obj;
+   if (markedTime != other.markedTime) {
return false;
}
-   return subtaskIndex == that.subtaskIndex;
-
+   if (operatorId == null) {
+   if (other.operatorId != null) {
+   return false;
+   }
+   } else if (!operatorId.equals(other.operatorId)) {
+   return false;
+   }
+   if (subtaskIndex != other.subtaskIndex) {
+   return false;
+   }
+   return true;
}
 
@Override
public int hashCode() {
-   int result = (int) (markedTime ^ (markedTime >>> 32));
 
 Review comment:
   seems unrelated


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Job is marked as FAILED after serialization exception
> -
>
> Key: FLINK-10204
> URL: https://issues.apache.org/jira/browse/FLINK-10204
> Project: Flink
>  Issue Type: Bug
>Reporter: Ben La Monica
>Priority: Major
>  Labels: pull-request-available
>
> We have a long running flink job that eventually fails and is shut down due 
> to an internal serialization exception that we keep on getting. Here is the 
> stack trace:
> {code:java}
> 2018-08-23 18:39:48,199 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation 
> (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED.
> java.io.IOException: Corrupt stream, found tag: 127
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219)
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
> at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206)
> at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748){code}
>  
> I think I have tracked down the issue to a mismatch in the 
> serialization/deserialization/copy code in the StreamElementSerializer with 
> regards to the LATENCY_MARKER.
> The Serialization logic writes 3 LONGs and an INT. The copy logic only writes 
> (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an 
> EOFException, and fixing the copy code causes the test to pass again.
> I've written a unit test that highlights the problem, and have written the 
> code to correct it.
> I'll submit a PR that goes along with it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhijiangW commented on issue #6555: [FLINK-10142][network] reduce locking around credit notification

2018-08-24 Thread GitBox
zhijiangW commented on issue #6555: [FLINK-10142][network] reduce locking 
around credit notification
URL: https://github.com/apache/flink/pull/6555#issuecomment-415692414
 
 
   Thanks @NicoK  for polishing the related processes.
   Actually there are two main changes in this pr:
   1. Remove some strict checks to reduce overheads in common cases.
   2. Remove notifications outside of the lock part.
   
   I reviewed the above modifications and seemed good to me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] xccui commented on issue #6605: [FLINK-10201] [table] [test] The batchTestUtil was mistakenly used in some stream sql tests

2018-08-24 Thread GitBox
xccui commented on issue #6605: [FLINK-10201] [table] [test] The batchTestUtil 
was mistakenly used in some stream sql tests
URL: https://github.com/apache/flink/pull/6605#issuecomment-415692162
 
 
   Thanks for your review, @fhueske. I'll merge this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records.

2018-08-24 Thread GitBox
zentol commented on a change in pull request #6610: [FLINK-10204] - fix 
serialization/copy error for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610#discussion_r212558074
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
 ##
 @@ -67,31 +67,40 @@ public int getSubtaskIndex() {
// 

 
@Override
-   public boolean equals(Object o) {
-   if (this == o) {
+   public boolean equals(Object obj) {
+   if (this == obj) {
return true;
}
-   if (o == null || getClass() != o.getClass()){
+   if (obj == null) {
return false;
}
-
-   LatencyMarker that = (LatencyMarker) o;
-
-   if (markedTime != that.markedTime) {
+   if (getClass() != obj.getClass()) {
return false;
}
-   if (operatorId != that.operatorId) {
+   LatencyMarker other = (LatencyMarker) obj;
+   if (markedTime != other.markedTime) {
return false;
}
-   return subtaskIndex == that.subtaskIndex;
-
+   if (operatorId == null) {
+   if (other.operatorId != null) {
+   return false;
+   }
+   } else if (!operatorId.equals(other.operatorId)) {
+   return false;
+   }
+   if (subtaskIndex != other.subtaskIndex) {
+   return false;
+   }
+   return true;
}
 
@Override
public int hashCode() {
-   int result = (int) (markedTime ^ (markedTime >>> 32));
 
 Review comment:
   seems unrelated


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10204) Job is marked as FAILED after serialization exception

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591330#comment-16591330
 ] 

ASF GitHub Bot commented on FLINK-10204:


zentol commented on a change in pull request #6610: [FLINK-10204] - fix 
serialization/copy error for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610#discussion_r212558104
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
 ##
 @@ -67,31 +67,40 @@ public int getSubtaskIndex() {
// 

 
@Override
-   public boolean equals(Object o) {
-   if (this == o) {
+   public boolean equals(Object obj) {
 
 Review comment:
   was the existing implementation wrong?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Job is marked as FAILED after serialization exception
> -
>
> Key: FLINK-10204
> URL: https://issues.apache.org/jira/browse/FLINK-10204
> Project: Flink
>  Issue Type: Bug
>Reporter: Ben La Monica
>Priority: Major
>  Labels: pull-request-available
>
> We have a long running flink job that eventually fails and is shut down due 
> to an internal serialization exception that we keep on getting. Here is the 
> stack trace:
> {code:java}
> 2018-08-23 18:39:48,199 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation 
> (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED.
> java.io.IOException: Corrupt stream, found tag: 127
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219)
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
> at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206)
> at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748){code}
>  
> I think I have tracked down the issue to a mismatch in the 
> serialization/deserialization/copy code in the StreamElementSerializer with 
> regards to the LATENCY_MARKER.
> The Serialization logic writes 3 LONGs and an INT. The copy logic only writes 
> (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an 
> EOFException, and fixing the copy code causes the test to pass again.
> I've written a unit test that highlights the problem, and have written the 
> code to correct it.
> I'll submit a PR that goes along with it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10201) The batchTestUtil was mistakenly used in some stream sql tests

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591328#comment-16591328
 ] 

ASF GitHub Bot commented on FLINK-10201:


xccui commented on issue #6605: [FLINK-10201] [table] [test] The batchTestUtil 
was mistakenly used in some stream sql tests
URL: https://github.com/apache/flink/pull/6605#issuecomment-415692162
 
 
   Thanks for your review, @fhueske. I'll merge this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The batchTestUtil was mistakenly used in some stream sql tests
> --
>
> Key: FLINK-10201
> URL: https://issues.apache.org/jira/browse/FLINK-10201
> Project: Flink
>  Issue Type: Test
>  Components: Table API  SQL
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Minor
>  Labels: pull-request-available
>
> The {{batchTestUtil}} was mistakenly used in stream sql tests 
> {{SetOperatorsTest.testValuesWithCast()}} and 
> {{CorrelateTest.testLeftOuterJoinAsSubQuery()}}. That should be fixed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records.

2018-08-24 Thread GitBox
zentol commented on a change in pull request #6610: [FLINK-10204] - fix 
serialization/copy error for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610#discussion_r212558104
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
 ##
 @@ -67,31 +67,40 @@ public int getSubtaskIndex() {
// 

 
@Override
-   public boolean equals(Object o) {
-   if (this == o) {
+   public boolean equals(Object obj) {
 
 Review comment:
   was the existing implementation wrong?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9626) Possible resource leak in FileSystem

2018-08-24 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591323#comment-16591323
 ] 

Piotr Nowojski commented on FLINK-9626:
---

I have reported this as a candidate to explain resource leak reported by a 
user, however in the meantime we have found much more likely cause of it: 
https://issues.apache.org/jira/browse/HADOOP-15658 so I'm decreasing priority 
of this issue.

> Possible resource leak in FileSystem
> 
>
> Key: FLINK-9626
> URL: https://issues.apache.org/jira/browse/FLINK-9626
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> There is a potential resource leak in 
> org.apache.flink.core.fs.FileSystem#getUnguardedFileSystem.
> Inside it there is a code:
>  
> {code:java}
> // this "default" initialization makes sure that the FileSystem class works
> // even when not configured with an explicit Flink configuration, like on
> // JobManager or TaskManager setup
> if (FS_FACTORIES.isEmpty()) {
>initialize(new Configuration());
> }
> {code}
> which is executed on each cache miss. However this initialize method is also 
> doing
>  
>  
> {code:java}
> CACHE.clear();
> {code}
> without closing file systems in CACHE (this could be problematic for 
> HadoopFileSystem which is a wrapper around org.apache.hadoop.fs.FileSystem 
> which is closable).
> Now if for example we are constantly accessing two different file systems 
> (file systems are differentiated by combination of [schema and 
> authority|https://en.wikipedia.org/wiki/Uniform_Resource_Identifier#Generic_syntax]
>  part from the file system's URI) initialized from FALLBACK_FACTORY, each 
> time we call getUnguardedFileSystem for one of them, that call will clear 
> from CACHE entry for the other one. Thus we will constantly be creating new 
> FileSystems without closing them.
> Solution could be to either not clear the CACHE or make sure that FileSystems 
> are properly closed.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9626) Possible resource leak in FileSystem

2018-08-24 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-9626:
--
Priority: Major  (was: Critical)

> Possible resource leak in FileSystem
> 
>
> Key: FLINK-9626
> URL: https://issues.apache.org/jira/browse/FLINK-9626
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> There is a potential resource leak in 
> org.apache.flink.core.fs.FileSystem#getUnguardedFileSystem.
> Inside it there is a code:
>  
> {code:java}
> // this "default" initialization makes sure that the FileSystem class works
> // even when not configured with an explicit Flink configuration, like on
> // JobManager or TaskManager setup
> if (FS_FACTORIES.isEmpty()) {
>initialize(new Configuration());
> }
> {code}
> which is executed on each cache miss. However this initialize method is also 
> doing
>  
>  
> {code:java}
> CACHE.clear();
> {code}
> without closing file systems in CACHE (this could be problematic for 
> HadoopFileSystem which is a wrapper around org.apache.hadoop.fs.FileSystem 
> which is closable).
> Now if for example we are constantly accessing two different file systems 
> (file systems are differentiated by combination of [schema and 
> authority|https://en.wikipedia.org/wiki/Uniform_Resource_Identifier#Generic_syntax]
>  part from the file system's URI) initialized from FALLBACK_FACTORY, each 
> time we call getUnguardedFileSystem for one of them, that call will clear 
> from CACHE entry for the other one. Thus we will constantly be creating new 
> FileSystems without closing them.
> Solution could be to either not clear the CACHE or make sure that FileSystems 
> are properly closed.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10206) Add hbase stream connector

2018-08-24 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang reassigned FLINK-10206:
---

Assignee: Shimin Yang

> Add hbase stream connector
> --
>
> Key: FLINK-10206
> URL: https://issues.apache.org/jira/browse/FLINK-10206
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Igloo
>Assignee: Shimin Yang
>Priority: Critical
> Fix For: 1.6.1
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> Now, there is a hbase connector for batch operation. 
>  
> In some cases, we need to save Streaming result into hbase.  Just like 
> cassandra streaming sink implementations. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591292#comment-16591292
 ] 

ASF GitHub Bot commented on FLINK-9061:
---

StephanEwen commented on issue #6604: [FLINK-9061] Optionally add entropy to 
checkpoint paths better S3 scalability
URL: https://github.com/apache/flink/pull/6604#issuecomment-415683885
 
 
   Test failures fixed - needed to adjust the mocking in some other tests. 
Classic ;-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>  Labels: pull-request-available
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StephanEwen commented on issue #6604: [FLINK-9061] Optionally add entropy to checkpoint paths better S3 scalability

2018-08-24 Thread GitBox
StephanEwen commented on issue #6604: [FLINK-9061] Optionally add entropy to 
checkpoint paths better S3 scalability
URL: https://github.com/apache/flink/pull/6604#issuecomment-415683885
 
 
   Test failures fixed - needed to adjust the mocking in some other tests. 
Classic ;-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10206) Add hbase stream connector

2018-08-24 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591288#comment-16591288
 ] 

buptljy commented on FLINK-10206:
-

[~igloo1986] I think we should have a HBaseBatchTableSink first? From what I 
see in org.apache.flink.addons.hbase.example.HBaseWriteExample, it's not a 
proper way to sink hbase data.

> Add hbase stream connector
> --
>
> Key: FLINK-10206
> URL: https://issues.apache.org/jira/browse/FLINK-10206
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Igloo
>Priority: Critical
> Fix For: 1.6.1
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> Now, there is a hbase connector for batch operation. 
>  
> In some cases, we need to save Streaming result into hbase.  Just like 
> cassandra streaming sink implementations. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-08-24 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r212542167
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A subclass of {@link ParquetInputFormat} to read from Parquet files and 
convert to {@link Row}.
+ * It is mainly used to integrate with table API and batch SQL.
+ */
+public class ParquetRowInputFormat extends ParquetInputFormat implements 
ResultTypeQueryable {
 
 Review comment:
   This PR is already quite large. A `ParquetTableSource` should be added 
later, IMO.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591254#comment-16591254
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r212542167
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A subclass of {@link ParquetInputFormat} to read from Parquet files and 
convert to {@link Row}.
+ * It is mainly used to integrate with table API and batch SQL.
+ */
+public class ParquetRowInputFormat extends ParquetInputFormat implements 
ResultTypeQueryable {
 
 Review comment:
   This PR is already quite large. A `ParquetTableSource` should be added 
later, IMO.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-08-24 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591251#comment-16591251
 ] 

Fabian Hueske commented on FLINK-10205:
---

Most jobs are implemented in a way that the split assignment does not affect 
the semantics of the job. 
I don't think we should restrict input split assignment to make the small set 
of jobs with non-deterministic logic behave deterministic.
Also, there are a few more sources of non-determinism (order, partitioning) 
that would need to be removed. The orchestration overhead to make all 
operations behave determistically is too high.

You can implement a custom 
[InputSplitAssigner|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java]
 if you want to have deterministic input split assignment.

I would close this issue as "Won't Fix"


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10206) Add hbase stream connector

2018-08-24 Thread Igloo (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igloo updated FLINK-10206:
--
Description: 
Now, there is a hbase connector for batch operation. 

 

In some cases, we need to save Streaming result into hbase.  Just like 
cassandra streaming sink implementations. 

 

  was:
Now, there is a  connector for batch operation.  In some cases, we need to save 
Streaming result into hbase. Just like cassandra streaming sink 
implementations. 

 


> Add hbase stream connector
> --
>
> Key: FLINK-10206
> URL: https://issues.apache.org/jira/browse/FLINK-10206
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Igloo
>Priority: Critical
> Fix For: 1.6.1
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> Now, there is a hbase connector for batch operation. 
>  
> In some cases, we need to save Streaming result into hbase.  Just like 
> cassandra streaming sink implementations. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10206) Add hbase stream connector

2018-08-24 Thread Igloo (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591237#comment-16591237
 ] 

Igloo commented on FLINK-10206:
---

is that necessary? I would like to do it.

> Add hbase stream connector
> --
>
> Key: FLINK-10206
> URL: https://issues.apache.org/jira/browse/FLINK-10206
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Igloo
>Priority: Critical
> Fix For: 1.6.1
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> Now, there is a  connector for batch operation.  In some cases, we need to 
> save Streaming result into hbase. Just like cassandra streaming sink 
> implementations. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10206) Add hbase stream connector

2018-08-24 Thread Igloo (JIRA)
Igloo created FLINK-10206:
-

 Summary: Add hbase stream connector
 Key: FLINK-10206
 URL: https://issues.apache.org/jira/browse/FLINK-10206
 Project: Flink
  Issue Type: Improvement
  Components: Streaming Connectors
Affects Versions: 1.6.0
Reporter: Igloo
 Fix For: 1.6.1


Now, there is a  connector for batch operation.  In some cases, we need to save 
Streaming result into hbase. Just like cassandra streaming sink 
implementations. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-08-24 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591218#comment-16591218
 ] 

vinoyang commented on FLINK-10205:
--

[~isunjin] Can you assign this issue to yourself? If not, you may need to apply 
for the Contributor permission first.

> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #6609: [hotfix][docs] missing breaking metrics page

2018-08-24 Thread GitBox
asfgit closed pull request #6609: [hotfix][docs] missing  breaking metrics 
page
URL: https://github.com/apache/flink/pull/6609
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index a93853c14dc..7d88a36393c 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1486,6 +1486,7 @@ Thus, in order to infer the metric identifier:
   bytesRequestedPerFetch
   stream, shardId
   The bytes requested (2 Mbps / loopFrequencyHz) in a single call to 
getRecords.
+  
   Gauge
 
   


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591208#comment-16591208
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] 
Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r212531176
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
 
 Review comment:
   Should we support filter pushdown?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-7243) Add ParquetInputFormat

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-7243:
--
Labels: pull-request-available  (was: )

> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-7243) Add ParquetInputFormat

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-7243:
--
Labels: pull-request-available  (was: )

> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591206#comment-16591206
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] 
Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r212530605
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
 ##
 @@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Schema converter converts Parquet schema to and from Flink internal types.
+ */
+public class ParquetSchemaConverter {
+   public static final String MAP_KEY = "key";
+   public static final String MAP_VALUE = "value";
+   public static final String LIST_ELEMENT = "array";
+   public static final String MESSAGE_ROOT = "root";
+   private static final AvroSchemaConverter SCHEMA_CONVERTER = new 
AvroSchemaConverter();
+
+   public static TypeInformation fromParquetType(MessageType type) {
+   return convertFields(type.getFields());
+   }
+
+   public static MessageType toParquetType(TypeInformation 
typeInformation) {
+   return (MessageType) convertField(null, typeInformation, 
Type.Repetition.OPTIONAL);
+   }
+
+   private static TypeInformation convertFields(List 
parquetFields) {
+   List> types = new ArrayList<>();
+   List names = new ArrayList<>();
+   for (Type field : parquetFields) {
+   TypeInformation subType = convertField(field);
+   if (subType != null) {
+   types.add(subType);
+   names.add(field.getName());
+   }
+   }
+
+   return new RowTypeInfo(types.toArray(new 
TypeInformation[types.size()]),
+   names.toArray(new String[names.size()]));
+   }
+
+   private static TypeInformation convertField(final Type fieldType) {
+   TypeInformation typeInfo = null;
+   if (fieldType.isPrimitive()) {
+   PrimitiveType primitiveType = 
fieldType.asPrimitiveType();
 
 Review comment:
   How about support more types, SqlTimeTypeInfo, Decimal, etc


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-7243) Add ParquetInputFormat

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-7243:
--
Labels: pull-request-available  (was: )

> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591207#comment-16591207
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] 
Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r212530857
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A subclass of {@link ParquetInputFormat} to read from Parquet files and 
convert to {@link Row}.
+ * It is mainly used to integrate with table API and batch SQL.
+ */
+public class ParquetRowInputFormat extends ParquetInputFormat implements 
ResultTypeQueryable {
 
 Review comment:
   Offer a tablesource for SQL/Table API ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-08-24 Thread GitBox
docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] 
Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r212530605
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
 ##
 @@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Schema converter converts Parquet schema to and from Flink internal types.
+ */
+public class ParquetSchemaConverter {
+   public static final String MAP_KEY = "key";
+   public static final String MAP_VALUE = "value";
+   public static final String LIST_ELEMENT = "array";
+   public static final String MESSAGE_ROOT = "root";
+   private static final AvroSchemaConverter SCHEMA_CONVERTER = new 
AvroSchemaConverter();
+
+   public static TypeInformation fromParquetType(MessageType type) {
+   return convertFields(type.getFields());
+   }
+
+   public static MessageType toParquetType(TypeInformation 
typeInformation) {
+   return (MessageType) convertField(null, typeInformation, 
Type.Repetition.OPTIONAL);
+   }
+
+   private static TypeInformation convertFields(List 
parquetFields) {
+   List> types = new ArrayList<>();
+   List names = new ArrayList<>();
+   for (Type field : parquetFields) {
+   TypeInformation subType = convertField(field);
+   if (subType != null) {
+   types.add(subType);
+   names.add(field.getName());
+   }
+   }
+
+   return new RowTypeInfo(types.toArray(new 
TypeInformation[types.size()]),
+   names.toArray(new String[names.size()]));
+   }
+
+   private static TypeInformation convertField(final Type fieldType) {
+   TypeInformation typeInfo = null;
+   if (fieldType.isPrimitive()) {
+   PrimitiveType primitiveType = 
fieldType.asPrimitiveType();
 
 Review comment:
   How about support more types, SqlTimeTypeInfo, Decimal, etc


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-08-24 Thread GitBox
docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] 
Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r212531176
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
 
 Review comment:
   Should we support filter pushdown?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-08-24 Thread GitBox
docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] 
Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r212530857
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A subclass of {@link ParquetInputFormat} to read from Parquet files and 
convert to {@link Row}.
+ * It is mainly used to integrate with table API and batch SQL.
+ */
+public class ParquetRowInputFormat extends ParquetInputFormat implements 
ResultTypeQueryable {
 
 Review comment:
   Offer a tablesource for SQL/Table API ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10202) Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment

2018-08-24 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591188#comment-16591188
 ] 

buptljy commented on FLINK-10202:
-

[~StephanEwen] Here is my code. I think the checkpoint directory in 
CheckpointCoordinator is still empty even if you set direcotry in 
FsStateBackend.
{code:java}
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(15000)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.setStateBackend(new FsStateBackend(checkpointDir)){code}
 1. If I don't set state.checkpoint.dir, it will throw exception above.

 2. If i remove the third line, it will be okay.

> Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment
> ---
>
> Key: FLINK-10202
> URL: https://issues.apache.org/jira/browse/FLINK-10202
> Project: Flink
>  Issue Type: Improvement
>Reporter: buptljy
>Priority: Major
>
> Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we 
> run a flink job locally, and we're not able to set state.checkpoint.dir for 
> background wrapped cluster, which will cause 
> {code:java}
> throw new IllegalStateException("CheckpointConfig says to persist periodic " +
>   "checkpoints, but no checkpoint directory has been configured. You can 
> " +
>   "configure configure one via key '" + 
> ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
> {code}
> I wonder if we could provide a public method in *StreamExecutionEnvironment* 
> so that developers can use it to set state.checkpoint.dir for job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


<    1   2