[GitHub] [flink] wangyang0918 commented on issue #8624: [FLINK-10932]Initial flink-kubernetes module with empty implementation
wangyang0918 commented on issue #8624: [FLINK-10932]Initial flink-kubernetes module with empty implementation URL: https://github.com/apache/flink/pull/8624#issuecomment-502544267 Hi, @tianchen92 I think you should remove the `dependencyManagement` and do as the following steps. 1. Add the `exclusions` of `jackson-core` `jackson-annotations` `jackson-databind` in the `kubernetes-client` dependency. 2. Add the direct dependency with the specific version of the above exclusions. 3. Set shaded plugin. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #8704: [FLINK-12812][runtime] Set resource profiles for task slots
gaoyunhaii commented on a change in pull request #8704: [FLINK-12812][runtime] Set resource profiles for task slots URL: https://github.com/apache/flink/pull/8704#discussion_r294143254 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ## @@ -522,4 +534,68 @@ public void testOnContainerCompleted() throws Exception { }); }}; } + + /** +* Test that RM and TM calculate same slot resource profile. Review comment: Tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #8704: [FLINK-12812][runtime] Set resource profiles for task slots
gaoyunhaii commented on a change in pull request #8704: [FLINK-12812][runtime] Set resource profiles for task slots URL: https://github.com/apache/flink/pull/8704#discussion_r294143299 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java ## @@ -163,4 +163,11 @@ boolean updatePartitionInfo( ExecutionAttemptID consumerID, PartitionInfo partitionInfo) throws IOException, InterruptedException; + + /** +* Get total memory size for network buffers in bytes. Review comment: Gets This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12867) Add insert overwrite grammar as HIVE dialect
Danny Chan created FLINK-12867: -- Summary: Add insert overwrite grammar as HIVE dialect Key: FLINK-12867 URL: https://issues.apache.org/jira/browse/FLINK-12867 Project: Flink Issue Type: Sub-task Components: Table SQL / API Affects Versions: 1.9.0 Reporter: Danny Chan Assignee: Danny Chan Fix For: 1.9.0 Support grammar like: {code:java} insert overwrite tbl1 partition(a=1) select a from tbl2;{code} This overwrite can use whole table or single partition as effective scope. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] wuchong commented on issue #8738: [FLINK-12845][sql-client] Execute multiple statements in command line…
wuchong commented on issue #8738: [FLINK-12845][sql-client] Execute multiple statements in command line… URL: https://github.com/apache/flink/pull/8738#issuecomment-502543453 Hi @docete , could you add some test to verify the new feature? We should always add some tests when introduce new features. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12863) Race condition between slot offerings and AllocatedSlotReport
[ https://issues.apache.org/jira/browse/FLINK-12863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865333#comment-16865333 ] Yun Gao commented on FLINK-12863: - Hi all, I create a Jira for the state inconsistency between RM and TM: [FLINK-12865 | https://issues.apache.org/jira/browse/FLINK-12865]. I agree with that this two problems shares similarity. For the inconsistency between RM and TM, previously we solve it with the version method, but currently I has not found the cases when FenceToken is not available. I think more thoughts would be required to compare the two methods. > Race condition between slot offerings and AllocatedSlotReport > - > > Key: FLINK-12863 > URL: https://issues.apache.org/jira/browse/FLINK-12863 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.7.3, 1.9.0, 1.8.1 > > > With FLINK-11059 we introduced the {{AllocatedSlotReport}} which is used by > the {{TaskExecutor}} to synchronize its internal view on slot allocations > with the view of the {{JobMaster}}. It seems that there is a race condition > between offering slots and receiving the report because the > {{AllocatedSlotReport}} is sent by the {{HeartbeatManagerSenderImpl}} from a > separate thread. > Due to that it can happen that we generate an {{AllocatedSlotReport}} just > before getting new slots offered. Since the report is sent from a different > thread, it can then happen that the response to the slot offerings is sent > earlier than the {{AllocatedSlotReport}}. Consequently, we might receive an > outdated slot report on the {{TaskExecutor}} causing active slots to be > released. > In order to solve the problem I propose to add a fencing token to the > {{AllocatedSlotReport}} which is being updated whenever we offer new slots to > the {{JobMaster}}. When we receive the {{AllocatedSlotReport}} on the > {{TaskExecutor}} we compare the current slot report fencing token with the > received one and only process the report if they are equal. Otherwise we wait > for the next heartbeat to send us an up to date {{AllocatedSlotReport}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12865) State inconsistency between RM and TM on the slot status
[ https://issues.apache.org/jira/browse/FLINK-12865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Gao updated FLINK-12865: Description: There may be state inconsistency between TM and RM due to race condition and message loss: # When TM sends heartbeat, it retrieve SlotReport in the main thread, but sends the heartbeat in another thread. There may be cases that the slot on TM is FREE initially and SlotReport read the FREE state, then RM requests slot and mark the slot as allocated, and the SlotReport finally override the allocated status at the RM side wrongly. # When RM requests slot, TM received the requests but the acknowledge message get lot. Then RM will think this slot is free. Both the problems may cause RM marks an ALLOCATED slot as FREE. This may currently cause additional retries till the state is synchronized after the next heartbeat, and for the inaccurate resource statistics for the fine-grained resource management in the future. was: There may be state inconsistency between TM and RM due to race condition and message loss: # When TM sends heartbeat, it retrieve SlotReport in the main thread, but sends the heartbeat in another thread. There may be cases that the slot on TM is FREE initially and SlotReport read the FREE state, then RM requests slot and mark the slot as allocated, and the SlotReport finally override the allocated status at the RM side wrongly. # When RM requests slot, TM received the requests but the acknowledge message get lot. Then RM will think this slot is free and assigned it to other request. > State inconsistency between RM and TM on the slot status > > > Key: FLINK-12865 > URL: https://issues.apache.org/jira/browse/FLINK-12865 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Major > > There may be state inconsistency between TM and RM due to race condition and > message loss: > # When TM sends heartbeat, it retrieve SlotReport in the main thread, but > sends the heartbeat in another thread. There may be cases that the slot on TM > is FREE initially and SlotReport read the FREE state, then RM requests slot > and mark the slot as allocated, and the SlotReport finally override the > allocated status at the RM side wrongly. > # When RM requests slot, TM received the requests but the acknowledge > message get lot. Then RM will think this slot is free. > Both the problems may cause RM marks an ALLOCATED slot as FREE. This may > currently cause additional retries till the state is synchronized after the > next heartbeat, and for the inaccurate resource statistics for the > fine-grained resource management in the future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12866) Connectors module failed on Travis checking
[ https://issues.apache.org/jira/browse/FLINK-12866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865328#comment-16865328 ] TisonKun commented on FLINK-12866: -- >From its corresponding compile >stage([https://travis-ci.org/apache/flink/jobs/546546643]) I can see the >cached dir created {\{Creating cache build directory /home/travis/flink_cache/37821/flink}} and the clean stage wasn't executed. Could any other command accidentally deleted the dir? cc [~Zentol] > Connectors module failed on Travis checking > --- > > Key: FLINK-12866 > URL: https://issues.apache.org/jira/browse/FLINK-12866 > Project: Flink > Issue Type: Test > Components: Tests >Affects Versions: 1.9.0 >Reporter: Biao Liu >Priority: Minor > > Here is the failure information. > Cached flink dir /home/travis/flink_cache/37821/flink does not exist. Exiting > build. > The command "./tools/travis_controller.sh connectors" exited with 1. > Full log is here, https://travis-ci.org/apache/flink/jobs/546546647 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] ifndef-SleePy edited a comment on issue #8721: [FLINK-12823][datastream] PartitionTransformation supports DataExchan…
ifndef-SleePy edited a comment on issue #8721: [FLINK-12823][datastream] PartitionTransformation supports DataExchan… URL: https://github.com/apache/flink/pull/8721#issuecomment-502540027 Python module is fine but connectors module failed this time on Travis checking. It seems a travis relevant issue. I have filed a JIRA issue for this, https://issues.apache.org/jira/browse/FLINK-12866 Will trigger checking again through closing and reopening, hope it works. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy edited a comment on issue #8721: [FLINK-12823][datastream] PartitionTransformation supports DataExchan…
ifndef-SleePy edited a comment on issue #8721: [FLINK-12823][datastream] PartitionTransformation supports DataExchan… URL: https://github.com/apache/flink/pull/8721#issuecomment-502540027 Python module is fine but connectors module failed this time on Travis checking. It seems a travis relevant issue. I have filed a JIRA issue for this, https://issues.apache.org/jira/browse/FLINK-12866 Will trigger checking again through closinng and reopening, hope it works. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on issue #8721: [FLINK-12823][datastream] PartitionTransformation supports DataExchan…
ifndef-SleePy commented on issue #8721: [FLINK-12823][datastream] PartitionTransformation supports DataExchan… URL: https://github.com/apache/flink/pull/8721#issuecomment-502540027 Python module is fine but connectors module failed this time on Travis checking. It seems a travis relevant issue. I have filed a JIRA issue for this, https://issues.apache.org/jira/browse/FLINK-12866 Will trigger checking again through close and reopening, hope it works. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy opened a new pull request #8721: [FLINK-12823][datastream] PartitionTransformation supports DataExchan…
ifndef-SleePy opened a new pull request #8721: [FLINK-12823][datastream] PartitionTransformation supports DataExchan… URL: https://github.com/apache/flink/pull/8721 …geMode property ## What is the purpose of the change * Since `StreamTransformation` would also support batch runner in 1.9, the result partition type of `StreamTransformation` should not be hard-coded with `PIPELINED_BOUNDED` * We need to provide a way for upper level API upon `StreamTransformation` to configure the result partition type of edge ## Brief change log * Expose a property `DataExchangeMode` of `PartitionTransformation` as an internal API of `StreamTransformation` * Pass the `DataExchangeMode` to `StreamEdge` * `StreamingJobGraphGenerator` chooses the appropriate result partition type based on `DataExchangeMode` of `StreamEdge` ## Verifying this change * Add an unit test of `StreamingJobGraphGeneratorTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy closed pull request #8721: [FLINK-12823][datastream] PartitionTransformation supports DataExchan…
ifndef-SleePy closed pull request #8721: [FLINK-12823][datastream] PartitionTransformation supports DataExchan… URL: https://github.com/apache/flink/pull/8721 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema
wuchong commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema URL: https://github.com/apache/flink/pull/8736#discussion_r294139723 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala ## @@ -19,20 +19,24 @@ package org.apache.flink.table.plan.schema import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.flink.table.{JHashSet, JSet} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.stats.FlinkStatistic import org.apache.flink.table.sources.{TableSource, TableSourceUtil} +import java.util + /** * Abstract class which define the interfaces required to convert a [[TableSource]] to * a Calcite Table */ class TableSourceTable[T]( val tableSource: TableSource[T], -val isStreaming: Boolean, -val statistic: FlinkStatistic) Review comment: 1. Yes, maybe we need something like `setTableStats`, but this is out of the scope of this issue. 2. If the TableSource is changed, shouldn't we always to create a new `TableSourceTable` and `getTableStats()` again? How do we know the stats is not changed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12866) Connectors module failed on Travis checking
Biao Liu created FLINK-12866: Summary: Connectors module failed on Travis checking Key: FLINK-12866 URL: https://issues.apache.org/jira/browse/FLINK-12866 Project: Flink Issue Type: Test Components: Tests Affects Versions: 1.9.0 Reporter: Biao Liu Here is the failure information. Cached flink dir /home/travis/flink_cache/37821/flink does not exist. Exiting build. The command "./tools/travis_controller.sh connectors" exited with 1. Full log is here, https://travis-ci.org/apache/flink/jobs/546546647 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] JingsongLi commented on a change in pull request #8689: [FLINK-12802][table-runtime-blink] Reducing the Code of BinaryString
JingsongLi commented on a change in pull request #8689: [FLINK-12802][table-runtime-blink] Reducing the Code of BinaryString URL: https://github.com/apache/flink/pull/8689#discussion_r294132999 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java ## @@ -97,32 +105,12 @@ public static BinaryString blankString(int length) { return fromBytes(spaces); } - /** -* Returns the number of bytes for a code point with the first byte as `b`. -* @param b The first byte of a code point -*/ - private static int numBytesForFirstByte(final byte b) { - if (b >= 0) { - // 1 byte, 7 bits: 0xxx - return 1; - } else if ((b >> 5) == -2 && (b & 0x1e) != 0) { - // 2 bytes, 11 bits: 110x 10xx - return 2; - } else if ((b >> 4) == -2) { - // 3 bytes, 16 bits: 1110 10xx 10xx - return 3; - } else if ((b >> 3) == -2) { - // 4 bytes, 21 bits: 0xxx 10xx 10xx 10xx - return 4; - } else { - // throw new IllegalArgumentException(); - // Skip the first byte disallowed in UTF-8 - return 1; - } - } + // -- + // Utility open methods on BinaryString Review comment: Yes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12865) State inconsistency between RM and TM on the slot status
[ https://issues.apache.org/jira/browse/FLINK-12865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865296#comment-16865296 ] Yun Gao commented on FLINK-12865: - I think adding a version to the slot may solves this problem. # Add a SYNCING status in the RM side. SYNCING means request is sent to TM but not knowing the result of the request. A slot with SYNCING status cannot be allocated to others. # RM and TM maintains a version for each slot, and the version starts from 0. # Whenever RM requests slot, it add the version by 1, and send the requests to TM. TM will only do allocation when RM's version > TM's version. # TM will also attach the version in the HeartBeat and RM will only accept the slot status when the TM's version >= RM's version. # If the SYNCING status keeps too long time, the request will be resent. The version method is a simplified solution of the full vector clock based state management. In the full vector-clock design, the version should be a vector represents (RM's version, TM's version). Whenever RM modify the slot's status (requestSlot) and TM modify the slot's status (freeSlot), It need to first increase the corresponding component and send the sync messages, and the messages can only be accepted when the vector version >= messages' vector version. However, since for the status of slot TM will only modify its status when freeing slots, we can ignore the component of TM's side will only cause a freed slot be marked as allocated, this will not cause error, and the free status can be finally updated to RM with Heartbeat message. > State inconsistency between RM and TM on the slot status > > > Key: FLINK-12865 > URL: https://issues.apache.org/jira/browse/FLINK-12865 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Major > > There may be state inconsistency between TM and RM due to race condition and > message loss: > # When TM sends heartbeat, it retrieve SlotReport in the main thread, but > sends the heartbeat in another thread. There may be cases that the slot on TM > is FREE initially and SlotReport read the FREE state, then RM requests slot > and mark the slot as allocated, and the SlotReport finally override the > allocated status at the RM side wrongly. > # When RM requests slot, TM received the requests but the acknowledge > message get lot. Then RM will think this slot is free and assigned it to > other request. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12865) State inconsistency between RM and TM on the slot status
[ https://issues.apache.org/jira/browse/FLINK-12865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Gao updated FLINK-12865: Description: There may be state inconsistency between TM and RM due to race condition and message loss: # When TM sends heartbeat, it retrieve SlotReport in the main thread, but sends the heartbeat in another thread. There may be cases that the slot on TM is FREE initially and SlotReport read the FREE state, then RM requests slot and mark the slot as allocated, and the SlotReport finally override the allocated status at the RM side wrongly. # When RM requests slot, TM received the requests but the acknowledge message get lot. Then RM will think this slot is free and assigned it to other request. was: There may be state inconsistency between TM and RM due to race condition and message loss: # When TM sends heartbeat, it retrieve SlotReport in the main thread, but sends the heartbeat in another thread. There may be cases that the slot on TM is FREE initially and SlotReport read the FREE state, then RM requests slot and mark the slot as allocated, and the SlotReport finally override the allocated status at the RM side wrongly. # When RM requests slot, TM received the requests but the acknowledge message get lot. Then RM will think this slot is free and assigned it to other request. Add a version to the slot may solves this problem. # Add a SYNCING status in the RM side. SYNCING means request is sent to TM but not knowing the result of the request. A slot with SYNCING status cannot be allocated to others. # RM and TM maintains a version for each slot, and the version starts from 0. # Whenever RM requests slot, it add the version by 1, and send the requests to TM. TM will only do allocation when RM's version > TM's version. # TM will also attach the version in the HeartBeat and RM will only accept the slot status when the TM's version >= RM's version. The version method is a simplified solution of the full vector clock based state management. In the full vector-clock design, the version should be a vector represents (RM's version, TM's version). Whenever RM modify the slot's status (requestSlot) and TM modify the slot's status (freeSlot), It need to first increase the corresponding component and send the sync messages, and the messages can only be accepted when the vector version >= messages' vector version. However, since for the status of slot TM will only modify its status when freeing slots, we can ignore the component of TM's side will only cause a freed slot be marked as allocated, this will not cause error, and the free status can be finally updated to RM with Heartbeat message. > State inconsistency between RM and TM on the slot status > > > Key: FLINK-12865 > URL: https://issues.apache.org/jira/browse/FLINK-12865 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Major > > There may be state inconsistency between TM and RM due to race condition and > message loss: > # When TM sends heartbeat, it retrieve SlotReport in the main thread, but > sends the heartbeat in another thread. There may be cases that the slot on TM > is FREE initially and SlotReport read the FREE state, then RM requests slot > and mark the slot as allocated, and the SlotReport finally override the > allocated status at the RM side wrongly. > # When RM requests slot, TM received the requests but the acknowledge > message get lot. Then RM will think this slot is free and assigned it to > other request. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput
sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput URL: https://github.com/apache/flink/pull/8731#discussion_r293741881 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ## @@ -225,6 +226,21 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { } } + /** +* Starting from the second operator, go forward through the operator chain to notify +* each operator that its input has ended. +* +* @throws Exception if some exception happens in the endInput function of an operator. +*/ + public void endOperatorInputs() throws Exception { + for (int i = allOperators.length - 2; i >= 0; i--) { Review comment: > I think you could simplify the code if you iterated through all operators in OperatorChain (not starting from the second one), check each one of them if there are instanceof BoundedOneInput and end them if they are. > >This would work for: > > - OneInputStreamTask - because all of them are/can be BoundedOneInput > - SourceStreamTask - because head will not be BoundedOneInput > - TwoInputStreamTask - because head will not be BoundedOneInput It works for now. However, if chaining non-header two input operator is allowed in the future, it will have problems. Beside, this simplification is somewhat difficult to understand. I suggest not to simplify it like this. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema
godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema URL: https://github.com/apache/flink/pull/8736#discussion_r294131403 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala ## @@ -19,20 +19,24 @@ package org.apache.flink.table.plan.schema import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.flink.table.{JHashSet, JSet} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.stats.FlinkStatistic import org.apache.flink.table.sources.{TableSource, TableSourceUtil} +import java.util + /** * Abstract class which define the interfaces required to convert a [[TableSource]] to * a Calcite Table */ class TableSourceTable[T]( val tableSource: TableSource[T], -val isStreaming: Boolean, -val statistic: FlinkStatistic) Review comment: 1. `TableSource` should be decoupled with `Catalog`, or add a method like `setTableStats` for `TableSource` interface. 2. The TableSource may be changed, e.g. project push down into table source This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on issue #8626: [FLINK-12742] Add insert into partition grammar as hive dialect
KurtYoung commented on issue #8626: [FLINK-12742] Add insert into partition grammar as hive dialect URL: https://github.com/apache/flink/pull/8626#issuecomment-502527618 merged with 4320d8372f93e8fa4d82da06e5e0a0ba310195a2 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung closed pull request #8626: [FLINK-12742] Add insert into partition grammar as hive dialect
KurtYoung closed pull request #8626: [FLINK-12742] Add insert into partition grammar as hive dialect URL: https://github.com/apache/flink/pull/8626 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12742) Add insert into partition grammar as hive dialect
[ https://issues.apache.org/jira/browse/FLINK-12742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-12742. -- Resolution: Implemented merged in 1.9.0: 4320d8372f93e8fa4d82da06e5e0a0ba310195a2 > Add insert into partition grammar as hive dialect > - > > Key: FLINK-12742 > URL: https://issues.apache.org/jira/browse/FLINK-12742 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.9.0 >Reporter: Danny Chan >Assignee: Danny Chan >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese
klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese URL: https://github.com/apache/flink/pull/8750#discussion_r294129377 ## File path: docs/concepts/runtime.zh.md ## @@ -26,102 +26,74 @@ under the License. * This will be replaced by the TOC {:toc} -## Tasks and Operator Chains +## 任务和算子链 -For distributed execution, Flink *chains* operator subtasks together into *tasks*. Each task is executed by one thread. -Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread -handover and buffering, and increases overall throughput while decreasing latency. -The chaining behavior can be configured; see the [chaining docs](../dev/stream/operators/#task-chaining-and-resource-groups) for details. +分布式计算中,Flink 将算子(operator)的 subtask *链接(chain)*成 task。每个 task 由一个线程执行。把算子链接成 tasks 是一个很好的优化:它减少了线程间切换和缓冲的开销,并在降低延迟的同时提高了整体吞吐量。链接操作的配置详情可参考:[chaining docs](../dev/stream/operators/#task-chaining-and-resource-groups) -The sample dataflow in the figure below is executed with five subtasks, and hence with five parallel threads. +下图的 dataflow 由五个 subtasks 执行,因此具有五个并行线程。 {% top %} -## Job Managers, Task Managers, Clients +## Job Managers、Task Managers、客户端(Clients) -The Flink runtime consists of two types of processes: +Flink 运行时包含两类进程: - - The **JobManagers** (also called *masters*) coordinate the distributed execution. They schedule tasks, coordinate -checkpoints, coordinate recovery on failures, etc. + - **JobManagers** (也称为 *masters*)协调分布式计算。它们负责调度任务、协调 checkpoints、协调故障恢复等。 -There is always at least one Job Manager. A high-availability setup will have multiple JobManagers, one of -which one is always the *leader*, and the others are *standby*. +至少需要一个 JobManager。高可用部署下会有多个 JobManagers,其中一个作为 *leader*,其余处于 *standby* 状态。 - - The **TaskManagers** (also called *workers*) execute the *tasks* (or more specifically, the subtasks) of a dataflow, -and buffer and exchange the data *streams*. + - **TaskManagers**(也称为 *workers*)执行 dataflow 中的 *tasks*(准确来说是 subtasks ),并且缓存和交换数据 *streams*。 -There must always be at least one TaskManager. +至少需要一个 TaskManager。 -The JobManagers and TaskManagers can be started in various ways: directly on the machines as a [standalone cluster](../ops/deployment/cluster_setup.html), in -containers, or managed by resource frameworks like [YARN](../ops/deployment/yarn_setup.html) or [Mesos](../ops/deployment/mesos.html). -TaskManagers connect to JobManagers, announcing themselves as available, and are assigned work. +JobManagers 和 TaskManagers 可由多种方式启动:可以直接在机器上启动(该集群称为 [standalone cluster](../ops/deployment/cluster_setup.html)),也可以在容器或资源管理框架,如 [YARN](../ops/deployment/yarn_setup.html) 或 [Mesos](../ops/deployment/mesos.html),中启动。TaskManagers 连接到 JobManagers,通知后者自己可用,之后 TaskManagers 便可以被分配工作了。 -The **client** is not part of the runtime and program execution, but is used to prepare and send a dataflow to the JobManager. -After that, the client can disconnect, or stay connected to receive progress reports. The client runs either as part of the -Java/Scala program that triggers the execution, or in the command line process `./bin/flink run ...`. +**客户端**不是运行时(runtime)和作业执行时的一部分,但它是被用来准备和发送 dataflow 到 JobManager 的。在那之后,客户端可以断开连接,也可以保持连接来接收进度报告。客户端既可以作为触发执行的 Java / Scala 程序的一部分,也可以在命令行进程中运行`./bin/flink run ...`。 {% top %} -## Task Slots and Resources +## Task Slots 和资源 -Each worker (TaskManager) is a *JVM process*, and may execute one or more subtasks in separate threads. -To control how many tasks a worker accepts, a worker has so called **task slots** (at least one). +每个 worker(TaskManager)都是一个 *JVM 进程*,并且可以在不同的线程中执行一个或多个 subtasks。为了控制 worker 接收 task 的数量,worker 拥有所谓的 **task slots** (至少一个)。 -Each *task slot* represents a fixed subset of resources of the TaskManager. A TaskManager with three slots, for example, -will dedicate 1/3 of its managed memory to each slot. Slotting the resources means that a subtask will not -compete with subtasks from other jobs for managed memory, but instead has a certain amount of reserved -managed memory. Note that no CPU isolation happens here; currently slots only separate the managed memory of tasks. +每个 *task slots*代表 TaskManager 的一份固定资源子集。例如,具有三个 slots 的 TaskManager 会将其管理的内存资源分成三等份给每个 slot。 划分资源意味着 subtask 之间不会竞争资源,但是也意味着它们只拥有固定的资源。注意这里并没有 CPU 隔离,当前 slots 之间只是划分任务的内存资源。 Review comment: ```suggestion 每个 *task slots* 代表 TaskManager 的一份固定资源子集。例如,具有三个 slots 的 TaskManager 会将其管理的内存资源分成三等份给每个 slot。 划分资源意味着 subtask 之间不会竞争资源,但是也意味着它们只拥有固定的资源。注意这里并没有 CPU 隔离,当前 slots 之间只是划分任务的内存资源。 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For
[GitHub] [flink] klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese
klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese URL: https://github.com/apache/flink/pull/8750#discussion_r294121063 ## File path: docs/concepts/runtime.zh.md ## @@ -26,102 +26,74 @@ under the License. * This will be replaced by the TOC {:toc} -## Tasks and Operator Chains +## 任务和算子链 -For distributed execution, Flink *chains* operator subtasks together into *tasks*. Each task is executed by one thread. -Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread -handover and buffering, and increases overall throughput while decreasing latency. -The chaining behavior can be configured; see the [chaining docs](../dev/stream/operators/#task-chaining-and-resource-groups) for details. +分布式计算中,Flink 将算子(operator)的 subtask *链接(chain)*成 task。每个 task 由一个线程执行。把算子链接成 tasks 是一个很好的优化:它减少了线程间切换和缓冲的开销,并在降低延迟的同时提高了整体吞吐量。链接操作的配置详情可参考:[chaining docs](../dev/stream/operators/#task-chaining-and-resource-groups) -The sample dataflow in the figure below is executed with five subtasks, and hence with five parallel threads. +下图的 dataflow 由五个 subtasks 执行,因此具有五个并行线程。 Review comment: Do we need to change the url of image? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese
klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese URL: https://github.com/apache/flink/pull/8750#discussion_r294120960 ## File path: docs/concepts/runtime.zh.md ## @@ -26,102 +26,74 @@ under the License. * This will be replaced by the TOC {:toc} -## Tasks and Operator Chains +## 任务和算子链 -For distributed execution, Flink *chains* operator subtasks together into *tasks*. Each task is executed by one thread. -Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread -handover and buffering, and increases overall throughput while decreasing latency. -The chaining behavior can be configured; see the [chaining docs](../dev/stream/operators/#task-chaining-and-resource-groups) for details. +分布式计算中,Flink 将算子(operator)的 subtask *链接(chain)*成 task。每个 task 由一个线程执行。把算子链接成 tasks 是一个很好的优化:它减少了线程间切换和缓冲的开销,并在降低延迟的同时提高了整体吞吐量。链接操作的配置详情可参考:[chaining docs](../dev/stream/operators/#task-chaining-and-resource-groups) Review comment: Maybe `把算子链接成 tasks 是一个很好的优化:它减少了线程间切换和缓冲的开销,并在降低延迟的同时提高了整体吞吐量` can change into `把算子链接成 tasks 能够减少线程间切换和缓冲的开销,在降低延迟的同时提高了整体吞吐量` Do we need to update This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese
klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese URL: https://github.com/apache/flink/pull/8750#discussion_r294122301 ## File path: docs/concepts/runtime.zh.md ## @@ -26,102 +26,74 @@ under the License. * This will be replaced by the TOC {:toc} -## Tasks and Operator Chains +## 任务和算子链 -For distributed execution, Flink *chains* operator subtasks together into *tasks*. Each task is executed by one thread. -Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread -handover and buffering, and increases overall throughput while decreasing latency. -The chaining behavior can be configured; see the [chaining docs](../dev/stream/operators/#task-chaining-and-resource-groups) for details. +分布式计算中,Flink 将算子(operator)的 subtask *链接(chain)*成 task。每个 task 由一个线程执行。把算子链接成 tasks 是一个很好的优化:它减少了线程间切换和缓冲的开销,并在降低延迟的同时提高了整体吞吐量。链接操作的配置详情可参考:[chaining docs](../dev/stream/operators/#task-chaining-and-resource-groups) -The sample dataflow in the figure below is executed with five subtasks, and hence with five parallel threads. +下图的 dataflow 由五个 subtasks 执行,因此具有五个并行线程。 {% top %} -## Job Managers, Task Managers, Clients +## Job Managers、Task Managers、客户端(Clients) -The Flink runtime consists of two types of processes: +Flink 运行时包含两类进程: - - The **JobManagers** (also called *masters*) coordinate the distributed execution. They schedule tasks, coordinate -checkpoints, coordinate recovery on failures, etc. + - **JobManagers** (也称为 *masters*)协调分布式计算。它们负责调度任务、协调 checkpoints、协调故障恢复等。 -There is always at least one Job Manager. A high-availability setup will have multiple JobManagers, one of -which one is always the *leader*, and the others are *standby*. +至少需要一个 JobManager。高可用部署下会有多个 JobManagers,其中一个作为 *leader*,其余处于 *standby* 状态。 - - The **TaskManagers** (also called *workers*) execute the *tasks* (or more specifically, the subtasks) of a dataflow, -and buffer and exchange the data *streams*. + - **TaskManagers**(也称为 *workers*)执行 dataflow 中的 *tasks*(准确来说是 subtasks ),并且缓存和交换数据 *streams*。 -There must always be at least one TaskManager. +至少需要一个 TaskManager。 -The JobManagers and TaskManagers can be started in various ways: directly on the machines as a [standalone cluster](../ops/deployment/cluster_setup.html), in -containers, or managed by resource frameworks like [YARN](../ops/deployment/yarn_setup.html) or [Mesos](../ops/deployment/mesos.html). -TaskManagers connect to JobManagers, announcing themselves as available, and are assigned work. +JobManagers 和 TaskManagers 可由多种方式启动:可以直接在机器上启动(该集群称为 [standalone cluster](../ops/deployment/cluster_setup.html)),也可以在容器或资源管理框架,如 [YARN](../ops/deployment/yarn_setup.html) 或 [Mesos](../ops/deployment/mesos.html),中启动。TaskManagers 连接到 JobManagers,通知后者自己可用,之后 TaskManagers 便可以被分配工作了。 Review comment: ```suggestion JobManagers 和 TaskManagers 有多种启动方式:直接在机器上启动(该集群称为 [standalone cluster](../ops/deployment/cluster_setup.html)),在容器或资源管理框架,如 [YARN](../ops/deployment/yarn_setup.html) 或 [Mesos](../ops/deployment/mesos.html),中启动。TaskManagers 连接到 JobManagers,通知后者自己可用,接手分配的工作。 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput
sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput URL: https://github.com/apache/flink/pull/8731#discussion_r294130413 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java ## @@ -174,6 +179,18 @@ private void initializeNumRecordsIn() { } } + private boolean checkFinished() throws Exception { + boolean isFinished = input.isFinished(); + if (isFinished) { + if (streamOperator instanceof BoundedOneInput) { Review comment: If let `xInputProcessor` see non-head operators, we can use the following structure. (`OperatorChain` is still invisible to `XInputProcessor`) ``` == xInputProcessor.java == private boolean checkFinished() throws Exception { .. // invoke endInput(...) of the head operator .. // invoke endInput(...) of other operators streamTask.endNonHeadOperatorInputs() } == xStreamTask.java == public void endNonHeadOperatorInputs throws Exception { operatorChain.endNonHeadOperatorInputs(); } == OperatorChain.java == public void endNonHeadOperatorInputs throws Exception { .. } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese
klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese URL: https://github.com/apache/flink/pull/8750#discussion_r294121434 ## File path: docs/concepts/runtime.zh.md ## @@ -26,102 +26,74 @@ under the License. * This will be replaced by the TOC {:toc} -## Tasks and Operator Chains +## 任务和算子链 -For distributed execution, Flink *chains* operator subtasks together into *tasks*. Each task is executed by one thread. -Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread -handover and buffering, and increases overall throughput while decreasing latency. -The chaining behavior can be configured; see the [chaining docs](../dev/stream/operators/#task-chaining-and-resource-groups) for details. +分布式计算中,Flink 将算子(operator)的 subtask *链接(chain)*成 task。每个 task 由一个线程执行。把算子链接成 tasks 是一个很好的优化:它减少了线程间切换和缓冲的开销,并在降低延迟的同时提高了整体吞吐量。链接操作的配置详情可参考:[chaining docs](../dev/stream/operators/#task-chaining-and-resource-groups) -The sample dataflow in the figure below is executed with five subtasks, and hence with five parallel threads. +下图的 dataflow 由五个 subtasks 执行,因此具有五个并行线程。 {% top %} -## Job Managers, Task Managers, Clients +## Job Managers、Task Managers、客户端(Clients) -The Flink runtime consists of two types of processes: +Flink 运行时包含两类进程: - - The **JobManagers** (also called *masters*) coordinate the distributed execution. They schedule tasks, coordinate -checkpoints, coordinate recovery on failures, etc. + - **JobManagers** (也称为 *masters*)协调分布式计算。它们负责调度任务、协调 checkpoints、协调故障恢复等。 -There is always at least one Job Manager. A high-availability setup will have multiple JobManagers, one of -which one is always the *leader*, and the others are *standby*. +至少需要一个 JobManager。高可用部署下会有多个 JobManagers,其中一个作为 *leader*,其余处于 *standby* 状态。 Review comment: ```suggestion 每个 Job 至少会有一个 JobManager。高可用部署下会有多个 JobManagers,其中一个作为 *leader*,其余处于 *standby* 状态。 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese
klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese URL: https://github.com/apache/flink/pull/8750#discussion_r294129009 ## File path: docs/concepts/runtime.zh.md ## @@ -26,102 +26,74 @@ under the License. * This will be replaced by the TOC {:toc} -## Tasks and Operator Chains +## 任务和算子链 -For distributed execution, Flink *chains* operator subtasks together into *tasks*. Each task is executed by one thread. -Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread -handover and buffering, and increases overall throughput while decreasing latency. -The chaining behavior can be configured; see the [chaining docs](../dev/stream/operators/#task-chaining-and-resource-groups) for details. +分布式计算中,Flink 将算子(operator)的 subtask *链接(chain)*成 task。每个 task 由一个线程执行。把算子链接成 tasks 是一个很好的优化:它减少了线程间切换和缓冲的开销,并在降低延迟的同时提高了整体吞吐量。链接操作的配置详情可参考:[chaining docs](../dev/stream/operators/#task-chaining-and-resource-groups) -The sample dataflow in the figure below is executed with five subtasks, and hence with five parallel threads. +下图的 dataflow 由五个 subtasks 执行,因此具有五个并行线程。 {% top %} -## Job Managers, Task Managers, Clients +## Job Managers、Task Managers、客户端(Clients) -The Flink runtime consists of two types of processes: +Flink 运行时包含两类进程: - - The **JobManagers** (also called *masters*) coordinate the distributed execution. They schedule tasks, coordinate -checkpoints, coordinate recovery on failures, etc. + - **JobManagers** (也称为 *masters*)协调分布式计算。它们负责调度任务、协调 checkpoints、协调故障恢复等。 -There is always at least one Job Manager. A high-availability setup will have multiple JobManagers, one of -which one is always the *leader*, and the others are *standby*. +至少需要一个 JobManager。高可用部署下会有多个 JobManagers,其中一个作为 *leader*,其余处于 *standby* 状态。 - - The **TaskManagers** (also called *workers*) execute the *tasks* (or more specifically, the subtasks) of a dataflow, -and buffer and exchange the data *streams*. + - **TaskManagers**(也称为 *workers*)执行 dataflow 中的 *tasks*(准确来说是 subtasks ),并且缓存和交换数据 *streams*。 -There must always be at least one TaskManager. +至少需要一个 TaskManager。 -The JobManagers and TaskManagers can be started in various ways: directly on the machines as a [standalone cluster](../ops/deployment/cluster_setup.html), in -containers, or managed by resource frameworks like [YARN](../ops/deployment/yarn_setup.html) or [Mesos](../ops/deployment/mesos.html). -TaskManagers connect to JobManagers, announcing themselves as available, and are assigned work. +JobManagers 和 TaskManagers 可由多种方式启动:可以直接在机器上启动(该集群称为 [standalone cluster](../ops/deployment/cluster_setup.html)),也可以在容器或资源管理框架,如 [YARN](../ops/deployment/yarn_setup.html) 或 [Mesos](../ops/deployment/mesos.html),中启动。TaskManagers 连接到 JobManagers,通知后者自己可用,之后 TaskManagers 便可以被分配工作了。 -The **client** is not part of the runtime and program execution, but is used to prepare and send a dataflow to the JobManager. -After that, the client can disconnect, or stay connected to receive progress reports. The client runs either as part of the -Java/Scala program that triggers the execution, or in the command line process `./bin/flink run ...`. +**客户端**不是运行时(runtime)和作业执行时的一部分,但它是被用来准备和发送 dataflow 到 JobManager 的。在那之后,客户端可以断开连接,也可以保持连接来接收进度报告。客户端既可以作为触发执行的 Java / Scala 程序的一部分,也可以在命令行进程中运行`./bin/flink run ...`。 Review comment: `但它是被用来准备和发送 dataflow 到 JobManager 的` --> `但用它来准备和提交 dataflow 到 JobManager `? `客户端可以断开连接,也可以保持连接来接收进度报告` --> `提交完成之后,客户端可以断开连接,也可以保持连接来接收进度报告` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese
klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese URL: https://github.com/apache/flink/pull/8750#discussion_r294129829 ## File path: docs/concepts/runtime.zh.md ## @@ -26,102 +26,74 @@ under the License. * This will be replaced by the TOC {:toc} -## Tasks and Operator Chains +## 任务和算子链 -For distributed execution, Flink *chains* operator subtasks together into *tasks*. Each task is executed by one thread. -Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread -handover and buffering, and increases overall throughput while decreasing latency. -The chaining behavior can be configured; see the [chaining docs](../dev/stream/operators/#task-chaining-and-resource-groups) for details. +分布式计算中,Flink 将算子(operator)的 subtask *链接(chain)*成 task。每个 task 由一个线程执行。把算子链接成 tasks 是一个很好的优化:它减少了线程间切换和缓冲的开销,并在降低延迟的同时提高了整体吞吐量。链接操作的配置详情可参考:[chaining docs](../dev/stream/operators/#task-chaining-and-resource-groups) -The sample dataflow in the figure below is executed with five subtasks, and hence with five parallel threads. +下图的 dataflow 由五个 subtasks 执行,因此具有五个并行线程。 {% top %} -## Job Managers, Task Managers, Clients +## Job Managers、Task Managers、客户端(Clients) -The Flink runtime consists of two types of processes: +Flink 运行时包含两类进程: - - The **JobManagers** (also called *masters*) coordinate the distributed execution. They schedule tasks, coordinate -checkpoints, coordinate recovery on failures, etc. + - **JobManagers** (也称为 *masters*)协调分布式计算。它们负责调度任务、协调 checkpoints、协调故障恢复等。 -There is always at least one Job Manager. A high-availability setup will have multiple JobManagers, one of -which one is always the *leader*, and the others are *standby*. +至少需要一个 JobManager。高可用部署下会有多个 JobManagers,其中一个作为 *leader*,其余处于 *standby* 状态。 - - The **TaskManagers** (also called *workers*) execute the *tasks* (or more specifically, the subtasks) of a dataflow, -and buffer and exchange the data *streams*. + - **TaskManagers**(也称为 *workers*)执行 dataflow 中的 *tasks*(准确来说是 subtasks ),并且缓存和交换数据 *streams*。 -There must always be at least one TaskManager. +至少需要一个 TaskManager。 -The JobManagers and TaskManagers can be started in various ways: directly on the machines as a [standalone cluster](../ops/deployment/cluster_setup.html), in -containers, or managed by resource frameworks like [YARN](../ops/deployment/yarn_setup.html) or [Mesos](../ops/deployment/mesos.html). -TaskManagers connect to JobManagers, announcing themselves as available, and are assigned work. +JobManagers 和 TaskManagers 可由多种方式启动:可以直接在机器上启动(该集群称为 [standalone cluster](../ops/deployment/cluster_setup.html)),也可以在容器或资源管理框架,如 [YARN](../ops/deployment/yarn_setup.html) 或 [Mesos](../ops/deployment/mesos.html),中启动。TaskManagers 连接到 JobManagers,通知后者自己可用,之后 TaskManagers 便可以被分配工作了。 -The **client** is not part of the runtime and program execution, but is used to prepare and send a dataflow to the JobManager. -After that, the client can disconnect, or stay connected to receive progress reports. The client runs either as part of the -Java/Scala program that triggers the execution, or in the command line process `./bin/flink run ...`. +**客户端**不是运行时(runtime)和作业执行时的一部分,但它是被用来准备和发送 dataflow 到 JobManager 的。在那之后,客户端可以断开连接,也可以保持连接来接收进度报告。客户端既可以作为触发执行的 Java / Scala 程序的一部分,也可以在命令行进程中运行`./bin/flink run ...`。 {% top %} -## Task Slots and Resources +## Task Slots 和资源 -Each worker (TaskManager) is a *JVM process*, and may execute one or more subtasks in separate threads. -To control how many tasks a worker accepts, a worker has so called **task slots** (at least one). +每个 worker(TaskManager)都是一个 *JVM 进程*,并且可以在不同的线程中执行一个或多个 subtasks。为了控制 worker 接收 task 的数量,worker 拥有所谓的 **task slots** (至少一个)。 -Each *task slot* represents a fixed subset of resources of the TaskManager. A TaskManager with three slots, for example, -will dedicate 1/3 of its managed memory to each slot. Slotting the resources means that a subtask will not -compete with subtasks from other jobs for managed memory, but instead has a certain amount of reserved -managed memory. Note that no CPU isolation happens here; currently slots only separate the managed memory of tasks. +每个 *task slots*代表 TaskManager 的一份固定资源子集。例如,具有三个 slots 的 TaskManager 会将其管理的内存资源分成三等份给每个 slot。 划分资源意味着 subtask 之间不会竞争资源,但是也意味着它们只拥有固定的资源。注意这里并没有 CPU 隔离,当前 slots 之间只是划分任务的内存资源。 -By adjusting the number of task slots, users can define how subtasks are isolated from each other. -Having one slot per TaskManager means each task group runs in a separate JVM (which can be started in a -separate container, for example). Having multiple slots -means more subtasks share the same JVM. Tasks in the same JVM share TCP connections (via multiplexing) and -heartbeat messages. They may also share data sets and data structures, thus reducing
[GitHub] [flink] klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese
klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese URL: https://github.com/apache/flink/pull/8750#discussion_r294129669 ## File path: docs/concepts/runtime.zh.md ## @@ -26,102 +26,74 @@ under the License. * This will be replaced by the TOC {:toc} -## Tasks and Operator Chains +## 任务和算子链 -For distributed execution, Flink *chains* operator subtasks together into *tasks*. Each task is executed by one thread. -Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread -handover and buffering, and increases overall throughput while decreasing latency. -The chaining behavior can be configured; see the [chaining docs](../dev/stream/operators/#task-chaining-and-resource-groups) for details. +分布式计算中,Flink 将算子(operator)的 subtask *链接(chain)*成 task。每个 task 由一个线程执行。把算子链接成 tasks 是一个很好的优化:它减少了线程间切换和缓冲的开销,并在降低延迟的同时提高了整体吞吐量。链接操作的配置详情可参考:[chaining docs](../dev/stream/operators/#task-chaining-and-resource-groups) -The sample dataflow in the figure below is executed with five subtasks, and hence with five parallel threads. +下图的 dataflow 由五个 subtasks 执行,因此具有五个并行线程。 {% top %} -## Job Managers, Task Managers, Clients +## Job Managers、Task Managers、客户端(Clients) -The Flink runtime consists of two types of processes: +Flink 运行时包含两类进程: - - The **JobManagers** (also called *masters*) coordinate the distributed execution. They schedule tasks, coordinate -checkpoints, coordinate recovery on failures, etc. + - **JobManagers** (也称为 *masters*)协调分布式计算。它们负责调度任务、协调 checkpoints、协调故障恢复等。 -There is always at least one Job Manager. A high-availability setup will have multiple JobManagers, one of -which one is always the *leader*, and the others are *standby*. +至少需要一个 JobManager。高可用部署下会有多个 JobManagers,其中一个作为 *leader*,其余处于 *standby* 状态。 - - The **TaskManagers** (also called *workers*) execute the *tasks* (or more specifically, the subtasks) of a dataflow, -and buffer and exchange the data *streams*. + - **TaskManagers**(也称为 *workers*)执行 dataflow 中的 *tasks*(准确来说是 subtasks ),并且缓存和交换数据 *streams*。 -There must always be at least one TaskManager. +至少需要一个 TaskManager。 -The JobManagers and TaskManagers can be started in various ways: directly on the machines as a [standalone cluster](../ops/deployment/cluster_setup.html), in -containers, or managed by resource frameworks like [YARN](../ops/deployment/yarn_setup.html) or [Mesos](../ops/deployment/mesos.html). -TaskManagers connect to JobManagers, announcing themselves as available, and are assigned work. +JobManagers 和 TaskManagers 可由多种方式启动:可以直接在机器上启动(该集群称为 [standalone cluster](../ops/deployment/cluster_setup.html)),也可以在容器或资源管理框架,如 [YARN](../ops/deployment/yarn_setup.html) 或 [Mesos](../ops/deployment/mesos.html),中启动。TaskManagers 连接到 JobManagers,通知后者自己可用,之后 TaskManagers 便可以被分配工作了。 -The **client** is not part of the runtime and program execution, but is used to prepare and send a dataflow to the JobManager. -After that, the client can disconnect, or stay connected to receive progress reports. The client runs either as part of the -Java/Scala program that triggers the execution, or in the command line process `./bin/flink run ...`. +**客户端**不是运行时(runtime)和作业执行时的一部分,但它是被用来准备和发送 dataflow 到 JobManager 的。在那之后,客户端可以断开连接,也可以保持连接来接收进度报告。客户端既可以作为触发执行的 Java / Scala 程序的一部分,也可以在命令行进程中运行`./bin/flink run ...`。 {% top %} -## Task Slots and Resources +## Task Slots 和资源 -Each worker (TaskManager) is a *JVM process*, and may execute one or more subtasks in separate threads. -To control how many tasks a worker accepts, a worker has so called **task slots** (at least one). +每个 worker(TaskManager)都是一个 *JVM 进程*,并且可以在不同的线程中执行一个或多个 subtasks。为了控制 worker 接收 task 的数量,worker 拥有所谓的 **task slots** (至少一个)。 -Each *task slot* represents a fixed subset of resources of the TaskManager. A TaskManager with three slots, for example, -will dedicate 1/3 of its managed memory to each slot. Slotting the resources means that a subtask will not -compete with subtasks from other jobs for managed memory, but instead has a certain amount of reserved -managed memory. Note that no CPU isolation happens here; currently slots only separate the managed memory of tasks. +每个 *task slots*代表 TaskManager 的一份固定资源子集。例如,具有三个 slots 的 TaskManager 会将其管理的内存资源分成三等份给每个 slot。 划分资源意味着 subtask 之间不会竞争资源,但是也意味着它们只拥有固定的资源。注意这里并没有 CPU 隔离,当前 slots 之间只是划分任务的内存资源。 -By adjusting the number of task slots, users can define how subtasks are isolated from each other. -Having one slot per TaskManager means each task group runs in a separate JVM (which can be started in a -separate container, for example). Having multiple slots -means more subtasks share the same JVM. Tasks in the same JVM share TCP connections (via multiplexing) and -heartbeat messages. They may also share data sets and data structures, thus reducing
[GitHub] [flink] klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese
klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese URL: https://github.com/apache/flink/pull/8750#discussion_r294121310 ## File path: docs/concepts/runtime.zh.md ## @@ -26,102 +26,74 @@ under the License. * This will be replaced by the TOC {:toc} -## Tasks and Operator Chains +## 任务和算子链 -For distributed execution, Flink *chains* operator subtasks together into *tasks*. Each task is executed by one thread. -Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread -handover and buffering, and increases overall throughput while decreasing latency. -The chaining behavior can be configured; see the [chaining docs](../dev/stream/operators/#task-chaining-and-resource-groups) for details. +分布式计算中,Flink 将算子(operator)的 subtask *链接(chain)*成 task。每个 task 由一个线程执行。把算子链接成 tasks 是一个很好的优化:它减少了线程间切换和缓冲的开销,并在降低延迟的同时提高了整体吞吐量。链接操作的配置详情可参考:[chaining docs](../dev/stream/operators/#task-chaining-and-resource-groups) -The sample dataflow in the figure below is executed with five subtasks, and hence with five parallel threads. +下图的 dataflow 由五个 subtasks 执行,因此具有五个并行线程。 {% top %} -## Job Managers, Task Managers, Clients +## Job Managers、Task Managers、客户端(Clients) Review comment: Do we need to translate `Job Managers` and `Task Managers` here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8707: [FLINK-12815] [table-planner-blink] Supports CatalogManager in blink planner
godfreyhe commented on a change in pull request #8707: [FLINK-12815] [table-planner-blink] Supports CatalogManager in blink planner URL: https://github.com/apache/flink/pull/8707#discussion_r294130078 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.operations.DataStreamQueryOperation; +import org.apache.flink.table.operations.QueryOperation; +import org.apache.flink.table.plan.schema.TableSourceTable; +import org.apache.flink.table.plan.stats.FlinkStatistic; + +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.Function; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.SchemaVersion; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.schema.Table; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static java.lang.String.format; + +/** + * A mapping between Flink catalog's database and Calcite's schema. + * Tables are registered as tables in the schema. + */ +class DatabaseCalciteSchema implements Schema { + private final String databaseName; + private final String catalogName; + private final Catalog catalog; + + public DatabaseCalciteSchema(String databaseName, String catalogName, Catalog catalog) { + this.databaseName = databaseName; + this.catalogName = catalogName; + this.catalog = catalog; + } + + @Override + public Table getTable(String tableName) { + ObjectPath tablePath = new ObjectPath(databaseName, tableName); + + try { + if (!catalog.tableExists(tablePath)) { + return null; + } + + CatalogBaseTable table = catalog.getTable(tablePath); + + // TODO supports GenericCatalogView + if (table instanceof QueryOperationCatalogView) { + QueryOperationCatalogView view = (QueryOperationCatalogView) table; + QueryOperation operation = view.getQueryOperation(); + if (operation instanceof DataStreamQueryOperation) { + List qualifiedName = Arrays.asList(catalogName, databaseName, tableName); + ((DataStreamQueryOperation) operation).setQualifiedName(qualifiedName); + } + return QueryOperationCatalogViewTable.createCalciteTable(view); + } else if (table instanceof ConnectorCatalogTable) { + ConnectorCatalogTable connectorTable = (ConnectorCatalogTable) table; + return connectorTable.getTableSource() + .map(tableSource -> new TableSourceTable<>( + tableSource, + !connectorTable.isBatch(), + FlinkStatistic.UNKNOWN()) + ).orElseThrow(() -> new TableException("Cannot query a sink only table.")); + } else if (table instanceof FlinkTempCatalogTable) { + return ((FlinkTempCatalogTable) table).getAbstractTable(); + } else { + throw new TableException("Unsupported table type: " + table); + } +
[GitHub] [flink] sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput
sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput URL: https://github.com/apache/flink/pull/8731#discussion_r294128294 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ## @@ -225,6 +226,21 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { } } + /** +* Starting from the second operator, go forward through the operator chain to notify +* each operator that its input has ended. +* +* @throws Exception if some exception happens in the endInput function of an operator. +*/ + public void endOperatorInputs() throws Exception { + for (int i = allOperators.length - 2; i >= 0; i--) { Review comment: Maybe we can use the following structure? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12406) Report BLOCKING_PERSISTENT result partition meta back to client
[ https://issues.apache.org/jira/browse/FLINK-12406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12406: --- Labels: pull-request-available (was: ) > Report BLOCKING_PERSISTENT result partition meta back to client > --- > > Key: FLINK-12406 > URL: https://issues.apache.org/jira/browse/FLINK-12406 > Project: Flink > Issue Type: Sub-task > Components: API / DataSet, Runtime / Coordination >Reporter: Ruidong Li >Assignee: Ruidong Li >Priority: Major > Labels: pull-request-available > > After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions > are generated, and locations of these result partitions should be report back > to client via {{JobExecutionResult}}, they will be later used for Table > {{cache()}} and {{invalidateCache()}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8756: [FLINK-12406] [Runtime] Report BLOCKING_PERSISTENT result partition meta back to client
flinkbot commented on issue #8756: [FLINK-12406] [Runtime] Report BLOCKING_PERSISTENT result partition meta back to client URL: https://github.com/apache/flink/pull/8756#issuecomment-502525621 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Xpray opened a new pull request #8756: [FLINK-12406] [Runtime] Report BLOCKING_PERSISTENT result partition meta back to client
Xpray opened a new pull request #8756: [FLINK-12406] [Runtime] Report BLOCKING_PERSISTENT result partition meta back to client URL: https://github.com/apache/flink/pull/8756 ## What is the purpose of the change This PR follows FLINK-12405, it reports all result partition locations of `BLOCKING_PERSISTENT` back to `ExecutionEnvironment`, so `TableEnvironment` can access these data in later PR[FLINK-12420] ## Brief change log - *Add a new class `IntermediateResultDescriptor`, which stores location of a `BLOCKING_PERSISTENT` `ResultPartition`* - *Add a new method `getResultPartitionDescriptors()` in `AccessExecutionGraph`* - *Add a new filed in `JobExecutionResult`, `JobResult`, `ArchivedExecutionGraph` and `ExecutionEnvironment`, which keeps a mapping from `IntermediateDataSetID` to its `ResultPartition` locations* - *When a job finishes, the metadata will flow in this path: `ExecutionGraph` -> `ArchivedExecutionGraph` -> `JobExecutionResult` -> `JobResult` -> `ExecutionEnvironment`* ## Verifying this change This change added tests and can be verified as follows: org.apache.flink.test.operators.ExecutionEnvironmentITCase#testAccessingBlockingPersistentResultPartition ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes [FLIP-36](https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink) [Implementations of FLIP-36](https://docs.google.com/document/d/1qY45m3_r2NmXujcWcjJMJ_avxvnXjGy9tDn99qIBM1Y/edit#) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12865) State inconsistency between RM and TM on the slot status
Yun Gao created FLINK-12865: --- Summary: State inconsistency between RM and TM on the slot status Key: FLINK-12865 URL: https://issues.apache.org/jira/browse/FLINK-12865 Project: Flink Issue Type: Bug Components: Runtime / Coordination Reporter: Yun Gao Assignee: Yun Gao There may be state inconsistency between TM and RM due to race condition and message loss: # When TM sends heartbeat, it retrieve SlotReport in the main thread, but sends the heartbeat in another thread. There may be cases that the slot on TM is FREE initially and SlotReport read the FREE state, then RM requests slot and mark the slot as allocated, and the SlotReport finally override the allocated status at the RM side wrongly. # When RM requests slot, TM received the requests but the acknowledge message get lot. Then RM will think this slot is free and assigned it to other request. Add a version to the slot may solves this problem. # Add a SYNCING status in the RM side. SYNCING means request is sent to TM but not knowing the result of the request. A slot with SYNCING status cannot be allocated to others. # RM and TM maintains a version for each slot, and the version starts from 0. # Whenever RM requests slot, it add the version by 1, and send the requests to TM. TM will only do allocation when RM's version > TM's version. # TM will also attach the version in the HeartBeat and RM will only accept the slot status when the TM's version >= RM's version. The version method is a simplified solution of the full vector clock based state management. In the full vector-clock design, the version should be a vector represents (RM's version, TM's version). Whenever RM modify the slot's status (requestSlot) and TM modify the slot's status (freeSlot), It need to first increase the corresponding component and send the sync messages, and the messages can only be accepted when the vector version >= messages' vector version. However, since for the status of slot TM will only modify its status when freeing slots, we can ignore the component of TM's side will only cause a freed slot be marked as allocated, this will not cause error, and the free status can be finally updated to RM with Heartbeat message. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput
sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput URL: https://github.com/apache/flink/pull/8731#discussion_r294128294 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ## @@ -225,6 +226,21 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { } } + /** +* Starting from the second operator, go forward through the operator chain to notify +* each operator that its input has ended. +* +* @throws Exception if some exception happens in the endInput function of an operator. +*/ + public void endOperatorInputs() throws Exception { + for (int i = allOperators.length - 2; i >= 0; i--) { Review comment: Maybe we can use the following structure? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8707: [FLINK-12815] [table-planner-blink] Supports CatalogManager in blink planner
godfreyhe commented on a change in pull request #8707: [FLINK-12815] [table-planner-blink] Supports CatalogManager in blink planner URL: https://github.com/apache/flink/pull/8707#discussion_r294127605 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java ## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; + +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.Function; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.SchemaVersion; +import org.apache.calcite.schema.Table; + +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Bridge between the {@link CatalogManager} and the {@link Schema}. This way we can query Flink's specific catalogs + * from Calcite. + * + * The mapping for {@link Catalog}s is modeled as a strict two-level reference structure for Flink in Calcite, + * the full path of objects is of format [catalog_name].[db_name].[meta-object_name]. + * + * It also supports {@link ExternalCatalog}s. An external catalog maps 1:1 to the Calcite's schema. + */ +@Internal +public class CatalogManagerCalciteSchema implements Schema { + + private final CatalogManager catalogManager; + private boolean isBatch; + Review comment: we unify `BatchTableSourceTable` and `StreamTableSourceTable` into `TableSourceTable`, and batch `TableSourceTable` and stream `TableSourceTable` have different behaviors in infering table row type. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Myasuka commented on issue #8686: [FLINK-11869] Make buffer size in checkpoint stream factory configurable
Myasuka commented on issue #8686: [FLINK-11869] Make buffer size in checkpoint stream factory configurable URL: https://github.com/apache/flink/pull/8686#issuecomment-502523369 CC @tzulitai This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8707: [FLINK-12815] [table-planner-blink] Supports CatalogManager in blink planner
godfreyhe commented on a change in pull request #8707: [FLINK-12815] [table-planner-blink] Supports CatalogManager in blink planner URL: https://github.com/apache/flink/pull/8707#discussion_r294126984 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; + +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.Function; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.SchemaVersion; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.schema.Table; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +/** + * A mapping between Flink's catalog and Calcite's schema. This enables to look up and access objects(tables, views, + * functions, types) in SQL queries without registering them in advance. Databases are registered as sub-schemas + * in the schema. + */ +@Internal +public class CatalogCalciteSchema implements Schema { + + private final String catalogName; + private final Catalog catalog; + + public CatalogCalciteSchema(String catalogName, Catalog catalog) { + this.catalogName = catalogName; + this.catalog = catalog; + } + + /** +* Look up a sub-schema (database) by the given sub-schema name. +* +* @param schemaName name of sub-schema to look up +* @return the sub-schema with a given database name, or null +*/ + @Override + public Schema getSubSchema(String schemaName) { + Review comment: do you mean we need an interface which defines methods like: `getDatabase(String database)` and `getTable(String table)` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8707: [FLINK-12815] [table-planner-blink] Supports CatalogManager in blink planner
godfreyhe commented on a change in pull request #8707: [FLINK-12815] [table-planner-blink] Supports CatalogManager in blink planner URL: https://github.com/apache/flink/pull/8707#discussion_r294126984 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; + +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.Function; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.SchemaVersion; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.schema.Table; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +/** + * A mapping between Flink's catalog and Calcite's schema. This enables to look up and access objects(tables, views, + * functions, types) in SQL queries without registering them in advance. Databases are registered as sub-schemas + * in the schema. + */ +@Internal +public class CatalogCalciteSchema implements Schema { + + private final String catalogName; + private final Catalog catalog; + + public CatalogCalciteSchema(String catalogName, Catalog catalog) { + this.catalogName = catalogName; + this.catalog = catalog; + } + + /** +* Look up a sub-schema (database) by the given sub-schema name. +* +* @param schemaName name of sub-schema to look up +* @return the sub-schema with a given database name, or null +*/ + @Override + public Schema getSubSchema(String schemaName) { + Review comment: do you mean we need an interface which defines methods like: `getDatabase(String database)` or `getTable(String table)` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema
wuchong commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema URL: https://github.com/apache/flink/pull/8736#discussion_r294126584 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala ## @@ -147,13 +161,15 @@ abstract class TableTestUtil(test: TableTestBase) { * @param types field types * @param fields field names * @param statistic statistic of current table +* @param uniqueKeys unique keys of current table * @return returns the registered [[Table]]. */ def addTableSource( name: String, types: Array[TypeInformation[_]], fields: Array[String], - statistic: FlinkStatistic = FlinkStatistic.UNKNOWN): Table + statistic: TableStats, Review comment: If the more information we need add to `FlinkStatistic`, I think it should also be included in `TableStats`. Regarding to the `relModifiedMonotonicity`, it is only be used internally in intermediate table source (`IntermediateRelTable`) which keeps `FlinkStatistic` as the constructor parameter. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema
wuchong commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema URL: https://github.com/apache/flink/pull/8736#discussion_r294126242 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala ## @@ -19,20 +19,24 @@ package org.apache.flink.table.plan.schema import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.flink.table.{JHashSet, JSet} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.stats.FlinkStatistic import org.apache.flink.table.sources.{TableSource, TableSourceUtil} +import java.util + /** * Abstract class which define the interfaces required to convert a [[TableSource]] to * a Calcite Table */ class TableSourceTable[T]( val tableSource: TableSource[T], -val isStreaming: Boolean, -val statistic: FlinkStatistic) Review comment: 1. the statistic from catalog should be restored via `TableSource#getTableStats()`. 2. I think if the TableSource is not changed, then we don't need to re-construct a new `TableSourceTable`. I can reuse the original `TableSourceTable` and avoid calling `TableSource#getTableStats` again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12857) move FilterableTableSource into flink-table-common
[ https://issues.apache.org/jira/browse/FLINK-12857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-12857. -- Resolution: Implemented Fix Version/s: 1.9.0 merged in 1.9.0: 9f8f89923e96286275ba3fb259ec450baaa4f7c0 > move FilterableTableSource into flink-table-common > -- > > Key: FLINK-12857 > URL: https://issues.apache.org/jira/browse/FLINK-12857 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: godfrey he >Assignee: godfrey he >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > move FilterableTableSource into flink-table-common, so that flink-planner and > blink-planner could use this interface both. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12863) Race condition between slot offerings and AllocatedSlotReport
[ https://issues.apache.org/jira/browse/FLINK-12863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865278#comment-16865278 ] shuai.xu edited comment on FLINK-12863 at 6/17/19 3:27 AM: --- Hi [~till.rohrmann], as [~xiaogang.shi] said, we found the same race condition between RM and TM, and adding a version in each slot to solve it. I think adding fencing token to AllocatedSlotReport may be have some defects. Considering how would you update the fencing token? When offering slots succeeds or before offering slots? If when offering slots succeeds, it may happen that JM use the new fencing token while TM considering the offering slots fail, so TM may not update the token, and JM have no change to use the old token any more. If TM updates the token before offering slots, it may happen that JM doesn't receive the offering, so JM doesn't update the token. I think using a version may be more suitable, as we can compare two version, the bigger version will be correct always. was (Author: tiemsn): Hi [~till.rohrmann], as [~xiaogang.shi] said, we found the same race condition between RM and TM, and adding a version in each slot to solve it. I think adding fencing token to AllocatedSlotReport can solve it. But how would you update the fencing token? When offering slots succeeds or before offering slots? If when offering slots succeeds, it may happen that JM use the new fencing token while TM considering the offering slots fail, so TM may not update the token, and JM have no change to use the old token any more. If TM updates the token before offering slots, it may happen that JM doesn't receive the offering, so JM doesn't update the token. I think using a version may be more suitable, as we can compare two version, the bigger version will be correct always. > Race condition between slot offerings and AllocatedSlotReport > - > > Key: FLINK-12863 > URL: https://issues.apache.org/jira/browse/FLINK-12863 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.7.3, 1.9.0, 1.8.1 > > > With FLINK-11059 we introduced the {{AllocatedSlotReport}} which is used by > the {{TaskExecutor}} to synchronize its internal view on slot allocations > with the view of the {{JobMaster}}. It seems that there is a race condition > between offering slots and receiving the report because the > {{AllocatedSlotReport}} is sent by the {{HeartbeatManagerSenderImpl}} from a > separate thread. > Due to that it can happen that we generate an {{AllocatedSlotReport}} just > before getting new slots offered. Since the report is sent from a different > thread, it can then happen that the response to the slot offerings is sent > earlier than the {{AllocatedSlotReport}}. Consequently, we might receive an > outdated slot report on the {{TaskExecutor}} causing active slots to be > released. > In order to solve the problem I propose to add a fencing token to the > {{AllocatedSlotReport}} which is being updated whenever we offer new slots to > the {{JobMaster}}. When we receive the {{AllocatedSlotReport}} on the > {{TaskExecutor}} we compare the current slot report fencing token with the > received one and only process the report if they are equal. Otherwise we wait > for the next heartbeat to send us an up to date {{AllocatedSlotReport}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] KurtYoung merged pull request #8748: [FLINK-12857] [table] move FilterableTableSource into flink-table-common
KurtYoung merged pull request #8748: [FLINK-12857] [table] move FilterableTableSource into flink-table-common URL: https://github.com/apache/flink/pull/8748 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8689: [FLINK-12802][table-runtime-blink] Reducing the Code of BinaryString
KurtYoung commented on a change in pull request #8689: [FLINK-12802][table-runtime-blink] Reducing the Code of BinaryString URL: https://github.com/apache/flink/pull/8689#discussion_r294125722 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java ## @@ -97,32 +105,12 @@ public static BinaryString blankString(int length) { return fromBytes(spaces); } - /** -* Returns the number of bytes for a code point with the first byte as `b`. -* @param b The first byte of a code point -*/ - private static int numBytesForFirstByte(final byte b) { - if (b >= 0) { - // 1 byte, 7 bits: 0xxx - return 1; - } else if ((b >> 5) == -2 && (b & 0x1e) != 0) { - // 2 bytes, 11 bits: 110x 10xx - return 2; - } else if ((b >> 4) == -2) { - // 3 bytes, 16 bits: 1110 10xx 10xx - return 3; - } else if ((b >> 3) == -2) { - // 4 bytes, 21 bits: 0xxx 10xx 10xx 10xx - return 4; - } else { - // throw new IllegalArgumentException(); - // Skip the first byte disallowed in UTF-8 - return 1; - } - } + // -- + // Utility open methods on BinaryString Review comment: what's `Open Interfaces`? Do you try to mean `Public interfaces`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema
godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema URL: https://github.com/apache/flink/pull/8736#discussion_r294124528 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala ## @@ -19,20 +19,24 @@ package org.apache.flink.table.plan.schema import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.flink.table.{JHashSet, JSet} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.stats.FlinkStatistic import org.apache.flink.table.sources.{TableSource, TableSourceUtil} +import java.util + /** * Abstract class which define the interfaces required to convert a [[TableSource]] to * a Calcite Table */ class TableSourceTable[T]( val tableSource: TableSource[T], -val isStreaming: Boolean, -val statistic: FlinkStatistic) Review comment: Another scenario: rules (like `PushProjectIntoTableSourceScanRule`) does not change statistics , so the new TableSource created by the rule could reuse the original TableSource, and avoid to call `TableSource`#getTableStats method which is high cost. so the ` def copy(statistic: FlinkStatistic): FlinkTable` method defined in `FlinkTable` should not be deleted too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12863) Race condition between slot offerings and AllocatedSlotReport
[ https://issues.apache.org/jira/browse/FLINK-12863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865278#comment-16865278 ] shuai.xu commented on FLINK-12863: -- Hi [~till.rohrmann], as [~xiaogang.shi] said, we found the same race condition between RM and TM, and adding a version in each slot to solve it. I think adding fencing token to AllocatedSlotReport can solve it. But how would you update the fencing token? When offering slots succeeds or before offering slots? If when offering slots succeeds, it may happen that JM use the new fencing token while TM considering the offering slots fail, so TM may not update the token, and JM have no change to use the old token any more. If TM updates the token before offering slots, it may happen that JM doesn't receive the offering, so JM doesn't update the token. I think using a version may be more suitable, as we can compare two version, the bigger version will be correct always. > Race condition between slot offerings and AllocatedSlotReport > - > > Key: FLINK-12863 > URL: https://issues.apache.org/jira/browse/FLINK-12863 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.7.3, 1.9.0, 1.8.1 > > > With FLINK-11059 we introduced the {{AllocatedSlotReport}} which is used by > the {{TaskExecutor}} to synchronize its internal view on slot allocations > with the view of the {{JobMaster}}. It seems that there is a race condition > between offering slots and receiving the report because the > {{AllocatedSlotReport}} is sent by the {{HeartbeatManagerSenderImpl}} from a > separate thread. > Due to that it can happen that we generate an {{AllocatedSlotReport}} just > before getting new slots offered. Since the report is sent from a different > thread, it can then happen that the response to the slot offerings is sent > earlier than the {{AllocatedSlotReport}}. Consequently, we might receive an > outdated slot report on the {{TaskExecutor}} causing active slots to be > released. > In order to solve the problem I propose to add a fencing token to the > {{AllocatedSlotReport}} which is being updated whenever we offer new slots to > the {{JobMaster}}. When we receive the {{AllocatedSlotReport}} on the > {{TaskExecutor}} we compare the current slot report fencing token with the > received one and only process the report if they are equal. Otherwise we wait > for the next heartbeat to send us an up to date {{AllocatedSlotReport}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema
godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema URL: https://github.com/apache/flink/pull/8736#discussion_r294120943 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala ## @@ -147,13 +161,15 @@ abstract class TableTestUtil(test: TableTestBase) { * @param types field types * @param fields field names * @param statistic statistic of current table +* @param uniqueKeys unique keys of current table * @return returns the registered [[Table]]. */ def addTableSource( name: String, types: Array[TypeInformation[_]], fields: Array[String], - statistic: FlinkStatistic = FlinkStatistic.UNKNOWN): Table + statistic: TableStats, Review comment: we may add more info for `FlinkStatistic` in future, use `FlinkStatistic` instead of each fields to make sure this method and related test cases are stable. `relModifiedMonotonicity` is also a member of `FlinkStatistic` and is not defined in this method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema
wuchong commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema URL: https://github.com/apache/flink/pull/8736#discussion_r294125156 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java ## @@ -343,13 +412,66 @@ public Builder field(String name, TypeInformation typeInfo) { return field(name, fromLegacyInfoToDataType(typeInfo)); } + /** +* Add a primary key with the given field names. +* There can only be one PRIMARY KEY for a given table +* See the {@link TableSchema} class javadoc for more definition about primary key. +*/ + public Builder primaryKey(String... fields) { + Preconditions.checkArgument( + fields != null && fields.length > 0, + "The primary key fields shouldn't be null or empty."); + Preconditions.checkArgument( + primaryKey == null, + "A primary key " + primaryKey + + " have been defined, can not define another primary key " + + Arrays.toString(fields)); + for (String field : fields) { + if (!fieldNames.contains(field)) { + throw new IllegalArgumentException("The field '" + field + + "' is not existed in the schema."); + } + } + primaryKey = Arrays.asList(fields); + return this; + } + + /** +* Add an unique key with the given field names. +* There can be more than one UNIQUE KEY for a given table. +* See the {@link TableSchema} class javadoc for more definition about unique key. +*/ + public Builder uniqueKey(String... fields) { + Preconditions.checkArgument( + fields != null && fields.length > 0, + "The unique key fields shouldn't be null or empty."); + for (String field : fields) { + if (!fieldNames.contains(field)) { + throw new IllegalArgumentException("The field '" + field + + "' is not existed in the schema."); + } + } + if (uniqueKeys == null) { + uniqueKeys = new ArrayList<>(); + } + uniqueKeys.add(Arrays.asList(fields)); Review comment: Sure, I will add that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema
wuchong commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema URL: https://github.com/apache/flink/pull/8736#discussion_r294125122 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java ## @@ -343,13 +412,66 @@ public Builder field(String name, TypeInformation typeInfo) { return field(name, fromLegacyInfoToDataType(typeInfo)); } + /** +* Add a primary key with the given field names. +* There can only be one PRIMARY KEY for a given table +* See the {@link TableSchema} class javadoc for more definition about primary key. +*/ + public Builder primaryKey(String... fields) { + Preconditions.checkArgument( + fields != null && fields.length > 0, + "The primary key fields shouldn't be null or empty."); + Preconditions.checkArgument( + primaryKey == null, + "A primary key " + primaryKey + + " have been defined, can not define another primary key " + + Arrays.toString(fields)); + for (String field : fields) { + if (!fieldNames.contains(field)) { + throw new IllegalArgumentException("The field '" + field + + "' is not existed in the schema."); + } + } Review comment: Yes. I think so. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema
wuchong commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema URL: https://github.com/apache/flink/pull/8736#discussion_r294125081 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java ## @@ -43,7 +44,28 @@ import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; /** - * A table schema that represents a table's structure with field names and data types. + * A table schema that represents a table's structure with field names and data types and some + * constraint information (e.g. primary key, unique key). + * + * Concepts about primary key and unique key: Review comment: The difference between primary key and unique key is that there is only one primary key and there can be more than one unique key. And a primary key doesn't need to be declared in unique key list again. I will add this to the class Javadoc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Gao updated FLINK-12852: Component/s: Runtime / Network > Deadlock occurs when requiring exclusive buffer for RemoteInputChannel > -- > > Key: FLINK-12852 > URL: https://issues.apache.org/jira/browse/FLINK-12852 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Major > > When running tests with an upstream vertex and downstream vertex, deadlock > occurs when submitting the job: > {code:java} > "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 > nid=0x38845 waiting on condition [0x7f2cbe9fe000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00073ed6b6f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312) > - locked <0x00073fbc81f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220) > at > org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598) > at java.lang.Thread.run(Thread.java:834) > {code} > This is due to the required and max of local buffer pool is not the same and > there may be over-allocation, when assignExclusiveSegments there are no > available memory. > > The detail of the scenarios is as follows: The parallelism of both upstream > vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM > and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs > 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with > local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces > data quickly and each occupy about 990 buffers. Then the DownStream task > starts and try to assigning exclusive buffers for 1500 -9 = 1491 > InputChannels. It requires 2981 buffers but only 1786 left. Since not all > downstream tasks can start, the job will be blocked finally and no buffer can > be released, and the deadlock finally occurred. > > I think although increasing the network memory solves the problem, the > deadlock may not be acceptable. Fined grained resource management > [Flink-12761|https://issues.apache.org/jira/browse/FLINK-12761] can solve > this problem, but AFAIK in 1.9 it will not include the network memory into > the ResourceProfile. I think the possible solution currently may be one of > # Make the required and max equal for the local buffer pool. > # Add max retrying for allocating exclusive buffers. When exceeding the > maximum retrying times, the task will fail and throw an exception that tells > users to increase the network memory. > I think the second one may be better. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema
godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema URL: https://github.com/apache/flink/pull/8736#discussion_r294120943 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala ## @@ -147,13 +161,15 @@ abstract class TableTestUtil(test: TableTestBase) { * @param types field types * @param fields field names * @param statistic statistic of current table +* @param uniqueKeys unique keys of current table * @return returns the registered [[Table]]. */ def addTableSource( name: String, types: Array[TypeInformation[_]], fields: Array[String], - statistic: FlinkStatistic = FlinkStatistic.UNKNOWN): Table + statistic: TableStats, Review comment: we may add more info for `FlinkStatistic` in future, use `FlinkStatistic` instead of each fields to make sure this method and related test cases are stable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema
godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema URL: https://github.com/apache/flink/pull/8736#discussion_r294124528 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala ## @@ -19,20 +19,24 @@ package org.apache.flink.table.plan.schema import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.flink.table.{JHashSet, JSet} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.stats.FlinkStatistic import org.apache.flink.table.sources.{TableSource, TableSourceUtil} +import java.util + /** * Abstract class which define the interfaces required to convert a [[TableSource]] to * a Calcite Table */ class TableSourceTable[T]( val tableSource: TableSource[T], -val isStreaming: Boolean, -val statistic: FlinkStatistic) Review comment: Another scenario: rules (like `PushProjectIntoTableSourceScanRule`) does not change statistics , so the new TableSource created by the rule could reuse the original TableSource, and avoid to call `TableSource`#getTableStats method which is high cost. so the ` def copy(statistic: FlinkStatistic): FlinkTable` method defined in `FlinkTable` should not be delete too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on issue #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API
WeiZhong94 commented on issue #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API URL: https://github.com/apache/flink/pull/8681#issuecomment-502518727 @dianfu Thanks for you review again! I have addressed your comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12863) Race condition between slot offerings and AllocatedSlotReport
[ https://issues.apache.org/jira/browse/FLINK-12863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865269#comment-16865269 ] Xiaogang Shi commented on FLINK-12863: -- Btw, i want to note that the race condition may not necessarily be caused by {{HeartbeatManagerSenderImpl}} sending heartbeats in a seperate thread. It can solve the problem in JM, but not the one in RM. Even when RM send heartbeat requests in the main thread, right after a slot request, the heartbeart responses may be handled first by RM. It's because RM uses ask to send both heartbeat and slot requests. Temporary {{PromiseActor}}s will be created to receive responses from TM. Since there is no guarantee on the execution order of actors, the {{PromiseActor}} which receives response first may be executed later. > Race condition between slot offerings and AllocatedSlotReport > - > > Key: FLINK-12863 > URL: https://issues.apache.org/jira/browse/FLINK-12863 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.7.3, 1.9.0, 1.8.1 > > > With FLINK-11059 we introduced the {{AllocatedSlotReport}} which is used by > the {{TaskExecutor}} to synchronize its internal view on slot allocations > with the view of the {{JobMaster}}. It seems that there is a race condition > between offering slots and receiving the report because the > {{AllocatedSlotReport}} is sent by the {{HeartbeatManagerSenderImpl}} from a > separate thread. > Due to that it can happen that we generate an {{AllocatedSlotReport}} just > before getting new slots offered. Since the report is sent from a different > thread, it can then happen that the response to the slot offerings is sent > earlier than the {{AllocatedSlotReport}}. Consequently, we might receive an > outdated slot report on the {{TaskExecutor}} causing active slots to be > released. > In order to solve the problem I propose to add a fencing token to the > {{AllocatedSlotReport}} which is being updated whenever we offer new slots to > the {{JobMaster}}. When we receive the {{AllocatedSlotReport}} on the > {{TaskExecutor}} we compare the current slot report fencing token with the > received one and only process the report if they are equal. Otherwise we wait > for the next heartbeat to send us an up to date {{AllocatedSlotReport}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema
godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema URL: https://github.com/apache/flink/pull/8736#discussion_r294120943 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala ## @@ -147,13 +161,15 @@ abstract class TableTestUtil(test: TableTestBase) { * @param types field types * @param fields field names * @param statistic statistic of current table +* @param uniqueKeys unique keys of current table * @return returns the registered [[Table]]. */ def addTableSource( name: String, types: Array[TypeInformation[_]], fields: Array[String], - statistic: FlinkStatistic = FlinkStatistic.UNKNOWN): Table + statistic: TableStats, Review comment: we may add more info for `FlinkStatistic`, use `FlinkStatistic` instead of each fields to make sure this method and related test cases are stable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema
godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema URL: https://github.com/apache/flink/pull/8736#discussion_r294118121 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java ## @@ -343,13 +412,66 @@ public Builder field(String name, TypeInformation typeInfo) { return field(name, fromLegacyInfoToDataType(typeInfo)); } + /** +* Add a primary key with the given field names. +* There can only be one PRIMARY KEY for a given table +* See the {@link TableSchema} class javadoc for more definition about primary key. +*/ + public Builder primaryKey(String... fields) { + Preconditions.checkArgument( + fields != null && fields.length > 0, + "The primary key fields shouldn't be null or empty."); + Preconditions.checkArgument( + primaryKey == null, + "A primary key " + primaryKey + + " have been defined, can not define another primary key " + + Arrays.toString(fields)); + for (String field : fields) { + if (!fieldNames.contains(field)) { + throw new IllegalArgumentException("The field '" + field + + "' is not existed in the schema."); + } + } Review comment: that means we must build fieldNames first ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema
godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema URL: https://github.com/apache/flink/pull/8736#discussion_r294117111 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java ## @@ -43,7 +44,28 @@ import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; /** - * A table schema that represents a table's structure with field names and data types. + * A table schema that represents a table's structure with field names and data types and some + * constraint information (e.g. primary key, unique key). + * + * Concepts about primary key and unique key: Review comment: do we distinguish primary key from unique key? in current javadoc, they have no difference. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema
godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema URL: https://github.com/apache/flink/pull/8736#discussion_r294121385 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/MetadataTestUtil.scala ## @@ -121,6 +123,22 @@ object MetadataTestUtil { getDataStreamTable(schema, new FlinkStatistic(tableStats, uniqueKeys)) } + private def createStudentTableSource(): TableSourceTable[BaseRow] = { +val schema = TableSchema.builder() + .field("id", DataTypes.BIGINT()) + .field("name", DataTypes.STRING()) + .field("score", DataTypes.DOUBLE()) + .field("age", DataTypes.INT()) + .field("height", DataTypes.DOUBLE()) + .field("sex", DataTypes.STRING()) + .field("class", DataTypes.INT()) + .uniqueKey("id") + .build() +val tableSource = new TestTableSource(true, schema) +new TableSourceTable[BaseRow](tableSource, false) + } + + Review comment: delete redundant blank line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema
godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema URL: https://github.com/apache/flink/pull/8736#discussion_r294118265 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java ## @@ -343,13 +412,66 @@ public Builder field(String name, TypeInformation typeInfo) { return field(name, fromLegacyInfoToDataType(typeInfo)); } + /** +* Add a primary key with the given field names. +* There can only be one PRIMARY KEY for a given table +* See the {@link TableSchema} class javadoc for more definition about primary key. +*/ + public Builder primaryKey(String... fields) { + Preconditions.checkArgument( + fields != null && fields.length > 0, + "The primary key fields shouldn't be null or empty."); + Preconditions.checkArgument( + primaryKey == null, + "A primary key " + primaryKey + + " have been defined, can not define another primary key " + + Arrays.toString(fields)); + for (String field : fields) { + if (!fieldNames.contains(field)) { + throw new IllegalArgumentException("The field '" + field + + "' is not existed in the schema."); + } + } + primaryKey = Arrays.asList(fields); + return this; + } + + /** +* Add an unique key with the given field names. +* There can be more than one UNIQUE KEY for a given table. +* See the {@link TableSchema} class javadoc for more definition about unique key. +*/ + public Builder uniqueKey(String... fields) { + Preconditions.checkArgument( + fields != null && fields.length > 0, + "The unique key fields shouldn't be null or empty."); + for (String field : fields) { + if (!fieldNames.contains(field)) { + throw new IllegalArgumentException("The field '" + field + + "' is not existed in the schema."); + } + } + if (uniqueKeys == null) { + uniqueKeys = new ArrayList<>(); + } + uniqueKeys.add(Arrays.asList(fields)); Review comment: should distinct uniqueKey ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema
godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema URL: https://github.com/apache/flink/pull/8736#discussion_r294119747 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala ## @@ -19,20 +19,24 @@ package org.apache.flink.table.plan.schema import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.flink.table.{JHashSet, JSet} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.stats.FlinkStatistic import org.apache.flink.table.sources.{TableSource, TableSourceUtil} +import java.util + /** * Abstract class which define the interfaces required to convert a [[TableSource]] to * a Calcite Table */ class TableSourceTable[T]( val tableSource: TableSource[T], -val isStreaming: Boolean, -val statistic: FlinkStatistic) Review comment: if the `statistic` is deleted, how to store the statistic from catalog? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12653) Keyed state backend fails to restore during rescaling
[ https://issues.apache.org/jira/browse/FLINK-12653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865268#comment-16865268 ] Tzu-Li (Gordon) Tai commented on FLINK-12653: - Thanks for the detailed investigation [~mxm]. This does indeed look like a bug. The fact that this works with RocksDB, but not with the heap backend also makes sense now, since heap tries to eagerly deserialize all state on restore. One thing that we have been considering recently while designing FLIP-43 (Savepoint connector / State Processing API), is to merge all registered state info (state name, state type information / serializer snapshot, owning operator, etc.) and write it as part of the savepoint metadata file. The information should be merged such that there is a global view of all registered states across all subtasks of a given operator. With this, upon rescaling, all restored subtasks should receive complete meta data via the {{KeyedStateHandle}} class. Do you think this will fix this problem, and in general is a reasonable approach? > Keyed state backend fails to restore during rescaling > - > > Key: FLINK-12653 > URL: https://issues.apache.org/jira/browse/FLINK-12653 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.6.4, 1.7.2, 1.8.0 > Environment: Beam 2.12.0 or any other Beam version > Flink >= 1.6 > Heap/Filesystem state backend (RocksDB works fine) >Reporter: Maximilian Michels >Priority: Critical > > The Flink Runner includes a test which verifies checkpoints/savepoints work > correctly with Beam on Flink. When adding additional tests for > scaleup/scaledown [1], I came across a bug with restoring the keyed state > backend. After a fair amount of debugging Beam code and checking any > potential issues with serializers, I think this could be a Flink issue. > Steps to reproduce: > 1. {{git clone https://github.com/mxm/beam}} > 2. {{cd beam && git checkout savepoint-problem}} > 3. {{./gradlew :runners:flink:1.6:test --tests > "**.FlinkSavepointTest.testSavepointRestoreLegacy"}} > Error: > {noformat} > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for DoFnOperator_76375152c4a81d5df72cf49e32c4ecb9_(4/4) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133) > ... 5 more > Caused by: java.lang.RuntimeException: Invalid namespace string: '' > at > org.apache.beam.runners.core.StateNamespaces.fromString(StateNamespaces.java:245) > at > org.apache.beam.runners.core.TimerInternals$TimerDataCoder.decode(TimerInternals.java:246) > at > org.apache.beam.runners.core.TimerInternals$TimerDataCoder.decode(TimerInternals.java:221) > at > org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:92) > at > org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:169) > at > org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45) > at > org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:513) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:474) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:431) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:370) >
[jira] [Updated] (FLINK-12176) Unify JobGraph creation in CliFrontend
[ https://issues.apache.org/jira/browse/FLINK-12176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TisonKun updated FLINK-12176: - Attachment: patch.diff > Unify JobGraph creation in CliFrontend > -- > > Key: FLINK-12176 > URL: https://issues.apache.org/jira/browse/FLINK-12176 > Project: Flink > Issue Type: Improvement > Components: Command Line Client >Affects Versions: 1.9.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Attachments: patch.diff > > > Currently, we create {{JobGraph}} by the following process > * if the cluster start in job mode, we create {{JobGraph}} by > {{PackagedProgramUtils#createJobGraph}} and deploy a job cluster > * if the cluster start in session mode, we create {{JobGraph}} and submit it > by {{CliFrontend#executeProgram}}, which internally the same as above but > using {{ContextEnvironment}} instead of {{OptimizerPlanEnvironment}}. > {{ContextEnvironment}} not only create the job graph but also submit it. > However, the processes of {{JobGraph}} creation in job mode and session mode > are similar. That means, we can unify the process by always create > {{JobGraph}} by {{PackagedProgramUtils#createJobGraph}}. And, > * in job mode, deploy job cluster with the {{JobGraph}} > * in session mode, submit the {{JobGraph}} to the session cluster > From a higher view, it is helpful for a common view of job submission in both > job and session mode and give opportunities to refactor legacy client codes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12176) Unify JobGraph creation in CliFrontend
[ https://issues.apache.org/jira/browse/FLINK-12176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865259#comment-16865259 ] TisonKun commented on FLINK-12176: -- [~till.rohrmann] After a closer look of {{ExecutionEnvironments}} I wonder what exactly means when you said "{{OptimizerPlanEnvironment}} does not support eager execution calls". In order to see the error I patched to switch from {{ContextEnvironment}} to {{OptimizerPlanEnvironment}} but none of tests reported an error(from which I had expected to learn what's wrong with this change). [^patch.diff] > Unify JobGraph creation in CliFrontend > -- > > Key: FLINK-12176 > URL: https://issues.apache.org/jira/browse/FLINK-12176 > Project: Flink > Issue Type: Improvement > Components: Command Line Client >Affects Versions: 1.9.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Attachments: patch.diff > > > Currently, we create {{JobGraph}} by the following process > * if the cluster start in job mode, we create {{JobGraph}} by > {{PackagedProgramUtils#createJobGraph}} and deploy a job cluster > * if the cluster start in session mode, we create {{JobGraph}} and submit it > by {{CliFrontend#executeProgram}}, which internally the same as above but > using {{ContextEnvironment}} instead of {{OptimizerPlanEnvironment}}. > {{ContextEnvironment}} not only create the job graph but also submit it. > However, the processes of {{JobGraph}} creation in job mode and session mode > are similar. That means, we can unify the process by always create > {{JobGraph}} by {{PackagedProgramUtils#createJobGraph}}. And, > * in job mode, deploy job cluster with the {{JobGraph}} > * in session mode, submit the {{JobGraph}} to the session cluster > From a higher view, it is helpful for a common view of job submission in both > job and session mode and give opportunities to refactor legacy client codes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12273) The default value of CheckpointRetentionPolicy should be RETAIN_ON_FAILURE, otherwise it cannot be recovered according to checkpoint after failure.
[ https://issues.apache.org/jira/browse/FLINK-12273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-12273: Assignee: vinoyang > The default value of CheckpointRetentionPolicy should be RETAIN_ON_FAILURE, > otherwise it cannot be recovered according to checkpoint after failure. > --- > > Key: FLINK-12273 > URL: https://issues.apache.org/jira/browse/FLINK-12273 > Project: Flink > Issue Type: Wish > Components: Runtime / Checkpointing >Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0 >Reporter: Mr.Nineteen >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.3, 1.9.0, 2.0.0, 1.6.5, 1.8.2 > > Time Spent: 20m > Remaining Estimate: 0h > > The default value of CheckpointRetentionPolicy should be RETAIN_ON_FAILURE, > otherwise it cannot be recovered according to checkpoint after failure. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] wuchong commented on issue #8712: [FLINK-12819][sql-client] Reuse TableEnvironment between different SQ…
wuchong commented on issue #8712: [FLINK-12819][sql-client] Reuse TableEnvironment between different SQ… URL: https://github.com/apache/flink/pull/8712#issuecomment-502513427 Sorry @docete , you are right, transformations will be cleared after execution. I will review it again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-12122: Fix Version/s: (was: 1.8.1) 1.8.2 > Spread out tasks evenly across all available registered TaskManagers > > > Key: FLINK-12122 > URL: https://issues.apache.org/jira/browse/FLINK-12122 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.6.4, 1.7.2, 1.8.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Fix For: 1.7.3, 1.9.0, 1.8.2 > > Attachments: image-2019-05-21-12-28-29-538.png, > image-2019-05-21-13-02-50-251.png > > > With Flip-6, we changed the default behaviour how slots are assigned to > {{TaskManages}}. Instead of evenly spreading it out over all registered > {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a > tendency to first fill up a TM before using another one. This is a regression > wrt the pre Flip-6 code. > I suggest to change the behaviour so that we try to evenly distribute slots > across all available {{TaskManagers}} by considering how many of their slots > are already allocated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865244#comment-16865244 ] sunjincheng commented on FLINK-12122: - Due to the release of 1.8.1, I change the fix version from 1.8.1 to 1.8.2 > Spread out tasks evenly across all available registered TaskManagers > > > Key: FLINK-12122 > URL: https://issues.apache.org/jira/browse/FLINK-12122 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.6.4, 1.7.2, 1.8.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Fix For: 1.7.3, 1.9.0, 1.8.2 > > Attachments: image-2019-05-21-12-28-29-538.png, > image-2019-05-21-13-02-50-251.png > > > With Flip-6, we changed the default behaviour how slots are assigned to > {{TaskManages}}. Instead of evenly spreading it out over all registered > {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a > tendency to first fill up a TM before using another one. This is a regression > wrt the pre Flip-6 code. > I suggest to change the behaviour so that we try to evenly distribute slots > across all available {{TaskManagers}} by considering how many of their slots > are already allocated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8513) Add documentation for connecting to non-AWS S3 endpoints
[ https://issues.apache.org/jira/browse/FLINK-8513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-8513: --- Fix Version/s: (was: 1.8.1) 1.8.2 > Add documentation for connecting to non-AWS S3 endpoints > > > Key: FLINK-8513 > URL: https://issues.apache.org/jira/browse/FLINK-8513 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem, Documentation >Reporter: chris snow >Assignee: Seth Wiesman >Priority: Trivial > Fix For: 1.9.0, 1.8.2 > > > It would be useful if the documentation provided information on connecting to > non-AWS S3 endpoints when using presto. For example: > > > You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's > {{flink-conf.yaml}}: > {code:java} > s3.access-key: your-access-key > s3.secret-key: your-secret-key{code} > If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object > Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 > endpoint in Flink's {{flink-conf.yaml}}: > {code:java} > s3.endpoint: your-endpoint-hostname{code} > > > Source: > [https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8513) Add documentation for connecting to non-AWS S3 endpoints
[ https://issues.apache.org/jira/browse/FLINK-8513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865243#comment-16865243 ] sunjincheng commented on FLINK-8513: Due to the release of 1.8.1, I change the fix version from 1.8.1 to 1.8.2 > Add documentation for connecting to non-AWS S3 endpoints > > > Key: FLINK-8513 > URL: https://issues.apache.org/jira/browse/FLINK-8513 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem, Documentation >Reporter: chris snow >Assignee: Seth Wiesman >Priority: Trivial > Fix For: 1.9.0, 1.8.2 > > > It would be useful if the documentation provided information on connecting to > non-AWS S3 endpoints when using presto. For example: > > > You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's > {{flink-conf.yaml}}: > {code:java} > s3.access-key: your-access-key > s3.secret-key: your-secret-key{code} > If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object > Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 > endpoint in Flink's {{flink-conf.yaml}}: > {code:java} > s3.endpoint: your-endpoint-hostname{code} > > > Source: > [https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12273) The default value of CheckpointRetentionPolicy should be RETAIN_ON_FAILURE, otherwise it cannot be recovered according to checkpoint after failure.
[ https://issues.apache.org/jira/browse/FLINK-12273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-12273: Fix Version/s: (was: 1.8.1) 1.8.2 > The default value of CheckpointRetentionPolicy should be RETAIN_ON_FAILURE, > otherwise it cannot be recovered according to checkpoint after failure. > --- > > Key: FLINK-12273 > URL: https://issues.apache.org/jira/browse/FLINK-12273 > Project: Flink > Issue Type: Wish > Components: Runtime / Checkpointing >Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0 >Reporter: Mr.Nineteen >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.3, 1.9.0, 2.0.0, 1.6.5, 1.8.2 > > Time Spent: 20m > Remaining Estimate: 0h > > The default value of CheckpointRetentionPolicy should be RETAIN_ON_FAILURE, > otherwise it cannot be recovered according to checkpoint after failure. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12273) The default value of CheckpointRetentionPolicy should be RETAIN_ON_FAILURE, otherwise it cannot be recovered according to checkpoint after failure.
[ https://issues.apache.org/jira/browse/FLINK-12273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865242#comment-16865242 ] sunjincheng commented on FLINK-12273: - Due to the release of 1.8.1, I change the fix version from 1.8.1 to 1.8.2 > The default value of CheckpointRetentionPolicy should be RETAIN_ON_FAILURE, > otherwise it cannot be recovered according to checkpoint after failure. > --- > > Key: FLINK-12273 > URL: https://issues.apache.org/jira/browse/FLINK-12273 > Project: Flink > Issue Type: Wish > Components: Runtime / Checkpointing >Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0 >Reporter: Mr.Nineteen >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.3, 1.9.0, 2.0.0, 1.6.5, 1.8.2 > > Time Spent: 20m > Remaining Estimate: 0h > > The default value of CheckpointRetentionPolicy should be RETAIN_ON_FAILURE, > otherwise it cannot be recovered according to checkpoint after failure. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12578) Use secure URLs for Maven repositories
[ https://issues.apache.org/jira/browse/FLINK-12578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-12578: Fix Version/s: (was: 1.8.1) 1.8.2 > Use secure URLs for Maven repositories > -- > > Key: FLINK-12578 > URL: https://issues.apache.org/jira/browse/FLINK-12578 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.6.4, 1.7.2, 1.8.0, 1.9.0 >Reporter: Jungtaek Lim >Assignee: Jungtaek Lim >Priority: Major > Labels: pull-request-available > Fix For: 1.7.3, 1.9.0, 1.8.2 > > Time Spent: 20m > Remaining Estimate: 0h > > Currently, some of repository URLs in Maven pom.xml are http scheme. Ideally > they should have been https scheme. > Below is the list of repositories which use http scheme in pom files for now: > * Confluent > * HWX > * MapR -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12578) Use secure URLs for Maven repositories
[ https://issues.apache.org/jira/browse/FLINK-12578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865241#comment-16865241 ] sunjincheng commented on FLINK-12578: - Due to the release of 1.8.1, I change the fix version from 1.8.1 to 1.8.2 > Use secure URLs for Maven repositories > -- > > Key: FLINK-12578 > URL: https://issues.apache.org/jira/browse/FLINK-12578 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.6.4, 1.7.2, 1.8.0, 1.9.0 >Reporter: Jungtaek Lim >Assignee: Jungtaek Lim >Priority: Major > Labels: pull-request-available > Fix For: 1.7.3, 1.9.0, 1.8.2 > > Time Spent: 20m > Remaining Estimate: 0h > > Currently, some of repository URLs in Maven pom.xml are http scheme. Ideally > they should have been https scheme. > Below is the list of repositories which use http scheme in pom files for now: > * Confluent > * HWX > * MapR -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11958) flink on windows yarn deploy failed
[ https://issues.apache.org/jira/browse/FLINK-11958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865240#comment-16865240 ] sunjincheng commented on FLINK-11958: - Due to the release of 1.8.1, I change the fix version from 1.8.1 to 1.8.2. > flink on windows yarn deploy failed > --- > > Key: FLINK-11958 > URL: https://issues.apache.org/jira/browse/FLINK-11958 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.7.2 >Reporter: Matrix42 >Assignee: Matrix42 >Priority: Major > Labels: pull-request-available > Fix For: 1.7.3, 1.8.2 > > Time Spent: 20m > Remaining Estimate: 0h > > Flink Version : 1.7.2 > Hadoop Version:2.7.5 > Yarn log: > Application application_1551710861615_0002 failed 1 times due to AM Container > for appattempt_1551710861615_0002_01 exited with exitCode: 1 > For more detailed output, check application tracking > page:http://DESKTOP-919H80J:8088/cluster/app/application_1551710861615_0002Then, > click on links to logs of each attempt. > Diagnostics: Exception from container-launch. > Container id: container_1551710861615_0002_01_01 > Exit code: 1 > Stack trace: ExitCodeException exitCode=1: > at org.apache.hadoop.util.Shell.runCommand(Shell.java:585) > at org.apache.hadoop.util.Shell.run(Shell.java:482) > at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:776) > at > org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212) > at > org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) > at > org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Shell output: 移动了 1 个文件。 > Container exited with a non-zero exit code 1 > Failing this attempt. Failing the application. > > jobmanager.err: > '$JAVA_HOME' 不是内部或外部命令,也不是可运行的程序或批处理文件。 > english: (Not internal or external commands, nor runnable programs or batch > files) > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12863) Race condition between slot offerings and AllocatedSlotReport
[ https://issues.apache.org/jira/browse/FLINK-12863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865239#comment-16865239 ] Xiaogang Shi commented on FLINK-12863: -- I think a similar problem happens in the heartbeats between RM and TM. When a RM receives a slot request from JM, it will find an available slot, mark it as pending, and send a slot request to TM. In the cases where the slot request is following a heartbeat request, RM will receive the heartbeat response first and will remove the pending slot. RM may reuse the slot when it receives a new slot request from JM, leading to duplicated slot allocation. A solution proposed by [~yungao.gy] is using version numbers. Each slot is equipped with a version number, which is increased once a new pending request is generated. These version numbers then are attached to the heartbeats sent to TM. Once a heartbeat response is received, we don't need to remove those pending slot requests whose version numbers are greater than those of heartbeats. I think the solution can also work here. What do you think? [~yungao.gy][~till.rohrmann] > Race condition between slot offerings and AllocatedSlotReport > - > > Key: FLINK-12863 > URL: https://issues.apache.org/jira/browse/FLINK-12863 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.7.3, 1.9.0, 1.8.1 > > > With FLINK-11059 we introduced the {{AllocatedSlotReport}} which is used by > the {{TaskExecutor}} to synchronize its internal view on slot allocations > with the view of the {{JobMaster}}. It seems that there is a race condition > between offering slots and receiving the report because the > {{AllocatedSlotReport}} is sent by the {{HeartbeatManagerSenderImpl}} from a > separate thread. > Due to that it can happen that we generate an {{AllocatedSlotReport}} just > before getting new slots offered. Since the report is sent from a different > thread, it can then happen that the response to the slot offerings is sent > earlier than the {{AllocatedSlotReport}}. Consequently, we might receive an > outdated slot report on the {{TaskExecutor}} causing active slots to be > released. > In order to solve the problem I propose to add a fencing token to the > {{AllocatedSlotReport}} which is being updated whenever we offer new slots to > the {{JobMaster}}. When we receive the {{AllocatedSlotReport}} on the > {{TaskExecutor}} we compare the current slot report fencing token with the > received one and only process the report if they are equal. Otherwise we wait > for the next heartbeat to send us an up to date {{AllocatedSlotReport}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] JingsongLi commented on a change in pull request #8689: [FLINK-12802][table-runtime-blink] Reducing the Code of BinaryString
JingsongLi commented on a change in pull request #8689: [FLINK-12802][table-runtime-blink] Reducing the Code of BinaryString URL: https://github.com/apache/flink/pull/8689#discussion_r294118537 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java ## @@ -97,32 +105,12 @@ public static BinaryString blankString(int length) { return fromBytes(spaces); } - /** -* Returns the number of bytes for a code point with the first byte as `b`. -* @param b The first byte of a code point -*/ - private static int numBytesForFirstByte(final byte b) { - if (b >= 0) { - // 1 byte, 7 bits: 0xxx - return 1; - } else if ((b >> 5) == -2 && (b & 0x1e) != 0) { - // 2 bytes, 11 bits: 110x 10xx - return 2; - } else if ((b >> 4) == -2) { - // 3 bytes, 16 bits: 1110 10xx 10xx - return 3; - } else if ((b >> 3) == -2) { - // 4 bytes, 21 bits: 0xxx 10xx 10xx 10xx - return 4; - } else { - // throw new IllegalArgumentException(); - // Skip the first byte disallowed in UTF-8 - return 1; - } - } + // -- + // Utility open methods on BinaryString Review comment: I mean `Open Interfaces`... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11958) flink on windows yarn deploy failed
[ https://issues.apache.org/jira/browse/FLINK-11958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-11958: Fix Version/s: (was: 1.8.1) 1.8.2 > flink on windows yarn deploy failed > --- > > Key: FLINK-11958 > URL: https://issues.apache.org/jira/browse/FLINK-11958 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.7.2 >Reporter: Matrix42 >Assignee: Matrix42 >Priority: Major > Labels: pull-request-available > Fix For: 1.7.3, 1.8.2 > > Time Spent: 20m > Remaining Estimate: 0h > > Flink Version : 1.7.2 > Hadoop Version:2.7.5 > Yarn log: > Application application_1551710861615_0002 failed 1 times due to AM Container > for appattempt_1551710861615_0002_01 exited with exitCode: 1 > For more detailed output, check application tracking > page:http://DESKTOP-919H80J:8088/cluster/app/application_1551710861615_0002Then, > click on links to logs of each attempt. > Diagnostics: Exception from container-launch. > Container id: container_1551710861615_0002_01_01 > Exit code: 1 > Stack trace: ExitCodeException exitCode=1: > at org.apache.hadoop.util.Shell.runCommand(Shell.java:585) > at org.apache.hadoop.util.Shell.run(Shell.java:482) > at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:776) > at > org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212) > at > org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) > at > org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Shell output: 移动了 1 个文件。 > Container exited with a non-zero exit code 1 > Failing this attempt. Failing the application. > > jobmanager.err: > '$JAVA_HOME' 不是内部或外部命令,也不是可运行的程序或批处理文件。 > english: (Not internal or external commands, nor runnable programs or batch > files) > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] JingsongLi commented on a change in pull request #8689: [FLINK-12802][table-runtime-blink] Reducing the Code of BinaryString
JingsongLi commented on a change in pull request #8689: [FLINK-12802][table-runtime-blink] Reducing the Code of BinaryString URL: https://github.com/apache/flink/pull/8689#discussion_r294118385 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java ## @@ -654,531 +498,211 @@ private BinaryString trimSlow() { } /** -* Walk each character of current string from both ends, remove the character if it -* is in trim string. -* Return the new substring which both ends trim characters have been removed. +* Returns the index within this string of the first occurrence of the +* specified substring, starting at the specified index. * -* @param trimStr the trim string -* @return A subString which both ends trim characters have been removed. +* @param str the substring to search for. +* @param fromIndex the index from which to start the search. +* @return the index of the first occurrence of the specified substring, +* starting at the specified index, +* or {@code -1} if there is no such occurrence. */ - public BinaryString trim(BinaryString trimStr) { - if (trimStr == null) { - return null; - } - return trimLeft(trimStr).trimRight(trimStr); - } - - public BinaryString trimLeft() { + public int indexOf(BinaryString str, int fromIndex) { ensureMaterialized(); + str.ensureMaterialized(); + if (str.sizeInBytes == 0) { + return 0; + } if (inFirstSegment()) { - int s = 0; - // skip all of the space (0x20) in the left side - while (s < this.sizeInBytes && getByteOneSegment(s) == 0x20) { - s++; - } - if (s == this.sizeInBytes) { - // empty string - return EMPTY_UTF8; - } else { - return copyBinaryStringInOneSeg(s, this.sizeInBytes - 1); + // position in byte + int byteIdx = 0; + // position is char + int charIdx = 0; + while (byteIdx < sizeInBytes && charIdx < fromIndex) { + byteIdx += numBytesForFirstByte(getByteOneSegment(byteIdx)); + charIdx++; } + do { + if (byteIdx + str.sizeInBytes > sizeInBytes) { + return -1; + } + if (SegmentsUtil.equals(segments, offset + byteIdx, + str.segments, str.offset, str.sizeInBytes)) { + return charIdx; + } + byteIdx += numBytesForFirstByte(getByteOneSegment(byteIdx)); + charIdx++; + } while (byteIdx < sizeInBytes); + + return -1; } else { - return trimLeftSlow(); + return indexOfMultiSegs(str, fromIndex); } } - private BinaryString trimLeftSlow() { - int s = 0; + private int indexOfMultiSegs(BinaryString str, int fromIndex) { + // position in byte + int byteIdx = 0; + // position is char + int charIdx = 0; int segSize = segments[0].size(); - SegmentAndOffset front = firstSegmentAndOffset(segSize); - // skip all of the space (0x20) in the left side - while (s < this.sizeInBytes && front.value() == 0x20) { - s++; - front.nextByte(segSize); - } - if (s == this.sizeInBytes) { - // empty string - return EMPTY_UTF8; - } else { - return copyBinaryString(s, this.sizeInBytes - 1); + SegmentAndOffset index = firstSegmentAndOffset(segSize); + while (byteIdx < sizeInBytes && charIdx < fromIndex) { + int charBytes = numBytesForFirstByte(index.value()); + byteIdx += charBytes; + charIdx++; + index.skipBytes(charBytes, segSize); } + do { + if (byteIdx + str.sizeInBytes > sizeInBytes) { + return -1; +
[jira] [Commented] (FLINK-12539) StreamingFileSink: Make the class extendable to customize for different usecases
[ https://issues.apache.org/jira/browse/FLINK-12539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865238#comment-16865238 ] sunjincheng commented on FLINK-12539: - I do not want to revert the commit for 1.8 branches, but just to avoid the sake of confusion in the 1.8.1 release note, I mark it as new improvement. > StreamingFileSink: Make the class extendable to customize for different > usecases > > > Key: FLINK-12539 > URL: https://issues.apache.org/jira/browse/FLINK-12539 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Kailash Hassan Dayanand >Assignee: Kailash Hassan Dayanand >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0, 1.8.1 > > Time Spent: 40m > Remaining Estimate: 0h > > Currently the StreamingFileSink has Builder pattern and the actual > constructor of StreamingFileSink is private. This makes it hard to extend the > class to built on top of this and customize the sink. (Example: Adding new > metrics). Proposing to make this protected as well as protected for the > Builder interface. > > Discussion is here: > [http://mail-archives.apache.org/mod_mbox/flink-dev/201905.mbox/%3CCAC27z=phl8+gw-ugmjkxbriseky9zimi2crpqvlpcnyupt8...@mail.gmail.com%3E] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] JingsongLi commented on a change in pull request #8689: [FLINK-12802][table-runtime-blink] Reducing the Code of BinaryString
JingsongLi commented on a change in pull request #8689: [FLINK-12802][table-runtime-blink] Reducing the Code of BinaryString URL: https://github.com/apache/flink/pull/8689#discussion_r294118181 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java ## @@ -1242,599 +763,32 @@ private void skipBytes(int n, int segSize) { } } - private byte value() { + byte value() { return this.segment.get(this.offset); } } /** -* Parses this BinaryString to Long. -* -* Note that, in this method we accumulate the result in negative format, and convert it to -* positive format at the end, if this string is not started with '-'. This is because min value -* is bigger than max value in digits, e.g. Long.MAX_VALUE is '9223372036854775807' and -* Long.MIN_VALUE is '-9223372036854775808'. -* -* This code is mostly copied from LazyLong.parseLong in Hive. -* @return Long value if the parsing was successful else null. -*/ - public Long toLong() { - ensureMaterialized(); - if (sizeInBytes == 0) { - return null; - } - int size = segments[0].size(); - SegmentAndOffset segmentAndOffset = startSegmentAndOffset(size); - int totalOffset = 0; - - byte b = segmentAndOffset.value(); - final boolean negative = b == '-'; - if (negative || b == '+') { - segmentAndOffset.nextByte(size); - totalOffset++; - if (sizeInBytes == 1) { - return null; - } - } - - long result = 0; - final byte separator = '.'; - final int radix = 10; - final long stopValue = Long.MIN_VALUE / radix; - while (totalOffset < this.sizeInBytes) { - b = segmentAndOffset.value(); - totalOffset++; - segmentAndOffset.nextByte(size); - if (b == separator) { - // We allow decimals and will return a truncated integral in that case. - // Therefore we won't throw an exception here (checking the fractional - // part happens below.) - break; - } - - int digit; - if (b >= '0' && b <= '9') { - digit = b - '0'; - } else { - return null; - } - - // We are going to process the new digit and accumulate the result. However, before - // doing this, if the result is already smaller than the - // stopValue(Long.MIN_VALUE / radix), then result * 10 will definitely be smaller - // than minValue, and we can stop. - if (result < stopValue) { - return null; - } - - result = result * radix - digit; - // Since the previous result is less than or equal to - // stopValue(Long.MIN_VALUE / radix), we can just use `result > 0` to check overflow. - // If result overflows, we should stop. - if (result > 0) { - return null; - } - } - - // This is the case when we've encountered a decimal separator. The fractional - // part will not change the number, but we will verify that the fractional part - // is well formed. - while (totalOffset < sizeInBytes) { - byte currentByte = segmentAndOffset.value(); - if (currentByte < '0' || currentByte > '9') { - return null; - } - totalOffset++; - segmentAndOffset.nextByte(size); - } - - if (!negative) { - result = -result; - if (result < 0) { - return null; - } - } - return result; - } - - /** -* Parses this BinaryString to Int. -* -* Note that, in this method we accumulate the result in negative format, and convert it to -* positive format at the end, if this string is not started with '-'. This is because min value -* is bigger
[GitHub] [flink] ifndef-SleePy edited a comment on issue #8721: [FLINK-12823][datastream] PartitionTransformation supports DataExchan…
ifndef-SleePy edited a comment on issue #8721: [FLINK-12823][datastream] PartitionTransformation supports DataExchan… URL: https://github.com/apache/flink/pull/8721#issuecomment-502511761 Travis failed, there is an importing error in python module. I don't think it is caused by this PR. I will try to rebase master and trigger travis checking again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on issue #8721: [FLINK-12823][datastream] PartitionTransformation supports DataExchan…
ifndef-SleePy commented on issue #8721: [FLINK-12823][datastream] PartitionTransformation supports DataExchan… URL: https://github.com/apache/flink/pull/8721#issuecomment-502511761 Travis failed, there is am importing error in python module. I don't think it is caused by this PR. I will try to rebase master and trigger travis checking again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12539) StreamingFileSink: Make the class extendable to customize for different usecases
[ https://issues.apache.org/jira/browse/FLINK-12539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-12539: Issue Type: Improvement (was: New Feature) > StreamingFileSink: Make the class extendable to customize for different > usecases > > > Key: FLINK-12539 > URL: https://issues.apache.org/jira/browse/FLINK-12539 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Kailash Hassan Dayanand >Assignee: Kailash Hassan Dayanand >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0, 1.8.1 > > Time Spent: 40m > Remaining Estimate: 0h > > Currently the StreamingFileSink has Builder pattern and the actual > constructor of StreamingFileSink is private. This makes it hard to extend the > class to built on top of this and customize the sink. (Example: Adding new > metrics). Proposing to make this protected as well as protected for the > Builder interface. > > Discussion is here: > [http://mail-archives.apache.org/mod_mbox/flink-dev/201905.mbox/%3CCAC27z=phl8+gw-ugmjkxbriseky9zimi2crpqvlpcnyupt8...@mail.gmail.com%3E] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] JingsongLi commented on issue #8700: [FLINK-12657][hive] Integrate Flink with Hive UDF
JingsongLi commented on issue #8700: [FLINK-12657][hive] Integrate Flink with Hive UDF URL: https://github.com/apache/flink/pull/8700#issuecomment-502511195 +1 to merge This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8700: [FLINK-12657][hive] Integrate Flink with Hive UDF
JingsongLi commented on a change in pull request #8700: [FLINK-12657][hive] Integrate Flink with Hive UDF URL: https://github.com/apache/flink/pull/8700#discussion_r294117338 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java ## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.hive.conversion; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.hive.util.HiveTypeUtil; +import org.apache.flink.table.functions.hive.FlinkHiveUDFException; +import org.apache.flink.table.types.DataType; + +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaBooleanObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaByteObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaDoubleObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaFloatObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaIntObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaLongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaShortObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; + + +/** + * Util for any ObjectInspector related inspection and conversion of Hive data to/from Flink data. + * + * Hive ObjectInspector is a group of flexible APIs to inspect value in different data representation, + * and developers can extend those API as needed, so technically, object inspector supports arbitrary data type in java. + */ +@Internal +public class HiveInspectors { + + /** +* Get conversion for converting Flink object to Hive object from an ObjectInspector. +*/ + public static HiveObjectConversion getConversion(ObjectInspector inspector) { + if (inspector instanceof PrimitiveObjectInspector) { + if (inspector instanceof JavaBooleanObjectInspector) { + if (((JavaBooleanObjectInspector) inspector).preferWritable()) { + return o -> new BooleanWritable((Boolean) o); + } else { + return IdentityConversion.INSTANCE; + } + } else if (inspector instanceof
[jira] [Updated] (FLINK-12539) StreamingFileSink: Make the class extendable to customize for different usecases
[ https://issues.apache.org/jira/browse/FLINK-12539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-12539: Issue Type: New Feature (was: Improvement) > StreamingFileSink: Make the class extendable to customize for different > usecases > > > Key: FLINK-12539 > URL: https://issues.apache.org/jira/browse/FLINK-12539 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Reporter: Kailash Hassan Dayanand >Assignee: Kailash Hassan Dayanand >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0, 1.8.1 > > Time Spent: 40m > Remaining Estimate: 0h > > Currently the StreamingFileSink has Builder pattern and the actual > constructor of StreamingFileSink is private. This makes it hard to extend the > class to built on top of this and customize the sink. (Example: Adding new > metrics). Proposing to make this protected as well as protected for the > Builder interface. > > Discussion is here: > [http://mail-archives.apache.org/mod_mbox/flink-dev/201905.mbox/%3CCAC27z=phl8+gw-ugmjkxbriseky9zimi2crpqvlpcnyupt8...@mail.gmail.com%3E] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12539) StreamingFileSink: Make the class extendable to customize for different usecases
[ https://issues.apache.org/jira/browse/FLINK-12539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865237#comment-16865237 ] sunjincheng edited comment on FLINK-12539 at 6/17/19 2:06 AM: -- I think this Jira is not about the New Feature(BTW, new Features should not merge into the bugfix branch). the changes only changing class visibility, So I think is an Improvement, and I change the Type to Improvement. was (Author: sunjincheng121): I think this Jira is not about the New Feature(BTW, new Features should not merge into the bugfix branch). the changes only changing class visibility, So I think is an Improvement. > StreamingFileSink: Make the class extendable to customize for different > usecases > > > Key: FLINK-12539 > URL: https://issues.apache.org/jira/browse/FLINK-12539 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Kailash Hassan Dayanand >Assignee: Kailash Hassan Dayanand >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0, 1.8.1 > > Time Spent: 40m > Remaining Estimate: 0h > > Currently the StreamingFileSink has Builder pattern and the actual > constructor of StreamingFileSink is private. This makes it hard to extend the > class to built on top of this and customize the sink. (Example: Adding new > metrics). Proposing to make this protected as well as protected for the > Builder interface. > > Discussion is here: > [http://mail-archives.apache.org/mod_mbox/flink-dev/201905.mbox/%3CCAC27z=phl8+gw-ugmjkxbriseky9zimi2crpqvlpcnyupt8...@mail.gmail.com%3E] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] docete commented on issue #8712: [FLINK-12819][sql-client] Reuse TableEnvironment between different SQ…
docete commented on issue #8712: [FLINK-12819][sql-client] Reuse TableEnvironment between different SQ… URL: https://github.com/apache/flink/pull/8712#issuecomment-502509891 Hi, @wuchong I checked flink planner and blink planner. Both of them have cleared transformations after execution. For flink planner, transformations are cleared in execution() function. For blink planner, transformations are cached in TableEnvironment and cleared after generating StreamGraph. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12539) StreamingFileSink: Make the class extendable to customize for different usecases
[ https://issues.apache.org/jira/browse/FLINK-12539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865237#comment-16865237 ] sunjincheng commented on FLINK-12539: - I think this Jira is not about the New Feature(BTW, new Features should not merge into the bugfix branch). the changes only changing class visibility, So I think is an Improvement. > StreamingFileSink: Make the class extendable to customize for different > usecases > > > Key: FLINK-12539 > URL: https://issues.apache.org/jira/browse/FLINK-12539 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Kailash Hassan Dayanand >Assignee: Kailash Hassan Dayanand >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0, 1.8.1 > > Time Spent: 40m > Remaining Estimate: 0h > > Currently the StreamingFileSink has Builder pattern and the actual > constructor of StreamingFileSink is private. This makes it hard to extend the > class to built on top of this and customize the sink. (Example: Adding new > metrics). Proposing to make this protected as well as protected for the > Builder interface. > > Discussion is here: > [http://mail-archives.apache.org/mod_mbox/flink-dev/201905.mbox/%3CCAC27z=phl8+gw-ugmjkxbriseky9zimi2crpqvlpcnyupt8...@mail.gmail.com%3E] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12539) StreamingFileSink: Make the class extendable to customize for different usecases
[ https://issues.apache.org/jira/browse/FLINK-12539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-12539: Issue Type: Improvement (was: New Feature) > StreamingFileSink: Make the class extendable to customize for different > usecases > > > Key: FLINK-12539 > URL: https://issues.apache.org/jira/browse/FLINK-12539 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Kailash Hassan Dayanand >Assignee: Kailash Hassan Dayanand >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0, 1.8.1 > > Time Spent: 40m > Remaining Estimate: 0h > > Currently the StreamingFileSink has Builder pattern and the actual > constructor of StreamingFileSink is private. This makes it hard to extend the > class to built on top of this and customize the sink. (Example: Adding new > metrics). Proposing to make this protected as well as protected for the > Builder interface. > > Discussion is here: > [http://mail-archives.apache.org/mod_mbox/flink-dev/201905.mbox/%3CCAC27z=phl8+gw-ugmjkxbriseky9zimi2crpqvlpcnyupt8...@mail.gmail.com%3E] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12539) StreamingFileSink: Make the class extendable to customize for different usecases
[ https://issues.apache.org/jira/browse/FLINK-12539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-12539: Issue Type: New Feature (was: Improvement) > StreamingFileSink: Make the class extendable to customize for different > usecases > > > Key: FLINK-12539 > URL: https://issues.apache.org/jira/browse/FLINK-12539 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Reporter: Kailash Hassan Dayanand >Assignee: Kailash Hassan Dayanand >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0, 1.8.1 > > Time Spent: 40m > Remaining Estimate: 0h > > Currently the StreamingFileSink has Builder pattern and the actual > constructor of StreamingFileSink is private. This makes it hard to extend the > class to built on top of this and customize the sink. (Example: Adding new > metrics). Proposing to make this protected as well as protected for the > Builder interface. > > Discussion is here: > [http://mail-archives.apache.org/mod_mbox/flink-dev/201905.mbox/%3CCAC27z=phl8+gw-ugmjkxbriseky9zimi2crpqvlpcnyupt8...@mail.gmail.com%3E] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12539) StreamingFileSink: Make the class extendable to customize for different usecases
[ https://issues.apache.org/jira/browse/FLINK-12539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-12539: Issue Type: Improvement (was: New Feature) > StreamingFileSink: Make the class extendable to customize for different > usecases > > > Key: FLINK-12539 > URL: https://issues.apache.org/jira/browse/FLINK-12539 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Kailash Hassan Dayanand >Assignee: Kailash Hassan Dayanand >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0, 1.8.1 > > Time Spent: 40m > Remaining Estimate: 0h > > Currently the StreamingFileSink has Builder pattern and the actual > constructor of StreamingFileSink is private. This makes it hard to extend the > class to built on top of this and customize the sink. (Example: Adding new > metrics). Proposing to make this protected as well as protected for the > Builder interface. > > Discussion is here: > [http://mail-archives.apache.org/mod_mbox/flink-dev/201905.mbox/%3CCAC27z=phl8+gw-ugmjkxbriseky9zimi2crpqvlpcnyupt8...@mail.gmail.com%3E] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] wuchong commented on issue #8748: [FLINK-12857] [table] move FilterableTableSource into flink-table-common
wuchong commented on issue #8748: [FLINK-12857] [table] move FilterableTableSource into flink-table-common URL: https://github.com/apache/flink/pull/8748#issuecomment-502508222 Thanks @godfreyhe , looks good to me. +1 to merge. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services