[GitHub] [flink] zhijiangW commented on issue #8181: [FLINK-12199][network] Refactor IOMetrics to not distinguish between local/remote in/out bytes
zhijiangW commented on issue #8181: [FLINK-12199][network] Refactor IOMetrics to not distinguish between local/remote in/out bytes URL: https://github.com/apache/flink/pull/8181#issuecomment-484364445 @zentol @azagrebin The codes are already updated for addressing above comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12208) Support translation from StreamExecSort / TemporalSort / SortLimit/ Limit to StreamTransformation.
[ https://issues.apache.org/jira/browse/FLINK-12208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Zhang updated FLINK-12208: --- Description: Support translation from StreamExecSort / TemporalSort / SortLimit/ Limit to StreamTransformation. 1. StreamExecSort handles query which requires sort on non-time fields, which caches all inputs elements, sorts all cached elements when the bounded-stream is finished, outputs sorted elements one by one then. *Notes:* it's only used for testing with bounded source now. If a query is converted to this node in product environment, an exception will be thrown. 2. StreamExecTemporalSort handles query which requires sort on EventTime/ ProcTime fields 3. StreamExecLimit could handles query which only takes `limit` elements from offset. *Notes:* StreamExecLimit could only support take limited elements currently, that is, `fetch` must be present in query. 4. StreamExecSortLimit handles query which not only requires sort, but also only takes `limit` elements from offset. *Notes:* StreamExecSortLimit could only support take limited elements currently, that is, `fetch` must be present in query. was: Support translation from StreamExecSort / TemporalSort / SortLimit/ Limit to StreamTransformation. 1. StreamExecSort handles query which requires sort on non-time fields. *Notes:* it's only used for testing with bounded source now. If a query is converted to this node in product environment, an exception will be thrown. 2. StreamExecTemporalSort handles query which requires sort on EventTime/ ProcTime fields 3. StreamExecLimit could handles query which only takes `limit` elements from offset. *Notes:* StreamExecLimit could only support take limited elements currently, that is, `fetch` must be present in query. 4. StreamExecSortLimit handles query which not only requires sort, but also only takes `limit` elements from offset. *Notes:* StreamExecSortLimit could only support take limited elements currently, that is, `fetch` must be present in query. > Support translation from StreamExecSort / TemporalSort / SortLimit/ Limit to > StreamTransformation. > -- > > Key: FLINK-12208 > URL: https://issues.apache.org/jira/browse/FLINK-12208 > Project: Flink > Issue Type: Task > Components: Table SQL / Runtime >Reporter: Jing Zhang >Priority: Major > > Support translation from StreamExecSort / TemporalSort / SortLimit/ Limit to > StreamTransformation. > 1. StreamExecSort handles query which requires sort on non-time fields, which > caches all inputs elements, sorts all cached elements when the bounded-stream > is finished, outputs sorted elements one by one then. > *Notes:* it's only used for testing with bounded source now. If a query is > converted to this node in product environment, an exception will be thrown. > 2. StreamExecTemporalSort handles query which requires sort on EventTime/ > ProcTime fields > 3. StreamExecLimit could handles query which only takes `limit` elements > from offset. > *Notes:* StreamExecLimit could only support take limited elements currently, > that is, `fetch` must be present in query. > 4. StreamExecSortLimit handles query which not only requires sort, but also > only takes `limit` elements from offset. > *Notes:* StreamExecSortLimit could only support take limited elements > currently, that is, `fetch` must be present in query. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8204: [hotfix][docs] fix typo in best_practices doc
flinkbot commented on issue #8204: [hotfix][docs] fix typo in best_practices doc URL: https://github.com/apache/flink/pull/8204#issuecomment-484362418 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zzchun opened a new pull request #8204: [hotfix][docs] fix typo in best_practices doc
zzchun opened a new pull request #8204: [hotfix][docs] fix typo in best_practices doc URL: https://github.com/apache/flink/pull/8204 ## What is the purpose of the change Fix typo in best_practices.md ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12208) Support translation from StreamExecSort / TemporalSort / SortLimit/ Limit to StreamTransformation.
[ https://issues.apache.org/jira/browse/FLINK-12208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Zhang updated FLINK-12208: --- Description: Support translation from StreamExecSort / TemporalSort / SortLimit/ Limit to StreamTransformation. 1. StreamExecSort handles query which requires sort on non-time fields. *Notes:* it's only used for testing with bounded source now. If a query is converted to this node in product environment, an exception will be thrown. 2. StreamExecTemporalSort handles query which requires sort on EventTime/ ProcTime fields 3. StreamExecLimit could handles query which only takes `limit` elements from offset. *Notes:* StreamExecLimit could only support take limited elements currently, that is, `fetch` must be present in query. 4. StreamExecSortLimit handles query which not only requires sort, but also only takes `limit` elements from offset. *Notes:* StreamExecSortLimit could only support take limited elements currently, that is, `fetch` must be present in query. was: Introduce Sort / TemporalSort / SortLimit/ Limit operators for blink streaming runtime 1. StreamExecSort used to > Support translation from StreamExecSort / TemporalSort / SortLimit/ Limit to > StreamTransformation. > -- > > Key: FLINK-12208 > URL: https://issues.apache.org/jira/browse/FLINK-12208 > Project: Flink > Issue Type: Task > Components: Table SQL / Runtime >Reporter: Jing Zhang >Priority: Major > > Support translation from StreamExecSort / TemporalSort / SortLimit/ Limit to > StreamTransformation. > 1. StreamExecSort handles query which requires sort on non-time fields. > *Notes:* it's only used for testing with bounded source now. If a query is > converted to this node in product environment, an exception will be thrown. > 2. StreamExecTemporalSort handles query which requires sort on EventTime/ > ProcTime fields > 3. StreamExecLimit could handles query which only takes `limit` elements > from offset. > *Notes:* StreamExecLimit could only support take limited elements currently, > that is, `fetch` must be present in query. > 4. StreamExecSortLimit handles query which not only requires sort, but also > only takes `limit` elements from offset. > *Notes:* StreamExecSortLimit could only support take limited elements > currently, that is, `fetch` must be present in query. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12208) Support translation from StreamExecSort / TemporalSort / SortLimit/ Limit to StreamTransformation.
[ https://issues.apache.org/jira/browse/FLINK-12208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Zhang updated FLINK-12208: --- Description: Introduce Sort / TemporalSort / SortLimit/ Limit operators for blink streaming runtime 1. StreamExecSort used to was: Introduce Sort / TemporalSort / SortLimit/ Limit operators for blink streaming runtime > Support translation from StreamExecSort / TemporalSort / SortLimit/ Limit to > StreamTransformation. > -- > > Key: FLINK-12208 > URL: https://issues.apache.org/jira/browse/FLINK-12208 > Project: Flink > Issue Type: Task > Components: Table SQL / Runtime >Reporter: Jing Zhang >Priority: Major > > Introduce Sort / TemporalSort / SortLimit/ Limit operators for blink > streaming runtime > 1. StreamExecSort used to -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12208) Support translation from StreamExecSort / TemporalSort / SortLimit/ Limit to StreamTransformation.
[ https://issues.apache.org/jira/browse/FLINK-12208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Zhang updated FLINK-12208: --- Summary: Support translation from StreamExecSort / TemporalSort / SortLimit/ Limit to StreamTransformation. (was: Introduce Sort / TemporalSort / SortLimit/ Limit operators for blink streaming runtime ) > Support translation from StreamExecSort / TemporalSort / SortLimit/ Limit to > StreamTransformation. > -- > > Key: FLINK-12208 > URL: https://issues.apache.org/jira/browse/FLINK-12208 > Project: Flink > Issue Type: Task > Components: Table SQL / Runtime >Reporter: Jing Zhang >Priority: Major > > Introduce Sort / TemporalSort / SortLimit/ Limit operators for blink > streaming runtime -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] beyond1920 opened a new pull request #8203: [Flink-12208][table-planner-blink] Support translation from StreamExecSort / TemporalSort / SortLimit/ Limit to StreamTransformation.
beyond1920 opened a new pull request #8203: [Flink-12208][table-planner-blink] Support translation from StreamExecSort / TemporalSort / SortLimit/ Limit to StreamTransformation. URL: https://github.com/apache/flink/pull/8203 ## What is the purpose of the change Introduce Sort / TemporalSort / SortLimit/ Limit operators for blink streaming runtime. ## Brief change log * Introduce StreamSortOperator to process sort on Non-time fields. * Introduce OnlyRowTimeSortOperator/RowTimeSortOperator/ProcTimeSortOperator to process sort on time fields. * StreamExecSort / StreamExecTemporalSort / StreamExecSortLimit/ StreamExecLimit implements StreamExecNode. ## Verifying this change UT && IT ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8203: [Flink-12208][table-planner-blink] Support translation from StreamExecSort / TemporalSort / SortLimit/ Limit to StreamTransformation.
flinkbot commented on issue #8203: [Flink-12208][table-planner-blink] Support translation from StreamExecSort / TemporalSort / SortLimit/ Limit to StreamTransformation. URL: https://github.com/apache/flink/pull/8203#issuecomment-484360424 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12245) Transient slot allocation failure on job recovery
Hwanju Kim created FLINK-12245: -- Summary: Transient slot allocation failure on job recovery Key: FLINK-12245 URL: https://issues.apache.org/jira/browse/FLINK-12245 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.6.3 Environment: Flink 1.6.2 with Kubernetes Reporter: Hwanju Kim In 1.6.2, We have experienced that slot allocation is transiently failed on job recovery especially when task manager (TM) is unavailable leading to heartbeat failure. By transient, it means it fails once with slot allocation timeout (by default 5min) and then next recovering restart is succeeded. I found that each _Execution_ remembers previous allocations and tries to prefer the last previous allocation for the sake of local state recovery from the resolved slot candidates. If the previous allocation belongs to unavailable TM, the candidates do not have this previous allocation, thereby forcing slot provider to request a new slot to resource manager, which then finds a new TM and its available slots. So far it is expected and fine, but any next execution that also belonged to the unavailable TM and has the first task as predecessor fails with the unavailable previous allocation as well. Here it also requests another new slot since it never finds the gone previous allocation from candidates. However, this behavior may make more slot requests than available. For example, if two pipelined tasks shared one slot in one TM, which is then crashed being replaced with a new TM, two new slot requests are generated from the tasks. Since two slot requests cannot be fulfilled by one slot TM, it hits slot allocation timeout and restarts the job. {code:java} org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 30 ms. Slots required: 2, slots allocated: 1 {code} At the next round of recovery, since the second execution failed to allocate a new slot, its last previous allocation is _null_, then it falls back to locality-based allocation strategy, which can find the slot allocated for the first task, and thus succeeded. Although it is eventually succeeded, it increases downtime by slot allocation timeout. The reason of this behavior is _PreviousAllocationSchedulingStrategy.findMatchWithLocality()_ immediately returns _null_ if previous allocation is not empty and is not contained in candidate list. I thought that if previous allocation is not in the candidates, it can fall back to _LocationPreferenceSchedulingStrategy.findMatchWithLocality()_ rather than returning _null_. By doing so, it can avoid requesting more than available. Requesting more slots could be fine in an environment where resource managers can reactively spawn up more TMs (like Yarn/Mesos) although it could spawn more than needed, but StandaloneResourceManager with statically provisioned resource cannot help but failing to allocate requested slots. Having looked at the mainline branch and 1.8.0, although I have not attempted to reproduce this issue with mainline, the related code is changed to what I have expected (falling back to locality-based strategy if previous allocation is not in candidates): PreviousAllocationSlotSelectionStrategy.selectBestSlotForProfile(). Those led me to reading group-aware scheduling work ([https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit#heading=h.k15nfgsa5bnk]). In addition, I checked in 1.6.2 _matchPreviousLocationNotAvailable_ test expects the problematic behavior I described. So, I started wondering whether the behavior of previous allocation strategy in non-mainline is by design or not. I have a fix similar to the mainline and verified that the problem is resolved, but I am bringing up the issue to have context around the behavior and to discuss what would be the side-effect of the fix. I understand the current vertex-by-vertex scheduling would be inefficient by letting an execution that belonged to unavailable slot steal another task's previous slot, but having slot allocation failure seems worse to me. I searched with slot allocation failure term in existing issues, and couldn't find the same issue, hence this issue. Please feel free to deduplicate it if any. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9904) Allow users to control MaxDirectMemorySize
[ https://issues.apache.org/jira/browse/FLINK-9904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16820720#comment-16820720 ] Ji Liu commented on FLINK-9904: --- Hi [~hroongta] are you still working on this issue? If not, i would like to provide a fix. > Allow users to control MaxDirectMemorySize > -- > > Key: FLINK-9904 > URL: https://issues.apache.org/jira/browse/FLINK-9904 > Project: Flink > Issue Type: Improvement > Components: Deployment / Scripts >Affects Versions: 1.4.2, 1.5.1, 1.7.2, 1.8.0, 1.9.0 >Reporter: Himanshu Roongta >Priority: Minor > > For people who use docker image and run flink in pods, currently, there is no > way to update > {{MaxDirectMemorySize}} > (Well one can create a custom version of > [taskmanager.sh|https://github.com/apache/flink/blob/master/flink-dist/src/main/flink-bin/bin/taskmanager.sh]) > > As a result, it starts with a value of 8388607T . If the param > {{taskmanager.memory.preallocate}} is set to false (default) the clean up > will only occur when the MaxDirectMemorySize limit is hit and a gc full cycle > kicks in. However with pods especially in kuberenete they will get killed > because pods do not run at such a high value. (In our case we run 8GB per pod) > > The fix would be to allow it be configurable via {{flink-conf}}. We can still > have a default of 8388607T to avoid a breaking change. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8202: [FLINK-12133] [table-runtime-blink] Support unbounded aggregate in streaming table runtime
flinkbot commented on issue #8202: [FLINK-12133] [table-runtime-blink] Support unbounded aggregate in streaming table runtime URL: https://github.com/apache/flink/pull/8202#issuecomment-484351023 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12133) Support unbounded aggregate in streaming table runtime
[ https://issues.apache.org/jira/browse/FLINK-12133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12133: --- Labels: pull-request-available (was: ) > Support unbounded aggregate in streaming table runtime > -- > > Key: FLINK-12133 > URL: https://issues.apache.org/jira/browse/FLINK-12133 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > Labels: pull-request-available > > This ticket is aiming to support unbounded aggregate in streaming runtime. > This should includes: > 1. GroupAggFunction: function that support unbounded aggregate without > optimizations > 2. MiniBatchGroupAggFunction: function that support unbounded aggregate with > minibatch optimization > 3. MiniBatchLocalGroupAggFunction & MiniBatchGlobalGroupAggFunction: > function that support unbounded aggregate with local combine optimization -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] wuchong opened a new pull request #8202: [FLINK-12133] [table-runtime-blink] Support unbounded aggregate in streaming table runtime
wuchong opened a new pull request #8202: [FLINK-12133] [table-runtime-blink] Support unbounded aggregate in streaming table runtime URL: https://github.com/apache/flink/pull/8202 ## What is the purpose of the change This ticket is aiming to support unbounded aggregate in blink streaming runtime. Includes: 1. GroupAggFunction: function that support unbounded aggregate without minibatch optimizations 2. MiniBatchGroupAggFunction: function that support unbounded aggregate with minibatch optimization 3. MiniBatchLocalGroupAggFunction & MiniBatchGlobalGroupAggFunction: function that support unbounded aggregate with local combine optimization 4. MiniBatchIncrementalAggFunction: agg function with incremental optimization. ## Brief change log - Add `GroupAggFunction`, `MiniBatchGroupAggFunction`, `MiniBatchLocalGroupAggFunction`, `MiniBatchIncrementalAggFunction`. - Move `TableConfig`, `MapView`, `ListView` to `flink-table-runtime-blink` module to make runtime class can access them. They should be in api module at last. So this is a temporary change. - Implement Expand and Union for runtime. - Introduce `StateDataViewStore` for `AggsHandleFunction` to access state DataView instead of using `ExecutionContext` in it. - Introduce `SINGLE_VALUE` and `COLLECT` agg call. - Rewrite `MaxWithRetract`, `MinWithRetract`, `ConcatWithRetract`, `ConcatWsWithRetract`. - Fix several bugs in code generation and serialization and other places. ## Verifying this change - Add `AggregateITCase` with 6 modes to cover unbounded aggregates. - Add `SplitAggregateITCase` with 4 modes to cover distinct aggregates split optimization. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on issue #8184: [FLINK-12211][table-planner-blink] Add more it cases to blink batch
JingsongLi commented on issue #8184: [FLINK-12211][table-planner-blink] Add more it cases to blink batch URL: https://github.com/apache/flink/pull/8184#issuecomment-484346031 > > #8165 make CalcITCase.testTimestampCompareWithDateString fail. Some bug in #8165 . > > The bug is in codegen, not in #8165. codegen needs to handle `Calc(select=[j], where=[<(j, CAST(_UTF-16LE'2017-11-11'))])` correctly. In blink, some advanced rules will convert `CAST(_UTF-16LE'2017-11-11'):TIMESTAMP(3) NOT NULL)` to a timestamp value directly, however those rules have not been ported to flink yet. Because our CodeGen not support varchar(-mm-dd) cast to timestamp(return null). But calcite support it. So if the plan use calcite method to reduce the cast expression, the case will past. But if the plan did not reduce the cast, the case will fail. I add todo to the case, Let's support later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12214) Add JobListener(hook) in flink job lifecycle
[ https://issues.apache.org/jira/browse/FLINK-12214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-12214: --- Summary: Add JobListener(hook) in flink job lifecycle (was: Add JobListener (hook) in flink job lifecycle) > Add JobListener(hook) in flink job lifecycle > > > Key: FLINK-12214 > URL: https://issues.apache.org/jira/browse/FLINK-12214 > Project: Flink > Issue Type: New Feature > Components: Runtime / Coordination >Reporter: Jeff Zhang >Assignee: Jeff Zhang >Priority: Major > Labels: pull-request-available > Attachments: image-2019-04-17-00-11-00-020.png, > image-2019-04-17-00-13-15-999.png, image-2019-04-17-00-15-16-750.png > > Time Spent: 20m > Remaining Estimate: 0h > > Third party library would like to add some hook when submitting jobs. So it > would be nice to have JobListener for flink. > More details about the motivation: > The background is that I am working on integrating flink with apache zeppelin > notebook. In apache zeppelin notebook, we allow user to run flink scala code > or sql. For each piece fo code, I'd like to associate it with its > corresponding flink job. So that user can link to flink job ui in zeppelin > notebook, also he can cancel this job. Here's one screenshot of my POC, in > this screenshot you can see the FLINK JOB link which will link to the flink > job ui. That's why I'd like to add hook in flink's job submission, so that I > can link each piece of flink code to paragraph (the following screenshot) > which is a concept of zeppelin and is a container that host this piece of > code. > Here's the sample code for reference about how I use JobListener in zeppelin. > [https://github.com/zjffdu/zeppelin/blob/blink_poc/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala#L251] > !image-2019-04-17-00-15-16-750.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12214) Add JobListener (hook) in flink job lifecycle
[ https://issues.apache.org/jira/browse/FLINK-12214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-12214: --- Summary: Add JobListener (hook) in flink job lifecycle (was: Add JobListener (hook) to in job lifecycle) > Add JobListener (hook) in flink job lifecycle > - > > Key: FLINK-12214 > URL: https://issues.apache.org/jira/browse/FLINK-12214 > Project: Flink > Issue Type: New Feature > Components: Runtime / Coordination >Reporter: Jeff Zhang >Assignee: Jeff Zhang >Priority: Major > Labels: pull-request-available > Attachments: image-2019-04-17-00-11-00-020.png, > image-2019-04-17-00-13-15-999.png, image-2019-04-17-00-15-16-750.png > > Time Spent: 20m > Remaining Estimate: 0h > > Third party library would like to add some hook when submitting jobs. So it > would be nice to have JobListener for flink. > More details about the motivation: > The background is that I am working on integrating flink with apache zeppelin > notebook. In apache zeppelin notebook, we allow user to run flink scala code > or sql. For each piece fo code, I'd like to associate it with its > corresponding flink job. So that user can link to flink job ui in zeppelin > notebook, also he can cancel this job. Here's one screenshot of my POC, in > this screenshot you can see the FLINK JOB link which will link to the flink > job ui. That's why I'd like to add hook in flink's job submission, so that I > can link each piece of flink code to paragraph (the following screenshot) > which is a concept of zeppelin and is a container that host this piece of > code. > Here's the sample code for reference about how I use JobListener in zeppelin. > [https://github.com/zjffdu/zeppelin/blob/blink_poc/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala#L251] > !image-2019-04-17-00-15-16-750.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12214) Add JobListener (hook) to in job lifecycle
[ https://issues.apache.org/jira/browse/FLINK-12214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-12214: --- Summary: Add JobListener (hook) to in job lifecycle (was: Add JobListener to capture job submission process) > Add JobListener (hook) to in job lifecycle > -- > > Key: FLINK-12214 > URL: https://issues.apache.org/jira/browse/FLINK-12214 > Project: Flink > Issue Type: New Feature > Components: Runtime / Coordination >Reporter: Jeff Zhang >Assignee: Jeff Zhang >Priority: Major > Labels: pull-request-available > Attachments: image-2019-04-17-00-11-00-020.png, > image-2019-04-17-00-13-15-999.png, image-2019-04-17-00-15-16-750.png > > Time Spent: 20m > Remaining Estimate: 0h > > Third party library would like to add some hook when submitting jobs. So it > would be nice to have JobListener for flink. > More details about the motivation: > The background is that I am working on integrating flink with apache zeppelin > notebook. In apache zeppelin notebook, we allow user to run flink scala code > or sql. For each piece fo code, I'd like to associate it with its > corresponding flink job. So that user can link to flink job ui in zeppelin > notebook, also he can cancel this job. Here's one screenshot of my POC, in > this screenshot you can see the FLINK JOB link which will link to the flink > job ui. That's why I'd like to add hook in flink's job submission, so that I > can link each piece of flink code to paragraph (the following screenshot) > which is a concept of zeppelin and is a container that host this piece of > code. > Here's the sample code for reference about how I use JobListener in zeppelin. > [https://github.com/zjffdu/zeppelin/blob/blink_poc/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala#L251] > !image-2019-04-17-00-15-16-750.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12244) SynchronousCheckpointITCase hang up
vinoyang created FLINK-12244: Summary: SynchronousCheckpointITCase hang up Key: FLINK-12244 URL: https://issues.apache.org/jira/browse/FLINK-12244 Project: Flink Issue Type: Test Components: Tests Reporter: vinoyang log details : [https://api.travis-ci.org/v3/job/521241815/log.txt] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] hequn8128 commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces
hequn8128 commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces URL: https://github.com/apache/flink/pull/8050#discussion_r276503761 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.ExternalCatalog; +import org.apache.flink.table.catalog.ExternalCatalogTable; +import org.apache.flink.table.descriptors.BatchTableDescriptor; +import org.apache.flink.table.descriptors.Descriptor; +import org.apache.flink.table.descriptors.StreamTableDescriptor; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; + +import java.lang.reflect.Method; +import java.util.Map; + +/** + * Utility for dealing with {@link TableFactory} using the {@link TableFactoryService}. + */ +public class TableFactoryUtil { + + /** +* Returns an external catalog. +*/ + public static ExternalCatalog findAndCreateExternalCatalog(Descriptor descriptor) { + Map properties = descriptor.toProperties(); + return TableFactoryService + .find(ExternalCatalogFactory.class, properties) + .createExternalCatalog(properties); + } + + /** +* Returns a table source matching the descriptor. +*/ + public static TableSource findAndCreateTableSource(Descriptor descriptor) { + Map properties = descriptor.toProperties(); + + TableSource tableSource; + try { + if (descriptor instanceof BatchTableDescriptor || isBatchExternalCatalogTable(descriptor)) { + + Object object = TableFactoryService.find( + Class.forName("org.apache.flink.table.factories.BatchTableSourceFactory"), + properties); + Method method = object.getClass().getDeclaredMethod("createBatchTableSource", Map.class); + + tableSource = (TableSource) method.invoke(object, properties); + } else if (descriptor instanceof StreamTableDescriptor || isStreamExternalCatalogTable(descriptor)) { + + Object object = TableFactoryService.find( + Class.forName("org.apache.flink.table.factories.StreamTableSourceFactory"), + properties); + Method method = object.getClass().getDeclaredMethod("createStreamTableSource", Map.class); + + tableSource = (TableSource) method.invoke(object, properties); + } else { + throw new TableException( + String.format( + "Unsupported table descriptor: %s", + descriptor.getClass().getName()) + ); + } + } catch (Throwable t) { + throw new TableException("findAndCreateTableSource failed.", t); + } + + return tableSource; + } + + /** +* Returns a table sink matching the descriptor. +*/ + public static TableSink findAndCreateTableSink(Descriptor descriptor) { + Map properties = descriptor.toProperties(); + + TableSink tableSink; + try { + if (descriptor instanceof BatchTableDescriptor || isBatchExternalCatalogTable(descriptor)) { + Object object = TableFactoryService.find( + Class.forName("org.apache.flink.table.factories.BatchTableSinkFactory"), Review comment: Good idea! This is a nice solution.
[GitHub] [flink] sunhaibotb commented on issue #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface
sunhaibotb commented on issue #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface URL: https://github.com/apache/flink/pull/8124#issuecomment-484327998 > Please also squash/split commits so that we have three separate commits: > Interfaces introduction > Runtime code > Benchmarks Does `Interfaces introduction` refer to the `Input` interface? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11474) Add ReadableCatalog, ReadableWritableCatalog, and other related interfaces
[ https://issues.apache.org/jira/browse/FLINK-11474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-11474. --- Resolution: Fixed Fix Version/s: 1.9.0 Fixed in master: 92ae67d788050f2e2d457692bc0c638bc142a265 > Add ReadableCatalog, ReadableWritableCatalog, and other related interfaces > -- > > Key: FLINK-11474 > URL: https://issues.apache.org/jira/browse/FLINK-11474 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Xuefu Zhang >Assignee: Xuefu Zhang >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 40m > Remaining Estimate: 0h > > Also deprecate ReadableCatalog, ReadableWritableCatalog, and other related, > existing classes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] asfgit closed pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
asfgit closed pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other … URL: https://github.com/apache/flink/pull/8007 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8176: [FLINK-12192] [table-planner-blink] Add support for generating optimized logical plan for grouping sets and distinct aggregate
godfreyhe commented on a change in pull request #8176: [FLINK-12192] [table-planner-blink] Add support for generating optimized logical plan for grouping sets and distinct aggregate URL: https://github.com/apache/flink/pull/8176#discussion_r276493412 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala ## @@ -104,10 +105,19 @@ object FlinkRelOptUtil { * * @return Whether any of the aggregates are accurate DISTINCT */ - def containsAccurateDistinctCall(aggCalls: Seq[AggregateCall]): Boolean = { + def containsAccurateDistinctCall(aggCalls: util.List[AggregateCall]): Boolean = { Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11476) Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
[ https://issues.apache.org/jira/browse/FLINK-11476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reassigned FLINK-11476: Assignee: Bowen Li (was: Xuefu Zhang) > Create CatalogManager to manage multiple catalogs and encapsulate Calcite > schema > > > Key: FLINK-11476 > URL: https://issues.apache.org/jira/browse/FLINK-11476 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Xuefu Zhang >Assignee: Bowen Li >Priority: Major > > Flink allows for more than one registered catalogs. {{CatalogManager}} class > is the holding class to manage and encapsulate the catalogs and their > interrelations with Calcite. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sunhaibotb commented on a change in pull request #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface
sunhaibotb commented on a change in pull request #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface URL: https://github.com/apache/flink/pull/8124#discussion_r276490132 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java ## @@ -0,0 +1,433 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.streaming.api.operators.InputSelectable; +import org.apache.flink.streaming.api.operators.InputSelection; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Input reader for {@link org.apache.flink.streaming.runtime.tasks.TwoInputSelectableStreamTask} + * in the case that the operator is InputSelectable. + * + * @param The type of the records that arrive on the first input + * @param The type of the records that arrive on the second input + */ +@Internal +public class StreamTwoInputSelectableProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputSelectableProcessor.class); + + private volatile boolean continuousProcessing = true; Review comment: > Just to be clear, this StreamTwoInputSelectableProcessor#processInput() should be split into smaller methods before merging this PR regardless of the outer/inner loop discussion. I agree, and splitting after the performance problem has been solved which reduce the interference to performance tuning. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11614) Translate the "Configuring Dependencies" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16820627#comment-16820627 ] YangFei commented on FLINK-11614: - Hi,Jark Wu,I had finished the issue , waiting to reviews. thanks! > Translate the "Configuring Dependencies" page into Chinese > -- > > Key: FLINK-11614 > URL: https://issues.apache.org/jira/browse/FLINK-11614 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: YangFei >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/dev/projectsetup/dependencies.html > The markdown file is located in flink/docs/dev/projectsetup/dependencies.zh.md > The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sunhaibotb commented on a change in pull request #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface
sunhaibotb commented on a change in pull request #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface URL: https://github.com/apache/flink/pull/8124#discussion_r276487477 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamTaskNonSelectableInputThroughputBenchmark.java ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io.benchmark; + +import org.apache.flink.runtime.io.network.partition.consumer.IterableInputChannel; +import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor; +import org.apache.flink.streaming.runtime.io.benchmark.StreamTaskInputBenchmarkEnvironment.ProcessorAndChannels; + +import java.io.IOException; +import java.util.List; + +/** + * Task-input (non-selectable) throughput benchmarks executed by the external + * https://github.com/dataArtisans/flink-benchmarks;>flink-benchmarks project. + */ +public class StreamTaskNonSelectableInputThroughputBenchmark extends StreamTaskInputThroughputBenchmarkBase { + + public void setUp(int numInputGates, int numChannelsPerGate, long numRecordsPerChannel) throws Exception { + setUp( + numInputGates, numInputGates, + numChannelsPerGate, numChannelsPerGate, + numRecordsPerChannel, numRecordsPerChannel, + new SummingLongStreamOperator()); + } + + @Override + protected AbstractTaskInputProcessorThread createProcessorThread( + int numInputGates1, + int numInputGates2, + int numChannels1PerGate, + int numChannels2PerGate, + long numRecords1PerChannel, + long numRecords2PerChannel, + long inputValue1, + long inputValue2, + SummingLongStreamOperator streamOperator) throws IOException { + + ProcessorAndChannels, IterableInputChannel> processorAndChannels = + environment.createTwoInputProcessor( + numInputGates1, + numInputGates2, + numChannels1PerGate, + numChannels2PerGate, + numRecords1PerChannel, + numRecords2PerChannel, + 1, + 2, + streamOperator); + + return new StreamTwoInputProcessorThread( + processorAndChannels.processor(), + processorAndChannels.channels(), + streamOperator); + } + + // + // Utilities + // + + private static class StreamTwoInputProcessorThread extends AbstractTaskInputProcessorThread { + + private final StreamTwoInputProcessor inputProcessor; + + private final SummingLongStreamOperator streamOperator; + + private volatile boolean continuousProcessing = true; Review comment: > I guess that if we drop the outer loop that we were discussing in other comment, there won't be a need for this volatile continuousProcessing flag and we can safely benchmark processors without adding extra overhead, right? Yes, it's right. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
sunjincheng121 commented on issue #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other … URL: https://github.com/apache/flink/pull/8007#issuecomment-484305219 Mergint... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8199: [FLINK-11955] Modify build to move filesystems from lib to plugins folder
yanghua commented on a change in pull request #8199: [FLINK-11955] Modify build to move filesystems from lib to plugins folder URL: https://github.com/apache/flink/pull/8199#discussion_r276472293 ## File path: flink-dist/pom.xml ## @@ -140,19 +140,54 @@ under the License. + org.apache.flink flink-hadoop-fs ${project.version} + provided Review comment: @zentol Maybe I did not describe my thought clearly, I just want to say this PR is just an auxiliary work, it's final state will be decided by other works, design, back compatibility and so on. I think the who architecture is still in the exploration and testing stage. I know that `flink-hadoop-fs` and `flink-mapr-fs` has been packaged into `flink-dist` before. And I know this PR is not in the merge-ready state. I just opened it, when the prior work is done, we will discuss how to deal with these two filesystems. There may be two possibilities : * follow the plugin architecture totally, there may other work should be done as you said * follow the plugin architecture partially, then these two modules should be packaged into `dist` like before, but others will be shipped into `plugin` dir This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8199: [FLINK-11955] Modify build to move filesystems from lib to plugins folder
yanghua commented on a change in pull request #8199: [FLINK-11955] Modify build to move filesystems from lib to plugins folder URL: https://github.com/apache/flink/pull/8199#discussion_r276472293 ## File path: flink-dist/pom.xml ## @@ -140,19 +140,54 @@ under the License. + org.apache.flink flink-hadoop-fs ${project.version} + provided Review comment: @zentol Maybe I did not describe my thought clearly, I just want to say this PR is just an auxiliary work, it's final state will be decided by other works, design, back compatibility and so on. I think the who architecture is still in the exploration and testing stage. I know that `flink-hadoop-fs` and `flink-mapr-fs` is been packaged into `flink-dist` before. And I know this PR is not in the merge-ready state. I just opened it, when the prior work is done, we will discuss how to deal with these two filesystems. There may be two possibilities : * follow the plugin architecture totally, there may other work should be done as you said * follow the plugin architecture partially, then these two modules should be packaged into `dist` like before, but others will be shipped into `plugin` dir This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] an0 commented on a change in pull request #8106: [FLINK-12092] [docs]Clarify when `onTimer(...)` is called
an0 commented on a change in pull request #8106: [FLINK-12092] [docs]Clarify when `onTimer(...)` is called URL: https://github.com/apache/flink/pull/8106#discussion_r276471748 ## File path: docs/dev/stream/operators/process_function.md ## @@ -44,8 +44,8 @@ For fault-tolerant state, the `ProcessFunction` gives access to Flink's [keyed s The timers allow applications to react to changes in processing time and in [event time]({{ site.baseurl }}/dev/event_time.html). Every call to the function `processElement(...)` gets a `Context` object which gives access to the element's event time timestamp, and to the *TimerService*. The `TimerService` can be used to register callbacks for future -event-/processing-time instants. When a timer's particular time is reached, the `onTimer(...)` method is -called. During that call, all states are again scoped to the key with which the timer was created, allowing +event-/processing-time instants. The `onTimer(...)` method is +called when such an event-time is first caught up by a watermark or such a processing-time is reached. During that call, all states are again scoped to the key with which the timer was created, allowing Review comment: @tzulitai Is it mergeable? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12238) Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module
[ https://issues.apache.org/jira/browse/FLINK-12238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-12238: - Summary: Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module (was: Support database related operations in GenericHiveMetastoreCatalog) > Support database related operations in GenericHiveMetastoreCatalog and setup > flink-connector-hive module > > > Key: FLINK-12238 > URL: https://issues.apache.org/jira/browse/FLINK-12238 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.9.0 > > > Support database related operations in GenericHiveMetastoreCatalog, which > implements ReadableWritableCatalog API -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12070) Make blocking result partitions consumable multiple times
[ https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16820414#comment-16820414 ] Stephan Ewen commented on FLINK-12070: -- [~Ryantaocer] Can you explain this comment a bit more: "multiple reads: the ByteBuffer object returned by the FileChannel.map() has an internal position that corresponds to the sliding window of physical memory within the range of mapping. In the sense, it cannot be shared among simultaneous reads like speculative executions, which have differenct reading positions resulting in frequent page-fault calling in OS. However, in other cases, the ByteBuffer can be reset to the begin for subsequent reads." What position are referring to here, could you share a pointer to the code? My understanding was that which pages are in physical memory should be the Kernel's decision, it should not be handled in the ByteBuffer. > Make blocking result partitions consumable multiple times > - > > Key: FLINK-12070 > URL: https://issues.apache.org/jira/browse/FLINK-12070 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Till Rohrmann >Assignee: BoWang >Priority: Major > > In order to avoid writing produced results multiple times for multiple > consumers and in order to speed up batch recoveries, we should make the > blocking result partitions to be consumable multiple times. At the moment a > blocking result partition will be released once the consumers has processed > all data. Instead the result partition should be released once the next > blocking result has been produced and all consumers of a blocking result > partition have terminated. Moreover, blocking results should not hold on slot > resources like network buffers or memory as it is currently the case with > {{SpillableSubpartitions}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zentol commented on a change in pull request #8199: [FLINK-11955] Modify build to move filesystems from lib to plugins folder
zentol commented on a change in pull request #8199: [FLINK-11955] Modify build to move filesystems from lib to plugins folder URL: https://github.com/apache/flink/pull/8199#discussion_r276358266 ## File path: flink-dist/pom.xml ## @@ -140,19 +140,54 @@ under the License. + org.apache.flink flink-hadoop-fs ${project.version} + provided Review comment: That is, unless all plugins are in /plugins by default and https://issues.apache.org/jira/browse/FLINK-12143 aims to ship everything in that directory by default. But I'd have my doubts about such an approach as well, and would prefer plugins to be located under /opt/plugins by default. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12242) Support partition related operations in GenericHiveMetastoreCatalog
Bowen Li created FLINK-12242: Summary: Support partition related operations in GenericHiveMetastoreCatalog Key: FLINK-12242 URL: https://issues.apache.org/jira/browse/FLINK-12242 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 Support partition related operations in GenericHiveMetastoreCatalog, which implements ReadableWritableCatalog API -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12243) Support table stats related operations in GenericHiveMetastoreCatalog
Bowen Li created FLINK-12243: Summary: Support table stats related operations in GenericHiveMetastoreCatalog Key: FLINK-12243 URL: https://issues.apache.org/jira/browse/FLINK-12243 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 Support table stats related operations in GenericHiveMetastoreCatalog, which implements ReadableWritableCatalog API -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12241) Support function related operations in GenericHiveMetastoreCatalog
Bowen Li created FLINK-12241: Summary: Support function related operations in GenericHiveMetastoreCatalog Key: FLINK-12241 URL: https://issues.apache.org/jira/browse/FLINK-12241 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 Support function related operations in GenericHiveMetastoreCatalog, which implements ReadableWritableCatalog API -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12239) Support table related operations in GenericHiveMetastoreCatalog
[ https://issues.apache.org/jira/browse/FLINK-12239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-12239: - Description: Support table related operations in GenericHiveMetastoreCatalog, which implements ReadableWritableCatalog API was: Support database related operations in GenericHiveMetastoreCatalog, which implements ReadableWritableCatalog API > Support table related operations in GenericHiveMetastoreCatalog > --- > > Key: FLINK-12239 > URL: https://issues.apache.org/jira/browse/FLINK-12239 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.9.0 > > > Support table related operations in GenericHiveMetastoreCatalog, which > implements ReadableWritableCatalog API -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12239) Support table related operations in GenericHiveMetastoreCatalog
Bowen Li created FLINK-12239: Summary: Support table related operations in GenericHiveMetastoreCatalog Key: FLINK-12239 URL: https://issues.apache.org/jira/browse/FLINK-12239 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 Support database related operations in GenericHiveMetastoreCatalog, which implements ReadableWritableCatalog API -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12240) Support view related operations in GenericHiveMetastoreCatalog
Bowen Li created FLINK-12240: Summary: Support view related operations in GenericHiveMetastoreCatalog Key: FLINK-12240 URL: https://issues.apache.org/jira/browse/FLINK-12240 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 Support view related operations in GenericHiveMetastoreCatalog, which implements ReadableWritableCatalog API -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12238) Support database related operations in GenericHiveMetastoreCatalog
Bowen Li created FLINK-12238: Summary: Support database related operations in GenericHiveMetastoreCatalog Key: FLINK-12238 URL: https://issues.apache.org/jira/browse/FLINK-12238 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 Support database related operations in GenericHiveMetastoreCatalog, which implements ReadableWritableCatalog API -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11482) Implement GenericHiveMetastoreCatalog
[ https://issues.apache.org/jira/browse/FLINK-11482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-11482: - Issue Type: New Feature (was: Sub-task) Parent: (was: FLINK-11275) > Implement GenericHiveMetastoreCatalog > - > > Key: FLINK-11482 > URL: https://issues.apache.org/jira/browse/FLINK-11482 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Ecosystem >Reporter: Xuefu Zhang >Assignee: Bowen Li >Priority: Major > > {{GenericHiveMetastoreCatalog}} is a special implementation of > {{ReadableWritableCatalog}} interface to store tables/views/functions defined > in Flink to Hive metastore. With respect to the objects stored, > {{GenericHiveMetastoreCatalog}} is similar to {{GenericInMemoryCatalog}}, but > the storage used is a Hive metastore instead of memory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12237) Support table stats related operations in HiveCatalog
Bowen Li created FLINK-12237: Summary: Support table stats related operations in HiveCatalog Key: FLINK-12237 URL: https://issues.apache.org/jira/browse/FLINK-12237 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 Support table stats related operations in HiveCatalog, which implements ReadableWritableCatalog API -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12236) Support function related operations in HiveCatalog
Bowen Li created FLINK-12236: Summary: Support function related operations in HiveCatalog Key: FLINK-12236 URL: https://issues.apache.org/jira/browse/FLINK-12236 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 Support function related operations in HiveCatalog, which implements ReadableWritableCatalog API -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12235) Support partition related operations in HiveCatalog
Bowen Li created FLINK-12235: Summary: Support partition related operations in HiveCatalog Key: FLINK-12235 URL: https://issues.apache.org/jira/browse/FLINK-12235 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 Support partition related operations in HiveCatalog, which implements ReadableWritableCatalog API -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12234) Support viewrelated operations in HiveCatalog
Bowen Li created FLINK-12234: Summary: Support viewrelated operations in HiveCatalog Key: FLINK-12234 URL: https://issues.apache.org/jira/browse/FLINK-12234 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 Support view related operations in HiveCatalog, which implements ReadableWritableCatalog API -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12233) Support table related operations in HiveCatalog
Bowen Li created FLINK-12233: Summary: Support table related operations in HiveCatalog Key: FLINK-12233 URL: https://issues.apache.org/jira/browse/FLINK-12233 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 Support table related operations in HiveCatalog, which implements ReadableWritableCatalog API -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12232) Support database related operations in HiveCatalog
[ https://issues.apache.org/jira/browse/FLINK-12232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-12232: - Description: Support database related operations in HiveCatalog, which implements ReadableWritableCatalog API > Support database related operations in HiveCatalog > -- > > Key: FLINK-12232 > URL: https://issues.apache.org/jira/browse/FLINK-12232 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.9.0 > > > Support database related operations in HiveCatalog, which implements > ReadableWritableCatalog API -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12232) Support database related operations in HiveCatalog
Bowen Li created FLINK-12232: Summary: Support database related operations in HiveCatalog Key: FLINK-12232 URL: https://issues.apache.org/jira/browse/FLINK-12232 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11479) Implement HiveCatalog
[ https://issues.apache.org/jira/browse/FLINK-11479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-11479: - Issue Type: New Feature (was: Sub-task) Parent: (was: FLINK-11275) > Implement HiveCatalog > - > > Key: FLINK-11479 > URL: https://issues.apache.org/jira/browse/FLINK-11479 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Ecosystem >Reporter: Xuefu Zhang >Assignee: Bowen Li >Priority: Major > > {{HiveCatalog}} is an implementation of {{ReadableWritableCatalog}} interface > for meta objects managed by Hive Metastore. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zentol commented on a change in pull request #8199: [FLINK-11955] Modify build to move filesystems from lib to plugins folder
zentol commented on a change in pull request #8199: [FLINK-11955] Modify build to move filesystems from lib to plugins folder URL: https://github.com/apache/flink/pull/8199#discussion_r276358266 ## File path: flink-dist/pom.xml ## @@ -140,19 +140,54 @@ under the License. + org.apache.flink flink-hadoop-fs ${project.version} + provided Review comment: That is, unless all plugins are in /plugins by default and https://issues.apache.org/jira/browse/FLINK-12143 aims to ship everything in that directory by default. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8199: [FLINK-11955] Modify build to move filesystems from lib to plugins folder
zentol commented on a change in pull request #8199: [FLINK-11955] Modify build to move filesystems from lib to plugins folder URL: https://github.com/apache/flink/pull/8199#discussion_r276357181 ## File path: flink-dist/pom.xml ## @@ -140,19 +140,54 @@ under the License. + org.apache.flink flink-hadoop-fs ${project.version} + provided Review comment: I don't believe the merge order to be relevant here. None of the other subtasks are about re-introducing the hadoop/mapr compatibility code into flink-dist or having them on the classpath by default; as such my point still stands. Conceptually I'm in favor of having these in the plugin directory as an opt-in feature, but nevertheless, we may break plenty of setups. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-992) Create CollectionDataSets by reading (client) local files.
[ https://issues.apache.org/jira/browse/FLINK-992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neelesh Srinivas Salian reassigned FLINK-992: - Assignee: (was: Neelesh Srinivas Salian) > Create CollectionDataSets by reading (client) local files. > -- > > Key: FLINK-992 > URL: https://issues.apache.org/jira/browse/FLINK-992 > Project: Flink > Issue Type: New Feature > Components: API / DataSet, API / Python >Reporter: Fabian Hueske >Priority: Minor > Labels: starter > > {{CollectionDataSets}} are a nice way to feed data into programs. > We could add support to read a client-local file at program construction time > using a FileInputFormat, put its data into a CollectionDataSet, and ship its > data together with the program. > This would remove the need to upload small files into DFS which are used > together with some large input (stored in DFS). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-1890) Add note to docs that ReadFields annotations are currently not evaluated
[ https://issues.apache.org/jira/browse/FLINK-1890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neelesh Srinivas Salian reassigned FLINK-1890: -- Assignee: (was: Neelesh Srinivas Salian) > Add note to docs that ReadFields annotations are currently not evaluated > > > Key: FLINK-1890 > URL: https://issues.apache.org/jira/browse/FLINK-1890 > Project: Flink > Issue Type: Wish > Components: API / DataSet >Reporter: Stefan Bunk >Priority: Minor > Labels: starter > > In the Scala API, you have the option to declare forwarded fields via the > {{withForwardedFields}} method. > It would be nice to have sth. similar for read fields, as otherwise one needs > to create a class, which I personally try to avoid for readability. > Maybe grouping all annotations in one function and have a first parameter > indicating the type of annotation is also an option, if you plan on adding > more annotations and want to keep the interface smaller. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-5595) Add links to sub-sections in the left-hand navigation bar
[ https://issues.apache.org/jira/browse/FLINK-5595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neelesh Srinivas Salian reassigned FLINK-5595: -- Assignee: (was: Neelesh Srinivas Salian) > Add links to sub-sections in the left-hand navigation bar > - > > Key: FLINK-5595 > URL: https://issues.apache.org/jira/browse/FLINK-5595 > Project: Flink > Issue Type: Improvement > Components: Project Website >Reporter: Mike Winters >Priority: Minor > Labels: newbie, website > Original Estimate: 2h > Remaining Estimate: 2h > > Some pages on the Flink project site (such as > http://flink.apache.org/introduction.html) include a table of contents at the > top. The sections from the ToC are not exposed in the left-hand nav when the > page is active, but this could be a useful addition, especially for longer, > content-heavy pages. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12191) Flink SVGs on "Material" page broken, render incorrectly on Firefox
[ https://issues.apache.org/jira/browse/FLINK-12191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Patrick Lucas closed FLINK-12191. - Resolution: Fixed > Flink SVGs on "Material" page broken, render incorrectly on Firefox > --- > > Key: FLINK-12191 > URL: https://issues.apache.org/jira/browse/FLINK-12191 > Project: Flink > Issue Type: Bug > Components: Project Website >Reporter: Patrick Lucas >Assignee: Patrick Lucas >Priority: Major > Labels: pull-request-available > Attachments: Screen Shot 2019-04-15 at 09.48.15.png > > Time Spent: 20m > Remaining Estimate: 0h > > Like FLINK-11043, the Flink SVGs on the [Material page of the Flink > website|https://flink.apache.org/material.html] are invalid and do not render > correctly on Firefox. > I'm not sure if there is an additional source-of-truth for these images, or > if these hosted on the website are canonical, but I can fix them nonetheless. > I also noticed that one of the squirrels in both {{color_black.svg}} and > {{color_white.svg}} is missing the eye gradient, which can also be easily > fixed. > !Screen Shot 2019-04-15 at 09.48.15.png|thumbnail! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zhijiangW commented on a change in pull request #8191: [FLINK-12213][network] Pass TaskManagerMetricGroup into constructor of NetworkEnvironment
zhijiangW commented on a change in pull request #8191: [FLINK-12213][network] Pass TaskManagerMetricGroup into constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8191#discussion_r276321299 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ## @@ -63,11 +70,19 @@ private boolean isShutdown; - public NetworkEnvironment(NetworkEnvironmentConfiguration config, TaskEventPublisher taskEventPublisher) { + public NetworkEnvironment( + NetworkEnvironmentConfiguration config, + TaskEventPublisher taskEventPublisher, + TaskManagerMetricGroup taskManagerMetricGroup) { Review comment: Sure, we actually use the methods of `ComponentMetricGroup` here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces
dawidwys commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces URL: https://github.com/apache/flink/pull/8050#discussion_r276315716 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala ## @@ -393,7 +395,7 @@ class TableImpl( override def window(overWindows: OverWindow*): OverWindowedTable = { -if (tableEnv.isInstanceOf[BatchTableEnvironment]) { +if (tableEnv.isInstanceOf[BatchTableEnvImpl]) { Review comment: Sorry, you are right, ignore my comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on issue #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces
hequn8128 commented on issue #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces URL: https://github.com/apache/flink/pull/8050#issuecomment-484150446 @dawidwys Hi, thank you very much for the review. I have addressed all your comments, except for the one about changing the interface to TableSource/Sink for TableFactories. I also agree with you and it would be nice if we can remove the reflections. However, the main concern is the compatible problem. The interface is widely used by users. Maybe we can have some opinions from @twalthr . What do you think? Thank you again for the nice review. Best, Hequn This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces
dawidwys commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces URL: https://github.com/apache/flink/pull/8050#discussion_r276312002 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.ExternalCatalog; +import org.apache.flink.table.catalog.ExternalCatalogTable; +import org.apache.flink.table.descriptors.BatchTableDescriptor; +import org.apache.flink.table.descriptors.Descriptor; +import org.apache.flink.table.descriptors.StreamTableDescriptor; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; + +import java.lang.reflect.Method; +import java.util.Map; + +/** + * Utility for dealing with {@link TableFactory} using the {@link TableFactoryService}. + */ +public class TableFactoryUtil { + + /** +* Returns an external catalog. +*/ + public static ExternalCatalog findAndCreateExternalCatalog(Descriptor descriptor) { + Map properties = descriptor.toProperties(); + return TableFactoryService + .find(ExternalCatalogFactory.class, properties) + .createExternalCatalog(properties); + } + + /** +* Returns a table source matching the descriptor. +*/ + public static TableSource findAndCreateTableSource(Descriptor descriptor) { + Map properties = descriptor.toProperties(); + + TableSource tableSource; + try { + if (descriptor instanceof BatchTableDescriptor || isBatchExternalCatalogTable(descriptor)) { + + Object object = TableFactoryService.find( + Class.forName("org.apache.flink.table.factories.BatchTableSourceFactory"), + properties); + Method method = object.getClass().getDeclaredMethod("createBatchTableSource", Map.class); + + tableSource = (TableSource) method.invoke(object, properties); + } else if (descriptor instanceof StreamTableDescriptor || isStreamExternalCatalogTable(descriptor)) { + + Object object = TableFactoryService.find( + Class.forName("org.apache.flink.table.factories.StreamTableSourceFactory"), + properties); + Method method = object.getClass().getDeclaredMethod("createStreamTableSource", Map.class); + + tableSource = (TableSource) method.invoke(object, properties); + } else { + throw new TableException( + String.format( + "Unsupported table descriptor: %s", + descriptor.getClass().getName()) + ); + } + } catch (Throwable t) { + throw new TableException("findAndCreateTableSource failed.", t); + } + + return tableSource; + } + + /** +* Returns a table sink matching the descriptor. +*/ + public static TableSink findAndCreateTableSink(Descriptor descriptor) { + Map properties = descriptor.toProperties(); + + TableSink tableSink; + try { + if (descriptor instanceof BatchTableDescriptor || isBatchExternalCatalogTable(descriptor)) { + Object object = TableFactoryService.find( + Class.forName("org.apache.flink.table.factories.BatchTableSinkFactory"), Review comment: I am not saying we should change the signature of
[GitHub] [flink] dawidwys commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces
dawidwys commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces URL: https://github.com/apache/flink/pull/8050#discussion_r276312002 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.ExternalCatalog; +import org.apache.flink.table.catalog.ExternalCatalogTable; +import org.apache.flink.table.descriptors.BatchTableDescriptor; +import org.apache.flink.table.descriptors.Descriptor; +import org.apache.flink.table.descriptors.StreamTableDescriptor; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; + +import java.lang.reflect.Method; +import java.util.Map; + +/** + * Utility for dealing with {@link TableFactory} using the {@link TableFactoryService}. + */ +public class TableFactoryUtil { + + /** +* Returns an external catalog. +*/ + public static ExternalCatalog findAndCreateExternalCatalog(Descriptor descriptor) { + Map properties = descriptor.toProperties(); + return TableFactoryService + .find(ExternalCatalogFactory.class, properties) + .createExternalCatalog(properties); + } + + /** +* Returns a table source matching the descriptor. +*/ + public static TableSource findAndCreateTableSource(Descriptor descriptor) { + Map properties = descriptor.toProperties(); + + TableSource tableSource; + try { + if (descriptor instanceof BatchTableDescriptor || isBatchExternalCatalogTable(descriptor)) { + + Object object = TableFactoryService.find( + Class.forName("org.apache.flink.table.factories.BatchTableSourceFactory"), + properties); + Method method = object.getClass().getDeclaredMethod("createBatchTableSource", Map.class); + + tableSource = (TableSource) method.invoke(object, properties); + } else if (descriptor instanceof StreamTableDescriptor || isStreamExternalCatalogTable(descriptor)) { + + Object object = TableFactoryService.find( + Class.forName("org.apache.flink.table.factories.StreamTableSourceFactory"), + properties); + Method method = object.getClass().getDeclaredMethod("createStreamTableSource", Map.class); + + tableSource = (TableSource) method.invoke(object, properties); + } else { + throw new TableException( + String.format( + "Unsupported table descriptor: %s", + descriptor.getClass().getName()) + ); + } + } catch (Throwable t) { + throw new TableException("findAndCreateTableSource failed.", t); + } + + return tableSource; + } + + /** +* Returns a table sink matching the descriptor. +*/ + public static TableSink findAndCreateTableSink(Descriptor descriptor) { + Map properties = descriptor.toProperties(); + + TableSink tableSink; + try { + if (descriptor instanceof BatchTableDescriptor || isBatchExternalCatalogTable(descriptor)) { + Object object = TableFactoryService.find( + Class.forName("org.apache.flink.table.factories.BatchTableSinkFactory"), Review comment: I am not saying we should change the signature of
[GitHub] [flink] azagrebin removed a comment on issue #8201: [FLINK-10941] Keep slots which contain unconsumed result partitions
azagrebin removed a comment on issue #8201: [FLINK-10941] Keep slots which contain unconsumed result partitions URL: https://github.com/apache/flink/pull/8201#issuecomment-484144918 @flinkbot attention @pnowojski This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on issue #8201: [FLINK-10941] Keep slots which contain unconsumed result partitions
azagrebin commented on issue #8201: [FLINK-10941] Keep slots which contain unconsumed result partitions URL: https://github.com/apache/flink/pull/8201#issuecomment-484145389 @flinkbot attention @pnowojski This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8201: [FLINK-10941] Keep slots which contain unconsumed result partitions
flinkbot edited a comment on issue #8201: [FLINK-10941] Keep slots which contain unconsumed result partitions URL: https://github.com/apache/flink/pull/8201#issuecomment-484145099 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❗ 3. Needs [attention] from. - Needs attention by @pnowojski [committer] * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8201: Flink 10941 1.7
flinkbot commented on issue #8201: Flink 10941 1.7 URL: https://github.com/apache/flink/pull/8201#issuecomment-484145099 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on issue #8201: Flink 10941 1.7
azagrebin commented on issue #8201: Flink 10941 1.7 URL: https://github.com/apache/flink/pull/8201#issuecomment-484144918 @flinkbot attention @pnowojski This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin opened a new pull request #8201: Flink 10941 1.7
azagrebin opened a new pull request #8201: Flink 10941 1.7 URL: https://github.com/apache/flink/pull/8201 1.7 backport of #7938 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12212) Clarify documentation about async checkpointing
[ https://issues.apache.org/jira/browse/FLINK-12212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-12212. -- Resolution: Fixed Fix Version/s: 1.9.0 Merged in: master: 046d752c6e > Clarify documentation about async checkpointing > --- > > Key: FLINK-12212 > URL: https://issues.apache.org/jira/browse/FLINK-12212 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Konstantin Knauf >Assignee: Konstantin Knauf >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > The current wording in > https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/large_state_tuning.html#make-state-checkpointing-asynchronous-where-possible > implies that operator state is not checkpointed asynchronously, while it > actually is. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] pnowojski commented on a change in pull request #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface
pnowojski commented on a change in pull request #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface URL: https://github.com/apache/flink/pull/8124#discussion_r276291942 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java ## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.streaming.api.operators.InputSelectable; +import org.apache.flink.streaming.api.operators.InputSelection; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Input reader for {@link org.apache.flink.streaming.runtime.tasks.TwoInputSelectableStreamTask} + * in the case that the operator is InputSelectable. + * + * @param The type of the records that arrive on the first input + * @param The type of the records that arrive on the second input + */ +@Internal +public class StreamTwoInputSelectableProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputSelectableProcessor.class); + + private volatile boolean continuousProcessing = true; + + private final NetworkInput input1; + private final NetworkInput input2; + + private final Object lock; + + private final TwoInputStreamOperator streamOperator; + + private final InputSelectable inputSelector; + + private final AuxiliaryHandler auxiliaryHandler; + + private final CompletableFuture[] listenFutures; + + private final boolean[] isFinished; + + private InputSelection inputSelection; + + private AtomicInteger availableInputsMask = new AtomicInteger(); Review comment: **(hot looping without synchronisation part 2)** I would drop `AtomicInteger availableInputsMask` and replace it with `integer availableIntpusMask` which is updated based on `BufferOrEvent#moreAvailable()` mentioned above. As long as all of the selected inputs are available, there is no need to synchronise. If some selected input is not available, that's the point where we could use `AtomicInteger`, but for `TwoInputStream` (and not general N input) case more or less equally efficient would be to check `listenFutures[x].getNow(-1)`. This is also a single volatile access under the hood, with the same performance penalty as `availableInputsMask.get()`. But again, keep in mind that while hot looping you can not touch `listenFutures`, as they are `volatiles` thus another synchronisation points. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at:
[GitHub] [flink] pnowojski commented on a change in pull request #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface
pnowojski commented on a change in pull request #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface URL: https://github.com/apache/flink/pull/8124#discussion_r276288402 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ## @@ -184,6 +184,9 @@ /** A timer to retrigger local partition requests. Only initialized if actually needed. */ private Timer retriggerLocalRequestTimer; + /** Flag indicating whether there is available data. */ + private volatile boolean moreDataAvailable = false; Review comment: (**hot looping without synchronisation part 1**) You should use `BufferOrEvent#moreAvailable()` to get next buffer and availability in the same call. If there is more data available, just keep reading from the gate without any extra synchronisation/callbacks. There is no need for `volatile boolean moreAvailable`. We always should use either this `BufferOrEvent#moreAvailable()` or relay on listeners/futures to inform us that the availability has changed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface
pnowojski commented on a change in pull request #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface URL: https://github.com/apache/flink/pull/8124#discussion_r276291942 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java ## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.streaming.api.operators.InputSelectable; +import org.apache.flink.streaming.api.operators.InputSelection; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Input reader for {@link org.apache.flink.streaming.runtime.tasks.TwoInputSelectableStreamTask} + * in the case that the operator is InputSelectable. + * + * @param The type of the records that arrive on the first input + * @param The type of the records that arrive on the second input + */ +@Internal +public class StreamTwoInputSelectableProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputSelectableProcessor.class); + + private volatile boolean continuousProcessing = true; + + private final NetworkInput input1; + private final NetworkInput input2; + + private final Object lock; + + private final TwoInputStreamOperator streamOperator; + + private final InputSelectable inputSelector; + + private final AuxiliaryHandler auxiliaryHandler; + + private final CompletableFuture[] listenFutures; + + private final boolean[] isFinished; + + private InputSelection inputSelection; + + private AtomicInteger availableInputsMask = new AtomicInteger(); Review comment: **(hot looping without synchronisation part 2)** I would drop `AtomicInteger availableInputsMask` and replace it with `integer availableIntpusMask` which is updated based on `BufferOrEvent#moreAvailable()` mentioned above. As long as all of the selected inputs are available, there is no need to synchronise. If some selected input is not available, that's the point where we could use `AtomicInteger`, but for `TwoInputStream` (and not general N input) case more or less equally efficient would be to check `listenFutures[x].getNow(-1)`. This is also a single volatile access under the hood, with the same performance penalty as `availableInputsMask.get()`. But again, keep in mind that while hot looping you can not touch `listenFutures`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12219) Yarn application can't stop when flink job failed in per-job yarn cluste mode
[ https://issues.apache.org/jira/browse/FLINK-12219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16820180#comment-16820180 ] Till Rohrmann commented on FLINK-12219: --- In order to better understand the problem I would need a bit more context [~lamber-ken]. How did you deploy the Flink job (job mode detached or attached)? How did the job fail? What exception is being thrown in {{FsJobArchivist}} to prevent the cluster from shutting down? > Yarn application can't stop when flink job failed in per-job yarn cluste mode > - > > Key: FLINK-12219 > URL: https://issues.apache.org/jira/browse/FLINK-12219 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN, Runtime / REST >Affects Versions: 1.6.3, 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Fix For: 1.9.0 > > Attachments: fix-bug.patch, image-2019-04-17-15-00-40-687.png, > image-2019-04-17-15-02-49-513.png > > > fix-bug, should catch `Exception`, not only `IOException` > > {{Flink Job Failed}} > !image-2019-04-17-15-00-40-687.png! > > but, yarn application can't stop > !image-2019-04-17-15-02-49-513.png! > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] lfrancke commented on issue #4566: [FLINK-7477] [FLINK-7480] Various improvements to Flink scripts
lfrancke commented on issue #4566: [FLINK-7477] [FLINK-7480] Various improvements to Flink scripts URL: https://github.com/apache/flink/pull/4566#issuecomment-484133483 I just stumbled across this as well. The commit @ruankd referred to was indeed not committed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces
hequn8128 commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces URL: https://github.com/apache/flink/pull/8050#discussion_r276288794 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java ## @@ -89,6 +89,11 @@ */ TableOperation getTableOperation(); + /** +* Returns the {@link TableEnvironment} of this table. +*/ + TableEnvironment getTableEnvironment(); Review comment: The conversions in `TableConversions.scala` need TableEnvironment. I will put `TableConversions` in planner module for now and remove getTableEnvironment() in Table. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8191: [FLINK-12213][network] Pass TaskManagerMetricGroup into constructor of NetworkEnvironment
azagrebin commented on a change in pull request #8191: [FLINK-12213][network] Pass TaskManagerMetricGroup into constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8191#discussion_r276286307 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ## @@ -63,11 +70,19 @@ private boolean isShutdown; - public NetworkEnvironment(NetworkEnvironmentConfiguration config, TaskEventPublisher taskEventPublisher) { + public NetworkEnvironment( + NetworkEnvironmentConfiguration config, + TaskEventPublisher taskEventPublisher, + TaskManagerMetricGroup taskManagerMetricGroup) { Review comment: btw, do we need `TaskManagerMetricGroup`? Could it be `ComponentMetricGroup parentMetricGroup` to have less dependency? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8191: [FLINK-12213][network] Pass TaskManagerMetricGroup into constructor of NetworkEnvironment
azagrebin commented on a change in pull request #8191: [FLINK-12213][network] Pass TaskManagerMetricGroup into constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8191#discussion_r276285846 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ## @@ -63,11 +70,19 @@ private boolean isShutdown; - public NetworkEnvironment(NetworkEnvironmentConfiguration config, TaskEventPublisher taskEventPublisher) { + public NetworkEnvironment( + NetworkEnvironmentConfiguration config, + TaskEventPublisher taskEventPublisher, + TaskManagerMetricGroup taskManagerMetricGroup) { this.config = checkNotNull(config); this.networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize()); + checkNotNull(taskManagerMetricGroup); Review comment: ``` private static registerNetworkMetrics(ComponentMetricGroup parentMetricGroup) { checkNotNull(parentMetricGroup); MetricGroup networkGroup = parentMetricGroup.addGroup(METRIC_GROUP_NETWORK); networkGroup.>gauge(METRIC_TOTAL_MEMORY_SEGMENT, networkBufferPool::getTotalNumberOfMemorySegments); networkGroup.>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT, networkBufferPool::getNumberOfAvailableMemorySegments); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8191: [FLINK-12213][network] Pass TaskManagerMetricGroup into constructor of NetworkEnvironment
azagrebin commented on a change in pull request #8191: [FLINK-12213][network] Pass TaskManagerMetricGroup into constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8191#discussion_r276285846 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ## @@ -63,11 +70,19 @@ private boolean isShutdown; - public NetworkEnvironment(NetworkEnvironmentConfiguration config, TaskEventPublisher taskEventPublisher) { + public NetworkEnvironment( + NetworkEnvironmentConfiguration config, + TaskEventPublisher taskEventPublisher, + TaskManagerMetricGroup taskManagerMetricGroup) { this.config = checkNotNull(config); this.networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize()); + checkNotNull(taskManagerMetricGroup); Review comment: private static registerNetworkMetrics(ComponentMetricGroup parentMetricGroup) { checkNotNull(parentMetricGroup); MetricGroup networkGroup = parentMetricGroup.addGroup(METRIC_GROUP_NETWORK); networkGroup.>gauge(METRIC_TOTAL_MEMORY_SEGMENT, networkBufferPool::getTotalNumberOfMemorySegments); networkGroup.>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT, networkBufferPool::getNumberOfAvailableMemorySegments); } This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8191: [FLINK-12213][network] Pass TaskManagerMetricGroup into constructor of NetworkEnvironment
zhijiangW commented on issue #8191: [FLINK-12213][network] Pass TaskManagerMetricGroup into constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8191#issuecomment-484125605 @azagrebin thanks for the efficient reviews! I agree with the point of internal testing which could retain the getter method in `NetworkEnvironment`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8191: [FLINK-12213][network] Pass TaskManagerMetricGroup into constructor of NetworkEnvironment
zhijiangW commented on a change in pull request #8191: [FLINK-12213][network] Pass TaskManagerMetricGroup into constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8191#discussion_r276283610 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ## @@ -63,11 +70,19 @@ private boolean isShutdown; - public NetworkEnvironment(NetworkEnvironmentConfiguration config, TaskEventPublisher taskEventPublisher) { + public NetworkEnvironment( + NetworkEnvironmentConfiguration config, + TaskEventPublisher taskEventPublisher, + TaskManagerMetricGroup taskManagerMetricGroup) { this.config = checkNotNull(config); this.networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize()); + checkNotNull(taskManagerMetricGroup); Review comment: Sorry, what is moved into private method? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8191: [FLINK-12213][network] Pass TaskManagerMetricGroup into constructor of NetworkEnvironment
zhijiangW commented on a change in pull request #8191: [FLINK-12213][network] Pass TaskManagerMetricGroup into constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8191#discussion_r276282242 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ## @@ -63,11 +70,19 @@ private boolean isShutdown; - public NetworkEnvironment(NetworkEnvironmentConfiguration config, TaskEventPublisher taskEventPublisher) { + public NetworkEnvironment( Review comment: Yes, I also considered this idea before, but not sure when to bring this factory. I could try to do it in this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces
hequn8128 commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces URL: https://github.com/apache/flink/pull/8050#discussion_r276281643 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/RegistrableDescriptor.scala ## @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -/** - * A trait for descriptors that allow to register table source and/or sinks. - */ -trait RegistrableDescriptor extends Descriptor { Review comment: I will move this change into the right commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8191: [FLINK-12213][network] Pass TaskManagerMetricGroup into constructor of NetworkEnvironment
zhijiangW commented on a change in pull request #8191: [FLINK-12213][network] Pass TaskManagerMetricGroup into constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8191#discussion_r276280152 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ## @@ -208,6 +219,8 @@ public void shutDown() throws FlinkException { taskEventDispatcher.clearAll(); + taskManagerMetricGroup.close(); Review comment: That is true. We decide not to make `TaskManagerMetricGroup` as one component of `TaskManagerServices`, so keep the previous behavior in `TaskExecutor`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8191: [FLINK-12213][network] Pass TaskManagerMetricGroup into constructor of NetworkEnvironment
zhijiangW commented on a change in pull request #8191: [FLINK-12213][network] Pass TaskManagerMetricGroup into constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8191#discussion_r276279559 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ## @@ -357,15 +357,16 @@ public static TaskExecutor startTaskManager( TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( taskManagerServicesConfiguration, resourceID, + remoteAddress, + metricRegistry, Review comment: Yes it also makes sense to create the `TaskManagerMetricGroup` before `TaskManagerServices`. I just thought it should be regarded as a component of `TaskManagerServices` like `TaskManagerLocation`. Nevertheless from the aspect of minimal change, I am willing to make it separate as you suggested. That is a good idea to extract the creation of `NetworkEnvironment` completely. I would do that in separate commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces
hequn8128 commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces URL: https://github.com/apache/flink/pull/8050#discussion_r276279651 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala ## @@ -393,7 +395,7 @@ class TableImpl( override def window(overWindows: OverWindow*): OverWindowedTable = { -if (tableEnv.isInstanceOf[BatchTableEnvironment]) { +if (tableEnv.isInstanceOf[BatchTableEnvImpl]) { Review comment: What do you mean? The `BatchTableEnvImpl` here is `api.BatchTableEnvImpl`. Both `scala.BatchTableEnvImpl` and `java.BatchTableEnvImpl` extend it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StefanRRichter merged pull request #8185: [FLINK-12212] [docs] clarify that operator state is checkpointed asynchronously
StefanRRichter merged pull request #8185: [FLINK-12212] [docs] clarify that operator state is checkpointed asynchronously URL: https://github.com/apache/flink/pull/8185 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12119) Add OWASP Dependency Check
[ https://issues.apache.org/jira/browse/FLINK-12119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-12119. Resolution: Fixed > Add OWASP Dependency Check > -- > > Key: FLINK-12119 > URL: https://issues.apache.org/jira/browse/FLINK-12119 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Konstantin Knauf >Assignee: Konstantin Knauf >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > In order to obtain some visibility on the current known security > vulnerabilities in Flink's dependencies. It would be useful to include the > OWASP dependency check plugin [1] into our Maven build. > By including it into flink-parent, we can get summary of all dependencies of > all child projects by running > {{mvn clean org.owasp:dependency-check-maven:5.0.0-M2:aggregate}} > We should probably exclude some modules from the dependency-check. These > could be: > * flink-docs > * flink-fs-tests > * flink-yarn-tests > * flink-contrib > Anything else? What about flink-python/flink-streaming-python?** > In addition I propose to exclude all dependencies in the *system* or > *provided* scope. > At least initially, the build would never fails because of vulnerabilities. > [1] > [https://jeremylong.github.io/DependencyCheck/dependency-check-maven/index.html] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12119) Add OWASP Dependency Check
[ https://issues.apache.org/jira/browse/FLINK-12119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16820141#comment-16820141 ] Chesnay Schepler commented on FLINK-12119: -- Weekly cron build setup on {{cron-master-dependency_check}}. Report is printed to stdout, found vulnerabilities will not fail the build. At this time this primarily serves to make this information readily available. > Add OWASP Dependency Check > -- > > Key: FLINK-12119 > URL: https://issues.apache.org/jira/browse/FLINK-12119 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Konstantin Knauf >Assignee: Konstantin Knauf >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > In order to obtain some visibility on the current known security > vulnerabilities in Flink's dependencies. It would be useful to include the > OWASP dependency check plugin [1] into our Maven build. > By including it into flink-parent, we can get summary of all dependencies of > all child projects by running > {{mvn clean org.owasp:dependency-check-maven:5.0.0-M2:aggregate}} > We should probably exclude some modules from the dependency-check. These > could be: > * flink-docs > * flink-fs-tests > * flink-yarn-tests > * flink-contrib > Anything else? What about flink-python/flink-streaming-python?** > In addition I propose to exclude all dependencies in the *system* or > *provided* scope. > At least initially, the build would never fails because of vulnerabilities. > [1] > [https://jeremylong.github.io/DependencyCheck/dependency-check-maven/index.html] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] hequn8128 commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces
hequn8128 commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces URL: https://github.com/apache/flink/pull/8050#discussion_r276274740 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java ## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.java; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.table.api.BatchQueryConfig; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.descriptors.BatchTableDescriptor; +import org.apache.flink.table.descriptors.ConnectorDescriptor; +import org.apache.flink.table.functions.AggregateFunction; +import org.apache.flink.table.functions.TableFunction; + +import java.lang.reflect.Constructor; + +/** + * The {@link TableEnvironment} for a Java batch {@link ExecutionEnvironment} that works + * with {@link DataSet}s. + * + * A TableEnvironment can be used to: + * + * convert a {@link DataSet} to a {@link Table} + * register a {@link DataSet} in the {@link TableEnvironment}'s catalog + * register a {@link Table} in the {@link TableEnvironment}'s catalog + * scan a registered table to obtain a {@link Table} + * specify a SQL query on registered tables to obtain a {@link Table} + * convert a {@link Table} into a {@link DataSet} + * explain the AST and execution plan of a {@link Table} + * + */ +@PublicEvolving +public interface BatchTableEnvironment extends TableEnvironment { + + /** +* Registers a {@link TableFunction} under a unique name in the TableEnvironment's catalog. +* Registered functions can be referenced in Table API and SQL queries. +* +* @param name The name under which the function is registered. +* @param tableFunction The TableFunction to register. +* @param The type of the output row. +*/ +void registerFunction(String name, TableFunction tableFunction); + + /** +* Registers an {@link AggregateFunction} under a unique name in the TableEnvironment's catalog. +* Registered functions can be referenced in Table API and SQL queries. +* +* @param name The name under which the function is registered. +* @param aggregateFunction The AggregateFunction to register. +* @param The type of the output value. +* @param The type of aggregate accumulator. +*/ +void registerFunction(String name, AggregateFunction aggregateFunction); + + /** +* Converts the given {@link DataSet} into a {@link Table}. +* +* The field names of the {@link Table} are automatically derived from the type of the +* {@link DataSet}. +* +* @param dataSet The {@link DataSet} to be converted. +* @tparam T The type of the {@link DataSet}. +* @return The converted {@link Table}. +*/ +Table fromDataSet(DataSet dataSet); + + /** +* Converts the given {@link DataSet} into a {@link Table} with specified field names. +* +* Example: +* +* {{{ +* DataSet> set = ... +* Table tab = tableEnv.fromDataSet(set, "a, b"); +* }}} +* +* @param dataSet The {@link DataSet} to be converted. +* @param fields The field names of the resulting {@link Table}. +* @tparam T The type of the {@link DataSet}. +* @return The converted {@link Table}. +*/ +Table fromDataSet(DataSet dataSet, String fields); + + /** +* Registers the given {@link DataSet} as table in the +* {@link TableEnvironment}'s catalog. +* Registered tables can be referenced in SQL queries. +* +* The field names of
[GitHub] [flink] knaufk commented on issue #8185: [FLINK-12212] [docs] clarify that operator state is checkpointed asynchronously
knaufk commented on issue #8185: [FLINK-12212] [docs] clarify that operator state is checkpointed asynchronously URL: https://github.com/apache/flink/pull/8185#issuecomment-484117180 @StefanRRichter Thanks for the review. I have addressed both comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8200: [FLINK-11614] [chinese-translation,Documentation] Translate the "Conf…
flinkbot commented on issue #8200: [FLINK-11614] [chinese-translation,Documentation] Translate the "Conf… URL: https://github.com/apache/flink/pull/8200#issuecomment-484114111 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces
hequn8128 commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces URL: https://github.com/apache/flink/pull/8050#discussion_r276269770 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.ExternalCatalog; +import org.apache.flink.table.catalog.ExternalCatalogTable; +import org.apache.flink.table.descriptors.BatchTableDescriptor; +import org.apache.flink.table.descriptors.Descriptor; +import org.apache.flink.table.descriptors.StreamTableDescriptor; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; + +import java.lang.reflect.Method; +import java.util.Map; + +/** + * Utility for dealing with {@link TableFactory} using the {@link TableFactoryService}. + */ +public class TableFactoryUtil { + + /** +* Returns an external catalog. +*/ + public static ExternalCatalog findAndCreateExternalCatalog(Descriptor descriptor) { + Map properties = descriptor.toProperties(); + return TableFactoryService + .find(ExternalCatalogFactory.class, properties) + .createExternalCatalog(properties); + } + + /** +* Returns a table source matching the descriptor. +*/ + public static TableSource findAndCreateTableSource(Descriptor descriptor) { + Map properties = descriptor.toProperties(); + + TableSource tableSource; + try { + if (descriptor instanceof BatchTableDescriptor || isBatchExternalCatalogTable(descriptor)) { + + Object object = TableFactoryService.find( + Class.forName("org.apache.flink.table.factories.BatchTableSourceFactory"), + properties); + Method method = object.getClass().getDeclaredMethod("createBatchTableSource", Map.class); + + tableSource = (TableSource) method.invoke(object, properties); + } else if (descriptor instanceof StreamTableDescriptor || isStreamExternalCatalogTable(descriptor)) { + + Object object = TableFactoryService.find( + Class.forName("org.apache.flink.table.factories.StreamTableSourceFactory"), + properties); + Method method = object.getClass().getDeclaredMethod("createStreamTableSource", Map.class); + + tableSource = (TableSource) method.invoke(object, properties); + } else { + throw new TableException( + String.format( + "Unsupported table descriptor: %s", + descriptor.getClass().getName()) + ); + } + } catch (Throwable t) { + throw new TableException("findAndCreateTableSource failed.", t); + } + + return tableSource; + } + + /** +* Returns a table sink matching the descriptor. +*/ + public static TableSink findAndCreateTableSink(Descriptor descriptor) { + Map properties = descriptor.toProperties(); + + TableSink tableSink; + try { + if (descriptor instanceof BatchTableDescriptor || isBatchExternalCatalogTable(descriptor)) { + Object object = TableFactoryService.find( + Class.forName("org.apache.flink.table.factories.BatchTableSinkFactory"), Review comment: This would be good if we change it to TableSink/Source. However, I'm
[GitHub] [flink] GJL commented on a change in pull request #7978: [FLINK-11910] [Yarn] add customizable yarn application type
GJL commented on a change in pull request #7978: [FLINK-11910] [Yarn] add customizable yarn application type URL: https://github.com/apache/flink/pull/7978#discussion_r276267053 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ## @@ -1285,6 +1287,13 @@ public void setName(String name) { customName = name; } + public void setApplicationType(String type) { + if (type == null) { Review comment: Let's use `org.apache.flink.util.Preconditions`: `this.applicationType = Preconditions.checkNotNull(applicationType, "applicationType must not be null");` `setName` should also be refactored in a separate commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #7978: [FLINK-11910] [Yarn] add customizable yarn application type
GJL commented on a change in pull request #7978: [FLINK-11910] [Yarn] add customizable yarn application type URL: https://github.com/apache/flink/pull/7978#discussion_r276268002 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ## @@ -201,6 +202,7 @@ public FlinkYarnSessionCli( .build(); streaming = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode"); name = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN"); + applicationType = new Option(shortPrefix + "at", longPrefix + "applicationType", true, "Set a custom application type for the application on YARN"); Review comment: `yarn_setup.md` should be updated as well I think. Specifically this part: ``` {% highlight bash %} Usage: Optional -D Dynamic properties -d,--detached Start detached -jm,--jobManagerMemory Memory for JobManager Container with optional unit (default: MB) -nm,--name Set a custom name for the application on YARN -q,--query Display available YARN resources (memory, cores) -qu,--queueSpecify YARN queue. -s,--slots Number of slots per TaskManager -tm,--taskManagerMemoryMemory per TaskManager Container with optional unit (default: MB) -z,--zookeeperNamespaceNamespace to create the Zookeeper sub-paths for HA mode {% endhighlight %} ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Leeviiii opened a new pull request #8200: [FLINK-11614] [chinese-translation,Documentation] Translate the "Conf…
Leev opened a new pull request #8200: [FLINK-11614] [chinese-translation,Documentation] Translate the "Conf… URL: https://github.com/apache/flink/pull/8200 …iguring Dependencies" page into Chinese ## What is the purpose of the change This pull request completes the Chinese translation of "Configuring Dependencies" page from official document. ## Brief change log *Translate the "Configuring Dependencies" page into Chinese* ## Verifying this change *This change is to add a new translated document.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11614) Translate the "Configuring Dependencies" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11614: --- Labels: pull-request-available (was: ) > Translate the "Configuring Dependencies" page into Chinese > -- > > Key: FLINK-11614 > URL: https://issues.apache.org/jira/browse/FLINK-11614 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: YangFei >Priority: Major > Labels: pull-request-available > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/dev/projectsetup/dependencies.html > The markdown file is located in flink/docs/dev/projectsetup/dependencies.zh.md > The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] yanghua commented on a change in pull request #8199: [FLINK-11955] Modify build to move filesystems from lib to plugins folder
yanghua commented on a change in pull request #8199: [FLINK-11955] Modify build to move filesystems from lib to plugins folder URL: https://github.com/apache/flink/pull/8199#discussion_r276267262 ## File path: flink-dist/pom.xml ## @@ -140,19 +140,54 @@ under the License. + org.apache.flink flink-hadoop-fs ${project.version} + provided Review comment: um... Actually, IMO, the merge-order of this PR should follow the order of subtasks under the umbrella issue about the plugin architecture (FLINK-11952). We should make sure the plugins work fine with the whole plugin architecture. There still is an opening prior issue which is being implemented by @pnowojski. cc @StefanRRichter This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8199: [FLINK-11955] Modify build to move filesystems from lib to plugins folder
yanghua commented on a change in pull request #8199: [FLINK-11955] Modify build to move filesystems from lib to plugins folder URL: https://github.com/apache/flink/pull/8199#discussion_r276267262 ## File path: flink-dist/pom.xml ## @@ -140,19 +140,54 @@ under the License. + org.apache.flink flink-hadoop-fs ${project.version} + provided Review comment: um... Actually, IMO, the merge-order of this PR should follow the order of subtasks under the umbrella issue about the plugin architecture (FLINK-11952). We should make sure the plugins work fine with the whole plugin architecture. There still is an opening issue which is being implemented by @pnowojski. cc @StefanRRichter This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12119) Add OWASP Dependency Check
[ https://issues.apache.org/jira/browse/FLINK-12119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16820131#comment-16820131 ] Chesnay Schepler commented on FLINK-12119: -- Plugin itself was setup on master in a5058388d463da5ab54127fa58ab1c62115b137e . It can be run in the flink root via {{mvn org.owasp:dependency-check-maven:aggregate}}. A report is both printed to stdout and written into the root /target directory. > Add OWASP Dependency Check > -- > > Key: FLINK-12119 > URL: https://issues.apache.org/jira/browse/FLINK-12119 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Konstantin Knauf >Assignee: Konstantin Knauf >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > In order to obtain some visibility on the current known security > vulnerabilities in Flink's dependencies. It would be useful to include the > OWASP dependency check plugin [1] into our Maven build. > By including it into flink-parent, we can get summary of all dependencies of > all child projects by running > {{mvn clean org.owasp:dependency-check-maven:5.0.0-M2:aggregate}} > We should probably exclude some modules from the dependency-check. These > could be: > * flink-docs > * flink-fs-tests > * flink-yarn-tests > * flink-contrib > Anything else? What about flink-python/flink-streaming-python?** > In addition I propose to exclude all dependencies in the *system* or > *provided* scope. > At least initially, the build would never fails because of vulnerabilities. > [1] > [https://jeremylong.github.io/DependencyCheck/dependency-check-maven/index.html] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12119) Add OWASP Dependency Check
[ https://issues.apache.org/jira/browse/FLINK-12119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-12119: - Fix Version/s: 1.9.0 > Add OWASP Dependency Check > -- > > Key: FLINK-12119 > URL: https://issues.apache.org/jira/browse/FLINK-12119 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Konstantin Knauf >Assignee: Konstantin Knauf >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > In order to obtain some visibility on the current known security > vulnerabilities in Flink's dependencies. It would be useful to include the > OWASP dependency check plugin [1] into our Maven build. > By including it into flink-parent, we can get summary of all dependencies of > all child projects by running > {{mvn clean org.owasp:dependency-check-maven:5.0.0-M2:aggregate}} > We should probably exclude some modules from the dependency-check. These > could be: > * flink-docs > * flink-fs-tests > * flink-yarn-tests > * flink-contrib > Anything else? What about flink-python/flink-streaming-python?** > In addition I propose to exclude all dependencies in the *system* or > *provided* scope. > At least initially, the build would never fails because of vulnerabilities. > [1] > [https://jeremylong.github.io/DependencyCheck/dependency-check-maven/index.html] -- This message was sent by Atlassian JIRA (v7.6.3#76005)