[GitHub] tragicjun edited a comment on issue #6508: [Flink-10079] [table] Automatically register sink table from external catalogs
tragicjun edited a comment on issue #6508: [Flink-10079] [table] Automatically register sink table from external catalogs URL: https://github.com/apache/flink/pull/6508#issuecomment-415720615 CC @zentol @xccui would you please help review this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tragicjun commented on issue #6508: [Flink-10079] [table] Automatically register sink table from external catalogs
tragicjun commented on issue #6508: [Flink-10079] [table] Automatically register sink table from external catalogs URL: https://github.com/apache/flink/pull/6508#issuecomment-415720615 CC @zentol @xccui would you please review on it? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10136) Add REPEAT supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591420#comment-16591420 ] ASF GitHub Bot commented on FLINK-10136: xccui commented on issue #6597: [FLINK-10136] [table] Add REPEAT supported in Table API and SQL URL: https://github.com/apache/flink/pull/6597#issuecomment-415710651 Thanks for your work, @yanghua. The PR looks good to me. +1 to merge. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add REPEAT supported in Table API and SQL > - > > Key: FLINK-10136 > URL: https://issues.apache.org/jira/browse/FLINK-10136 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > Oracle : > [https://docs.oracle.com/cd/E17952_01/mysql-5.1-en/string-functions.html#function_repeat] > MySql: > https://dev.mysql.com/doc/refman/5.5/en/string-functions.html#function_repeat -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xccui commented on issue #6597: [FLINK-10136] [table] Add REPEAT supported in Table API and SQL
xccui commented on issue #6597: [FLINK-10136] [table] Add REPEAT supported in Table API and SQL URL: https://github.com/apache/flink/pull/6597#issuecomment-415710651 Thanks for your work, @yanghua. The PR looks good to me. +1 to merge. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-10192) SQL Client table visualization mode does not update correctly
[ https://issues.apache.org/jira/browse/FLINK-10192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther reassigned FLINK-10192: Assignee: Timo Walther > SQL Client table visualization mode does not update correctly > - > > Key: FLINK-10192 > URL: https://issues.apache.org/jira/browse/FLINK-10192 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.6.0 >Reporter: Fabian Hueske >Assignee: Timo Walther >Priority: Major > > The table visualization modes does not seem to update correctly. > When I run a query that groups and aggregates on a few (6) distinct keys, the > client visualizes some keys multiple times. Also the aggregated values do not > seem to be correct. > Due to the small number of keys, these get frequently updated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10204) Job is marked as FAILED after serialization exception
[ https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591396#comment-16591396 ] ASF GitHub Bot commented on FLINK-10204: StephanEwen commented on issue #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610#issuecomment-415706823 Good catch! Could you undo the changes to `LatencyMarker`? Keeps the file's change history clean/meaningful... Otherwise +1 to merge this! Should also be merged into the 1.5 and 1.6 branches... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Job is marked as FAILED after serialization exception > - > > Key: FLINK-10204 > URL: https://issues.apache.org/jira/browse/FLINK-10204 > Project: Flink > Issue Type: Bug >Reporter: Ben La Monica >Priority: Major > Labels: pull-request-available > > We have a long running flink job that eventually fails and is shut down due > to an internal serialization exception that we keep on getting. Here is the > stack trace: > {code:java} > 2018-08-23 18:39:48,199 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation > (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED. > java.io.IOException: Corrupt stream, found tag: 127 > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748){code} > > I think I have tracked down the issue to a mismatch in the > serialization/deserialization/copy code in the StreamElementSerializer with > regards to the LATENCY_MARKER. > The Serialization logic writes 3 LONGs and an INT. The copy logic only writes > (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an > EOFException, and fixing the copy code causes the test to pass again. > I've written a unit test that highlights the problem, and have written the > code to correct it. > I'll submit a PR that goes along with it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StephanEwen commented on issue #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records.
StephanEwen commented on issue #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610#issuecomment-415706823 Good catch! Could you undo the changes to `LatencyMarker`? Keeps the file's change history clean/meaningful... Otherwise +1 to merge this! Should also be merged into the 1.5 and 1.6 branches... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9559) The type of a union of CHAR columns of different lengths should be VARCHAR
[ https://issues.apache.org/jira/browse/FLINK-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591365#comment-16591365 ] ASF GitHub Bot commented on FLINK-9559: --- pnowojski edited a comment on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR URL: https://github.com/apache/flink/pull/6519#issuecomment-415699060 @hequn8128 may I ask what is the use case that you are trying to solve? I'm asking because I have mixed feelings with this: 1. as it is, it brakes standard, which is bad 2. adding configuration option `shouldConvertRaggedUnionTypesToVarying`, which by default would be `true` is equally bad 3. making it by default `false` leaves us with an option that's very hard to explain to the users what does it to. I would guess almost nobody would use it 4. having more configuration rises the complexity of the system - more features that we need to maintain, support, refactor, test etc. This can quickly become a nightmare. 5. it seems like very cosmetic change. In most cases it doesn't matter, since either the result is being printed or being stored in an external system that has it's own schema. As long as in this external system column is defined as `VARCHAR(x)` this issue doesn't matter. Isn't the scope of this limited purely to unit tests, where we need to compare to spaces? 6. if we agree to introduce this change, should we also agree when next time someone reports that he/she would like to have things like: - `123.456` literal being double or decimal? - `'abc'` literal being `CHAR(3)` or `VARCHAR(3)`? - `'A ' || 'B '` returning `'AB'` or `'A B '`? - 'NULL == 0` being true, false or null? - `SUBSTRING('abc' FROM -2 FOR 4)` should yield 'a' or 'bc'? and the list goes on and on. We definitely can not provide a switch for all of such quirks. Having one switch like `mysql-compatibility-mode: true` would have more sense, however it's impossible to implement/maintain for different reasons. That's why I don't like changing the default behaviour and I see more problems then positives by introducing this as a configurable option. However I see one issue here that would solve this problem. Since Flink does not support `CHAR(x)` anywhere besides string literals, and our `CHAR(x)` support seems to be broken (for example `'A ' || 'B '` should return `'A B '`), maybe we should treat all string literals as varchars? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The type of a union of CHAR columns of different lengths should be VARCHAR > -- > > Key: FLINK-9559 > URL: https://issues.apache.org/jira/browse/FLINK-9559 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Currently, If the case-when expression has two branches which return string > literal, redundant white spaces will be appended to the short string literal. > For example, for the sql: case 1 when 1 then 'a' when 2 then 'bcd' end, the > return value will be 'a ' of CHAR(3) instead of 'a'. > Although, this follows the behavior in strict SQL standard mode(SQL:2003). We > should get the pragmatic return type in a real scenario without blank-padded. > Happily, this problem has been fixed by > [CALCITE-2321|https://issues.apache.org/jira/browse/CALCITE-2321], we can > upgrade calcite to the next release(1.17.0) and override > {{RelDataTypeSystem}} in flink to configure the return type, i.e., making > {{shouldConvertRaggedUnionTypesToVarying()}} return true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski edited a comment on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR
pnowojski edited a comment on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR URL: https://github.com/apache/flink/pull/6519#issuecomment-415699060 @hequn8128 may I ask what is the use case that you are trying to solve? I'm asking because I have mixed feelings with this: 1. as it is, it brakes standard, which is bad 2. adding configuration option `shouldConvertRaggedUnionTypesToVarying`, which by default would be `true` is equally bad 3. making it by default `false` leaves us with an option that's very hard to explain to the users what does it to. I would guess almost nobody would use it 4. having more configuration rises the complexity of the system - more features that we need to maintain, support, refactor, test etc. This can quickly become a nightmare. 5. it seems like very cosmetic change. In most cases it doesn't matter, since either the result is being printed or being stored in an external system that has it's own schema. As long as in this external system column is defined as `VARCHAR(x)` this issue doesn't matter. Isn't the scope of this limited purely to unit tests, where we need to compare to spaces? 6. if we agree to introduce this change, should we also agree when next time someone reports that he/she would like to have things like: - `123.456` literal being double or decimal? - `'abc'` literal being `CHAR(3)` or `VARCHAR(3)`? - `'A ' || 'B '` returning `'AB'` or `'A B '`? - 'NULL == 0` being true, false or null? - `SUBSTRING('abc' FROM -2 FOR 4)` should yield 'a' or 'bc'? and the list goes on and on. We definitely can not provide a switch for all of such quirks. Having one switch like `mysql-compatibility-mode: true` would have more sense, however it's impossible to implement/maintain for different reasons. That's why I don't like changing the default behaviour and I see more problems then positives by introducing this as a configurable option. However I see one issue here that would solve this problem. Since Flink does not support `CHAR(x)` anywhere besides string literals, and our `CHAR(x)` support seems to be broken (for example `'A ' || 'B '` should return `'A B '`), maybe we should treat all string literals as varchars? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR
pnowojski commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR URL: https://github.com/apache/flink/pull/6519#issuecomment-415699060 @hequn8128 may I ask what is the use case that you are trying to solve? I'm asking because I have mixed feelings with this: 1. as it is, it brakes standard, which is bad 2. adding configuration option `shouldConvertRaggedUnionTypesToVarying`, which by default would be `true` is equally bad 3. making it by default `false` leaves us with an option that's very hard to explain to the users what does it to. I would guess almost nobody would use it 4. having more configuration rises the complexity of the system - more features that we need to maintain, support, refactor, test etc. This can quickly become a nightmare. 5. it seems like very cosmetic change. In most cases it doesn't matter, since either the result is being printed or being stored in an external system that has it's own schema. As long as in this external system column is defined as `VARCHAR(x)` this issue doesn't matter. Isn't the scope of this limited purely to unit tests, where we need to compare to spaces? 6. if we agree to introduce this change, should we also agree when next time someone reports that he/she would like to have things like: - `123.456` literal being double or decimal? - `'abc'` literal being `CHAR(3)` or `VARCHAR(3)`? - `'A ' || 'B '` returning `'AB'` or `'A B '` - 'NULL == 0` being true, false or null? - `SUBSTRING('abc' FROM -2 FOR 4)` should yield 'a' or 'bc'? and the list goes on and on. We definitely can not provide a switch for all of such quirks. Having one switch like `mysql-compatibility-mode: true` would have more sense, however it's impossible to implement/maintain for different reasons. That's why I don't like changing the default behaviour and I see more problems then positives by introducing this as a configurable option. However I see one issue here that would solve this problem. Since Flink does not support `CHAR(x)` anywhere besides string literals, and our `CHAR(x)` support seems to be broken (for example `'A ' || 'B '` should return `'A B '`), maybe we should treat all string literals as varchars? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9559) The type of a union of CHAR columns of different lengths should be VARCHAR
[ https://issues.apache.org/jira/browse/FLINK-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591358#comment-16591358 ] ASF GitHub Bot commented on FLINK-9559: --- pnowojski commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR URL: https://github.com/apache/flink/pull/6519#issuecomment-415699060 @hequn8128 may I ask what is the use case that you are trying to solve? I'm asking because I have mixed feelings with this: 1. as it is, it brakes standard, which is bad 2. adding configuration option `shouldConvertRaggedUnionTypesToVarying`, which by default would be `true` is equally bad 3. making it by default `false` leaves us with an option that's very hard to explain to the users what does it to. I would guess almost nobody would use it 4. having more configuration rises the complexity of the system - more features that we need to maintain, support, refactor, test etc. This can quickly become a nightmare. 5. it seems like very cosmetic change. In most cases it doesn't matter, since either the result is being printed or being stored in an external system that has it's own schema. As long as in this external system column is defined as `VARCHAR(x)` this issue doesn't matter. Isn't the scope of this limited purely to unit tests, where we need to compare to spaces? 6. if we agree to introduce this change, should we also agree when next time someone reports that he/she would like to have things like: - `123.456` literal being double or decimal? - `'abc'` literal being `CHAR(3)` or `VARCHAR(3)`? - `'A ' || 'B '` returning `'AB'` or `'A B '` - 'NULL == 0` being true, false or null? - `SUBSTRING('abc' FROM -2 FOR 4)` should yield 'a' or 'bc'? and the list goes on and on. We definitely can not provide a switch for all of such quirks. Having one switch like `mysql-compatibility-mode: true` would have more sense, however it's impossible to implement/maintain for different reasons. That's why I don't like changing the default behaviour and I see more problems then positives by introducing this as a configurable option. However I see one issue here that would solve this problem. Since Flink does not support `CHAR(x)` anywhere besides string literals, and our `CHAR(x)` support seems to be broken (for example `'A ' || 'B '` should return `'A B '`), maybe we should treat all string literals as varchars? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The type of a union of CHAR columns of different lengths should be VARCHAR > -- > > Key: FLINK-9559 > URL: https://issues.apache.org/jira/browse/FLINK-9559 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Currently, If the case-when expression has two branches which return string > literal, redundant white spaces will be appended to the short string literal. > For example, for the sql: case 1 when 1 then 'a' when 2 then 'bcd' end, the > return value will be 'a ' of CHAR(3) instead of 'a'. > Although, this follows the behavior in strict SQL standard mode(SQL:2003). We > should get the pragmatic return type in a real scenario without blank-padded. > Happily, this problem has been fixed by > [CALCITE-2321|https://issues.apache.org/jira/browse/CALCITE-2321], we can > upgrade calcite to the next release(1.17.0) and override > {{RelDataTypeSystem}} in flink to configure the return type, i.e., making > {{shouldConvertRaggedUnionTypesToVarying()}} return true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10209) Exclude jdk.tools dependency from hadoop when running with java 9
Chesnay Schepler created FLINK-10209: Summary: Exclude jdk.tools dependency from hadoop when running with java 9 Key: FLINK-10209 URL: https://issues.apache.org/jira/browse/FLINK-10209 Project: Flink Issue Type: Sub-task Components: Build System Affects Versions: 1.7.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.7.0 {{hadoop-common}} has a {{jdk.tools}} dependency which cannot be resolved on java 9. At least for compiling we have to exclude this dependency. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10208) Bump mockito to 2.0+
Chesnay Schepler created FLINK-10208: Summary: Bump mockito to 2.0+ Key: FLINK-10208 URL: https://issues.apache.org/jira/browse/FLINK-10208 Project: Flink Issue Type: Sub-task Components: Build System, Tests Affects Versions: 1.7.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.7.0 Mockito only properly supports java 9 with version 2. We have to bump the dependency and fix various API incompatibilities. Additionally we could investigate whether we still need powermock after bumping the dependency (which we'd also have to bump otherwise). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10207) Bump checkstyle-plugo to 8.9
Chesnay Schepler created FLINK-10207: Summary: Bump checkstyle-plugo to 8.9 Key: FLINK-10207 URL: https://issues.apache.org/jira/browse/FLINK-10207 Project: Flink Issue Type: Sub-task Components: Build System Affects Versions: 1.7.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.7.0 Our current checkstyle version (8.4) is incompatible with java 9, the earliest version to work properly is 8.9. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-8033) Build Flink with JDK 9
[ https://issues.apache.org/jira/browse/FLINK-8033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-8033: --- Assignee: Chesnay Schepler > Build Flink with JDK 9 > -- > > Key: FLINK-8033 > URL: https://issues.apache.org/jira/browse/FLINK-8033 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.4.0 >Reporter: Hai Zhou >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.7.0 > > > This is a JIRA to track all issues that found to make Flink compatible with > Java 9. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10142) Reduce synchronization overhead for credit notifications
[ https://issues.apache.org/jira/browse/FLINK-10142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591331#comment-16591331 ] ASF GitHub Bot commented on FLINK-10142: zhijiangW commented on issue #6555: [FLINK-10142][network] reduce locking around credit notification URL: https://github.com/apache/flink/pull/6555#issuecomment-415692414 Thanks @NicoK for polishing the related processes. Actually there are two main changes in this pr: 1. Remove some strict checks to reduce overheads in common cases. 2. Remove notifications outside of the lock part. I reviewed the above modifications and seemed good to me. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce synchronization overhead for credit notifications > > > Key: FLINK-10142 > URL: https://issues.apache.org/jira/browse/FLINK-10142 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.2, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > When credit-based flow control was introduced, we also added some checks and > optimisations for uncommon code paths that make common code paths > unnecessarily more expensive, e.g. checking whether a channel was released > before forwarding a credit notification to Netty. Such checks would have to > be confirmed by the Netty thread anyway and thus only add additional load for > something that happens only once (per channel). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10204) Job is marked as FAILED after serialization exception
[ https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591329#comment-16591329 ] ASF GitHub Bot commented on FLINK-10204: zentol commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610#discussion_r212558074 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java ## @@ -67,31 +67,40 @@ public int getSubtaskIndex() { // @Override - public boolean equals(Object o) { - if (this == o) { + public boolean equals(Object obj) { + if (this == obj) { return true; } - if (o == null || getClass() != o.getClass()){ + if (obj == null) { return false; } - - LatencyMarker that = (LatencyMarker) o; - - if (markedTime != that.markedTime) { + if (getClass() != obj.getClass()) { return false; } - if (operatorId != that.operatorId) { + LatencyMarker other = (LatencyMarker) obj; + if (markedTime != other.markedTime) { return false; } - return subtaskIndex == that.subtaskIndex; - + if (operatorId == null) { + if (other.operatorId != null) { + return false; + } + } else if (!operatorId.equals(other.operatorId)) { + return false; + } + if (subtaskIndex != other.subtaskIndex) { + return false; + } + return true; } @Override public int hashCode() { - int result = (int) (markedTime ^ (markedTime >>> 32)); Review comment: seems unrelated This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Job is marked as FAILED after serialization exception > - > > Key: FLINK-10204 > URL: https://issues.apache.org/jira/browse/FLINK-10204 > Project: Flink > Issue Type: Bug >Reporter: Ben La Monica >Priority: Major > Labels: pull-request-available > > We have a long running flink job that eventually fails and is shut down due > to an internal serialization exception that we keep on getting. Here is the > stack trace: > {code:java} > 2018-08-23 18:39:48,199 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation > (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED. > java.io.IOException: Corrupt stream, found tag: 127 > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748){code} > > I think I have tracked down the issue to a mismatch in the > serialization/deserialization/copy code in the StreamElementSerializer with > regards to the LATENCY_MARKER. > The Serialization logic writes 3 LONGs and an INT. The copy logic only writes > (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an > EOFException, and fixing the copy code causes the test to pass again. > I've written a unit test that highlights the problem, and have written the > code to correct it. > I'll submit a PR that goes along with it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhijiangW commented on issue #6555: [FLINK-10142][network] reduce locking around credit notification
zhijiangW commented on issue #6555: [FLINK-10142][network] reduce locking around credit notification URL: https://github.com/apache/flink/pull/6555#issuecomment-415692414 Thanks @NicoK for polishing the related processes. Actually there are two main changes in this pr: 1. Remove some strict checks to reduce overheads in common cases. 2. Remove notifications outside of the lock part. I reviewed the above modifications and seemed good to me. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xccui commented on issue #6605: [FLINK-10201] [table] [test] The batchTestUtil was mistakenly used in some stream sql tests
xccui commented on issue #6605: [FLINK-10201] [table] [test] The batchTestUtil was mistakenly used in some stream sql tests URL: https://github.com/apache/flink/pull/6605#issuecomment-415692162 Thanks for your review, @fhueske. I'll merge this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records.
zentol commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610#discussion_r212558074 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java ## @@ -67,31 +67,40 @@ public int getSubtaskIndex() { // @Override - public boolean equals(Object o) { - if (this == o) { + public boolean equals(Object obj) { + if (this == obj) { return true; } - if (o == null || getClass() != o.getClass()){ + if (obj == null) { return false; } - - LatencyMarker that = (LatencyMarker) o; - - if (markedTime != that.markedTime) { + if (getClass() != obj.getClass()) { return false; } - if (operatorId != that.operatorId) { + LatencyMarker other = (LatencyMarker) obj; + if (markedTime != other.markedTime) { return false; } - return subtaskIndex == that.subtaskIndex; - + if (operatorId == null) { + if (other.operatorId != null) { + return false; + } + } else if (!operatorId.equals(other.operatorId)) { + return false; + } + if (subtaskIndex != other.subtaskIndex) { + return false; + } + return true; } @Override public int hashCode() { - int result = (int) (markedTime ^ (markedTime >>> 32)); Review comment: seems unrelated This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10204) Job is marked as FAILED after serialization exception
[ https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591330#comment-16591330 ] ASF GitHub Bot commented on FLINK-10204: zentol commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610#discussion_r212558104 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java ## @@ -67,31 +67,40 @@ public int getSubtaskIndex() { // @Override - public boolean equals(Object o) { - if (this == o) { + public boolean equals(Object obj) { Review comment: was the existing implementation wrong? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Job is marked as FAILED after serialization exception > - > > Key: FLINK-10204 > URL: https://issues.apache.org/jira/browse/FLINK-10204 > Project: Flink > Issue Type: Bug >Reporter: Ben La Monica >Priority: Major > Labels: pull-request-available > > We have a long running flink job that eventually fails and is shut down due > to an internal serialization exception that we keep on getting. Here is the > stack trace: > {code:java} > 2018-08-23 18:39:48,199 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation > (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED. > java.io.IOException: Corrupt stream, found tag: 127 > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748){code} > > I think I have tracked down the issue to a mismatch in the > serialization/deserialization/copy code in the StreamElementSerializer with > regards to the LATENCY_MARKER. > The Serialization logic writes 3 LONGs and an INT. The copy logic only writes > (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an > EOFException, and fixing the copy code causes the test to pass again. > I've written a unit test that highlights the problem, and have written the > code to correct it. > I'll submit a PR that goes along with it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10201) The batchTestUtil was mistakenly used in some stream sql tests
[ https://issues.apache.org/jira/browse/FLINK-10201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591328#comment-16591328 ] ASF GitHub Bot commented on FLINK-10201: xccui commented on issue #6605: [FLINK-10201] [table] [test] The batchTestUtil was mistakenly used in some stream sql tests URL: https://github.com/apache/flink/pull/6605#issuecomment-415692162 Thanks for your review, @fhueske. I'll merge this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The batchTestUtil was mistakenly used in some stream sql tests > -- > > Key: FLINK-10201 > URL: https://issues.apache.org/jira/browse/FLINK-10201 > Project: Flink > Issue Type: Test > Components: Table API SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Minor > Labels: pull-request-available > > The {{batchTestUtil}} was mistakenly used in stream sql tests > {{SetOperatorsTest.testValuesWithCast()}} and > {{CorrelateTest.testLeftOuterJoinAsSubQuery()}}. That should be fixed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records.
zentol commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610#discussion_r212558104 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java ## @@ -67,31 +67,40 @@ public int getSubtaskIndex() { // @Override - public boolean equals(Object o) { - if (this == o) { + public boolean equals(Object obj) { Review comment: was the existing implementation wrong? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9626) Possible resource leak in FileSystem
[ https://issues.apache.org/jira/browse/FLINK-9626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591323#comment-16591323 ] Piotr Nowojski commented on FLINK-9626: --- I have reported this as a candidate to explain resource leak reported by a user, however in the meantime we have found much more likely cause of it: https://issues.apache.org/jira/browse/HADOOP-15658 so I'm decreasing priority of this issue. > Possible resource leak in FileSystem > > > Key: FLINK-9626 > URL: https://issues.apache.org/jira/browse/FLINK-9626 > Project: Flink > Issue Type: Bug > Components: FileSystem >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Priority: Major > > There is a potential resource leak in > org.apache.flink.core.fs.FileSystem#getUnguardedFileSystem. > Inside it there is a code: > > {code:java} > // this "default" initialization makes sure that the FileSystem class works > // even when not configured with an explicit Flink configuration, like on > // JobManager or TaskManager setup > if (FS_FACTORIES.isEmpty()) { >initialize(new Configuration()); > } > {code} > which is executed on each cache miss. However this initialize method is also > doing > > > {code:java} > CACHE.clear(); > {code} > without closing file systems in CACHE (this could be problematic for > HadoopFileSystem which is a wrapper around org.apache.hadoop.fs.FileSystem > which is closable). > Now if for example we are constantly accessing two different file systems > (file systems are differentiated by combination of [schema and > authority|https://en.wikipedia.org/wiki/Uniform_Resource_Identifier#Generic_syntax] > part from the file system's URI) initialized from FALLBACK_FACTORY, each > time we call getUnguardedFileSystem for one of them, that call will clear > from CACHE entry for the other one. Thus we will constantly be creating new > FileSystems without closing them. > Solution could be to either not clear the CACHE or make sure that FileSystems > are properly closed. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9626) Possible resource leak in FileSystem
[ https://issues.apache.org/jira/browse/FLINK-9626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-9626: -- Priority: Major (was: Critical) > Possible resource leak in FileSystem > > > Key: FLINK-9626 > URL: https://issues.apache.org/jira/browse/FLINK-9626 > Project: Flink > Issue Type: Bug > Components: FileSystem >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Priority: Major > > There is a potential resource leak in > org.apache.flink.core.fs.FileSystem#getUnguardedFileSystem. > Inside it there is a code: > > {code:java} > // this "default" initialization makes sure that the FileSystem class works > // even when not configured with an explicit Flink configuration, like on > // JobManager or TaskManager setup > if (FS_FACTORIES.isEmpty()) { >initialize(new Configuration()); > } > {code} > which is executed on each cache miss. However this initialize method is also > doing > > > {code:java} > CACHE.clear(); > {code} > without closing file systems in CACHE (this could be problematic for > HadoopFileSystem which is a wrapper around org.apache.hadoop.fs.FileSystem > which is closable). > Now if for example we are constantly accessing two different file systems > (file systems are differentiated by combination of [schema and > authority|https://en.wikipedia.org/wiki/Uniform_Resource_Identifier#Generic_syntax] > part from the file system's URI) initialized from FALLBACK_FACTORY, each > time we call getUnguardedFileSystem for one of them, that call will clear > from CACHE entry for the other one. Thus we will constantly be creating new > FileSystems without closing them. > Solution could be to either not clear the CACHE or make sure that FileSystems > are properly closed. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10206) Add hbase stream connector
[ https://issues.apache.org/jira/browse/FLINK-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang reassigned FLINK-10206: --- Assignee: Shimin Yang > Add hbase stream connector > -- > > Key: FLINK-10206 > URL: https://issues.apache.org/jira/browse/FLINK-10206 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Igloo >Assignee: Shimin Yang >Priority: Critical > Fix For: 1.6.1 > > Original Estimate: 336h > Remaining Estimate: 336h > > Now, there is a hbase connector for batch operation. > > In some cases, we need to save Streaming result into hbase. Just like > cassandra streaming sink implementations. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591292#comment-16591292 ] ASF GitHub Bot commented on FLINK-9061: --- StephanEwen commented on issue #6604: [FLINK-9061] Optionally add entropy to checkpoint paths better S3 scalability URL: https://github.com/apache/flink/pull/6604#issuecomment-415683885 Test failures fixed - needed to adjust the mocking in some other tests. Classic ;-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > add entropy to s3 path for better scalability > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jamie Grier >Assignee: Indrajit Roychoudhury >Priority: Critical > Labels: pull-request-available > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StephanEwen commented on issue #6604: [FLINK-9061] Optionally add entropy to checkpoint paths better S3 scalability
StephanEwen commented on issue #6604: [FLINK-9061] Optionally add entropy to checkpoint paths better S3 scalability URL: https://github.com/apache/flink/pull/6604#issuecomment-415683885 Test failures fixed - needed to adjust the mocking in some other tests. Classic ;-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10206) Add hbase stream connector
[ https://issues.apache.org/jira/browse/FLINK-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591288#comment-16591288 ] buptljy commented on FLINK-10206: - [~igloo1986] I think we should have a HBaseBatchTableSink first? From what I see in org.apache.flink.addons.hbase.example.HBaseWriteExample, it's not a proper way to sink hbase data. > Add hbase stream connector > -- > > Key: FLINK-10206 > URL: https://issues.apache.org/jira/browse/FLINK-10206 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Igloo >Priority: Critical > Fix For: 1.6.1 > > Original Estimate: 336h > Remaining Estimate: 336h > > Now, there is a hbase connector for batch operation. > > In some cases, we need to save Streaming result into hbase. Just like > cassandra streaming sink implementations. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r212542167 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.Row; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A subclass of {@link ParquetInputFormat} to read from Parquet files and convert to {@link Row}. + * It is mainly used to integrate with table API and batch SQL. + */ +public class ParquetRowInputFormat extends ParquetInputFormat implements ResultTypeQueryable { Review comment: This PR is already quite large. A `ParquetTableSource` should be added later, IMO. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591254#comment-16591254 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r212542167 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.Row; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A subclass of {@link ParquetInputFormat} to read from Parquet files and convert to {@link Row}. + * It is mainly used to integrate with table API and batch SQL. + */ +public class ParquetRowInputFormat extends ParquetInputFormat implements ResultTypeQueryable { Review comment: This PR is already quite large. A `ParquetTableSource` should be added later, IMO. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask
[ https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591251#comment-16591251 ] Fabian Hueske commented on FLINK-10205: --- Most jobs are implemented in a way that the split assignment does not affect the semantics of the job. I don't think we should restrict input split assignment to make the small set of jobs with non-deterministic logic behave deterministic. Also, there are a few more sources of non-determinism (order, partitioning) that would need to be removed. The orchestration overhead to make all operations behave determistically is too high. You can implement a custom [InputSplitAssigner|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java] if you want to have deterministic input split assignment. I would close this issue as "Won't Fix" > Batch Job: InputSplit Fault tolerant for DataSourceTask > --- > > Key: FLINK-10205 > URL: https://issues.apache.org/jira/browse/FLINK-10205 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: JIN SUN >Priority: Major > Original Estimate: 168h > Remaining Estimate: 168h > > Today DataSource Task pull InputSplits from JobManager to achieve better > performance, however, when a DataSourceTask failed and rerun, it will not get > the same splits as its previous version. this will introduce inconsistent > result or even data corruption. > Furthermore, if there are two executions run at the same time (in batch > scenario), this two executions should process same splits. > we need to fix the issue to make the inputs of a DataSourceTask > deterministic. The propose is save all splits into ExecutionVertex and > DataSourceTask will pull split from there. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10206) Add hbase stream connector
[ https://issues.apache.org/jira/browse/FLINK-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igloo updated FLINK-10206: -- Description: Now, there is a hbase connector for batch operation. In some cases, we need to save Streaming result into hbase. Just like cassandra streaming sink implementations. was: Now, there is a connector for batch operation. In some cases, we need to save Streaming result into hbase. Just like cassandra streaming sink implementations. > Add hbase stream connector > -- > > Key: FLINK-10206 > URL: https://issues.apache.org/jira/browse/FLINK-10206 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Igloo >Priority: Critical > Fix For: 1.6.1 > > Original Estimate: 336h > Remaining Estimate: 336h > > Now, there is a hbase connector for batch operation. > > In some cases, we need to save Streaming result into hbase. Just like > cassandra streaming sink implementations. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10206) Add hbase stream connector
[ https://issues.apache.org/jira/browse/FLINK-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591237#comment-16591237 ] Igloo commented on FLINK-10206: --- is that necessary? I would like to do it. > Add hbase stream connector > -- > > Key: FLINK-10206 > URL: https://issues.apache.org/jira/browse/FLINK-10206 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Igloo >Priority: Critical > Fix For: 1.6.1 > > Original Estimate: 336h > Remaining Estimate: 336h > > Now, there is a connector for batch operation. In some cases, we need to > save Streaming result into hbase. Just like cassandra streaming sink > implementations. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10206) Add hbase stream connector
Igloo created FLINK-10206: - Summary: Add hbase stream connector Key: FLINK-10206 URL: https://issues.apache.org/jira/browse/FLINK-10206 Project: Flink Issue Type: Improvement Components: Streaming Connectors Affects Versions: 1.6.0 Reporter: Igloo Fix For: 1.6.1 Now, there is a connector for batch operation. In some cases, we need to save Streaming result into hbase. Just like cassandra streaming sink implementations. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask
[ https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591218#comment-16591218 ] vinoyang commented on FLINK-10205: -- [~isunjin] Can you assign this issue to yourself? If not, you may need to apply for the Contributor permission first. > Batch Job: InputSplit Fault tolerant for DataSourceTask > --- > > Key: FLINK-10205 > URL: https://issues.apache.org/jira/browse/FLINK-10205 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: JIN SUN >Priority: Major > Original Estimate: 168h > Remaining Estimate: 168h > > Today DataSource Task pull InputSplits from JobManager to achieve better > performance, however, when a DataSourceTask failed and rerun, it will not get > the same splits as its previous version. this will introduce inconsistent > result or even data corruption. > Furthermore, if there are two executions run at the same time (in batch > scenario), this two executions should process same splits. > we need to fix the issue to make the inputs of a DataSourceTask > deterministic. The propose is save all splits into ExecutionVertex and > DataSourceTask will pull split from there. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] asfgit closed pull request #6609: [hotfix][docs] missing breaking metrics page
asfgit closed pull request #6609: [hotfix][docs] missing breaking metrics page URL: https://github.com/apache/flink/pull/6609 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index a93853c14dc..7d88a36393c 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -1486,6 +1486,7 @@ Thus, in order to infer the metric identifier: bytesRequestedPerFetch stream, shardId The bytes requested (2 Mbps / loopFrequencyHz) in a single call to getRecords. + Gauge This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591208#comment-16591208 ] ASF GitHub Bot commented on FLINK-7243: --- docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r212531176 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.ParquetRecordReader; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; +import org.apache.flink.formats.parquet.utils.RowReadSupport; +import org.apache.flink.metrics.Counter; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * The base InputFormat class to read from Parquet files. + * For specific return types the {@link #convert(Row)} method need to be implemented. + * + * Using {@link ParquetRecordReader} to Read files instead of {@link org.apache.flink.core.fs.FSDataInputStream}, + * we override {@link #open(FileInputSplit)} and {@link #close()} to change the behaviors. + * + * @param The type of record to read. + */ +public abstract class ParquetInputFormat extends FileInputFormat implements Review comment: Should we support filter pushdown? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-7243: -- Labels: pull-request-available (was: ) > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-7243: -- Labels: pull-request-available (was: ) > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591206#comment-16591206 ] ASF GitHub Bot commented on FLINK-7243: --- docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r212530605 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java ## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; + +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; + +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.List; + +/** + * Schema converter converts Parquet schema to and from Flink internal types. + */ +public class ParquetSchemaConverter { + public static final String MAP_KEY = "key"; + public static final String MAP_VALUE = "value"; + public static final String LIST_ELEMENT = "array"; + public static final String MESSAGE_ROOT = "root"; + private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter(); + + public static TypeInformation fromParquetType(MessageType type) { + return convertFields(type.getFields()); + } + + public static MessageType toParquetType(TypeInformation typeInformation) { + return (MessageType) convertField(null, typeInformation, Type.Repetition.OPTIONAL); + } + + private static TypeInformation convertFields(List parquetFields) { + List> types = new ArrayList<>(); + List names = new ArrayList<>(); + for (Type field : parquetFields) { + TypeInformation subType = convertField(field); + if (subType != null) { + types.add(subType); + names.add(field.getName()); + } + } + + return new RowTypeInfo(types.toArray(new TypeInformation[types.size()]), + names.toArray(new String[names.size()])); + } + + private static TypeInformation convertField(final Type fieldType) { + TypeInformation typeInfo = null; + if (fieldType.isPrimitive()) { + PrimitiveType primitiveType = fieldType.asPrimitiveType(); Review comment: How about support more types, SqlTimeTypeInfo, Decimal, etc This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-7243: -- Labels: pull-request-available (was: ) > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591207#comment-16591207 ] ASF GitHub Bot commented on FLINK-7243: --- docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r212530857 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.Row; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A subclass of {@link ParquetInputFormat} to read from Parquet files and convert to {@link Row}. + * It is mainly used to integrate with table API and batch SQL. + */ +public class ParquetRowInputFormat extends ParquetInputFormat implements ResultTypeQueryable { Review comment: Offer a tablesource for SQL/Table API ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r212530605 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java ## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; + +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; + +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.List; + +/** + * Schema converter converts Parquet schema to and from Flink internal types. + */ +public class ParquetSchemaConverter { + public static final String MAP_KEY = "key"; + public static final String MAP_VALUE = "value"; + public static final String LIST_ELEMENT = "array"; + public static final String MESSAGE_ROOT = "root"; + private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter(); + + public static TypeInformation fromParquetType(MessageType type) { + return convertFields(type.getFields()); + } + + public static MessageType toParquetType(TypeInformation typeInformation) { + return (MessageType) convertField(null, typeInformation, Type.Repetition.OPTIONAL); + } + + private static TypeInformation convertFields(List parquetFields) { + List> types = new ArrayList<>(); + List names = new ArrayList<>(); + for (Type field : parquetFields) { + TypeInformation subType = convertField(field); + if (subType != null) { + types.add(subType); + names.add(field.getName()); + } + } + + return new RowTypeInfo(types.toArray(new TypeInformation[types.size()]), + names.toArray(new String[names.size()])); + } + + private static TypeInformation convertField(final Type fieldType) { + TypeInformation typeInfo = null; + if (fieldType.isPrimitive()) { + PrimitiveType primitiveType = fieldType.asPrimitiveType(); Review comment: How about support more types, SqlTimeTypeInfo, Decimal, etc This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r212531176 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.ParquetRecordReader; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; +import org.apache.flink.formats.parquet.utils.RowReadSupport; +import org.apache.flink.metrics.Counter; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * The base InputFormat class to read from Parquet files. + * For specific return types the {@link #convert(Row)} method need to be implemented. + * + * Using {@link ParquetRecordReader} to Read files instead of {@link org.apache.flink.core.fs.FSDataInputStream}, + * we override {@link #open(FileInputSplit)} and {@link #close()} to change the behaviors. + * + * @param The type of record to read. + */ +public abstract class ParquetInputFormat extends FileInputFormat implements Review comment: Should we support filter pushdown? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r212530857 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.Row; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A subclass of {@link ParquetInputFormat} to read from Parquet files and convert to {@link Row}. + * It is mainly used to integrate with table API and batch SQL. + */ +public class ParquetRowInputFormat extends ParquetInputFormat implements ResultTypeQueryable { Review comment: Offer a tablesource for SQL/Table API ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10202) Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment
[ https://issues.apache.org/jira/browse/FLINK-10202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591188#comment-16591188 ] buptljy commented on FLINK-10202: - [~StephanEwen] Here is my code. I think the checkpoint directory in CheckpointCoordinator is still empty even if you set direcotry in FsStateBackend. {code:java} val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(15000) env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) env.setStateBackend(new FsStateBackend(checkpointDir)){code} 1. If I don't set state.checkpoint.dir, it will throw exception above. 2. If i remove the third line, it will be okay. > Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment > --- > > Key: FLINK-10202 > URL: https://issues.apache.org/jira/browse/FLINK-10202 > Project: Flink > Issue Type: Improvement >Reporter: buptljy >Priority: Major > > Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we > run a flink job locally, and we're not able to set state.checkpoint.dir for > background wrapped cluster, which will cause > {code:java} > throw new IllegalStateException("CheckpointConfig says to persist periodic " + > "checkpoints, but no checkpoint directory has been configured. You can > " + > "configure configure one via key '" + > ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'."); > {code} > I wonder if we could provide a public method in *StreamExecutionEnvironment* > so that developers can use it to set state.checkpoint.dir for job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)