[jira] [Created] (FLINK-22678) Fix Loading Changelog Statebackend with configs set in job-level and cluster-level separately

2021-05-16 Thread Yuan Mei (Jira)
Yuan Mei created FLINK-22678:


 Summary: Fix Loading Changelog Statebackend with configs set in 
job-level and cluster-level separately
 Key: FLINK-22678
 URL: https://issues.apache.org/jira/browse/FLINK-22678
 Project: Flink
  Issue Type: Bug
Reporter: Yuan Mei






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on pull request #15926: [FLINK-22655]When using -i option to initialize SQL Client session It should be possible to annotate the script with --

2021-05-16 Thread GitBox


flinkbot commented on pull request #15926:
URL: https://github.com/apache/flink/pull/15926#issuecomment-842001231


   
   ## CI report:
   
   * b5cb2bf5a3fae247b64d47106552618594090c48 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-22667) Fix the wrong file system ln java doc and documentation

2021-05-16 Thread Yishuang Lu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345882#comment-17345882
 ] 

Yishuang Lu commented on FLINK-22667:
-

Sure, thanks!

> Fix the wrong file system ln java doc and documentation
> ---
>
> Key: FLINK-22667
> URL: https://issues.apache.org/jira/browse/FLINK-22667
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Yishuang Lu
>Priority: Major
>
> The file system url is missng a `\`:
> [https://github.com/apache/flink/blob/master/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md]
> {code:java}
> env.getCheckpointConfig().setCheckpointStorage("hdfs://my/checkpoint/dir")
> {code}
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java#L76]
> {code:java}
>  *
> env.getCheckpointConfig().setCheckpointStorage("hdfs://checkpoints");{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on pull request #15926: [FLINK-22655]When using -i option to initialize SQL Client session It should be possible to annotate the script with --

2021-05-16 Thread GitBox


flinkbot commented on pull request #15926:
URL: https://github.com/apache/flink/pull/15926#issuecomment-841990082


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit b5cb2bf5a3fae247b64d47106552618594090c48 (Mon May 17 
05:11:23 UTC 2021)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-22655) When using -i option to initialize SQL Client session It should be possible to annotate the script with --

2021-05-16 Thread JasonLee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345875#comment-17345875
 ] 

JasonLee commented on FLINK-22655:
--

I have submitted a PR, [https://github.com/apache/flink/pull/15926] please 
check it. Thank you

> When using -i  option to initialize SQL Client session It should be 
> possible to annotate the script with --
> -
>
> Key: FLINK-22655
> URL: https://issues.apache.org/jira/browse/FLINK-22655
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client
>Affects Versions: 1.13.0
>Reporter: JasonLee
>Assignee: Shengkai Fang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22655) When using -i option to initialize SQL Client session It should be possible to annotate the script with --

2021-05-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-22655:
---
Labels: pull-request-available  (was: )

> When using -i  option to initialize SQL Client session It should be 
> possible to annotate the script with --
> -
>
> Key: FLINK-22655
> URL: https://issues.apache.org/jira/browse/FLINK-22655
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client
>Affects Versions: 1.13.0
>Reporter: JasonLee
>Assignee: Shengkai Fang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] JasonLeeCoding opened a new pull request #15926: [FLINK-22655]When using -i option to initialize SQL Client session It should be possible to annotate the script with --

2021-05-16 Thread GitBox


JasonLeeCoding opened a new pull request #15926:
URL: https://github.com/apache/flink/pull/15926


   
   ## What is the purpose of the change
   
   *this change is in order to use comments in the initialization script with 
'--' in the SQL Client*
   
   
   ## Brief change log
   
   *add replaceAll in CliStatementSplitter#splitContent*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   *Modify the original test file CliStatementSplitterTest.java*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-22655) When using -i option to initialize SQL Client session It should be possible to annotate the script with --

2021-05-16 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345866#comment-17345866
 ] 

Shengkai Fang commented on FLINK-22655:
---

Thanks for your help. Just call me when you finish.

> When using -i  option to initialize SQL Client session It should be 
> possible to annotate the script with --
> -
>
> Key: FLINK-22655
> URL: https://issues.apache.org/jira/browse/FLINK-22655
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client
>Affects Versions: 1.13.0
>Reporter: JasonLee
>Assignee: Shengkai Fang
>Priority: Major
> Fix For: 1.14.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #15922: [FLINK-22650][python][table-planner-blink] Support StreamExecPythonCorrelate json serialization/deserialization

2021-05-16 Thread GitBox


flinkbot edited a comment on pull request #15922:
URL: https://github.com/apache/flink/pull/15922#issuecomment-841617014


   
   ## CI report:
   
   * 01cc8a69d63b42251c294c2460b8152cd0d06a11 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17981)
 
   * 9fcb8001b9956441051aa1800cdc2df052f7a2c7 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18007)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15925: [FLINK-21464][sql-client] Support ADD JAR in SQL Client

2021-05-16 Thread GitBox


flinkbot edited a comment on pull request #15925:
URL: https://github.com/apache/flink/pull/15925#issuecomment-841952583


   
   ## CI report:
   
   * 8119d476fa59a7aa35f09c84e285226ff9b3e238 UNKNOWN
   * 9f545673d53bf59cccdab10021641ece2cf21650 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18002)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14830: [FLINK-20487][table-planner-blink] Remove restriction on StreamPhysicalGroupWindowAggregate which only supports insert-only input nod

2021-05-16 Thread GitBox


flinkbot edited a comment on pull request #14830:
URL: https://github.com/apache/flink/pull/14830#issuecomment-770878871


   
   ## CI report:
   
   * c39b7a4ecde12aca916b2228ffe734598749fa9d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16721)
 
   * cffb2fc0ae51d8f60eae2eb4c19144291a87f87d Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17998)
 
   * ec4a374ea2544b0e07f98ca52e184265e14bb875 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18006)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Issue Comment Deleted] (FLINK-9665) PrometheusReporter does not properly unregister metrics

2021-05-16 Thread Guokuai Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-9665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guokuai Huang updated FLINK-9665:
-
Comment: was deleted

(was: I encountered a problem, which may be a bug introduced by this change. 
https://issues.apache.org/jira/browse/FLINK-22664)

> PrometheusReporter does not properly unregister metrics
> ---
>
> Key: FLINK-9665
> URL: https://issues.apache.org/jira/browse/FLINK-9665
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.4.2, 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Jelmer Kuperus
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.4.3, 1.5.1, 1.6.0
>
>
> The {{PrometheusReporter}} groups metrics with the same logical scope in a 
> single {{Collector}} which are periodically polled by Prometheus.
> New metrics are added to an existing collector, and a reference count is 
> maintained so we can eventually cleanup the {{Collector}} itself.
> For removed metrics we decrease the reference count, do not however remove 
> the metrics that were added. As a result the collector will continue to 
> expose metrics, as long as at least 1 metric exists with the same logical 
> scope.
> If the collector is a {{io.prometheus.client.Gauge}} we can use the 
> {{#remove()}} method. For histograms we will have to modify our 
> {{HistogramSummaryProxy}} class to allow removing individual histograms.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22664) Task metrics are not properly unregistered during region failover

2021-05-16 Thread Guokuai Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guokuai Huang closed FLINK-22664.
-
Resolution: Not A Problem

> Task metrics are not properly unregistered during region failover
> -
>
> Key: FLINK-22664
> URL: https://issues.apache.org/jira/browse/FLINK-22664
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.11.0, 1.12.0
>Reporter: Guokuai Huang
>Priority: Major
> Attachments: Screen Shot 2021-05-14 at 2.51.04 PM.png, Screen Shot 
> 2021-05-14 at 5.40.22 PM.png
>
>
> In the current implementation of AbstractPrometheusReporter, metrics with the 
> same scopedMetricName share the same metric Collector. At the same time, a 
> HashMap named collectorsWithCountByMetricName is maintained to record the 
> refrence counter of each Collector. Only when the refrence counter of one 
> Collector becomes 0, it will be unregistered. 
> Suppose we have a flink job with single chained operator, and *execution 
> failover-strategy is set to region.*
>  !Screen Shot 2021-05-14 at 2.51.04 PM.png!
>  The following figure compares the number of metrics when this job runs on 2 
> TaskManager with 1 slots/TM and 1 TaskManager with 2 slots/TM after region 
> failover.
> Each inflection point on the graph represents a region failover. *For 
> TaskManager with multiple tasks(slots), the number of metrics increases after 
> region failover.*
> This is a case I deliberately constructed to illustrate this problem. 
> TaskManager only needs to restart part of the tasks during each region 
> failover, that is to say, *the refrence counter of task's metric Collector 
> will never become 0, so the metric Collector will not be unregistered.*
> This problem has brought a lot of pressure to our Prometheus, please see if 
> there is a good solution.
> !Screen Shot 2021-05-14 at 5.40.22 PM.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22664) Task metrics are not properly unregistered during region failover

2021-05-16 Thread Guokuai Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345860#comment-17345860
 ] 

Guokuai Huang commented on FLINK-22664:
---

Sorry, this problem is caused by our secondary development. In order to 
facilitate the management of the metrics of the flink job running on yarn, we 
modified AbstractPrometheusReporter and added yarn applicaiton_id as a metric 
dimension. When removing metric, this mtric dimension was not added, resulting 
in the unsuccessful remove.

> Task metrics are not properly unregistered during region failover
> -
>
> Key: FLINK-22664
> URL: https://issues.apache.org/jira/browse/FLINK-22664
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.11.0, 1.12.0
>Reporter: Guokuai Huang
>Priority: Major
> Attachments: Screen Shot 2021-05-14 at 2.51.04 PM.png, Screen Shot 
> 2021-05-14 at 5.40.22 PM.png
>
>
> In the current implementation of AbstractPrometheusReporter, metrics with the 
> same scopedMetricName share the same metric Collector. At the same time, a 
> HashMap named collectorsWithCountByMetricName is maintained to record the 
> refrence counter of each Collector. Only when the refrence counter of one 
> Collector becomes 0, it will be unregistered. 
> Suppose we have a flink job with single chained operator, and *execution 
> failover-strategy is set to region.*
>  !Screen Shot 2021-05-14 at 2.51.04 PM.png!
>  The following figure compares the number of metrics when this job runs on 2 
> TaskManager with 1 slots/TM and 1 TaskManager with 2 slots/TM after region 
> failover.
> Each inflection point on the graph represents a region failover. *For 
> TaskManager with multiple tasks(slots), the number of metrics increases after 
> region failover.*
> This is a case I deliberately constructed to illustrate this problem. 
> TaskManager only needs to restart part of the tasks during each region 
> failover, that is to say, *the refrence counter of task's metric Collector 
> will never become 0, so the metric Collector will not be unregistered.*
> This problem has brought a lot of pressure to our Prometheus, please see if 
> there is a good solution.
> !Screen Shot 2021-05-14 at 5.40.22 PM.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-21232) Introduce pluggable Hadoop delegation token providers

2021-05-16 Thread Junfan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345857#comment-17345857
 ] 

Junfan Zhang edited comment on FLINK-21232 at 5/17/21, 3:54 AM:


Hi [~jackwangcs] [~lirui]
Nice improvement!  Thanks for your work. [~jackwangcs] 
And any update on it? 
I am happy to apply this patch to our production environment for testing.


was (Author: zuston):
Nice improvement!  Thanks for your work. [~jackwangcs] 
And any update on it? [~lirui]
I am happy to apply this patch to our production environment for testing.

> Introduce pluggable Hadoop delegation token providers
> -
>
> Key: FLINK-21232
> URL: https://issues.apache.org/jira/browse/FLINK-21232
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Deployment / YARN
>Reporter: jackwangcs
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
>
> Introduce a pluggable delegation provider via SPI. 
> Delegation provider could be placed in connector related code and is more 
> extendable comparing using reflection way to obtain DTs.
> Email dicussion thread:
> [https://lists.apache.org/thread.html/rbedb6e769358a10c6426c4c42b3b51cdbed48a3b6537e4ebde912bc0%40%3Cdev.flink.apache.org%3E]
>  
> [https://lists.apache.org/thread.html/r20d4be431ff2f6faff94129b5321a047fcbb0c71c8e092504cd91183%40%3Cdev.flink.apache.org%3E]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21232) Introduce pluggable Hadoop delegation token providers

2021-05-16 Thread Junfan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345857#comment-17345857
 ] 

Junfan Zhang commented on FLINK-21232:
--

Nice improvement!  Thanks for your work. [~jackwangcs] 
And any update on it? [~lirui]
I am happy to apply this patch to our production environment for testing.

> Introduce pluggable Hadoop delegation token providers
> -
>
> Key: FLINK-21232
> URL: https://issues.apache.org/jira/browse/FLINK-21232
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Deployment / YARN
>Reporter: jackwangcs
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
>
> Introduce a pluggable delegation provider via SPI. 
> Delegation provider could be placed in connector related code and is more 
> extendable comparing using reflection way to obtain DTs.
> Email dicussion thread:
> [https://lists.apache.org/thread.html/rbedb6e769358a10c6426c4c42b3b51cdbed48a3b6537e4ebde912bc0%40%3Cdev.flink.apache.org%3E]
>  
> [https://lists.apache.org/thread.html/r20d4be431ff2f6faff94129b5321a047fcbb0c71c8e092504cd91183%40%3Cdev.flink.apache.org%3E]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] hehuiyuan commented on pull request #15712: [FLINK-22400][hive connect]fix NPE problem when convert flink object for Map

2021-05-16 Thread GitBox


hehuiyuan commented on pull request #15712:
URL: https://github.com/apache/flink/pull/15712#issuecomment-841961124


   @lirui-apache 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] hehuiyuan removed a comment on pull request #15712: [FLINK-22400][hive connect]fix NPE problem when convert flink object for Map

2021-05-16 Thread GitBox


hehuiyuan removed a comment on pull request #15712:
URL: https://github.com/apache/flink/pull/15712#issuecomment-840226220


   @lirui-apache is it ok?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15925: [FLINK-21464][sql-client] Support ADD JAR in SQL Client

2021-05-16 Thread GitBox


flinkbot edited a comment on pull request #15925:
URL: https://github.com/apache/flink/pull/15925#issuecomment-841952583


   
   ## CI report:
   
   * 8119d476fa59a7aa35f09c84e285226ff9b3e238 UNKNOWN
   * 9f545673d53bf59cccdab10021641ece2cf21650 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15922: [FLINK-22650][python][table-planner-blink] Support StreamExecPythonCorrelate json serialization/deserialization

2021-05-16 Thread GitBox


flinkbot edited a comment on pull request #15922:
URL: https://github.com/apache/flink/pull/15922#issuecomment-841617014


   
   ## CI report:
   
   * 01cc8a69d63b42251c294c2460b8152cd0d06a11 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17981)
 
   * 9fcb8001b9956441051aa1800cdc2df052f7a2c7 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15913: [FLINK-22649][python][table-planner-blink] Support StreamExecPythonCalc json serialization/deserialization

2021-05-16 Thread GitBox


flinkbot edited a comment on pull request #15913:
URL: https://github.com/apache/flink/pull/15913#issuecomment-840516258


   
   ## CI report:
   
   * 0dea984584314c7e6c57d4bdcbbdd1276de7dff9 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17948)
 
   * ca0b0af0a7b0592c1e75da97251fd24a2a0348db Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18001)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14830: [FLINK-20487][table-planner-blink] Remove restriction on StreamPhysicalGroupWindowAggregate which only supports insert-only input nod

2021-05-16 Thread GitBox


flinkbot edited a comment on pull request #14830:
URL: https://github.com/apache/flink/pull/14830#issuecomment-770878871


   
   ## CI report:
   
   * c39b7a4ecde12aca916b2228ffe734598749fa9d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16721)
 
   * cffb2fc0ae51d8f60eae2eb4c19144291a87f87d Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17998)
 
   * ec4a374ea2544b0e07f98ca52e184265e14bb875 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] SteNicholas edited a comment on pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-16 Thread GitBox


SteNicholas edited a comment on pull request #15924:
URL: https://github.com/apache/flink/pull/15924#issuecomment-841957117


   @tweise , thanks for creating pull request for `HybridSource` basic 
implementation.  IMO, `SwitchableSource` and `SwitchableSplitEnumerator` 
mentioned in FLIP-150 could be introduced here to control the switching 
behavior more appropriately and add switchable source more conveniently. What's 
more, does `HybridSourceITCase` need to verify the switching between FileSource 
and KafkaSource?
   cc @StephanEwen @becketqin 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-22655) When using -i option to initialize SQL Client session It should be possible to annotate the script with --

2021-05-16 Thread JasonLee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345854#comment-17345854
 ] 

JasonLee commented on FLINK-22655:
--

I think u are right ,I am very happy to do this.

> When using -i  option to initialize SQL Client session It should be 
> possible to annotate the script with --
> -
>
> Key: FLINK-22655
> URL: https://issues.apache.org/jira/browse/FLINK-22655
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client
>Affects Versions: 1.13.0
>Reporter: JasonLee
>Assignee: Shengkai Fang
>Priority: Major
> Fix For: 1.14.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] SteNicholas commented on pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-16 Thread GitBox


SteNicholas commented on pull request #15924:
URL: https://github.com/apache/flink/pull/15924#issuecomment-841957117


   @tweise , thanks for creating pull request for `HybridSource` basic 
implementation.  IMO, `SwitchableSource` and `SwitchableSplitEnumerator` 
mentioned in FLIP-150 could be introduced here to control the switching 
behavior more appropriately.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-22672) Some enhancements for pluggable shuffle service framework

2021-05-16 Thread Jin Xing (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jin Xing updated FLINK-22672:
-
Description: 
"Pluggable shuffle service" in Flink provides an architecture which are unified 
for both streaming and batch jobs, allowing user to customize the process of 
data transfer between shuffle stages according to scenarios.

There are already a number of implementations of "remote shuffle service" on 
Spark like [1][2][3]. Remote shuffle enables to shuffle data from/to a remote 
cluster and achieves benefits like :
 # The lifecycle of computing resource can be decoupled with shuffle data, once 
computing task is finished, idle computing nodes can be released with its 
completed shuffle data accormadated on remote shuffle cluster.
 # There is no need to reserve disk capacity for shuffle on computing nodes. 
Remote shuffle cluster serves shuffling request with better scaling ability and 
alleviates the local disk pressure on computing nodes when data skew.

Based "pluggable shuffle service", we build our own "remote shuffle service" on 
Flink –- Lattice, which targets to provide functionalities and improve 
performance for batch processing jobs. Basically it works as below:
 # Lattice cluster works as an independent service for shuffling request;
 # LatticeShuffleMaster extends ShuffleMaster, works inside JM and talks with 
remote Lattice cluster for shuffle resouce application and shuffle data 
lifecycle management;
 # LatticeShuffleEnvironmente extends ShuffleEnvironment, works inside TM and 
provides an environment for shuffling data from/to remote Lattice cluster;

During the process of building Lattice we find some potential enhancements on 
"pluggable shuffle service". I will enumerate and create some sub JIRAs under 
this umbrella

 

[1] 
[https://www.alibabacloud.com/blog/emr-remote-shuffle-service-a-powerful-elastic-tool-of-serverless-spark_597728]

[2] [https://bestoreo.github.io/post/cosco/cosco/]

[3] [https://github.com/uber/RemoteShuffleService]

  was:
"Pluggable shuffle service" in Flink provides an architecture which are unified 
for both streaming and batch jobs, allowing user to customize the process of 
data transfer between shuffle stages according to scenarios.

There are already a number of implementations of "remote shuffle service" on 
Spark like [1][2][3]. Remote shuffle enables to shuffle data from/to a remote 
cluster and achieves benefits like :
 # The lifecycle of computing resource can be decoupled with shuffle data, once 
computing task is finished, idle computing nodes can be released with its 
completed shuffle data accormadated on remote shuffle cluster.
 # There is no need to reserve disk capacity for shuffle on computing nodes. 
Remote shuffle cluster serves shuffling request with better scaling ability and 
alleviates the local disk pressure on computing nodes when data skew.

Based "pluggable shuffle service", we build our own "remote shuffle service" on 
Flink -- Lattice, which targets to provide functionalities and improve 
performance for batch processing jobs. Basically it works as below:
 # Lattice cluster works as an independent service for shuffling request;
 # LatticeShuffleMaster extends ShuffleMaster, works inside JM and talks with 
remote Lattice cluster for shuffle resouce application and shuffle data 
lifecycle management;
 # LatticeShuffleEnvironmente extends ShuffleEnvironment, works inside TM and 
provides an environment for shuffling data from/to remote Lattice cluster;

During the process of building Lattice we find some potential enhancements on 
"pluggable shuffle service". I will enumerate and create some sub JIRAs under 
this umbrella

 

[1] 
[https://www.alibabacloud.com/blog/emr-remote-shuffle-service-a-powerful-elastic-tool-of-serverless-spark_597728]

[2] [https://bestoreo.github.io/post/cosco/cosco/]

[3] [https://github.com/uber/RemoteShuffleService]


> Some enhancements for pluggable shuffle service framework
> -
>
> Key: FLINK-22672
> URL: https://issues.apache.org/jira/browse/FLINK-22672
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Jin Xing
>Priority: Major
>
> "Pluggable shuffle service" in Flink provides an architecture which are 
> unified for both streaming and batch jobs, allowing user to customize the 
> process of data transfer between shuffle stages according to scenarios.
> There are already a number of implementations of "remote shuffle service" on 
> Spark like [1][2][3]. Remote shuffle enables to shuffle data from/to a remote 
> cluster and achieves benefits like :
>  # The lifecycle of computing resource can be decoupled with shuffle data, 
> once computing task is finished, idle computing nodes can be released with 
> its completed shuffle 

[jira] [Commented] (FLINK-19973) 【Flink-Deployment】YARN CLI Parameter doesn't work when set `execution.target: yarn-per-job` config

2021-05-16 Thread liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345851#comment-17345851
 ] 

liu commented on FLINK-19973:
-

When use flink-sql-client , how to set  {{execution.target to deploy job to 
yarn-application mode,or flink-sql-clien not support yarn-application mode ?}}

> 【Flink-Deployment】YARN CLI Parameter doesn't work when set `execution.target: 
> yarn-per-job` config
> --
>
> Key: FLINK-19973
> URL: https://issues.apache.org/jira/browse/FLINK-19973
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client, Deployment / YARN
>Affects Versions: 1.12.0
>Reporter: zhisheng
>Assignee: Kostas Kloudas
>Priority: Major
> Attachments: image-2020-11-04-20-58-49-738.png, 
> image-2020-11-04-21-00-06-180.png
>
>
> when i use flink-sql-client to deploy job to yarn(per job mod), I set 
> `execution.target: yarn-per-job` in flink-conf.yaml, job will deploy to yarn.
>  
> when I deploy jar job to yarn, The command is `./bin/flink run -m 
> yarn-cluster -ynm flink-1.12-test  -ytm 3g -yjm 3g 
> examples/streaming/StateMachineExample.jar`, it will deploy ok, but the 
> `-ynm`、`-ytm 3g` and `-yjm 3g` doesn't work. 
>  
> !image-2020-11-04-20-58-49-738.png|width=912,height=235!
>  
>  
> when i remove the config `execution.target: yarn-per-job`, it work well.
>  
> !image-2020-11-04-21-00-06-180.png|width=1047,height=150!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22675) Add an interface method ShuffleMaster#close

2021-05-16 Thread Jin Xing (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jin Xing updated FLINK-22675:
-
Description: When extending remote shuffle service based on 'pluggable 
shuffle service',  ShuffleMaster talks with remote cluster by network 
connection. This Jira proposes to add an interface method –- 
ShuffleMaster#close, which can be extended and do cleanup work and will be 
called when Flink application is closed.  (was: When extending remote shuffle 
service based on 'pluggable shuffle service', 
ShuffleMaster talks with remote cluster by network connection. This Jira 
proposes to add an interface method – ShuffleMaster#close, which can be 
extended and do cleanup work and will be called when Flink application is 
closed.)

> Add an interface method ShuffleMaster#close
> ---
>
> Key: FLINK-22675
> URL: https://issues.apache.org/jira/browse/FLINK-22675
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Jin Xing
>Priority: Major
>
> When extending remote shuffle service based on 'pluggable shuffle service',  
> ShuffleMaster talks with remote cluster by network connection. This Jira 
> proposes to add an interface method –- ShuffleMaster#close, which can be 
> extended and do cleanup work and will be called when Flink application is 
> closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22677) Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real asynchronous fashion

2021-05-16 Thread Jin Xing (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jin Xing updated FLINK-22677:
-
Description: Current scheduler enforces a synchronous registration though 
the API of ShuffleMaster#registerPartitionWithProducer returns a 
CompletableFuture. In scenario of remote shuffle service, the talk between 
ShuffleMaster and remote cluster tends to be expensive. A synchronous 
registration risks to block main thread potentially and might cause negative 
side effects like heartbeat timeout. Additionally, expensive synchronous 
invokes to remote could bottleneck the throughput for applying shuffle 
resource, especially for batch jobs with complicated DAGs;  (was: Current 
scheduler enforces a synchronous registration though the API of 
ShuffleMaster#registerPartitionWithProducer returns a CompletableFuture. In 
scenario of remote shuffle service, the talk between ShuffleMaster and remote 
cluster tends to be expensive. A synchronous registration risks to block main 
thread potentially and might cause negative side effects like heartbeat timeout.

Additionally, expensive synchronous invokes to remote could bottleneck the 
throughput for applying shuffle resource, especially for batch jobs with 
complicated DAGs;)

> Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real 
> asynchronous fashion
> --
>
> Key: FLINK-22677
> URL: https://issues.apache.org/jira/browse/FLINK-22677
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Jin Xing
>Priority: Major
>
> Current scheduler enforces a synchronous registration though the API of 
> ShuffleMaster#registerPartitionWithProducer returns a CompletableFuture. In 
> scenario of remote shuffle service, the talk between ShuffleMaster and remote 
> cluster tends to be expensive. A synchronous registration risks to block main 
> thread potentially and might cause negative side effects like heartbeat 
> timeout. Additionally, expensive synchronous invokes to remote could 
> bottleneck the throughput for applying shuffle resource, especially for batch 
> jobs with complicated DAGs;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] SteNicholas commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-16 Thread GitBox


SteNicholas commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r633194823



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/** Hybrid source reader that delegates to the actual current source reader. */

Review comment:
   The comment of `HybridSourceReader` is confused because there is no 
concept of current source reader.

##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+/** Hybrid source that switches underlying sources based on configurable 
source chain. */
+public class HybridSource implements Source {
+
+private final SourceChain sourceChain;
+
+public HybridSource(SourceChain sourceChain) {

Review comment:
   IMO, `SourceChain` shouldn't be generated from user side, which could be 
constructed in `HybridSource`. The constructor of `HybridSource` could be 
initial source or source list. 

##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * 

[jira] [Updated] (FLINK-20716) Pluggable shuffle service public interface IndexedInputGate mixed with Netty shuffle service implementation.

2021-05-16 Thread Jin Xing (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jin Xing updated FLINK-20716:
-
Description: In current "pluggable shuffle service", IndexedInputGate 
defines the reading for a single intermediate result (doc of InputGate) and 
InputChannel defines a single reading channel in IndexedInputGate. 
IndexedInputGate consists of a number of InputChanels. Current fields of 
InputChannel are mainly some descriptions or metrics for a single channel. But 
we found SingleInputGate is mixed in. From our understanding, SingleInputGate 
is a specific implementation in "Netty shuffle service" and should not be 
exposed by "pluggable shuffle service".  (was: IndexedInputGate provided by 
pluggable shuffle service as a public interface, and intend to allow user to 
extend/customize the shuffle-read behavior. Currently its method of 
IndexedInputGate#getChannel returns InputChannel, which from my understanding 
binds to Netty shuffle service implementation. Should we keep IndexedInputGate 
independent ?)

> Pluggable shuffle service public interface IndexedInputGate mixed with Netty 
> shuffle service implementation.
> 
>
> Key: FLINK-20716
> URL: https://issues.apache.org/jira/browse/FLINK-20716
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Jin Xing
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> In current "pluggable shuffle service", IndexedInputGate defines the reading 
> for a single intermediate result (doc of InputGate) and InputChannel defines 
> a single reading channel in IndexedInputGate. IndexedInputGate consists of a 
> number of InputChanels. Current fields of InputChannel are mainly some 
> descriptions or metrics for a single channel. But we found SingleInputGate is 
> mixed in. From our understanding, SingleInputGate is a specific 
> implementation in "Netty shuffle service" and should not be exposed by 
> "pluggable shuffle service".



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-20716) Pluggable shuffle service public interface IndexedInputGate mixed with Netty shuffle service implementation.

2021-05-16 Thread Jin Xing (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jin Xing updated FLINK-20716:
-
Parent: FLINK-22672
Issue Type: Sub-task  (was: Bug)

> Pluggable shuffle service public interface IndexedInputGate mixed with Netty 
> shuffle service implementation.
> 
>
> Key: FLINK-20716
> URL: https://issues.apache.org/jira/browse/FLINK-20716
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Jin Xing
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> IndexedInputGate provided by pluggable shuffle service as a public interface, 
> and intend to allow user to extend/customize the shuffle-read behavior. 
> Currently its method of IndexedInputGate#getChannel returns InputChannel, 
> which from my understanding binds to Netty shuffle service implementation. 
> Should we keep IndexedInputGate independent ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22677) Scheduler should invoke ShuffleMaster#registerPartitionWithProducer by a real asynchronous fashion

2021-05-16 Thread Jin Xing (Jira)
Jin Xing created FLINK-22677:


 Summary: Scheduler should invoke 
ShuffleMaster#registerPartitionWithProducer by a real asynchronous fashion
 Key: FLINK-22677
 URL: https://issues.apache.org/jira/browse/FLINK-22677
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Jin Xing


Current scheduler enforces a synchronous registration though the API of 
ShuffleMaster#registerPartitionWithProducer returns a CompletableFuture. In 
scenario of remote shuffle service, the talk between ShuffleMaster and remote 
cluster tends to be expensive. A synchronous registration risks to block main 
thread potentially and might cause negative side effects like heartbeat timeout.

Additionally, expensive synchronous invokes to remote could bottleneck the 
throughput for applying shuffle resource, especially for batch jobs with 
complicated DAGs;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22592) numBuffersInLocal is always zero when using unaligned checkpoints

2021-05-16 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song closed FLINK-22592.

Fix Version/s: 1.12.5
   1.13.1
   1.14.0
   Resolution: Fixed

Fixed via
- master (1.14): 6c6256130583a246309d1585029ad30d243941b3
- release-1.13: 9b2f97d07dfa39c58572655f0a30d76ad9156546
- release-1.12: 60093e4c51fd0e566e44c8ec7585a67c9376e164


> numBuffersInLocal is always zero when using unaligned checkpoints
> -
>
> Key: FLINK-22592
> URL: https://issues.apache.org/jira/browse/FLINK-22592
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.11.3, 1.13.0, 1.12.3
>Reporter: Piotr Nowojski
>Assignee: Nicholas Jiang
>Priority: Major
>  Labels: beginner-friendly, pull-request-available
> Fix For: 1.14.0, 1.13.1, 1.12.5
>
>
> This is because {{LocalRecoveredInputChannel#toInputChannelInternal}} is 
> passing wrong parameter to {{LocalInputChannel}}'s constructor (twice 
> {{numBytesIn}}):
> {code:java}
> protected InputChannel toInputChannelInternal() {
> return new LocalInputChannel(
> inputGate,
> getChannelIndex(),
> partitionId,
> partitionManager,
> taskEventPublisher,
> initialBackoff,
> maxBackoff,
> numBytesIn,
> numBytesIn,
> channelStateWriter);
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on pull request #15925: [FLINK-21464][sql-client] Support ADD JAR in SQL Client

2021-05-16 Thread GitBox


flinkbot commented on pull request #15925:
URL: https://github.com/apache/flink/pull/15925#issuecomment-841952583


   
   ## CI report:
   
   * 8119d476fa59a7aa35f09c84e285226ff9b3e238 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-16 Thread GitBox


flinkbot edited a comment on pull request #15924:
URL: https://github.com/apache/flink/pull/15924#issuecomment-841943851


   
   ## CI report:
   
   * 6aaf5dfeabcc9b470a06ff23be9a11ffeda61f3c Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17999)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-22676) The partition tracker should support remote shuffle properly

2021-05-16 Thread Jin Xing (Jira)
Jin Xing created FLINK-22676:


 Summary: The partition tracker should support remote shuffle 
properly
 Key: FLINK-22676
 URL: https://issues.apache.org/jira/browse/FLINK-22676
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Jin Xing


In current Flink, data partition is bound with the ResourceID of TM in 
Execution#startTrackingPartitions and partition tracker will stop tracking 
corresponding partitions when a TM 
disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of shuffle 
data is bound with computing resource (TM). It works fine for internal shuffle 
service, but doesn't for remote shuffle service. Note that shuffle data is 
accommodated on remote, the lifecycle of a completed partition is capable to be 
decoupled with TM, i.e. TM is totally fine to be released when no computing 
task on it and further shuffle reading requests could be directed to remote 
shuffle cluster. In addition, when a TM is lost, its completed data partitions 
on remote shuffle cluster could avoid reproducing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #15913: [FLINK-22649][python][table-planner-blink] Support StreamExecPythonCalc json serialization/deserialization

2021-05-16 Thread GitBox


flinkbot edited a comment on pull request #15913:
URL: https://github.com/apache/flink/pull/15913#issuecomment-840516258


   
   ## CI report:
   
   * 0dea984584314c7e6c57d4bdcbbdd1276de7dff9 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17948)
 
   * ca0b0af0a7b0592c1e75da97251fd24a2a0348db UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong closed pull request #15915: [FLINK-22592][runtime] numBuffersInLocal is always zero when using unaligned checkpoints

2021-05-16 Thread GitBox


xintongsong closed pull request #15915:
URL: https://github.com/apache/flink/pull/15915


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-22675) Add an interface method ShuffleMaster#close

2021-05-16 Thread Jin Xing (Jira)
Jin Xing created FLINK-22675:


 Summary: Add an interface method ShuffleMaster#close
 Key: FLINK-22675
 URL: https://issues.apache.org/jira/browse/FLINK-22675
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Jin Xing


When extending remote shuffle service based on 'pluggable shuffle service', 
ShuffleMaster talks with remote cluster by network connection. This Jira 
proposes to add an interface method – ShuffleMaster#close, which can be 
extended and do cleanup work and will be called when Flink application is 
closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22673) Add document about add jar related commands

2021-05-16 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-22673:

Component/s: Table SQL / Client

> Add document about add jar related commands
> ---
>
> Key: FLINK-22673
> URL: https://issues.apache.org/jira/browse/FLINK-22673
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Table SQL / Client
>Reporter: Shengkai Fang
>Priority: Major
> Fix For: 1.14.0
>
>
> Including {{ADD JAR}}, {{SHOW JAR}}, {{REMOVE JAR}}. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22592) numBuffersInLocal is always zero when using unaligned checkpoints

2021-05-16 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song reassigned FLINK-22592:


Assignee: Nicholas Jiang

> numBuffersInLocal is always zero when using unaligned checkpoints
> -
>
> Key: FLINK-22592
> URL: https://issues.apache.org/jira/browse/FLINK-22592
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.11.3, 1.13.0, 1.12.3
>Reporter: Piotr Nowojski
>Assignee: Nicholas Jiang
>Priority: Major
>  Labels: beginner-friendly, pull-request-available
>
> This is because {{LocalRecoveredInputChannel#toInputChannelInternal}} is 
> passing wrong parameter to {{LocalInputChannel}}'s constructor (twice 
> {{numBytesIn}}):
> {code:java}
> protected InputChannel toInputChannelInternal() {
> return new LocalInputChannel(
> inputGate,
> getChannelIndex(),
> partitionId,
> partitionManager,
> taskEventPublisher,
> initialBackoff,
> maxBackoff,
> numBytesIn,
> numBytesIn,
> channelStateWriter);
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #14830: [FLINK-20487][table-planner-blink] Remove restriction on StreamPhysicalGroupWindowAggregate which only supports insert-only input nod

2021-05-16 Thread GitBox


flinkbot edited a comment on pull request #14830:
URL: https://github.com/apache/flink/pull/14830#issuecomment-770878871


   
   ## CI report:
   
   * c39b7a4ecde12aca916b2228ffe734598749fa9d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16721)
 
   * cffb2fc0ae51d8f60eae2eb4c19144291a87f87d Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17998)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-22674) Provide JobID when apply shuffle resource by ShuffleMaster

2021-05-16 Thread Jin Xing (Jira)
Jin Xing created FLINK-22674:


 Summary: Provide JobID when apply shuffle resource by ShuffleMaster
 Key: FLINK-22674
 URL: https://issues.apache.org/jira/browse/FLINK-22674
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Jin Xing


In current Flink 'pluggable shuffle service' framework, only 
PartitionDescriptor and ProducerDescriptor are included as parameters in 
ShuffleMaster#registerPartitionWithProducer.

But when extending a remote shuffle service based on 'pluggable shuffle 
service', JobID is also needed when apply shuffle resource from remote cluster. 
It can be used as an identification to link shuffle resource with the 
corresponding job:
 # Remote shuffle cluster can isolate or do capacity control on shuffle 
resource between jobs;
 # Remote shuffle cluster can use JobID for shuffle data cleanup when job is 
lost thus to avoid file leak;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22673) Add document about add jar related commands

2021-05-16 Thread Shengkai Fang (Jira)
Shengkai Fang created FLINK-22673:
-

 Summary: Add document about add jar related commands
 Key: FLINK-22673
 URL: https://issues.apache.org/jira/browse/FLINK-22673
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Shengkai Fang
 Fix For: 1.14.0


Including {{ADD JAR}}, {{SHOW JAR}}, {{REMOVE JAR}}. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on pull request #15925: [FLINK-21464][sql-client] Support ADD JAR in SQL Client

2021-05-16 Thread GitBox


flinkbot commented on pull request #15925:
URL: https://github.com/apache/flink/pull/15925#issuecomment-841949701


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 8119d476fa59a7aa35f09c84e285226ff9b3e238 (Mon May 17 
03:09:24 UTC 2021)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-22669) Support date format in a path when use filesytem as source

2021-05-16 Thread Rui Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345844#comment-17345844
 ] 

Rui Li commented on FLINK-22669:


Hi [~wangqinghuan], would [dynamic table 
options|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/hints/#dynamic-table-options]
 satisfy the requirement here?

> Support date format in a path when use filesytem as source
> --
>
> Key: FLINK-22669
> URL: https://issues.apache.org/jira/browse/FLINK-22669
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Ecosystem
>Reporter: wangqinghuan
>Priority: Minor
>
> Do we have suggestions to support date format in a file path when using 
> filesystem as source?
> For example:
> {{CREATE TABLE fs_table (}}
> {{...}}
> {{)}}{{ WITH (}}
> 'connector'='filesystem',
>  'path'='ftp:/tmp/\{-MM-dd}/data',
> {{'format'='parquet'}}
> {{);}}
> the \{-MM-dd} expression in path will be replaced with actual system time.
> *Why support this?*
> date expression support will be used in some scheduled jobs, such as running 
> a daily job to read  current day data from FTP which will create a new file 
> path using date expression every day.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] fsk119 commented on pull request #15558: [FLINK-22064][sql-client] Support ADD JAR command in sql client

2021-05-16 Thread GitBox


fsk119 commented on pull request #15558:
URL: https://github.com/apache/flink/pull/15558#issuecomment-841947244


   CC #15925


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-21464) Support ADD JAR command in sql client

2021-05-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-21464:
---
Labels: pull-request-available stale-assigned  (was: stale-assigned)

> Support ADD JAR command in sql client
> -
>
> Key: FLINK-21464
> URL: https://issues.apache.org/jira/browse/FLINK-21464
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Client
>Affects Versions: 1.13.0
>Reporter: Shengkai Fang
>Assignee: xiangtao
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.14.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22658) Remove Deprecated util class TableConnectorUtil

2021-05-16 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-22658.
---
Fix Version/s: 1.14.0
   Resolution: Fixed

Fixed in master: 5ff7f731d32f33f8f5020bd7a6a4d8f5aa8b7818

> Remove Deprecated util class TableConnectorUtil
> ---
>
> Key: FLINK-22658
> URL: https://issues.apache.org/jira/browse/FLINK-22658
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API, Table SQL / Runtime
>Reporter: Xianghu Wang
>Assignee: Xianghu Wang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Replace `TableConnectorUtil` with `TableConnectorUtils`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wuchong merged pull request #15914: [FLINK-22658][table-common] Remove Deprecated util class TableConnectorUtil

2021-05-16 Thread GitBox


wuchong merged pull request #15914:
URL: https://github.com/apache/flink/pull/15914


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] fsk119 opened a new pull request #15925: [FLINK-21464][sql-client] Support ADD JAR in SQL Client

2021-05-16 Thread GitBox


fsk119 opened a new pull request #15925:
URL: https://github.com/apache/flink/pull/15925


   
   
   ## What is the purpose of the change
   
   *Support ADD JAR in SQL Client.*
   
   
   ## Brief change log
   
 - *Add grammer in calcite parser*
 - *`SessionContext` supports to create a new classloader and adds the 
added URL into the configuration*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Add end to end test in CliClientTest (load the function in added jar, 
RESET should reset the classloader also)*
 - *Test `SessionContext` to add the legal/illegal path.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / 
don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-21301) Decouple window aggregate allow lateness with state ttl configuration

2021-05-16 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345841#comment-17345841
 ] 

Jark Wu commented on FLINK-21301:
-

Hi [~qingru zhang], I thought about this again. Your requirement sounds like a 
fine-grained state TTL that different operators can have different TTL. Maybe 
can be addressed by this issue FLINK-17173? 

> Decouple window aggregate allow lateness with state ttl configuration
> -
>
> Key: FLINK-21301
> URL: https://issues.apache.org/jira/browse/FLINK-21301
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Andy
>Priority: Major
>  Labels: auto-unassigned
> Fix For: 1.14.0
>
>
> Currently, state retention time config will also effect state clean behavior 
> of Window Aggregate, which is unexpected for most users.
> E.g for the following example,  User would set `MinIdleStateRetentionTime` to 
> 1 Day to clean state in `deduplicate` . However, it will also effects clean 
> behavior of window aggregate. For example, 2021-01-04 data would clean at 
> 2021-01-06 instead of 2021-01-05. 
> {code:sql}
> SELECT
>  DATE_FORMAT(tumble_end(ROWTIME ,interval '1' DAY),'-MM-dd') as stat_time,
>  count(1) first_phone_num
> FROM (
>  SELECT 
>  ROWTIME,
>  user_id,
>  row_number() over(partition by user_id, pdate order by ROWTIME ) as rn
>  FROM source_kafka_biz_shuidi_sdb_crm_call_record 
> ) cal 
> where rn =1
> group by tumble(ROWTIME,interval '1' DAY);{code}
> It's better to decouple window aggregate allow lateness with 
> `MinIdleStateRetentionTime` .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22672) Some enhancements for pluggable shuffle service framework

2021-05-16 Thread Jin Xing (Jira)
Jin Xing created FLINK-22672:


 Summary: Some enhancements for pluggable shuffle service framework
 Key: FLINK-22672
 URL: https://issues.apache.org/jira/browse/FLINK-22672
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Jin Xing


"Pluggable shuffle service" in Flink provides an architecture which are unified 
for both streaming and batch jobs, allowing user to customize the process of 
data transfer between shuffle stages according to scenarios.

There are already a number of implementations of "remote shuffle service" on 
Spark like [1][2][3]. Remote shuffle enables to shuffle data from/to a remote 
cluster and achieves benefits like :
 # The lifecycle of computing resource can be decoupled with shuffle data, once 
computing task is finished, idle computing nodes can be released with its 
completed shuffle data accormadated on remote shuffle cluster.
 # There is no need to reserve disk capacity for shuffle on computing nodes. 
Remote shuffle cluster serves shuffling request with better scaling ability and 
alleviates the local disk pressure on computing nodes when data skew.

Based "pluggable shuffle service", we build our own "remote shuffle service" on 
Flink -- Lattice, which targets to provide functionalities and improve 
performance for batch processing jobs. Basically it works as below:
 # Lattice cluster works as an independent service for shuffling request;
 # LatticeShuffleMaster extends ShuffleMaster, works inside JM and talks with 
remote Lattice cluster for shuffle resouce application and shuffle data 
lifecycle management;
 # LatticeShuffleEnvironmente extends ShuffleEnvironment, works inside TM and 
provides an environment for shuffling data from/to remote Lattice cluster;

During the process of building Lattice we find some potential enhancements on 
"pluggable shuffle service". I will enumerate and create some sub JIRAs under 
this umbrella

 

[1] 
[https://www.alibabacloud.com/blog/emr-remote-shuffle-service-a-powerful-elastic-tool-of-serverless-spark_597728]

[2] [https://bestoreo.github.io/post/cosco/cosco/]

[3] [https://github.com/uber/RemoteShuffleService]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22540) Remove YAML environment file support in SQL Client

2021-05-16 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345840#comment-17345840
 ] 

Jark Wu commented on FLINK-22540:
-

[~fate], it is recommended to make the construtor arguments to be parameters of 
the {{eval}} function and do some initialization when first processing eval 
method. 

> Remove YAML environment file support in SQL Client
> --
>
> Key: FLINK-22540
> URL: https://issues.apache.org/jira/browse/FLINK-22540
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Client
>Reporter: Jark Wu
>Assignee: Shengkai Fang
>Priority: Critical
> Fix For: 1.14.0
>
>
> As discussed in 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-163%3A+SQL+Client+Improvements,
>  
> YAML environment file is deprecated in 1.13 version and should be removed in 
> 1.14 version. Users are recommended to use SQL script to initialize session.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-16 Thread GitBox


flinkbot commented on pull request #15924:
URL: https://github.com/apache/flink/pull/15924#issuecomment-841943851


   
   ## CI report:
   
   * 6aaf5dfeabcc9b470a06ff23be9a11ffeda61f3c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] HuangXingBo commented on pull request #15913: [FLINK-22649][python][table-planner-blink] Support StreamExecPythonCalc json serialization/deserialization

2021-05-16 Thread GitBox


HuangXingBo commented on pull request #15913:
URL: https://github.com/apache/flink/pull/15913#issuecomment-841943741


   @godfreyhe Thanks a lot for the review. I have addressed the comments at the 
latest commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] godfreyhe commented on a change in pull request #14830: [FLINK-20487][table-planner-blink] Remove restriction on StreamPhysicalGroupWindowAggregate which only supports insert-only inp

2021-05-16 Thread GitBox


godfreyhe commented on a change in pull request #14830:
URL: https://github.com/apache/flink/pull/14830#discussion_r633190805



##
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
##
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.planner.runtime.stream.sql
 
+import java.math.BigDecimal

Review comment:
   nit: reorder the imports




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14830: [FLINK-20487][table-planner-blink] Remove restriction on StreamPhysicalGroupWindowAggregate which only supports insert-only input nod

2021-05-16 Thread GitBox


flinkbot edited a comment on pull request #14830:
URL: https://github.com/apache/flink/pull/14830#issuecomment-770878871


   
   ## CI report:
   
   * c39b7a4ecde12aca916b2228ffe734598749fa9d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16721)
 
   * cffb2fc0ae51d8f60eae2eb4c19144291a87f87d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] tweise commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-16 Thread GitBox


tweise commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r633190010



##
File path: 
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
##
@@ -118,16 +121,37 @@ private void testBoundedTextFileSource(FailoverType 
failoverType) throws Excepti
 // default
 writeHiddenJunkFiles(testDir);
 
-final FileSource source =
+final FileSource fileSource =
+FileSource.forRecordStreamFormat(new TextLineFormat(), 
Path.fromLocalFile(testDir))
+.build();
+// directory doesn't matter; splits are supplied by converter
+final FileSource fileSource2 =
 FileSource.forRecordStreamFormat(new TextLineFormat(), 
Path.fromLocalFile(testDir))
 .build();
 
+HybridSource.SourceChain>

Review comment:
   This will be reverted - now in HybridSourceITCase




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-17845) Can't remove a table connector property with ALTER TABLE

2021-05-16 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345837#comment-17345837
 ] 

Jark Wu commented on FLINK-17845:
-

Hi [~qingyue], I think just removing keys is fine because it then will use 
default values. This is also how RESET command does in SQL CLI [1]. 

[1]: 
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/reset/

> Can't remove a table connector property with ALTER TABLE
> 
>
> Key: FLINK-17845
> URL: https://issues.apache.org/jira/browse/FLINK-17845
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Fabian Hueske
>Priority: Major
>  Labels: stale-major
>
> It is not possible to remove an existing table property from a table.
> Looking at the [source 
> code|https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java#L295]
>  this seems to be the intended semantics, but it seems counter-intuitive to 
> me.
> If I create a table with the following statement:
> {code}
> CREATE TABLE `testTable` (
>   id INT
> )
> WITH (
> 'connector.type' = 'kafka',
> 'connector.version' = 'universal',
> 'connector.topicX' = 'test',  -- Woops, I made a typo here
> [...]
> )
> {code}
> The statement will be successfully executed. However, the table cannot be 
> used due to the typo.
> Fixing the typo with the following DDL is not possible:
> {code}
> ALTER TABLE `testTable` SET (
> 'connector.type' = 'kafka',
> 'connector.version' = 'universal',
> 'connector.topic' = 'test',  -- Fixing the typo
> )
> {code}
> because the key {{connector.topicX}} is not removed.
> Right now it seems that the only way to fix a table with an invalid key is to 
> DROP and CREATE it. I think that this use case should be supported by ALTER 
> TABLE.
> I would even argue that the expected behavior is that previous properties are 
> removed and replaced by the new properties.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] tweise commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-16 Thread GitBox


tweise commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r633189734



##
File path: 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
##
@@ -39,6 +42,46 @@
 /** IT case for the {@link Source} with a coordinator. */
 public class CoordinatedSourceITCase extends AbstractTestBase {
 
+/** Test with start position dynamically derived from previous enumerator 
state. */
+@Test
+public void testHybridSourceWithDynamicStartPosition() throws Exception {

Review comment:
   This will be moved to HybridSourceITCase




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-22654) SqlCreateTable toString()/unparse() lose CONSTRAINTS and watermarks

2021-05-16 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-22654:
---
Fix Version/s: 1.13.1

> SqlCreateTable  toString()/unparse() lose CONSTRAINTS  and watermarks
> -
>
> Key: FLINK-22654
> URL: https://issues.apache.org/jira/browse/FLINK-22654
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: youxianq
>Assignee: Terry Wang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.13.1
>
>
> create a SqlCreateTable using like clause   and then toString() or unparse()  
> will lose watermark 
> if  no column. 
> {code:java}
> public static SqlParser getSqlParser(String sql) {
> SourceStringReader sqlReader = new SourceStringReader(sql);
> return SqlParser.create(sqlReader,
> SqlParser.configBuilder()
> .setParserFactory(FlinkSqlParserImpl.FACTORY)
> .setLex(Lex.JAVA)
> .setIdentifierMaxLength(256)
> .setConformance(FlinkSqlConformance.DEFAULT)
> .build());
> }
> public static void main(String[] args) throws Exception {
> SqlParser sqlParser = getSqlParser("" +
> "create TEMPORARY table t_order_course (\n" +
> "   WATERMARK FOR last_update_time AS last_update_time - INTERVAL 
> '5' SECOND\n" +
> ") with (\n" +
> "  'scan.startup.mode' = 'specific-offsets',\n" +
> "  'scan.startup.specific-offsets' = 
> 'partition:0,offset:1169129'\n" +
> ") like cdc.`qq_data(sh-backend-tst:3306)`.t_order_course (\n" +
> "   OVERWRITING  WATERMARKS\n" +
> "   OVERWRITING OPTIONS\n" +
> "   EXCLUDING CONSTRAINTS\n" +
> " \n" +
> ")");
> SqlNode sqlNode = sqlParser.parseStmt();
> System.out.println(sqlNode.toString());
> }
> {code}
> output:
> CREATE TEMPORARY TABLE `t_order_course` WITH ( 'scan.startup.mode' = 
> 'specific-offsets', 'scan.startup.specific-offsets' = 
> 'partition:0,offset:1169129' ) LIKE 
> `cdc`.`qq_data(sh-backend-tst:3306)`.`t_order_course` ( OVERWRITING 
> WATERMARKS OVERWRITING OPTIONS EXCLUDING CONSTRAINTS )



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-22654) SqlCreateTable toString()/unparse() lose CONSTRAINTS and watermarks

2021-05-16 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345833#comment-17345833
 ] 

godfrey he edited comment on FLINK-22654 at 5/17/21, 2:40 AM:
--

Fixed in 1.14.0: cb4b4d37870fba22af122d1c31e49ce7f3ed096c
Fixed in 1.13.1: 6f4a49ce7c79a8f820c22bd797c6db13f5d0d177


was (Author: godfreyhe):
Fixed in 1.14.0: cb4b4d37870fba22af122d1c31e49ce7f3ed096c

> SqlCreateTable  toString()/unparse() lose CONSTRAINTS  and watermarks
> -
>
> Key: FLINK-22654
> URL: https://issues.apache.org/jira/browse/FLINK-22654
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: youxianq
>Assignee: Terry Wang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> create a SqlCreateTable using like clause   and then toString() or unparse()  
> will lose watermark 
> if  no column. 
> {code:java}
> public static SqlParser getSqlParser(String sql) {
> SourceStringReader sqlReader = new SourceStringReader(sql);
> return SqlParser.create(sqlReader,
> SqlParser.configBuilder()
> .setParserFactory(FlinkSqlParserImpl.FACTORY)
> .setLex(Lex.JAVA)
> .setIdentifierMaxLength(256)
> .setConformance(FlinkSqlConformance.DEFAULT)
> .build());
> }
> public static void main(String[] args) throws Exception {
> SqlParser sqlParser = getSqlParser("" +
> "create TEMPORARY table t_order_course (\n" +
> "   WATERMARK FOR last_update_time AS last_update_time - INTERVAL 
> '5' SECOND\n" +
> ") with (\n" +
> "  'scan.startup.mode' = 'specific-offsets',\n" +
> "  'scan.startup.specific-offsets' = 
> 'partition:0,offset:1169129'\n" +
> ") like cdc.`qq_data(sh-backend-tst:3306)`.t_order_course (\n" +
> "   OVERWRITING  WATERMARKS\n" +
> "   OVERWRITING OPTIONS\n" +
> "   EXCLUDING CONSTRAINTS\n" +
> " \n" +
> ")");
> SqlNode sqlNode = sqlParser.parseStmt();
> System.out.println(sqlNode.toString());
> }
> {code}
> output:
> CREATE TEMPORARY TABLE `t_order_course` WITH ( 'scan.startup.mode' = 
> 'specific-offsets', 'scan.startup.specific-offsets' = 
> 'partition:0,offset:1169129' ) LIKE 
> `cdc`.`qq_data(sh-backend-tst:3306)`.`t_order_course` ( OVERWRITING 
> WATERMARKS OVERWRITING OPTIONS EXCLUDING CONSTRAINTS )



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on pull request #15924: [FLINK-22670][FLIP-150] Hybrid source baseline

2021-05-16 Thread GitBox


flinkbot commented on pull request #15924:
URL: https://github.com/apache/flink/pull/15924#issuecomment-841938388


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 6aaf5dfeabcc9b470a06ff23be9a11ffeda61f3c (Mon May 17 
02:34:11 UTC 2021)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-22655) When using -i option to initialize SQL Client session It should be possible to annotate the script with --

2021-05-16 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345834#comment-17345834
 ] 

Shengkai Fang commented on FLINK-22655:
---

Thanks for your hint. The solution doesn' t fix the problem when comment is in 
the same line, e.g. {{SET key = value; -- COMMENT}}. 

I think a better solution is use {{MASK}} to clean the lines.


{code:java}
public static List splitContent(String content) {
List statements = new ArrayList<>();
List buffer = new ArrayList<>();

for (String line : content.split("\n")) {
if (isEndOfStatement(line)) {
buffer.add(line);
statements.add(
buffer.stream()
.map(statementLine -> 
statementLine.replaceAll(MASK, ""))
.collect(Collectors.joining("\n")));
buffer.clear();
} else {
buffer.add(line);
}
}
if (!buffer.isEmpty()) {
statements.add(String.join("\n", buffer));
}
return statements;
}
{code}

Are you willing to help to fix this? 

> When using -i  option to initialize SQL Client session It should be 
> possible to annotate the script with --
> -
>
> Key: FLINK-22655
> URL: https://issues.apache.org/jira/browse/FLINK-22655
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client
>Affects Versions: 1.13.0
>Reporter: JasonLee
>Assignee: Shengkai Fang
>Priority: Major
> Fix For: 1.14.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22654) SqlCreateTable toString()/unparse() lose CONSTRAINTS and watermarks

2021-05-16 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he closed FLINK-22654.
--
Resolution: Fixed

Fixed in 1.14.0: cb4b4d37870fba22af122d1c31e49ce7f3ed096c

> SqlCreateTable  toString()/unparse() lose CONSTRAINTS  and watermarks
> -
>
> Key: FLINK-22654
> URL: https://issues.apache.org/jira/browse/FLINK-22654
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: youxianq
>Assignee: Terry Wang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> create a SqlCreateTable using like clause   and then toString() or unparse()  
> will lose watermark 
> if  no column. 
> {code:java}
> public static SqlParser getSqlParser(String sql) {
> SourceStringReader sqlReader = new SourceStringReader(sql);
> return SqlParser.create(sqlReader,
> SqlParser.configBuilder()
> .setParserFactory(FlinkSqlParserImpl.FACTORY)
> .setLex(Lex.JAVA)
> .setIdentifierMaxLength(256)
> .setConformance(FlinkSqlConformance.DEFAULT)
> .build());
> }
> public static void main(String[] args) throws Exception {
> SqlParser sqlParser = getSqlParser("" +
> "create TEMPORARY table t_order_course (\n" +
> "   WATERMARK FOR last_update_time AS last_update_time - INTERVAL 
> '5' SECOND\n" +
> ") with (\n" +
> "  'scan.startup.mode' = 'specific-offsets',\n" +
> "  'scan.startup.specific-offsets' = 
> 'partition:0,offset:1169129'\n" +
> ") like cdc.`qq_data(sh-backend-tst:3306)`.t_order_course (\n" +
> "   OVERWRITING  WATERMARKS\n" +
> "   OVERWRITING OPTIONS\n" +
> "   EXCLUDING CONSTRAINTS\n" +
> " \n" +
> ")");
> SqlNode sqlNode = sqlParser.parseStmt();
> System.out.println(sqlNode.toString());
> }
> {code}
> output:
> CREATE TEMPORARY TABLE `t_order_course` WITH ( 'scan.startup.mode' = 
> 'specific-offsets', 'scan.startup.specific-offsets' = 
> 'partition:0,offset:1169129' ) LIKE 
> `cdc`.`qq_data(sh-backend-tst:3306)`.`t_order_course` ( OVERWRITING 
> WATERMARKS OVERWRITING OPTIONS EXCLUDING CONSTRAINTS )



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22654) SqlCreateTable toString()/unparse() lose CONSTRAINTS and watermarks

2021-05-16 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-22654:
---
Fix Version/s: 1.14.0

> SqlCreateTable  toString()/unparse() lose CONSTRAINTS  and watermarks
> -
>
> Key: FLINK-22654
> URL: https://issues.apache.org/jira/browse/FLINK-22654
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: youxianq
>Assignee: Terry Wang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> create a SqlCreateTable using like clause   and then toString() or unparse()  
> will lose watermark 
> if  no column. 
> {code:java}
> public static SqlParser getSqlParser(String sql) {
> SourceStringReader sqlReader = new SourceStringReader(sql);
> return SqlParser.create(sqlReader,
> SqlParser.configBuilder()
> .setParserFactory(FlinkSqlParserImpl.FACTORY)
> .setLex(Lex.JAVA)
> .setIdentifierMaxLength(256)
> .setConformance(FlinkSqlConformance.DEFAULT)
> .build());
> }
> public static void main(String[] args) throws Exception {
> SqlParser sqlParser = getSqlParser("" +
> "create TEMPORARY table t_order_course (\n" +
> "   WATERMARK FOR last_update_time AS last_update_time - INTERVAL 
> '5' SECOND\n" +
> ") with (\n" +
> "  'scan.startup.mode' = 'specific-offsets',\n" +
> "  'scan.startup.specific-offsets' = 
> 'partition:0,offset:1169129'\n" +
> ") like cdc.`qq_data(sh-backend-tst:3306)`.t_order_course (\n" +
> "   OVERWRITING  WATERMARKS\n" +
> "   OVERWRITING OPTIONS\n" +
> "   EXCLUDING CONSTRAINTS\n" +
> " \n" +
> ")");
> SqlNode sqlNode = sqlParser.parseStmt();
> System.out.println(sqlNode.toString());
> }
> {code}
> output:
> CREATE TEMPORARY TABLE `t_order_course` WITH ( 'scan.startup.mode' = 
> 'specific-offsets', 'scan.startup.specific-offsets' = 
> 'partition:0,offset:1169129' ) LIKE 
> `cdc`.`qq_data(sh-backend-tst:3306)`.`t_order_course` ( OVERWRITING 
> WATERMARKS OVERWRITING OPTIONS EXCLUDING CONSTRAINTS )



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] godfreyhe merged pull request #15918: [FLINK-22654][table] fix SqlCreateTable toString()/unparse() lose CONSTRAINTS and watermarks

2021-05-16 Thread GitBox


godfreyhe merged pull request #15918:
URL: https://github.com/apache/flink/pull/15918


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-22670) Initial HybridSource DataStream Implementation

2021-05-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-22670:
---
Labels: pull-request-available  (was: )

> Initial HybridSource DataStream Implementation
> --
>
> Key: FLINK-22670
> URL: https://issues.apache.org/jira/browse/FLINK-22670
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> Implementation based on FLIP-27 interfaces that supports fixed start position 
> and runtime position conversion for an upfront defined source sequence. Can 
> work with any FLIP-27 source.
> Discussion: 
> [https://lists.apache.org/thread.html/r94057d19f0df2a211695820375502d60cddeeab5ad27057c1ca988d6%40%3Cdev.flink.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] tweise opened a new pull request #15924: [FLINK-22670][FLIP-150] Hybrid source baseline

2021-05-16 Thread GitBox


tweise opened a new pull request #15924:
URL: https://github.com/apache/flink/pull/15924


   ## What is the purpose of the change
   
   Initial implementation for hybrid source with support for fixed start 
position and runtime position conversion for an upfront defined source 
sequence. Can work with any FLIP-27 source.
   
   Discussion: 
https://lists.apache.org/thread.html/r94057d19f0df2a211695820375502d60cddeeab5ad27057c1ca988d6%40%3Cdev.flink.apache.org%3E
   
   Based on the previously shared prototype. Integration test with 
`MockBaseSource`. Unit tests to be added. Tested with `FileSource` and 
`KafkaSource` on Flink 1.12 cluster. This combination would be a candidate for 
future e2e test addition.
   
   The design follows overall that outlined in 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
 except that no special interfaces are required for switching of underlying 
sources.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / 
don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)
 - User documentation will be added as follow-up once baseline is 
established
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] godfreyhe commented on a change in pull request #15913: [FLINK-22649][python][table-planner-blink] Support StreamExecPythonCalc json serialization/deserialization

2021-05-16 Thread GitBox


godfreyhe commented on a change in pull request #15913:
URL: https://github.com/apache/flink/pull/15913#discussion_r633181871



##
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest.java
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/** Test json serialization/deserialization for calc. */
+public class PythonCalcJsonPlanTest extends TableTestBase {
+
+private StreamTableTestUtil util;
+private TableEnvironment tEnv;
+
+@Before
+public void setup() {
+util = streamTestUtil(TableConfig.getDefault());
+tEnv = util.getTableEnv();
+
+String srcTableDdl =
+"CREATE TABLE MyTable (\n"
++ "  a bigint,\n"
++ "  b int not null,\n"
++ "  c varchar,\n"
++ "  d timestamp(3)\n"
++ ") with (\n"
++ "  'connector' = 'values',\n"
++ "  'bounded' = 'false')";
+tEnv.executeSql(srcTableDdl);
+}
+
+@Test
+public void testPythonCalc() {
+tEnv.createTemporaryFunction("pyFunc", new 
PythonScalarFunction("pyFunc"));
+String sinkTableDdl =
+"CREATE TABLE MySink (\n"
++ "  a bigint,\n"
++ "  b int\n"
++ ") with (\n"
++ "  'connector' = 'values',\n"
++ "  'table-sink-class' = 'DEFAULT')";
+tEnv.executeSql(sinkTableDdl);
+util.verifyJsonPlan("insert into MySink select a, pyFunc(b, b) from 
MyTable");
+}

Review comment:
   it's better to add a test with filter




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] 95chenjz commented on pull request #15866: [FLINK-22562][docs-zh]Translate recent updates to backpressure and ch…

2021-05-16 Thread GitBox


95chenjz commented on pull request #15866:
URL: https://github.com/apache/flink/pull/15866#issuecomment-841935204


   cc @wuchong @xccui @klion26, looking for your feedbacks, thanks! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-22562) Translate recent updates to backpressure and checkpointing docs

2021-05-16 Thread Jianzhang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345830#comment-17345830
 ] 

Jianzhang Chen commented on FLINK-22562:


Please review: https://github.com/apache/flink/pull/15866

> Translate recent updates to backpressure and checkpointing docs
> ---
>
> Key: FLINK-22562
> URL: https://issues.apache.org/jira/browse/FLINK-22562
> Project: Flink
>  Issue Type: Bug
>  Components: chinese-translation
>Affects Versions: 1.14.0, 1.13.1
>Reporter: Piotr Nowojski
>Assignee: Jianzhang Chen
>Priority: Major
>  Labels: pull-request-available
>
> https://issues.apache.org/jira/browse/FLINK-22253
> https://github.com/apache/flink/pull/15811



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-22540) Remove YAML environment file support in SQL Client

2021-05-16 Thread liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345827#comment-17345827
 ] 

liu edited comment on FLINK-22540 at 5/17/21, 2:21 AM:
---

For user-define-function,I want to know how to create function with parameters 
in sql-client. If use  YAML environment file ,I can achieve it with keyword 
'constructor'. But  in sql-client, I cannot find the way to acheive it  without 
YAML environment file. Thanks.

 

 


was (Author: fate):
For user-define-function,I want to know how to create function with parameters 
in sql-client. If use  YAML environment file ,I can achieve it with keyword 
'constructor'. But  in sql-client, I cannot find the way to acheive it  without 
YAML environment file.

 

 

> Remove YAML environment file support in SQL Client
> --
>
> Key: FLINK-22540
> URL: https://issues.apache.org/jira/browse/FLINK-22540
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Client
>Reporter: Jark Wu
>Assignee: Shengkai Fang
>Priority: Critical
> Fix For: 1.14.0
>
>
> As discussed in 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-163%3A+SQL+Client+Improvements,
>  
> YAML environment file is deprecated in 1.13 version and should be removed in 
> 1.14 version. Users are recommended to use SQL script to initialize session.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22669) Support date format in a path when use filesytem as source

2021-05-16 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-22669:
-
Component/s: Table SQL / Ecosystem

> Support date format in a path when use filesytem as source
> --
>
> Key: FLINK-22669
> URL: https://issues.apache.org/jira/browse/FLINK-22669
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Ecosystem
>Reporter: wangqinghuan
>Priority: Minor
>
> Do we have suggestions to support date format in a file path when using 
> filesystem as source?
> For example:
> {{CREATE TABLE fs_table (}}
> {{...}}
> {{)}}{{ WITH (}}
> 'connector'='filesystem',
>  'path'='ftp:/tmp/\{-MM-dd}/data',
> {{'format'='parquet'}}
> {{);}}
> the \{-MM-dd} expression in path will be replaced with actual system time.
> *Why support this?*
> date expression support will be used in some scheduled jobs, such as running 
> a daily job to read  current day data from FTP which will create a new file 
> path using date expression every day.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22540) Remove YAML environment file support in SQL Client

2021-05-16 Thread liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345827#comment-17345827
 ] 

liu commented on FLINK-22540:
-

For user-define-function,I want to know how to create function with parameters 
in sql-client. If use  YAML environment file ,I can achieve it with keyword 
'constructor'. But  in sql-client, I cannot find the way to acheive it  without 
YAML environment file.

 

 

> Remove YAML environment file support in SQL Client
> --
>
> Key: FLINK-22540
> URL: https://issues.apache.org/jira/browse/FLINK-22540
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Client
>Reporter: Jark Wu
>Assignee: Shengkai Fang
>Priority: Critical
> Fix For: 1.14.0
>
>
> As discussed in 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-163%3A+SQL+Client+Improvements,
>  
> YAML environment file is deprecated in 1.13 version and should be removed in 
> 1.14 version. Users are recommended to use SQL script to initialize session.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22669) Support date format in a path when use filesytem as source

2021-05-16 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345826#comment-17345826
 ] 

Jingsong Lee commented on FLINK-22669:
--

CC: [~lirui]

> Support date format in a path when use filesytem as source
> --
>
> Key: FLINK-22669
> URL: https://issues.apache.org/jira/browse/FLINK-22669
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Ecosystem
>Reporter: wangqinghuan
>Priority: Minor
>
> Do we have suggestions to support date format in a file path when using 
> filesystem as source?
> For example:
> {{CREATE TABLE fs_table (}}
> {{...}}
> {{)}}{{ WITH (}}
> 'connector'='filesystem',
>  'path'='ftp:/tmp/\{-MM-dd}/data',
> {{'format'='parquet'}}
> {{);}}
> the \{-MM-dd} expression in path will be replaced with actual system time.
> *Why support this?*
> date expression support will be used in some scheduled jobs, such as running 
> a daily job to read  current day data from FTP which will create a new file 
> path using date expression every day.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] KarmaGYZ edited a comment on pull request #15812: remove slotpoolImpl

2021-05-16 Thread GitBox


KarmaGYZ edited a comment on pull request #15812:
URL: https://github.com/apache/flink/pull/15812#issuecomment-841927457


   Hi, @lqjack . We prefer to use `rebase` instead of `merge` here, which gives 
us a linear history.
   
   Also, it seems the removal of `ResourceManagerGateway.requestSlot` has not 
been included in the latest commit?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-22667) Fix the wrong file system ln java doc and documentation

2021-05-16 Thread Yao Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345824#comment-17345824
 ] 

Yao Zhang commented on FLINK-22667:
---

Pls change the first segment of your commit message to ‘[FLINK-22667]’ so that 
it can be linked to this Jira issue automatically.

> Fix the wrong file system ln java doc and documentation
> ---
>
> Key: FLINK-22667
> URL: https://issues.apache.org/jira/browse/FLINK-22667
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Yishuang Lu
>Priority: Major
>
> The file system url is missng a `\`:
> [https://github.com/apache/flink/blob/master/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md]
> {code:java}
> env.getCheckpointConfig().setCheckpointStorage("hdfs://my/checkpoint/dir")
> {code}
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java#L76]
> {code:java}
>  *
> env.getCheckpointConfig().setCheckpointStorage("hdfs://checkpoints");{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] godfreyhe commented on a change in pull request #15913: [FLINK-22649][python][table-planner-blink] Support StreamExecPythonCalc json serialization/deserialization

2021-05-16 Thread GitBox


godfreyhe commented on a change in pull request #15913:
URL: https://github.com/apache/flink/pull/15913#discussion_r633181871



##
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest.java
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/** Test json serialization/deserialization for calc. */
+public class PythonCalcJsonPlanTest extends TableTestBase {
+
+private StreamTableTestUtil util;
+private TableEnvironment tEnv;
+
+@Before
+public void setup() {
+util = streamTestUtil(TableConfig.getDefault());
+tEnv = util.getTableEnv();
+
+String srcTableDdl =
+"CREATE TABLE MyTable (\n"
++ "  a bigint,\n"
++ "  b int not null,\n"
++ "  c varchar,\n"
++ "  d timestamp(3)\n"
++ ") with (\n"
++ "  'connector' = 'values',\n"
++ "  'bounded' = 'false')";
+tEnv.executeSql(srcTableDdl);
+}
+
+@Test
+public void testPythonCalc() {
+tEnv.createTemporaryFunction("pyFunc", new 
PythonScalarFunction("pyFunc"));
+String sinkTableDdl =
+"CREATE TABLE MySink (\n"
++ "  a bigint,\n"
++ "  b int\n"
++ ") with (\n"
++ "  'connector' = 'values',\n"
++ "  'table-sink-class' = 'DEFAULT')";
+tEnv.executeSql(sinkTableDdl);
+util.verifyJsonPlan("insert into MySink select a, pyFunc(b, b) from 
MyTable");
+}

Review comment:
   it's better add a test with filter

##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCalc.scala
##
@@ -49,8 +51,10 @@ class BatchPhysicalPythonCalc(
   }
 
   override def translateToExecNode(): ExecNode[_] = {
+val projection = calcProgram.getProjectList.map(calcProgram.expandLocalRef)

Review comment:
   nit: it's better to check the condition in `calcProgram` is empty ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (FLINK-19481) Add support for a flink native GCS FileSystem

2021-05-16 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345823#comment-17345823
 ] 

Xintong Song edited comment on FLINK-19481 at 5/17/21, 2:08 AM:


Hi [~jgrier], thanks for your input.

I have noticed your earlier comment. However, that comment was before Galen's 
PR and I think things are a bit different with this PR now.
- IIUC, what you've described are benefits of a native implementation 
*comparing to the current status*, where Flink does not provide any specific 
supports for GS and users have to deal with the Hadoop dependencies and Flink's 
FS abstractions by themselves. 
- What I'm trying to understand are the benefits *comparing to the status once 
Galen's PR is merged*. The PR provides an out-of-box Hadoop-based GS FS 
implementation, so that users no longer need to deal with the dependencies and 
abstractions. In that case, is it still beneficial that this implementation, 
internally, is built directly on top of the GCS native SDK, rather than 
leveraging the existing Hadoop stack provided by google storage connector?


was (Author: xintongsong):
Hi [~jgrier], thanks for your input.

I have noticed your earlier comment. However, that comment was before Galen's 
PR and I think things are a bit different with this PR now.
- IIUC, what you've described are benefits of a native implementation 
*comparing to the current status*, where Flink does not provide any specific 
supports for GS and users have to deal with the Hadoop dependencies and Flink's 
FS abstractions by themselves. 
- What I'm trying to understand are the benefits *comparing to the status once 
Galen's PR is merged*. The PR provides an out-of-box GS FS implementation, so 
that users no longer need to deal with the dependencies and abstractions. In 
that case, is it still beneficial that this implementation, internally, is 
built directly on top of the GCS native SDK, rather than leveraging the 
existing Hadoop stack provided by google storage connector?

> Add support for a flink native GCS FileSystem
> -
>
> Key: FLINK-19481
> URL: https://issues.apache.org/jira/browse/FLINK-19481
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, FileSystems
>Affects Versions: 1.12.0
>Reporter: Ben Augarten
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Currently, GCS is supported but only by using the hadoop connector[1]
>  
> The objective of this improvement is to add support for checkpointing to 
> Google Cloud Storage with the Flink File System,
>  
> This would allow the `gs://` scheme to be used for savepointing and 
> checkpointing. Long term, it would be nice if we could use the GCS FileSystem 
> as a source and sink in flink jobs as well. 
>  
> Long term, I hope that implementing a flink native GCS FileSystem will 
> simplify usage of GCS because the hadoop FileSystem ends up bringing in many 
> unshaded dependencies.
>  
> [1] 
> [https://github.com/GoogleCloudDataproc/hadoop-connectors|https://github.com/GoogleCloudDataproc/hadoop-connectors)]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19481) Add support for a flink native GCS FileSystem

2021-05-16 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345823#comment-17345823
 ] 

Xintong Song commented on FLINK-19481:
--

Hi [~jgrier], thanks for your input.

I have noticed your earlier comment. However, that comment was before Galen's 
PR and I think things are a bit different with this PR now.
- IIUC, what you've described are benefits of a native implementation 
*comparing to the current status*, where Flink does not provide any specific 
supports for GS and users have to deal with the Hadoop dependencies and Flink's 
FS abstractions by themselves. 
- What I'm trying to understand are the benefits *comparing to the status once 
Galen's PR is merged*. The PR provides an out-of-box GS FS implementation, so 
that users no longer need to deal with the dependencies and abstractions. In 
that case, is it still beneficial that this implementation, internally, is 
built directly on top of the GCS native SDK, rather than leveraging the 
existing Hadoop stack provided by google storage connector?

> Add support for a flink native GCS FileSystem
> -
>
> Key: FLINK-19481
> URL: https://issues.apache.org/jira/browse/FLINK-19481
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, FileSystems
>Affects Versions: 1.12.0
>Reporter: Ben Augarten
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Currently, GCS is supported but only by using the hadoop connector[1]
>  
> The objective of this improvement is to add support for checkpointing to 
> Google Cloud Storage with the Flink File System,
>  
> This would allow the `gs://` scheme to be used for savepointing and 
> checkpointing. Long term, it would be nice if we could use the GCS FileSystem 
> as a source and sink in flink jobs as well. 
>  
> Long term, I hope that implementing a flink native GCS FileSystem will 
> simplify usage of GCS because the hadoop FileSystem ends up bringing in many 
> unshaded dependencies.
>  
> [1] 
> [https://github.com/GoogleCloudDataproc/hadoop-connectors|https://github.com/GoogleCloudDataproc/hadoop-connectors)]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] KarmaGYZ commented on pull request #15812: remove slotpoolImpl

2021-05-16 Thread GitBox


KarmaGYZ commented on pull request #15812:
URL: https://github.com/apache/flink/pull/15812#issuecomment-841927457


   Hi, @lqjack . We prefer to use `rebase` instead of `merge` here, which gives 
us a linear history.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15304: [FLINK-20731][connector] Introduce Pulsar Source

2021-05-16 Thread GitBox


flinkbot edited a comment on pull request #15304:
URL: https://github.com/apache/flink/pull/15304#issuecomment-803430086


   
   ## CI report:
   
   * d91de8832416553f9a72571f9209567c24adad01 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17995)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

2021-05-16 Thread GitBox


flinkbot edited a comment on pull request #15599:
URL: https://github.com/apache/flink/pull/15599#issuecomment-818947931


   
   ## CI report:
   
   * 2dc286c697fff7ad6adb68c2a0e6de40d1c20bfe Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17994)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15304: [FLINK-20731][connector] Introduce Pulsar Source

2021-05-16 Thread GitBox


flinkbot edited a comment on pull request #15304:
URL: https://github.com/apache/flink/pull/15304#issuecomment-803430086


   
   ## CI report:
   
   * d3660d4aad02129ce663ffcc96ca966b39703b21 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15112)
 
   * d91de8832416553f9a72571f9209567c24adad01 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17995)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15304: [FLINK-20731][connector] Introduce Pulsar Source

2021-05-16 Thread GitBox


flinkbot edited a comment on pull request #15304:
URL: https://github.com/apache/flink/pull/15304#issuecomment-803430086


   
   ## CI report:
   
   * d3660d4aad02129ce663ffcc96ca966b39703b21 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15112)
 
   * d91de8832416553f9a72571f9209567c24adad01 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] syhily commented on a change in pull request #15304: [FLINK-20731][connector] Introduce Pulsar Source

2021-05-16 Thread GitBox


syhily commented on a change in pull request #15304:
URL: https://github.com/apache/flink/pull/15304#discussion_r633138743



##
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/StopCondition.java
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.Map;
+
+import static 
org.apache.flink.connector.pulsar.source.StopCondition.StopResult.DONT_STOP;
+import static 
org.apache.flink.connector.pulsar.source.StopCondition.StopResult.STOP_AFTER;
+import static 
org.apache.flink.connector.pulsar.source.StopCondition.StopResult.STOP_BEFORE;
+
+/** An interface to control when to stop. */
+public interface StopCondition extends Serializable {
+StopResult shouldStop(AbstractPartition partition, Message message);
+
+/** Enum for stop condition. */
+enum StopResult {
+STOP_BEFORE,
+STOP_AFTER,
+DONT_STOP;
+}
+
+Comparator NON_BATCH_COMPARATOR =
+new Comparator() {
+final Comparator implComparator =
+Comparator.comparingLong(MessageIdImpl::getLedgerId)
+.thenComparingLong(MessageIdImpl::getEntryId)
+
.thenComparingInt(MessageIdImpl::getPartitionIndex);
+
+@Override
+public int compare(MessageId o1, MessageId o2) {
+return implComparator.compare((MessageIdImpl) o1, 
(MessageIdImpl) o2);
+}
+};
+
+default void init(AbstractPartition partition, Consumer consumer)
+throws PulsarClientException {}
+
+static StopCondition stopAtMessageId(MessageId id) {
+return (partition, message) -> hitMessageId(message, id) ? STOP_BEFORE 
: DONT_STOP;
+}
+
+static boolean hitMessageId(Message message, MessageId id) {
+return NON_BATCH_COMPARATOR.compare(message.getMessageId(), id) >= 0;
+}
+
+static StopCondition stopAfterMessageId(MessageId id) {
+return (partition, message) -> hitMessageId(message, id) ? STOP_AFTER 
: DONT_STOP;

Review comment:
   I think you are right, I would remove this enum class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] syhily commented on a change in pull request #15304: [FLINK-20731][connector] Introduce Pulsar Source

2021-05-16 Thread GitBox


syhily commented on a change in pull request #15304:
URL: https://github.com/apache/flink/pull/15304#discussion_r632034934



##
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
##
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.source.util.PulsarAdminUtils;
+
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The @builder class for {@link PulsarSource} to make it easier for the users 
to construct a {@link
+ * PulsarSource}.
+ */
+@PublicEvolving
+public class PulsarSourceBuilder {
+private static final Logger LOG = 
LoggerFactory.getLogger(PulsarSourceBuilder.class);
+// The subscriber specifies the partitions to subscribe to.
+private PulsarSubscriber subscriber;
+// Users can specify the starting / stopping offset initializer.
+private StartOffsetInitializer startOffsetInitializer = 
StartOffsetInitializer.earliest();
+private StopCondition stopCondition = StopCondition.never();
+// Boundedness
+private Boundedness boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+private MessageDeserializer messageDeserializer;
+private SplitSchedulingStrategy splitSchedulingStrategy;
+// The configurations.
+private Configuration configuration = new Configuration();
+
+private ClientConfigurationData clientConfigurationData = new 
ClientConfigurationData();
+private ConsumerConfigurationData consumerConfigurationData =
+new ConsumerConfigurationData<>();
+
+PulsarSourceBuilder() {
+
consumerConfigurationData.setSubscriptionMode(SubscriptionMode.NonDurable);
+
consumerConfigurationData.setSubscriptionType(SubscriptionType.Exclusive);
+consumerConfigurationData.setSubscriptionName("flink-" + 
UUID.randomUUID());
+}
+
+public PulsarSourceBuilder setTopics(
+SplitDivisionStrategy splitDivisionStrategy, String... topics) {
+TreeSet topicNames = Sets.newTreeSet();
+List collect = 
Arrays.stream(topics).collect(Collectors.toList());
+for (String topic : collect) {
+topicNames.add(topic);
+}
+consumerConfigurationData.setTopicNames(topicNames);
+return setSubscriber(
+PulsarSubscriber.getTopicListSubscriber(splitDivisionStrategy, 
topics));

Review comment:
   Yeah, I think this is an abstraction leak. Pulsar connector use a 
`TopicListSubscriber` like Kafka connector. But putting topics into consumer 
configuration is just use pulsar client self consuming ability. This should be 
changed and thanks for your detailed review.

##
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/util/CachedPulsarClient.java
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, 

[GitHub] [flink] flinkbot edited a comment on pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

2021-05-16 Thread GitBox


flinkbot edited a comment on pull request #15599:
URL: https://github.com/apache/flink/pull/15599#issuecomment-818947931


   
   ## CI report:
   
   * df17222b8b4e1cea268da7589e8cb808535d7dd8 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17990)
 
   * 2dc286c697fff7ad6adb68c2a0e6de40d1c20bfe Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17994)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

2021-05-16 Thread GitBox


flinkbot edited a comment on pull request #15599:
URL: https://github.com/apache/flink/pull/15599#issuecomment-818947931


   
   ## CI report:
   
   * df17222b8b4e1cea268da7589e8cb808535d7dd8 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17990)
 
   * 2dc286c697fff7ad6adb68c2a0e6de40d1c20bfe UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] galenwarren commented on pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

2021-05-16 Thread GitBox


galenwarren commented on pull request #15599:
URL: https://github.com/apache/flink/pull/15599#issuecomment-841868124


   Hi @xintongsong, I've rebased and pushed commits to address some of your 
feedback. I've noted the relevant commits in some open conversations above. 
More commits coming soon for the rest of the feedback ...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] galenwarren commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

2021-05-16 Thread GitBox


galenwarren commented on a change in pull request #15599:
URL: https://github.com/apache/flink/pull/15599#discussion_r633131032



##
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+private static final String SCHEME = "gs";
+
+private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+private static final String FLINK_SHADING_PREFIX = "";
+
+public static final ConfigOption WRITER_TEMPORARY_BUCKET_NAME =
+ConfigOptions.key("gs.writer.temporary.bucket.name")
+.stringType()
+
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+.withDescription(
+"This option sets the bucket name used by the 
recoverable writer to store temporary files. "
++ "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+public static final ConfigOption WRITER_TEMPORARY_OBJECT_PREFIX =
+ConfigOptions.key("gs.writer.temporary.object.prefix")
+.stringType()
+
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+.withDescription(
+"This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
++ "final object name to form the base name 
for temporary files.");

Review comment:
   Config for temporary-object prefix removed in 
[2dc286c](https://github.com/apache/flink/pull/15599/commits/2dc286c697fff7ad6adb68c2a0e6de40d1c20bfe).
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] galenwarren commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

2021-05-16 Thread GitBox


galenwarren commented on a change in pull request #15599:
URL: https://github.com/apache/flink/pull/15599#discussion_r633130895



##
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+private static final String SCHEME = "gs";
+
+private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+private static final String FLINK_SHADING_PREFIX = "";
+
+public static final ConfigOption WRITER_TEMPORARY_BUCKET_NAME =
+ConfigOptions.key("gs.writer.temporary.bucket.name")
+.stringType()
+
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+.withDescription(
+"This option sets the bucket name used by the 
recoverable writer to store temporary files. "
++ "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+public static final ConfigOption WRITER_TEMPORARY_OBJECT_PREFIX =
+ConfigOptions.key("gs.writer.temporary.object.prefix")
+.stringType()
+
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+.withDescription(
+"This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
++ "final object name to form the base name 
for temporary files.");
+
+public static final ConfigOption WRITER_CONTENT_TYPE =
+ConfigOptions.key("gs.writer.content.type")
+.stringType()
+
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CONTENT_TYPE)
+.withDescription(
+"This option sets the content type applied to 
files written by the recoverable writer.");
+
+public static final ConfigOption WRITER_CHUNK_SIZE =
+ConfigOptions.key("gs.writer.chunk.size")
+.intType()
+
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CHUNK_SIZE)
+.withDescription(
+"This option sets the chunk size for writes by the 
recoverable writer. This value is passed through to the underlying "
++ "Google WriteChannel; if zero, the 
default WriteChannel value is used.");

Review comment:
   Done in 
[2dc286c](https://github.com/apache/flink/pull/15599/commits/2dc286c697fff7ad6adb68c2a0e6de40d1c20bfe).
   

##
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * 

[GitHub] [flink] galenwarren commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

2021-05-16 Thread GitBox


galenwarren commented on a change in pull request #15599:
URL: https://github.com/apache/flink/pull/15599#discussion_r633130858



##
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+private static final String SCHEME = "gs";
+
+private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+private static final String FLINK_SHADING_PREFIX = "";
+
+public static final ConfigOption WRITER_TEMPORARY_BUCKET_NAME =
+ConfigOptions.key("gs.writer.temporary.bucket.name")
+.stringType()
+
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+.withDescription(
+"This option sets the bucket name used by the 
recoverable writer to store temporary files. "
++ "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+public static final ConfigOption WRITER_TEMPORARY_OBJECT_PREFIX =
+ConfigOptions.key("gs.writer.temporary.object.prefix")
+.stringType()
+
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+.withDescription(
+"This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
++ "final object name to form the base name 
for temporary files.");
+
+public static final ConfigOption WRITER_CONTENT_TYPE =
+ConfigOptions.key("gs.writer.content.type")
+.stringType()
+
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CONTENT_TYPE)
+.withDescription(
+"This option sets the content type applied to 
files written by the recoverable writer.");
+
+public static final ConfigOption WRITER_CHUNK_SIZE =
+ConfigOptions.key("gs.writer.chunk.size")
+.intType()
+
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CHUNK_SIZE)

Review comment:
   Done in 
[2dc286c](https://github.com/apache/flink/pull/15599/commits/2dc286c697fff7ad6adb68c2a0e6de40d1c20bfe).
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] galenwarren commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

2021-05-16 Thread GitBox


galenwarren commented on a change in pull request #15599:
URL: https://github.com/apache/flink/pull/15599#discussion_r633130840



##
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+private static final String SCHEME = "gs";
+
+private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+private static final String FLINK_SHADING_PREFIX = "";
+
+public static final ConfigOption WRITER_TEMPORARY_BUCKET_NAME =
+ConfigOptions.key("gs.writer.temporary.bucket.name")
+.stringType()
+
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+.withDescription(
+"This option sets the bucket name used by the 
recoverable writer to store temporary files. "
++ "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+public static final ConfigOption WRITER_TEMPORARY_OBJECT_PREFIX =
+ConfigOptions.key("gs.writer.temporary.object.prefix")
+.stringType()
+
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+.withDescription(
+"This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
++ "final object name to form the base name 
for temporary files.");
+
+public static final ConfigOption WRITER_CONTENT_TYPE =
+ConfigOptions.key("gs.writer.content.type")
+.stringType()
+
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CONTENT_TYPE)
+.withDescription(
+"This option sets the content type applied to 
files written by the recoverable writer.");

Review comment:
   Removed in 
[2dc286c](https://github.com/apache/flink/pull/15599/commits/2dc286c697fff7ad6adb68c2a0e6de40d1c20bfe).
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] galenwarren commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

2021-05-16 Thread GitBox


galenwarren commented on a change in pull request #15599:
URL: https://github.com/apache/flink/pull/15599#discussion_r633130814



##
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+private static final String SCHEME = "gs";
+
+private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+private static final String FLINK_SHADING_PREFIX = "";
+
+public static final ConfigOption WRITER_TEMPORARY_BUCKET_NAME =
+ConfigOptions.key("gs.writer.temporary.bucket.name")
+.stringType()
+
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)

Review comment:
   Done in 
[2dc286c](https://github.com/apache/flink/pull/15599/commits/2dc286c697fff7ad6adb68c2a0e6de40d1c20bfe).
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

2021-05-16 Thread GitBox


flinkbot edited a comment on pull request #15599:
URL: https://github.com/apache/flink/pull/15599#issuecomment-818947931


   
   ## CI report:
   
   * df17222b8b4e1cea268da7589e8cb808535d7dd8 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17990)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-21700) Allow to disable fetching Hadoop delegation token on Yarn

2021-05-16 Thread Matthias (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345774#comment-17345774
 ] 

Matthias commented on FLINK-21700:
--

master: 
[3a1b2e1|https://github.com/apache/flink/commit/3a1b2e142debb35f28407730288188e3944747c6]

> Allow to disable fetching Hadoop delegation token on Yarn
> -
>
> Key: FLINK-21700
> URL: https://issues.apache.org/jira/browse/FLINK-21700
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / YARN
>Reporter: Junfan Zhang
>Assignee: Junfan Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> h3. Why
> I want to support Flink Action on Oozie. 
>  As we know, Oozie will obtain HDFS/HBase delegation token before starting 
> Flink submitter cli.
>  Actually, Spark support disable fetching delegation token on Spark client, 
> [related Spark 
> doc|https://spark.apache.org/docs/latest/running-on-yarn.html#launching-your-application-with-apache-oozie].
>  
> So i think Flink should allow to disable fetching Hadoop delegation token on 
> Yarn.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >