[jira] [Commented] (FLINK-17188) Failed to download conda when running python tests

2020-04-18 Thread Robert Metzger (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086769#comment-17086769
 ] 

Robert Metzger commented on FLINK-17188:


Thanks a lot

> Failed to download conda when running python tests
> --
>
> Key: FLINK-17188
> URL: https://issues.apache.org/jira/browse/FLINK-17188
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Build System / Azure Pipelines
>Reporter: Dawid Wysakowicz
>Priority: Major
>  Labels: test-stability
> Fix For: 1.11.0
>
>
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=7549=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=14487301-07d2-5d56-5690-6dfab9ffd4d9
> This pipeline failed to download conda
> If this issue starts appearing more often we should come up with some 
> solution for those kinds of problems.
> {code}
> CondaHTTPError: HTTP 000 CONNECTION FAILED for url 
> 
> Elapsed: -
> An HTTP error occurred when trying to retrieve this URL.
> HTTP errors are often intermittent, and a simple retry will get you on your 
> way.
> conda install sphinx failed please try to exec the script again.  
>   if failed many times, you can try to exec in the form of sudo 
> ./lint-python.sh -f
> PYTHON exited with EXIT CODE: 1.
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-17188) Failed to download conda when running python tests

2020-04-18 Thread Robert Metzger (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086769#comment-17086769
 ] 

Robert Metzger edited comment on FLINK-17188 at 4/19/20, 5:58 AM:
--

Thanks a lot. I assigned you to the ticket.


was (Author: rmetzger):
Thanks a lot

> Failed to download conda when running python tests
> --
>
> Key: FLINK-17188
> URL: https://issues.apache.org/jira/browse/FLINK-17188
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Build System / Azure Pipelines
>Reporter: Dawid Wysakowicz
>Assignee: Huang Xingbo
>Priority: Major
>  Labels: test-stability
> Fix For: 1.11.0
>
>
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=7549=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=14487301-07d2-5d56-5690-6dfab9ffd4d9
> This pipeline failed to download conda
> If this issue starts appearing more often we should come up with some 
> solution for those kinds of problems.
> {code}
> CondaHTTPError: HTTP 000 CONNECTION FAILED for url 
> 
> Elapsed: -
> An HTTP error occurred when trying to retrieve this URL.
> HTTP errors are often intermittent, and a simple retry will get you on your 
> way.
> conda install sphinx failed please try to exec the script again.  
>   if failed many times, you can try to exec in the form of sudo 
> ./lint-python.sh -f
> PYTHON exited with EXIT CODE: 1.
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-17188) Failed to download conda when running python tests

2020-04-18 Thread Robert Metzger (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger reassigned FLINK-17188:
--

Assignee: Huang Xingbo

> Failed to download conda when running python tests
> --
>
> Key: FLINK-17188
> URL: https://issues.apache.org/jira/browse/FLINK-17188
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Build System / Azure Pipelines
>Reporter: Dawid Wysakowicz
>Assignee: Huang Xingbo
>Priority: Major
>  Labels: test-stability
> Fix For: 1.11.0
>
>
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=7549=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=14487301-07d2-5d56-5690-6dfab9ffd4d9
> This pipeline failed to download conda
> If this issue starts appearing more often we should come up with some 
> solution for those kinds of problems.
> {code}
> CondaHTTPError: HTTP 000 CONNECTION FAILED for url 
> 
> Elapsed: -
> An HTTP error occurred when trying to retrieve this URL.
> HTTP errors are often intermittent, and a simple retry will get you on your 
> way.
> conda install sphinx failed please try to exec the script again.  
>   if failed many times, you can try to exec in the form of sudo 
> ./lint-python.sh -f
> PYTHON exited with EXIT CODE: 1.
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15641) Support to start init container

2020-04-18 Thread ouyangwulin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086760#comment-17086760
 ] 

ouyangwulin edited comment on FLINK-15641 at 4/19/20, 5:38 AM:
---

[~fly_in_gis],in progressing. I will implement something in the doc before 
share it.


was (Author: ouyangwuli):
[~fly_in_gis],in progressing.

> Support to start init container
> ---
>
> Key: FLINK-15641
> URL: https://issues.apache.org/jira/browse/FLINK-15641
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: Yang Wang
>Priority: Major
>
> >> Why do we need init container?
> The init container could be used to prepare the use jars and dependencies. 
> Then we could always set the user image to Flink official image both for 
> standalone per-job on K8s or native K8s per-job. When the JobManager and 
> TaskManager container launched, the user jars will already exist there. I 
> think many users are running standalone per-job cluster in production by 
> using this way.
> The init container only works for K8s cluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15641) Support to start init container

2020-04-18 Thread ouyangwulin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086760#comment-17086760
 ] 

ouyangwulin commented on FLINK-15641:
-

[~fly_in_gis],in progressing.

> Support to start init container
> ---
>
> Key: FLINK-15641
> URL: https://issues.apache.org/jira/browse/FLINK-15641
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: Yang Wang
>Priority: Major
>
> >> Why do we need init container?
> The init container could be used to prepare the use jars and dependencies. 
> Then we could always set the user image to Flink official image both for 
> standalone per-job on K8s or native K8s per-job. When the JobManager and 
> TaskManager container launched, the user jars will already exist there. I 
> think many users are running standalone per-job cluster in production by 
> using this way.
> The init container only works for K8s cluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhengcanbin edited a comment on issue #11708: [FLINK-15816][k8s] Limit the value of kubernetes.cluster-id to have no more than 45 characters

2020-04-18 Thread GitBox
zhengcanbin edited a comment on issue #11708: [FLINK-15816][k8s] Limit the 
value of kubernetes.cluster-id to have no more than 45 characters
URL: https://github.com/apache/flink/pull/11708#issuecomment-616018548
 
 
   Thanks for your review @tillrohrmann! I have addressed your comments and 
look forward to your double-check.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11805: [FLINK-17068][python][tests] Ensure the permission of scripts set correctly before executing the Python tests

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11805: [FLINK-17068][python][tests] Ensure 
the permission of scripts set correctly before executing the Python tests
URL: https://github.com/apache/flink/pull/11805#issuecomment-616030530
 
 
   
   ## CI report:
   
   * 2670def9536876df7899cf712e329b0c6f70f692 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160880714) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7720)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #11805: [FLINK-17068][python][tests] Ensure the permission of scripts set correctly before executing the Python tests

2020-04-18 Thread GitBox
flinkbot commented on issue #11805: [FLINK-17068][python][tests] Ensure the 
permission of scripts set correctly before executing the Python tests
URL: https://github.com/apache/flink/pull/11805#issuecomment-616030530
 
 
   
   ## CI report:
   
   * 2670def9536876df7899cf712e329b0c6f70f692 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-17068) ERROR at teardown of TableConfigTests.test_get_set_decimal_context

2020-04-18 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086740#comment-17086740
 ] 

Dian Fu edited comment on FLINK-17068 at 4/19/20, 4:37 AM:
---

Just make it clear: this issue is because the permission of the Python scripts 
are not correct. I can reproduce the same error message after changing the 
permission of `pyflink-gateway-server.sh` to a wrong value `444` in my local 
environment. Have submitted a PR which ensures the permissions of the Python 
scripts are correct before executing the Python tests.

Regarding to the `No space left on device` error, it's not related this problem 
and it currently only occurs in the private azure pipeline AFAIK. I have 
created a ticket FLINK-17220 to track it.


was (Author: dian.fu):
Just make it clear: this issue is because the permission of the Python scripts 
are not correct. I can reproduce the same error message after changing the 
permission of `pyflink-gateway-server.sh` to a wrong value `444`. Have 
submitted a PR which ensures the permission of the Python scripts are correct 
before executing the Python tests.

Regarding to the `No space left on device` error, it's not related this problem 
and it currently only occurs in the private azure pipeline. I have created a 
ticket FLINK-17220 to track it.

> ERROR at teardown of TableConfigTests.test_get_set_decimal_context
> --
>
> Key: FLINK-17068
> URL: https://issues.apache.org/jira/browse/FLINK-17068
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Tests
>Affects Versions: 1.11.0
>Reporter: Robert Metzger
>Assignee: Dian Fu
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> CI run: 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=7243=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=14487301-07d2-5d56-5690-6dfab9ffd4d9
> {code}
> 2020-04-09T00:34:15.9084299Z  ERRORS 
> 
> 2020-04-09T00:34:15.9085728Z __ ERROR at teardown of 
> TableConfigTests.test_get_set_decimal_context __
> 2020-04-09T00:34:15.9086216Z 
> 2020-04-09T00:34:15.9086725Z self =  object at 0x7f8d978989b0>
> 2020-04-09T00:34:15.9087144Z 
> 2020-04-09T00:34:15.9087457Z def __enter__(self):
> 2020-04-09T00:34:15.9087787Z try:
> 2020-04-09T00:34:15.9091929Z >   return next(self.gen)
> 2020-04-09T00:34:15.9092634Z E   OSError: [Errno 9] Bad file 
> descriptor
> 2020-04-09T00:34:15.9092863Z 
> 2020-04-09T00:34:15.9093134Z 
> dev/.conda/envs/3.5/lib/python3.5/contextlib.py:59: OSError
> 2020-04-09T00:34:15.9093548Z __ ERROR at setup of 
> TableConfigTests.test_get_set_idle_state_retention_time ___
> 2020-04-09T00:34:15.9093803Z 
> 2020-04-09T00:34:15.9094082Z self =  object at 0x7f8d9c3f3da0>
> 2020-04-09T00:34:15.9094313Z 
> 2020-04-09T00:34:15.9094502Z def __enter__(self):
> 2020-04-09T00:34:15.9094862Z try:
> 2020-04-09T00:34:15.9095088Z >   return next(self.gen)
> 2020-04-09T00:34:15.9095707Z E   OSError: [Errno 9] Bad file 
> descriptor
> 2020-04-09T00:34:15.9095913Z 
> 2020-04-09T00:34:15.9096203Z 
> dev/.conda/envs/3.5/lib/python3.5/contextlib.py:59: OSError
> 2020-04-09T00:34:15.9096818Z _ ERROR at teardown of 
> TableConfigTests.test_get_set_idle_state_retention_time _
> 2020-04-09T00:34:15.9100686Z 
> 2020-04-09T00:34:15.9101687Z self =  object at 0x7f8d978d83c8>
> 2020-04-09T00:34:15.9102005Z 
> 2020-04-09T00:34:15.9102193Z def __enter__(self):
> 2020-04-09T00:34:15.9102415Z try:
> 2020-04-09T00:34:15.9102741Z >   return next(self.gen)
> 2020-04-09T00:34:15.9103144Z E   OSError: [Errno 9] Bad file 
> descriptor
> 2020-04-09T00:34:15.9103367Z 
> 2020-04-09T00:34:15.9103786Z 
> dev/.conda/envs/3.5/lib/python3.5/contextlib.py:59: OSError
> 2020-04-09T00:34:15.9104185Z  ERROR at setup of 
> TableConfigTests.test_get_set_local_timezone 
> 2020-04-09T00:34:15.9104999Z 
> 2020-04-09T00:34:15.9105287Z self =  object at 0x7f8d979345f8>
> 2020-04-09T00:34:15.9105531Z 
> 2020-04-09T00:34:15.9105707Z def __enter__(self):
> 2020-04-09T00:34:15.9105924Z try:
> 2020-04-09T00:34:15.9106138Z >   return next(self.gen)
> 2020-04-09T00:34:15.9106555Z E   OSError: [Errno 9] Bad file 
> descriptor
> 2020-04-09T00:34:15.9106858Z 
> 2020-04-09T00:34:15.9107159Z 
> dev/.conda/envs/3.5/lib/python3.5/contextlib.py:59: OSError
> 2020-04-09T00:34:15.9107675Z __ ERROR at teardown of 
> TableConfigTests.test_get_set_local_timezone ___
> 2020-04-09T00:34:15.9107983Z 
> 2020-04-09T00:34:15.9108350Z self =  object at 

[jira] [Commented] (FLINK-17068) ERROR at teardown of TableConfigTests.test_get_set_decimal_context

2020-04-18 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086740#comment-17086740
 ] 

Dian Fu commented on FLINK-17068:
-

Just make it clear: this issue is because the permission of the Python scripts 
are not correct. I can reproduce the same error message after changing the 
permission of `pyflink-gateway-server.sh` to a wrong value `444`. Have 
submitted a PR which ensures the permission of the Python scripts are correct 
before executing the Python tests.

Regarding to the `No space left on device` error, it's not related this problem 
and it currently only occurs in the private azure pipeline. I have created a 
ticket FLINK-17220 to track it.

> ERROR at teardown of TableConfigTests.test_get_set_decimal_context
> --
>
> Key: FLINK-17068
> URL: https://issues.apache.org/jira/browse/FLINK-17068
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Tests
>Affects Versions: 1.11.0
>Reporter: Robert Metzger
>Assignee: Dian Fu
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> CI run: 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=7243=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=14487301-07d2-5d56-5690-6dfab9ffd4d9
> {code}
> 2020-04-09T00:34:15.9084299Z  ERRORS 
> 
> 2020-04-09T00:34:15.9085728Z __ ERROR at teardown of 
> TableConfigTests.test_get_set_decimal_context __
> 2020-04-09T00:34:15.9086216Z 
> 2020-04-09T00:34:15.9086725Z self =  object at 0x7f8d978989b0>
> 2020-04-09T00:34:15.9087144Z 
> 2020-04-09T00:34:15.9087457Z def __enter__(self):
> 2020-04-09T00:34:15.9087787Z try:
> 2020-04-09T00:34:15.9091929Z >   return next(self.gen)
> 2020-04-09T00:34:15.9092634Z E   OSError: [Errno 9] Bad file 
> descriptor
> 2020-04-09T00:34:15.9092863Z 
> 2020-04-09T00:34:15.9093134Z 
> dev/.conda/envs/3.5/lib/python3.5/contextlib.py:59: OSError
> 2020-04-09T00:34:15.9093548Z __ ERROR at setup of 
> TableConfigTests.test_get_set_idle_state_retention_time ___
> 2020-04-09T00:34:15.9093803Z 
> 2020-04-09T00:34:15.9094082Z self =  object at 0x7f8d9c3f3da0>
> 2020-04-09T00:34:15.9094313Z 
> 2020-04-09T00:34:15.9094502Z def __enter__(self):
> 2020-04-09T00:34:15.9094862Z try:
> 2020-04-09T00:34:15.9095088Z >   return next(self.gen)
> 2020-04-09T00:34:15.9095707Z E   OSError: [Errno 9] Bad file 
> descriptor
> 2020-04-09T00:34:15.9095913Z 
> 2020-04-09T00:34:15.9096203Z 
> dev/.conda/envs/3.5/lib/python3.5/contextlib.py:59: OSError
> 2020-04-09T00:34:15.9096818Z _ ERROR at teardown of 
> TableConfigTests.test_get_set_idle_state_retention_time _
> 2020-04-09T00:34:15.9100686Z 
> 2020-04-09T00:34:15.9101687Z self =  object at 0x7f8d978d83c8>
> 2020-04-09T00:34:15.9102005Z 
> 2020-04-09T00:34:15.9102193Z def __enter__(self):
> 2020-04-09T00:34:15.9102415Z try:
> 2020-04-09T00:34:15.9102741Z >   return next(self.gen)
> 2020-04-09T00:34:15.9103144Z E   OSError: [Errno 9] Bad file 
> descriptor
> 2020-04-09T00:34:15.9103367Z 
> 2020-04-09T00:34:15.9103786Z 
> dev/.conda/envs/3.5/lib/python3.5/contextlib.py:59: OSError
> 2020-04-09T00:34:15.9104185Z  ERROR at setup of 
> TableConfigTests.test_get_set_local_timezone 
> 2020-04-09T00:34:15.9104999Z 
> 2020-04-09T00:34:15.9105287Z self =  object at 0x7f8d979345f8>
> 2020-04-09T00:34:15.9105531Z 
> 2020-04-09T00:34:15.9105707Z def __enter__(self):
> 2020-04-09T00:34:15.9105924Z try:
> 2020-04-09T00:34:15.9106138Z >   return next(self.gen)
> 2020-04-09T00:34:15.9106555Z E   OSError: [Errno 9] Bad file 
> descriptor
> 2020-04-09T00:34:15.9106858Z 
> 2020-04-09T00:34:15.9107159Z 
> dev/.conda/envs/3.5/lib/python3.5/contextlib.py:59: OSError
> 2020-04-09T00:34:15.9107675Z __ ERROR at teardown of 
> TableConfigTests.test_get_set_local_timezone ___
> 2020-04-09T00:34:15.9107983Z 
> 2020-04-09T00:34:15.9108350Z self =  object at 0x7f8d981f8240>
> 2020-04-09T00:34:15.9108699Z 
> 2020-04-09T00:34:15.9108983Z def __enter__(self):
> 2020-04-09T00:34:15.9109311Z try:
> 2020-04-09T00:34:15.9109566Z >   return next(self.gen)
> 2020-04-09T00:34:15.9109872Z E   OSError: [Errno 9] Bad file 
> descriptor
> 2020-04-09T00:34:15.9110082Z 
> 2020-04-09T00:34:15.9110349Z 
> dev/.conda/envs/3.5/lib/python3.5/contextlib.py:59: OSError
> 2020-04-09T00:34:15.9111098Z __ ERROR at setup of 
> TableConfigTests.test_get_set_max_generated_code_length ___
> 2020-04-09T00:34:15.9111479Z 
> 2020-04-09T00:34:15.9111740Z self =  object at 0x7f8d9c3380f0>
> 

[GitHub] [flink] flinkbot commented on issue #11805: [FLINK-17068][python][tests] Ensure the permission of scripts set correctly before executing the Python tests

2020-04-18 Thread GitBox
flinkbot commented on issue #11805: [FLINK-17068][python][tests] Ensure the 
permission of scripts set correctly before executing the Python tests
URL: https://github.com/apache/flink/pull/11805#issuecomment-616028131
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 2670def9536876df7899cf712e329b0c6f70f692 (Sun Apr 19 
04:32:11 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-17068) ERROR at teardown of TableConfigTests.test_get_set_decimal_context

2020-04-18 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-17068:
---

Assignee: Dian Fu

> ERROR at teardown of TableConfigTests.test_get_set_decimal_context
> --
>
> Key: FLINK-17068
> URL: https://issues.apache.org/jira/browse/FLINK-17068
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Tests
>Affects Versions: 1.11.0
>Reporter: Robert Metzger
>Assignee: Dian Fu
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> CI run: 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=7243=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=14487301-07d2-5d56-5690-6dfab9ffd4d9
> {code}
> 2020-04-09T00:34:15.9084299Z  ERRORS 
> 
> 2020-04-09T00:34:15.9085728Z __ ERROR at teardown of 
> TableConfigTests.test_get_set_decimal_context __
> 2020-04-09T00:34:15.9086216Z 
> 2020-04-09T00:34:15.9086725Z self =  object at 0x7f8d978989b0>
> 2020-04-09T00:34:15.9087144Z 
> 2020-04-09T00:34:15.9087457Z def __enter__(self):
> 2020-04-09T00:34:15.9087787Z try:
> 2020-04-09T00:34:15.9091929Z >   return next(self.gen)
> 2020-04-09T00:34:15.9092634Z E   OSError: [Errno 9] Bad file 
> descriptor
> 2020-04-09T00:34:15.9092863Z 
> 2020-04-09T00:34:15.9093134Z 
> dev/.conda/envs/3.5/lib/python3.5/contextlib.py:59: OSError
> 2020-04-09T00:34:15.9093548Z __ ERROR at setup of 
> TableConfigTests.test_get_set_idle_state_retention_time ___
> 2020-04-09T00:34:15.9093803Z 
> 2020-04-09T00:34:15.9094082Z self =  object at 0x7f8d9c3f3da0>
> 2020-04-09T00:34:15.9094313Z 
> 2020-04-09T00:34:15.9094502Z def __enter__(self):
> 2020-04-09T00:34:15.9094862Z try:
> 2020-04-09T00:34:15.9095088Z >   return next(self.gen)
> 2020-04-09T00:34:15.9095707Z E   OSError: [Errno 9] Bad file 
> descriptor
> 2020-04-09T00:34:15.9095913Z 
> 2020-04-09T00:34:15.9096203Z 
> dev/.conda/envs/3.5/lib/python3.5/contextlib.py:59: OSError
> 2020-04-09T00:34:15.9096818Z _ ERROR at teardown of 
> TableConfigTests.test_get_set_idle_state_retention_time _
> 2020-04-09T00:34:15.9100686Z 
> 2020-04-09T00:34:15.9101687Z self =  object at 0x7f8d978d83c8>
> 2020-04-09T00:34:15.9102005Z 
> 2020-04-09T00:34:15.9102193Z def __enter__(self):
> 2020-04-09T00:34:15.9102415Z try:
> 2020-04-09T00:34:15.9102741Z >   return next(self.gen)
> 2020-04-09T00:34:15.9103144Z E   OSError: [Errno 9] Bad file 
> descriptor
> 2020-04-09T00:34:15.9103367Z 
> 2020-04-09T00:34:15.9103786Z 
> dev/.conda/envs/3.5/lib/python3.5/contextlib.py:59: OSError
> 2020-04-09T00:34:15.9104185Z  ERROR at setup of 
> TableConfigTests.test_get_set_local_timezone 
> 2020-04-09T00:34:15.9104999Z 
> 2020-04-09T00:34:15.9105287Z self =  object at 0x7f8d979345f8>
> 2020-04-09T00:34:15.9105531Z 
> 2020-04-09T00:34:15.9105707Z def __enter__(self):
> 2020-04-09T00:34:15.9105924Z try:
> 2020-04-09T00:34:15.9106138Z >   return next(self.gen)
> 2020-04-09T00:34:15.9106555Z E   OSError: [Errno 9] Bad file 
> descriptor
> 2020-04-09T00:34:15.9106858Z 
> 2020-04-09T00:34:15.9107159Z 
> dev/.conda/envs/3.5/lib/python3.5/contextlib.py:59: OSError
> 2020-04-09T00:34:15.9107675Z __ ERROR at teardown of 
> TableConfigTests.test_get_set_local_timezone ___
> 2020-04-09T00:34:15.9107983Z 
> 2020-04-09T00:34:15.9108350Z self =  object at 0x7f8d981f8240>
> 2020-04-09T00:34:15.9108699Z 
> 2020-04-09T00:34:15.9108983Z def __enter__(self):
> 2020-04-09T00:34:15.9109311Z try:
> 2020-04-09T00:34:15.9109566Z >   return next(self.gen)
> 2020-04-09T00:34:15.9109872Z E   OSError: [Errno 9] Bad file 
> descriptor
> 2020-04-09T00:34:15.9110082Z 
> 2020-04-09T00:34:15.9110349Z 
> dev/.conda/envs/3.5/lib/python3.5/contextlib.py:59: OSError
> 2020-04-09T00:34:15.9111098Z __ ERROR at setup of 
> TableConfigTests.test_get_set_max_generated_code_length ___
> 2020-04-09T00:34:15.9111479Z 
> 2020-04-09T00:34:15.9111740Z self =  object at 0x7f8d9c3380f0>
> 2020-04-09T00:34:15.9112010Z 
> 2020-04-09T00:34:15.9112297Z def __enter__(self):
> 2020-04-09T00:34:15.9112571Z try:
> 2020-04-09T00:34:15.9112803Z >   return next(self.gen)
> 2020-04-09T00:34:15.9113114Z E   OSError: [Errno 9] Bad file 
> descriptor
> 2020-04-09T00:34:15.9113353Z 
> 2020-04-09T00:34:15.9113737Z 
> dev/.conda/envs/3.5/lib/python3.5/contextlib.py:59: OSError
> 2020-04-09T00:34:15.9114282Z _ ERROR at teardown of 
> TableConfigTests.test_get_set_max_generated_code_length _
> 2020-04-09T00:34:15.9114652Z 
> 

[jira] [Updated] (FLINK-17068) ERROR at teardown of TableConfigTests.test_get_set_decimal_context

2020-04-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-17068:
---
Labels: pull-request-available test-stability  (was: test-stability)

> ERROR at teardown of TableConfigTests.test_get_set_decimal_context
> --
>
> Key: FLINK-17068
> URL: https://issues.apache.org/jira/browse/FLINK-17068
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Tests
>Affects Versions: 1.11.0
>Reporter: Robert Metzger
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
>
> CI run: 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=7243=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=14487301-07d2-5d56-5690-6dfab9ffd4d9
> {code}
> 2020-04-09T00:34:15.9084299Z  ERRORS 
> 
> 2020-04-09T00:34:15.9085728Z __ ERROR at teardown of 
> TableConfigTests.test_get_set_decimal_context __
> 2020-04-09T00:34:15.9086216Z 
> 2020-04-09T00:34:15.9086725Z self =  object at 0x7f8d978989b0>
> 2020-04-09T00:34:15.9087144Z 
> 2020-04-09T00:34:15.9087457Z def __enter__(self):
> 2020-04-09T00:34:15.9087787Z try:
> 2020-04-09T00:34:15.9091929Z >   return next(self.gen)
> 2020-04-09T00:34:15.9092634Z E   OSError: [Errno 9] Bad file 
> descriptor
> 2020-04-09T00:34:15.9092863Z 
> 2020-04-09T00:34:15.9093134Z 
> dev/.conda/envs/3.5/lib/python3.5/contextlib.py:59: OSError
> 2020-04-09T00:34:15.9093548Z __ ERROR at setup of 
> TableConfigTests.test_get_set_idle_state_retention_time ___
> 2020-04-09T00:34:15.9093803Z 
> 2020-04-09T00:34:15.9094082Z self =  object at 0x7f8d9c3f3da0>
> 2020-04-09T00:34:15.9094313Z 
> 2020-04-09T00:34:15.9094502Z def __enter__(self):
> 2020-04-09T00:34:15.9094862Z try:
> 2020-04-09T00:34:15.9095088Z >   return next(self.gen)
> 2020-04-09T00:34:15.9095707Z E   OSError: [Errno 9] Bad file 
> descriptor
> 2020-04-09T00:34:15.9095913Z 
> 2020-04-09T00:34:15.9096203Z 
> dev/.conda/envs/3.5/lib/python3.5/contextlib.py:59: OSError
> 2020-04-09T00:34:15.9096818Z _ ERROR at teardown of 
> TableConfigTests.test_get_set_idle_state_retention_time _
> 2020-04-09T00:34:15.9100686Z 
> 2020-04-09T00:34:15.9101687Z self =  object at 0x7f8d978d83c8>
> 2020-04-09T00:34:15.9102005Z 
> 2020-04-09T00:34:15.9102193Z def __enter__(self):
> 2020-04-09T00:34:15.9102415Z try:
> 2020-04-09T00:34:15.9102741Z >   return next(self.gen)
> 2020-04-09T00:34:15.9103144Z E   OSError: [Errno 9] Bad file 
> descriptor
> 2020-04-09T00:34:15.9103367Z 
> 2020-04-09T00:34:15.9103786Z 
> dev/.conda/envs/3.5/lib/python3.5/contextlib.py:59: OSError
> 2020-04-09T00:34:15.9104185Z  ERROR at setup of 
> TableConfigTests.test_get_set_local_timezone 
> 2020-04-09T00:34:15.9104999Z 
> 2020-04-09T00:34:15.9105287Z self =  object at 0x7f8d979345f8>
> 2020-04-09T00:34:15.9105531Z 
> 2020-04-09T00:34:15.9105707Z def __enter__(self):
> 2020-04-09T00:34:15.9105924Z try:
> 2020-04-09T00:34:15.9106138Z >   return next(self.gen)
> 2020-04-09T00:34:15.9106555Z E   OSError: [Errno 9] Bad file 
> descriptor
> 2020-04-09T00:34:15.9106858Z 
> 2020-04-09T00:34:15.9107159Z 
> dev/.conda/envs/3.5/lib/python3.5/contextlib.py:59: OSError
> 2020-04-09T00:34:15.9107675Z __ ERROR at teardown of 
> TableConfigTests.test_get_set_local_timezone ___
> 2020-04-09T00:34:15.9107983Z 
> 2020-04-09T00:34:15.9108350Z self =  object at 0x7f8d981f8240>
> 2020-04-09T00:34:15.9108699Z 
> 2020-04-09T00:34:15.9108983Z def __enter__(self):
> 2020-04-09T00:34:15.9109311Z try:
> 2020-04-09T00:34:15.9109566Z >   return next(self.gen)
> 2020-04-09T00:34:15.9109872Z E   OSError: [Errno 9] Bad file 
> descriptor
> 2020-04-09T00:34:15.9110082Z 
> 2020-04-09T00:34:15.9110349Z 
> dev/.conda/envs/3.5/lib/python3.5/contextlib.py:59: OSError
> 2020-04-09T00:34:15.9111098Z __ ERROR at setup of 
> TableConfigTests.test_get_set_max_generated_code_length ___
> 2020-04-09T00:34:15.9111479Z 
> 2020-04-09T00:34:15.9111740Z self =  object at 0x7f8d9c3380f0>
> 2020-04-09T00:34:15.9112010Z 
> 2020-04-09T00:34:15.9112297Z def __enter__(self):
> 2020-04-09T00:34:15.9112571Z try:
> 2020-04-09T00:34:15.9112803Z >   return next(self.gen)
> 2020-04-09T00:34:15.9113114Z E   OSError: [Errno 9] Bad file 
> descriptor
> 2020-04-09T00:34:15.9113353Z 
> 2020-04-09T00:34:15.9113737Z 
> dev/.conda/envs/3.5/lib/python3.5/contextlib.py:59: OSError
> 2020-04-09T00:34:15.9114282Z _ ERROR at teardown of 
> TableConfigTests.test_get_set_max_generated_code_length _
> 2020-04-09T00:34:15.9114652Z 
> 2020-04-09T00:34:15.9114929Z 

[GitHub] [flink] dianfu opened a new pull request #11805: [FLINK-17068][python][tests] Ensure the permission of scripts set correctly before executing the Python tests

2020-04-18 Thread GitBox
dianfu opened a new pull request #11805: [FLINK-17068][python][tests] Ensure 
the permission of scripts set correctly before executing the Python tests
URL: https://github.com/apache/flink/pull/11805
 
 
   
   ## What is the purpose of the change
   
   *This pull request add the executable permission for the Python scripts 
before executing the Python tests.*
   
   ## Brief change log
   
 - *Ensure the permission of scripts set correctly in `dev/lint-python.sh`*
   
   ## Verifying this change
   
   Verify manually in my local environment.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-17234) Show more error messages in taskmanager's log

2020-04-18 Thread Kurt Young (Jira)
Kurt Young created FLINK-17234:
--

 Summary: Show more error messages in taskmanager's log
 Key: FLINK-17234
 URL: https://issues.apache.org/jira/browse/FLINK-17234
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Runtime 
/ Task
Reporter: Kurt Young
 Fix For: 1.11.0


I created a csv table in sql client and trying to view the table content by 
executing a simple SELECT * query. 

First I got the error message printed in sql cli:
{code:java}
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: can't switch state from terminal state READING 
to CLOSED{code}
And then I open the TM's log to fin more information about what went wrong. The 
only information I got from log file is similar with sql cli:
{code:java}
2020-04-19 11:50:28,630 WARN  org.apache.flink.runtime.taskmanager.Task         
           [] - CsvTableSource(read fields: t, author) -> 
SourceConversion(table=[default_catalog.default_database.contri, source: 
[CsvTableSource(read fields: t, author)]], fields=[t, author]) -> 
SinkConversionToRow -> Sink: Unnamed (1/1) (ed397f0f69e8f48b320c568f91a5976e) 
switched from RUNNING to FAILED.2020-04-19 11:50:28,630 WARN  
org.apache.flink.runtime.taskmanager.Task                    [] - 
CsvTableSource(read fields: t, author) -> 
SourceConversion(table=[default_catalog.default_database.contri, source: 
[CsvTableSource(read fields: t, author)]], fields=[t, author]) -> 
SinkConversionToRow -> Sink: Unnamed (1/1) (ed397f0f69e8f48b320c568f91a5976e) 
switched from RUNNING to FAILED.java.lang.IllegalStateException: can't switch 
state from terminal state READING to CLOSED at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:217) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.switchState(ContinuousFileReaderOperator.java:366)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:213)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.run(StreamTaskActionExecutor.java:42)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:276)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:205)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:196)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:490)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:718) 
[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:542) 
[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]{code}
 

Finally, it turns out I specified wrong column type of the csv table, and 
something should have go wrong when reading the data file, but I can't get any 
useful information from log file. 

I'm not sure whether the root cause is the error message never thrown by csv 
parser or got eat up by mailbox model yet. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17234) Show more error messages in taskmanager's log

2020-04-18 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-17234:
---
Description: 
I created a csv table in sql client and trying to view the table content by 
executing a simple SELECT * query. 

First I got the error message printed in sql cli:
{code:java}
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: can't switch state from terminal state READING 
to CLOSED{code}
And then I open the TM's log to find more information about what went wrong. 
The only information I got from log file is similar with sql cli:
{code:java}
2020-04-19 11:50:28,630 WARN  org.apache.flink.runtime.taskmanager.Task         
           [] - CsvTableSource(read fields: t, author) -> 
SourceConversion(table=[default_catalog.default_database.contri, source: 
[CsvTableSource(read fields: t, author)]], fields=[t, author]) -> 
SinkConversionToRow -> Sink: Unnamed (1/1) (ed397f0f69e8f48b320c568f91a5976e) 
switched from RUNNING to FAILED.2020-04-19 11:50:28,630 WARN  
org.apache.flink.runtime.taskmanager.Task                    [] - 
CsvTableSource(read fields: t, author) -> 
SourceConversion(table=[default_catalog.default_database.contri, source: 
[CsvTableSource(read fields: t, author)]], fields=[t, author]) -> 
SinkConversionToRow -> Sink: Unnamed (1/1) (ed397f0f69e8f48b320c568f91a5976e) 
switched from RUNNING to FAILED.java.lang.IllegalStateException: can't switch 
state from terminal state READING to CLOSED at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:217) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.switchState(ContinuousFileReaderOperator.java:366)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:213)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.run(StreamTaskActionExecutor.java:42)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:276)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:205)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:196)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:490)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:718) 
[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:542) 
[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]{code}
 

Finally, it turns out I specified wrong column type of the csv table, and 
something should have go wrong when reading the data file, but I can't get any 
useful information from log file. 

I'm not sure whether the root cause is the error message never thrown by csv 
parser or got eat up by mailbox model yet. 

  was:
I created a csv table in sql client and trying to view the table content by 
executing a simple SELECT * query. 

First I got the error message printed in sql cli:
{code:java}
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: can't switch state from terminal state READING 
to CLOSED{code}
And then I open the TM's log to fin more information about what went wrong. The 
only information I got from log file is similar with sql cli:
{code:java}
2020-04-19 11:50:28,630 WARN  org.apache.flink.runtime.taskmanager.Task         
           [] - CsvTableSource(read fields: t, author) -> 
SourceConversion(table=[default_catalog.default_database.contri, source: 
[CsvTableSource(read fields: t, author)]], fields=[t, author]) -> 
SinkConversionToRow -> Sink: Unnamed (1/1) (ed397f0f69e8f48b320c568f91a5976e) 
switched from RUNNING to FAILED.2020-04-19 11:50:28,630 WARN  
org.apache.flink.runtime.taskmanager.Task                    [] - 
CsvTableSource(read fields: t, author) -> 
SourceConversion(table=[default_catalog.default_database.contri, source: 
[CsvTableSource(read fields: t, author)]], fields=[t, author]) -> 
SinkConversionToRow -> Sink: Unnamed (1/1) (ed397f0f69e8f48b320c568f91a5976e) 
switched from RUNNING to 

[jira] [Commented] (FLINK-17196) Improve the implementation of Fabric8FlinkKubeClient#getRestEndpoint

2020-04-18 Thread Canbin Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086732#comment-17086732
 ] 

Canbin Zheng commented on FLINK-17196:
--

Thanks, [~fly_in_gis]! They are already in the implementation plan.
{quote}When the creation of load balancer timeouts, we need to show up some 
logs for users to set the service exposed type to {{NodePort}} or check why the 
LB could not be allocated.
{quote}
When we try to retrieve the Endpoint, there is a possibility that the LB or 
NodePort Service is unready that leads to timed out, in that case, we should 
throw an Exception.

> Improve the implementation of Fabric8FlinkKubeClient#getRestEndpoint
> 
>
> Key: FLINK-17196
> URL: https://issues.apache.org/jira/browse/FLINK-17196
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes
>Affects Versions: 1.10.0, 1.10.1
>Reporter: Canbin Zheng
>Priority: Major
> Fix For: 1.11.0
>
>
> Currently there are some bugs in the 
> {{Fabric8FlinkKubeClient#getRestEndpoint}} and serveral implicit 
> fallback/toleration behaviors when retrieving the Endpoint.
> This ticket proposes to fix the bugs and improve the implementation by 
> deprecating some implicit fallback/toleration behaviors.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11786: [FLINK-17208][network] Reduce redundant data available notifications of PipelinedSubpartition

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11786: [FLINK-17208][network] Reduce 
redundant data available notifications of PipelinedSubpartition
URL: https://github.com/apache/flink/pull/11786#issuecomment-615087816
 
 
   
   ## CI report:
   
   * 66188949ff1646c298d32df7fe45c6745f771bc1 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160850008) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7714)
 
   * ab6e2e33d46724a5694a66b13bba1d8a6fdbbdc1 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160877666) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7719)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhengcanbin commented on issue #11708: [FLINK-15816][k8s] Limit the value of kubernetes.cluster-id to have no more than 45 characters

2020-04-18 Thread GitBox
zhengcanbin commented on issue #11708: [FLINK-15816][k8s] Limit the value of 
kubernetes.cluster-id to have no more than 45 characters
URL: https://github.com/apache/flink/pull/11708#issuecomment-616018548
 
 
   Thanks for your comments @tillrohrmann! I have addressed your comments and 
look forward to your double-check.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11786: [FLINK-17208][network] Reduce redundant data available notifications of PipelinedSubpartition

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11786: [FLINK-17208][network] Reduce 
redundant data available notifications of PipelinedSubpartition
URL: https://github.com/apache/flink/pull/11786#issuecomment-615087816
 
 
   
   ## CI report:
   
   * 66188949ff1646c298d32df7fe45c6745f771bc1 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160850008) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7714)
 
   * ab6e2e33d46724a5694a66b13bba1d8a6fdbbdc1 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-17226) Remove Prometheus relocations

2020-04-18 Thread molsion (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086729#comment-17086729
 ] 

molsion commented on FLINK-17226:
-

[~chesnay] I don't quite understand the question, can you make it clear?thx

> Remove Prometheus relocations
> -
>
> Key: FLINK-17226
> URL: https://issues.apache.org/jira/browse/FLINK-17226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics
>Reporter: Chesnay Schepler
>Priority: Major
> Fix For: 1.11.0
>
>
> Now that we load the Prometheus reporters as plugins we should remove the 
> shade-plugin configuration/relocations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11770: [FLINK-17014][runtime] Implement PipelinedRegionSchedulingStrategy

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11770: [FLINK-17014][runtime] Implement 
PipelinedRegionSchedulingStrategy
URL: https://github.com/apache/flink/pull/11770#issuecomment-614523163
 
 
   
   ## CI report:
   
   * d8568c4f9d43b9cd0b9cb47e6f8a9ddd8a57ec1c Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160873750) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7718)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on issue #11700: [FLINK-16366] [table] Introduce executeSql method in TableEnvironment and support many statements

2020-04-18 Thread GitBox
KurtYoung commented on issue #11700: [FLINK-16366] [table] Introduce executeSql 
method in TableEnvironment and support many statements
URL: https://github.com/apache/flink/pull/11700#issuecomment-616013579
 
 
   There is an e2e tests failure: "Dependency shading of table modules test"


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-17229) Add a flink-sql-client-sdk module

2020-04-18 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086724#comment-17086724
 ] 

godfrey he commented on FLINK-17229:


[~molsion] if you want to submit jobs and access the cluster with programming 
style, you can use TableEnvironment and JobClient after 
[FLIP-84|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878]
 finished.  If you want to submit jobs and access the cluster without 
programming style (e.g. shell) or does not want to depend on flink jar, you can 
use flink-sql-gateway.

> Add a flink-sql-client-sdk module 
> --
>
> Key: FLINK-17229
> URL: https://issues.apache.org/jira/browse/FLINK-17229
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API, Table SQL / Client
>Reporter: molsion
>Priority: Major
>
> can we add a flink-sql-client-sdk module that can import like jdbc(sdk)? If 
> we do it , our user can use flink-sql-client-sdk to sumit sql job . it will 
> be more friendly than flink-sql-gateway。
> I have created and implemented a flink-sql-client-sdk project and i want to 
> contribute to community。
>  
> [flink-sql-gateway : 
> https://github.com/ververica/flink-sql-gateway|https://github.com/ververica/flink-sql-gateway]
> [flink-sql-client-sdk : 
> https://github.com/molsionmo/flink-sql-client|https://github.com/molsionmo/flink-sql-client]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17177) Handle ERROR event correctly in KubernetesResourceManager#onError

2020-04-18 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086719#comment-17086719
 ] 

Yang Wang commented on FLINK-17177:
---

[~felixzheng], thanks a lot. Let's try to figure out when the {{Error 
WatchEvent}} will be sent from K8s ApiServer.
 * If it happens in resource spec check(e.g. resource version too old, format 
check failed), then current handle logics is right, remove the pod and create a 
new one.
 * If it happens because of some K8s internal error, creating a new watcher 
could not solve this problem. Maybe we need to throw a fatal error and failed 
the current jobmanager attempt.
 * Some other case ...

 

Moreover, i am afraid that if there are some HTTP layer errors, the 
{{WatchConnectionManager}} could handle it and retry internally, i.e. creating 
a new {{WebSocket}}. Just like YARN, if there is some network problem, the IPC 
client of {{AMRMClient}} will handle the retry logic.

> Handle ERROR event correctly in KubernetesResourceManager#onError
> -
>
> Key: FLINK-17177
> URL: https://issues.apache.org/jira/browse/FLINK-17177
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.10.0, 1.10.1
>Reporter: Canbin Zheng
>Priority: Major
> Fix For: 1.11.0
>
>
> Currently, once we receive an *ERROR* event that is sent from the K8s API 
> server via the K8s {{Watcher}}, then {{KubernetesResourceManager#onError}} 
> will handle it by calling the 
> {{KubernetesResourceManager#removePodIfTerminated}}. This may be incorrect 
> since the *ERROR* event may indicate an exception in the HTTP layer, which 
> means the previously created {{Watcher}} may be no longer available and we'd 
> better re-create the {{Watcher}} immediately.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11770: [FLINK-17014][runtime] Implement PipelinedRegionSchedulingStrategy

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11770: [FLINK-17014][runtime] Implement 
PipelinedRegionSchedulingStrategy
URL: https://github.com/apache/flink/pull/11770#issuecomment-614523163
 
 
   
   ## CI report:
   
   * d8568c4f9d43b9cd0b9cb47e6f8a9ddd8a57ec1c Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160873750) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7718)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-17229) Add a flink-sql-client-sdk module

2020-04-18 Thread molsion (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086716#comment-17086716
 ] 

molsion commented on FLINK-17229:
-

[~godfreyhe], thank you for your answer, but is flink-sql-gateway necessary? 
Why does the SDK not directly access the cluster but go through the gateway

> Add a flink-sql-client-sdk module 
> --
>
> Key: FLINK-17229
> URL: https://issues.apache.org/jira/browse/FLINK-17229
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API, Table SQL / Client
>Reporter: molsion
>Priority: Major
>
> can we add a flink-sql-client-sdk module that can import like jdbc(sdk)? If 
> we do it , our user can use flink-sql-client-sdk to sumit sql job . it will 
> be more friendly than flink-sql-gateway。
> I have created and implemented a flink-sql-client-sdk project and i want to 
> contribute to community。
>  
> [flink-sql-gateway : 
> https://github.com/ververica/flink-sql-gateway|https://github.com/ververica/flink-sql-gateway]
> [flink-sql-client-sdk : 
> https://github.com/molsionmo/flink-sql-client|https://github.com/molsionmo/flink-sql-client]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11770: [FLINK-17014][runtime] Implement PipelinedRegionSchedulingStrategy

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11770: [FLINK-17014][runtime] Implement 
PipelinedRegionSchedulingStrategy
URL: https://github.com/apache/flink/pull/11770#issuecomment-614523163
 
 
   
   ## CI report:
   
   * 395043eb6bbd4b1dc2dffa1a463ddf7a36401132 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160680515) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7630)
 
   * d8568c4f9d43b9cd0b9cb47e6f8a9ddd8a57ec1c Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160873750) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7718)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11770: [FLINK-17014][runtime] Implement PipelinedRegionSchedulingStrategy

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11770: [FLINK-17014][runtime] Implement 
PipelinedRegionSchedulingStrategy
URL: https://github.com/apache/flink/pull/11770#issuecomment-614523163
 
 
   
   ## CI report:
   
   * 395043eb6bbd4b1dc2dffa1a463ddf7a36401132 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160680515) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7630)
 
   * d8568c4f9d43b9cd0b9cb47e6f8a9ddd8a57ec1c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11804: [FLINK-16473][doc][jdbc] add documentation for JDBCCatalog and PostgresCatalog

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11804: [FLINK-16473][doc][jdbc] add 
documentation for JDBCCatalog and PostgresCatalog
URL: https://github.com/apache/flink/pull/11804#issuecomment-615960634
 
 
   
   ## CI report:
   
   * 42e0607b996d640983cddede30eb1281944c1c64 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160865763) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7716)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-04-18 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086706#comment-17086706
 ] 

Zhu Zhu commented on FLINK-17075:
-

Thanks for the explanation! I think the heartbeat way could work well as a 
safety net.
Agreed that it's best to have both of the approaches. With the heartbeat 
payload safety net, we only need finite state update retries which is simpler 
than infinite retries.

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.11.0
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11804: [FLINK-16473][doc][jdbc] add documentation for JDBCCatalog and PostgresCatalog

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11804: [FLINK-16473][doc][jdbc] add 
documentation for JDBCCatalog and PostgresCatalog
URL: https://github.com/apache/flink/pull/11804#issuecomment-615960634
 
 
   
   ## CI report:
   
   * 42e0607b996d640983cddede30eb1281944c1c64 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160865763) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7716)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11804: [FLINK-16473][doc][jdbc] add documentation for JDBCCatalog and PostgresCatalog

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11804: [FLINK-16473][doc][jdbc] add 
documentation for JDBCCatalog and PostgresCatalog
URL: https://github.com/apache/flink/pull/11804#issuecomment-615960634
 
 
   
   ## CI report:
   
   * 42e0607b996d640983cddede30eb1281944c1c64 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160865763) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7716)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #11804: [FLINK-16473][doc][jdbc] add documentation for JDBCCatalog and PostgresCatalog

2020-04-18 Thread GitBox
flinkbot commented on issue #11804: [FLINK-16473][doc][jdbc] add documentation 
for JDBCCatalog and PostgresCatalog
URL: https://github.com/apache/flink/pull/11804#issuecomment-615960634
 
 
   
   ## CI report:
   
   * 42e0607b996d640983cddede30eb1281944c1c64 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #11804: [FLINK-16473][doc][jdbc] add documentation for JDBCCatalog and PostgresCatalog

2020-04-18 Thread GitBox
flinkbot commented on issue #11804: [FLINK-16473][doc][jdbc] add documentation 
for JDBCCatalog and PostgresCatalog
URL: https://github.com/apache/flink/pull/11804#issuecomment-615957826
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 42e0607b996d640983cddede30eb1281944c1c64 (Sat Apr 18 
21:58:14 UTC 2020)
   
✅no warnings
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 opened a new pull request #11804: [FLINK-16473][doc][jdbc] add documentation for JDBCCatalog and PostgresCatalog

2020-04-18 Thread GitBox
bowenli86 opened a new pull request #11804: [FLINK-16473][doc][jdbc] add 
documentation for JDBCCatalog and PostgresCatalog
URL: https://github.com/apache/flink/pull/11804
 
 
   ## What is the purpose of the change
   
   add documentation for JDBCCatalog and PostgresCatalog
   
   ## Brief change log
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (docs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-16473) add documentation for JDBCCatalog and PostgresCatalog

2020-04-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-16473:
---
Labels: pull-request-available  (was: )

> add documentation for JDBCCatalog and PostgresCatalog
> -
>
> Key: FLINK-16473
> URL: https://issues.apache.org/jira/browse/FLINK-16473
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / JDBC, Documentation
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16473) add documentation for JDBCCatalog and PostgresCatalog

2020-04-18 Thread Bowen Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-16473:
-
Summary: add documentation for JDBCCatalog and PostgresCatalog  (was: add 
documentation for PostgresJDBCCatalog)

> add documentation for JDBCCatalog and PostgresCatalog
> -
>
> Key: FLINK-16473
> URL: https://issues.apache.org/jira/browse/FLINK-16473
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / JDBC, Documentation
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.11.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17173) Supports query hint to config "IdleStateRetentionTime" per query in SQL

2020-04-18 Thread Jiahui Jiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086655#comment-17086655
 ] 

Jiahui Jiang commented on FLINK-17173:
--

[~danny0405] I'd love to contribute the feature if that's okay! :D 
 But there are some implementation details that I think worth discussing before 
I submit the PR.
 1. In order for the parser to extract the hints and have it available in 
TableImpl, we basically need to bring QueryConfig and have it as part of the 
Parser#parse's response. I know QueryConfig was [JUST 
removed|https://github.com/apache/flink/pull/11481/files] from the external 
api, but is it acceptable if I add it back as an internal api? Or is there a 
better approach?
 2. To have this 'QueryConfig' available in TableEnvironmentImpl, I can
 (1) either change the signature of Parser.parse() to return a ParserResponse, 
which contains the current List and an additional config
 (2) or add queryConfig as an additional field of PlannerQueryOperation. 
 I think option 2 is cleaner and will be a less involved change. Just want to 
make sure this won't cause unexpected impact!

Thank you :)

> Supports query hint to config "IdleStateRetentionTime" per query in SQL
> ---
>
> Key: FLINK-17173
> URL: https://issues.apache.org/jira/browse/FLINK-17173
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.11.0
>Reporter: Danny Chen
>Priority: Major
>
> The motivation why we need this (copy from user mailing list [~qzhzm173227])
> In some of the use cases our users have, they have a couple of complex join 
> queries where the key domains key evolving - we definitely want some sort of 
> state retention for those queries; but there are other where the key domain 
> doesn't evolve overtime, but there isn't really a guarantee on what's the 
> maximum gap between 2 records of the same key to appear in the stream, we 
> don't want to accidentally invalidate the state for those keys in these 
> streams.
> Because of queries with different requirements can both exist in the 
> pipeline, I think we have to config `IDLE_STATE_RETENTION_TIME` per operator.
> Just wondering, has similar requirement not come up much for SQL users 
> before? (being able to set table / query configuration inside SQL queries)
> We are also a little bit concerned because right now since 
> 'toRetractStream(Table, Class, QueryConfig)' is deprecated, relying on the 
> fact that TableConfig is read during toDataStream feels like relying on an 
> implementation details that just happens to work, and there is no guarantee 
> that it will keep working in the future versions...
> Demo syntax:
> {code:sql}
> CREATE TABLE `/output` AS
> SELECT /*+ IDLE_STATE_RETENTION_TIME(minTime ='5m', maxTime ='11m') */ *
> FROM `/input1` a
> INNER JOIN `/input2` b
> ON a.column_name = b.column_name;
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] StephanEwen commented on a change in pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

2020-04-18 Thread GitBox
StephanEwen commented on a change in pull request #11554: 
[FLINK-15101][connector/common] Add SourceCoordinator implementation.
URL: https://github.com/apache/flink/pull/11554#discussion_r410746156
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
 ##
 @@ -0,0 +1,259 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.function.BiConsumer;
+
+/**
+ * A context class for the {@link OperatorCoordinator}. Compared with {@link 
SplitEnumeratorContext} this class
+ * allows interaction with state and sending {@link OperatorEvent} to the 
SourceOperator while
+ * {@link SplitEnumeratorContext} only allows sending {@link SourceEvent}.
+ *
+ * The context serves a few purposes:
+ * 
+ * 
+ * Information provider - The context provides necessary information 
to the enumerator for it to
+ * know what is the status of the source readers and their split 
assignments. These information
+ * allows the split enumerator to do the coordination.
+ * 
+ * 
+ * Action taker - The context also provides a few actions that the 
enumerator can take to carry
+ * out the coordination. So far there are two actions: 1) assign 
splits to the source readers.
+ * and 2) sens a custom {@link SourceEvent SourceEvents} to the source 
readers.
+ * 
+ * 
+ * Thread model enforcement - The context ensures that all the 
manipulations to the coordinator state
+ * are handled by the same thread.
+ * 
+ * 
+ * @param  the type of the splits.
+ */
+@Internal
+public class SourceCoordinatorContext implements 
SplitEnumeratorContext {
+   private final ExecutorService coordinatorExecutor;
+   private final ExecutorNotifier notifier;
+   private final OperatorCoordinator.Context operatorCoordinatorContext;
+   private final ConcurrentMap registeredReaders;
+   private final SplitAssignmentTracker assignmentTracker;
+   private final String coordinatorThreadName;
+
+   public SourceCoordinatorContext(
+   ExecutorService coordinatorExecutor,
+   String coordinatorThreadName,
+   int numWorkerThreads,
+   OperatorCoordinator.Context operatorCoordinatorContext) 
{
+   this(coordinatorExecutor, coordinatorThreadName, 
numWorkerThreads, operatorCoordinatorContext,
+   new SplitAssignmentTracker<>());
+   }
+
+   // Package private method for unit test.
+   SourceCoordinatorContext(
+   ExecutorService coordinatorExecutor,
+   String coordinatorThreadName,
+  

[GitHub] [flink] StephanEwen commented on a change in pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

2020-04-18 Thread GitBox
StephanEwen commented on a change in pull request #11554: 
[FLINK-15101][connector/common] Add SourceCoordinator implementation.
URL: https://github.com/apache/flink/pull/11554#discussion_r410746366
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java
 ##
 @@ -0,0 +1,43 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.event;
+
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+
+/**
+ * An {@link OperatorEvent} that registers a {@link 
org.apache.flink.api.connector.source.SourceReader SourceReader}
+ * to the SourceCoordinator.
+ */
+public class ReaderRegistrationEvent implements OperatorEvent {
 
 Review comment:
   Same as above: adding `toString()` is helpful.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on a change in pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

2020-04-18 Thread GitBox
StephanEwen commented on a change in pull request #11554: 
[FLINK-15101][connector/common] Add SourceCoordinator implementation.
URL: https://github.com/apache/flink/pull/11554#discussion_r410746363
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/event/AddSplitEvent.java
 ##
 @@ -0,0 +1,40 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.event;
+
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+
+import java.util.List;
+
+/**
+ * A source event that adds splits to a source reader.
+ *
+ * @param  the type of splits.
+ */
+public class AddSplitEvent implements OperatorEvent {
 
 Review comment:
   Nit: Adding `toString()` to such classes is super helpful when debugging 
(shows contents in the debugger without needing op navigate into the object).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on a change in pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

2020-04-18 Thread GitBox
StephanEwen commented on a change in pull request #11554: 
[FLINK-15101][connector/common] Add SourceCoordinator implementation.
URL: https://github.com/apache/flink/pull/11554#discussion_r410746390
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/event/SourceEventWrapper.java
 ##
 @@ -0,0 +1,40 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.event;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+
+/**
+ * A wrapper operator event that contains a custom defined operator event.
+ */
+public class SourceEventWrapper implements OperatorEvent {
 
 Review comment:
   Nit: `toString()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on a change in pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

2020-04-18 Thread GitBox
StephanEwen commented on a change in pull request #11554: 
[FLINK-15101][connector/common] Add SourceCoordinator implementation.
URL: https://github.com/apache/flink/pull/11554#discussion_r410746103
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java
 ##
 @@ -0,0 +1,202 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * A class that is responsible for tracking the past split assignments made by
+ * {@link SplitEnumerator}.
+ */
+@Internal
+public class SplitAssignmentTracker {
+   // All the split assignments since the last successful checkpoint.
+   // Maintaining this allow the subtasks to fail over independently.
+   // The mapping is [CheckpointId -> [SubtaskId -> 
LinkedHashSet[SourceSplits]]].
+   private final SortedMap>> 
assignmentsByCheckpointId;
+   // The split assignments since the last checkpoint attempt.
+   // The mapping is [SubtaskId -> LinkedHashSet[SourceSplits]].
+   private Map> uncheckpointedAssignments;
+
+   public SplitAssignmentTracker() {
+   this.assignmentsByCheckpointId = new TreeMap<>();
+   this.uncheckpointedAssignments = new HashMap<>();
+   }
+
+   /**
+* Take a snapshot of the uncheckpointed split assignments.
+*
+* @param checkpointId the id of the ongoing checkpoint
+*/
+   public void snapshotState(
+   long checkpointId,
+   SimpleVersionedSerializer splitSerializer,
+   ObjectOutput out) throws Exception {
+   // Write the split serializer version.
+   out.writeInt(splitSerializer.getVersion());
 
 Review comment:
   Java Serialization, see high-level comment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on a change in pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

2020-04-18 Thread GitBox
StephanEwen commented on a change in pull request #11554: 
[FLINK-15101][connector/common] Add SourceCoordinator implementation.
URL: https://github.com/apache/flink/pull/11554#discussion_r410746237
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
 ##
 @@ -0,0 +1,259 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.function.BiConsumer;
+
+/**
+ * A context class for the {@link OperatorCoordinator}. Compared with {@link 
SplitEnumeratorContext} this class
+ * allows interaction with state and sending {@link OperatorEvent} to the 
SourceOperator while
+ * {@link SplitEnumeratorContext} only allows sending {@link SourceEvent}.
+ *
+ * The context serves a few purposes:
+ * 
+ * 
+ * Information provider - The context provides necessary information 
to the enumerator for it to
+ * know what is the status of the source readers and their split 
assignments. These information
+ * allows the split enumerator to do the coordination.
+ * 
+ * 
+ * Action taker - The context also provides a few actions that the 
enumerator can take to carry
+ * out the coordination. So far there are two actions: 1) assign 
splits to the source readers.
+ * and 2) sens a custom {@link SourceEvent SourceEvents} to the source 
readers.
+ * 
+ * 
+ * Thread model enforcement - The context ensures that all the 
manipulations to the coordinator state
+ * are handled by the same thread.
+ * 
+ * 
+ * @param  the type of the splits.
+ */
+@Internal
+public class SourceCoordinatorContext implements 
SplitEnumeratorContext {
+   private final ExecutorService coordinatorExecutor;
+   private final ExecutorNotifier notifier;
+   private final OperatorCoordinator.Context operatorCoordinatorContext;
+   private final ConcurrentMap registeredReaders;
+   private final SplitAssignmentTracker assignmentTracker;
+   private final String coordinatorThreadName;
+
+   public SourceCoordinatorContext(
+   ExecutorService coordinatorExecutor,
+   String coordinatorThreadName,
+   int numWorkerThreads,
+   OperatorCoordinator.Context operatorCoordinatorContext) 
{
+   this(coordinatorExecutor, coordinatorThreadName, 
numWorkerThreads, operatorCoordinatorContext,
+   new SplitAssignmentTracker<>());
+   }
+
+   // Package private method for unit test.
+   SourceCoordinatorContext(
+   ExecutorService coordinatorExecutor,
+   String coordinatorThreadName,
+  

[GitHub] [flink] StephanEwen commented on a change in pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

2020-04-18 Thread GitBox
StephanEwen commented on a change in pull request #11554: 
[FLINK-15101][connector/common] Add SourceCoordinator implementation.
URL: https://github.com/apache/flink/pull/11554#discussion_r410746233
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
 ##
 @@ -0,0 +1,73 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.BiConsumer;
+
+/**
+ * The provider of {@link SourceCoordinator}.
+ */
+public class SourceCoordinatorProvider
+   implements OperatorCoordinator.Provider {
+   private final OperatorID operatorID;
+   private final Source source;
+   private final int numWorkerThreads;
+
+   /**
+* Construct the {@link SourceCoordinatorProvider}.
+*
+* @param operatorID the ID of the operator this coordinator 
corresponds to.
+* @param source the Source that will be used for this coordinator.
+* @param numWorkerThreads the number of threads the should provide to 
the SplitEnumerator
+* for doing async calls. See
+* {@link 
org.apache.flink.api.connector.source.SplitEnumeratorContext#callAsync(Callable,
 BiConsumer)
+* SplitEnumeratorContext.callAsync()}.
+*/
+   public SourceCoordinatorProvider(
+   OperatorID operatorID,
+   Source source,
+   int numWorkerThreads) {
+   this.operatorID = operatorID;
+   this.source = source;
+   this.numWorkerThreads = numWorkerThreads;
+   }
+
+   @Override
+   public OperatorID getOperatorId() {
+   return operatorID;
+   }
+
+   @Override
+   public OperatorCoordinator create(OperatorCoordinator.Context context) {
+   final String coordinatorThreadName = "SourceCoordinator-" + 
operatorID;
+   ExecutorService coordinatorExecutor = 
Executors.newSingleThreadExecutor(
 
 Review comment:
   Executors should always have an `UncaughtExceptionHandler`. I would suggest 
to try and reuse one of the existing thread factories, for example 
`DispatcherThreadFactory` for single threads, and `ExecutorThreadFactory` for 
pools of threads.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on a change in pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

2020-04-18 Thread GitBox
StephanEwen commented on a change in pull request #11554: 
[FLINK-15101][connector/common] Add SourceCoordinator implementation.
URL: https://github.com/apache/flink/pull/11554#discussion_r410745362
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 ##
 @@ -0,0 +1,230 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The default implementation of the {@link OperatorCoordinator} for the 
{@link Source}.
+ *
+ * The SourceCoordinator provides an event loop style thread 
model to interact with
+ * the Flink runtime. The coordinator ensures that all the state manipulations 
are made by its event loop
+ * thread. It also helps keep track of the necessary split assignments history 
per subtask to simplify the
+ * {@link SplitEnumerator} implementation.
+ *
+ * The coordinator maintains a {@link 
org.apache.flink.api.connector.source.SplitEnumeratorContext
+ * SplitEnumeratorContxt} and shares it with the enumerator. When the 
coordinator receives an action
+ * request from the Flink runtime, it sets up the context, and calls 
corresponding method of the
+ * SplitEnumerator to take actions.
+ */
+@Internal
+public class SourceCoordinator 
implements OperatorCoordinator {
+   private static final Logger LOG = 
LoggerFactory.getLogger(OperatorCoordinator.class);
+   /** A single-thread executor to handle all the changes to the 
coordinator. */
+   private final ExecutorService coordinatorExecutor;
+   /** The Source that is associated with this SourceCoordinator. */
+   private final Source source;
+   /** The serializer that handles the serde of the SplitEnumerator 
checkpoints. */
+   private final SimpleVersionedSerializer 
enumCheckpointSerializer;
+   /** The serializer for the SourceSplit of the associated Source. */
+   private final SimpleVersionedSerializer splitSerializer;
+   /** The context containing the states of the coordinator. */
+   private final SourceCoordinatorContext context;
+   /** The split enumerator created from the associated Source. */
+   private SplitEnumerator enumerator;
+   /** A flag marking whether the coordinator has started. */
+   private boolean started;
+
+   public SourceCoordinator(
+   ExecutorService coordinatorExecutor,
+   Source source,
+   SourceCoordinatorContext context) {
+   this.coordinatorExecutor = coordinatorExecutor;
+   this.source = source;
+   this.enumCheckpointSerializer = 
source.getEnumeratorCheckpointSerializer();
+   this.splitSerializer = source.getSplitSerializer();
+   this.context = context;
+   this.enumerator = source.createEnumerator(context);
+   this.started = false;
+   }
+
+   @Override
+   public void start() throws Exception {
+   LOG.info("Starting split enumerator.");
+   enumerator.start();
+   

[GitHub] [flink] StephanEwen commented on a change in pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

2020-04-18 Thread GitBox
StephanEwen commented on a change in pull request #11554: 
[FLINK-15101][connector/common] Add SourceCoordinator implementation.
URL: https://github.com/apache/flink/pull/11554#discussion_r410745364
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 ##
 @@ -0,0 +1,230 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The default implementation of the {@link OperatorCoordinator} for the 
{@link Source}.
+ *
+ * The SourceCoordinator provides an event loop style thread 
model to interact with
+ * the Flink runtime. The coordinator ensures that all the state manipulations 
are made by its event loop
+ * thread. It also helps keep track of the necessary split assignments history 
per subtask to simplify the
+ * {@link SplitEnumerator} implementation.
+ *
+ * The coordinator maintains a {@link 
org.apache.flink.api.connector.source.SplitEnumeratorContext
+ * SplitEnumeratorContxt} and shares it with the enumerator. When the 
coordinator receives an action
+ * request from the Flink runtime, it sets up the context, and calls 
corresponding method of the
+ * SplitEnumerator to take actions.
+ */
+@Internal
+public class SourceCoordinator 
implements OperatorCoordinator {
+   private static final Logger LOG = 
LoggerFactory.getLogger(OperatorCoordinator.class);
+   /** A single-thread executor to handle all the changes to the 
coordinator. */
+   private final ExecutorService coordinatorExecutor;
+   /** The Source that is associated with this SourceCoordinator. */
+   private final Source source;
+   /** The serializer that handles the serde of the SplitEnumerator 
checkpoints. */
+   private final SimpleVersionedSerializer 
enumCheckpointSerializer;
+   /** The serializer for the SourceSplit of the associated Source. */
+   private final SimpleVersionedSerializer splitSerializer;
+   /** The context containing the states of the coordinator. */
+   private final SourceCoordinatorContext context;
+   /** The split enumerator created from the associated Source. */
+   private SplitEnumerator enumerator;
+   /** A flag marking whether the coordinator has started. */
+   private boolean started;
+
+   public SourceCoordinator(
+   ExecutorService coordinatorExecutor,
+   Source source,
+   SourceCoordinatorContext context) {
+   this.coordinatorExecutor = coordinatorExecutor;
+   this.source = source;
+   this.enumCheckpointSerializer = 
source.getEnumeratorCheckpointSerializer();
+   this.splitSerializer = source.getSplitSerializer();
+   this.context = context;
+   this.enumerator = source.createEnumerator(context);
+   this.started = false;
+   }
+
+   @Override
+   public void start() throws Exception {
+   LOG.info("Starting split enumerator.");
+   enumerator.start();
+   

[GitHub] [flink] StephanEwen commented on a change in pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

2020-04-18 Thread GitBox
StephanEwen commented on a change in pull request #11554: 
[FLINK-15101][connector/common] Add SourceCoordinator implementation.
URL: https://github.com/apache/flink/pull/11554#discussion_r410745342
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 ##
 @@ -0,0 +1,230 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The default implementation of the {@link OperatorCoordinator} for the 
{@link Source}.
+ *
+ * The SourceCoordinator provides an event loop style thread 
model to interact with
+ * the Flink runtime. The coordinator ensures that all the state manipulations 
are made by its event loop
+ * thread. It also helps keep track of the necessary split assignments history 
per subtask to simplify the
+ * {@link SplitEnumerator} implementation.
+ *
+ * The coordinator maintains a {@link 
org.apache.flink.api.connector.source.SplitEnumeratorContext
+ * SplitEnumeratorContxt} and shares it with the enumerator. When the 
coordinator receives an action
+ * request from the Flink runtime, it sets up the context, and calls 
corresponding method of the
+ * SplitEnumerator to take actions.
+ */
+@Internal
+public class SourceCoordinator 
implements OperatorCoordinator {
+   private static final Logger LOG = 
LoggerFactory.getLogger(OperatorCoordinator.class);
+   /** A single-thread executor to handle all the changes to the 
coordinator. */
+   private final ExecutorService coordinatorExecutor;
+   /** The Source that is associated with this SourceCoordinator. */
+   private final Source source;
+   /** The serializer that handles the serde of the SplitEnumerator 
checkpoints. */
+   private final SimpleVersionedSerializer 
enumCheckpointSerializer;
+   /** The serializer for the SourceSplit of the associated Source. */
+   private final SimpleVersionedSerializer splitSerializer;
+   /** The context containing the states of the coordinator. */
+   private final SourceCoordinatorContext context;
+   /** The split enumerator created from the associated Source. */
+   private SplitEnumerator enumerator;
+   /** A flag marking whether the coordinator has started. */
+   private boolean started;
+
+   public SourceCoordinator(
+   ExecutorService coordinatorExecutor,
+   Source source,
+   SourceCoordinatorContext context) {
+   this.coordinatorExecutor = coordinatorExecutor;
+   this.source = source;
+   this.enumCheckpointSerializer = 
source.getEnumeratorCheckpointSerializer();
+   this.splitSerializer = source.getSplitSerializer();
+   this.context = context;
+   this.enumerator = source.createEnumerator(context);
+   this.started = false;
+   }
+
+   @Override
+   public void start() throws Exception {
+   LOG.info("Starting split enumerator.");
+   enumerator.start();
+   

[GitHub] [flink] StephanEwen commented on a change in pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

2020-04-18 Thread GitBox
StephanEwen commented on a change in pull request #11554: 
[FLINK-15101][connector/common] Add SourceCoordinator implementation.
URL: https://github.com/apache/flink/pull/11554#discussion_r410744838
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 ##
 @@ -0,0 +1,230 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The default implementation of the {@link OperatorCoordinator} for the 
{@link Source}.
+ *
+ * The SourceCoordinator provides an event loop style thread 
model to interact with
+ * the Flink runtime. The coordinator ensures that all the state manipulations 
are made by its event loop
+ * thread. It also helps keep track of the necessary split assignments history 
per subtask to simplify the
+ * {@link SplitEnumerator} implementation.
+ *
+ * The coordinator maintains a {@link 
org.apache.flink.api.connector.source.SplitEnumeratorContext
+ * SplitEnumeratorContxt} and shares it with the enumerator. When the 
coordinator receives an action
+ * request from the Flink runtime, it sets up the context, and calls 
corresponding method of the
+ * SplitEnumerator to take actions.
+ */
+@Internal
+public class SourceCoordinator 
implements OperatorCoordinator {
+   private static final Logger LOG = 
LoggerFactory.getLogger(OperatorCoordinator.class);
+   /** A single-thread executor to handle all the changes to the 
coordinator. */
+   private final ExecutorService coordinatorExecutor;
+   /** The Source that is associated with this SourceCoordinator. */
+   private final Source source;
+   /** The serializer that handles the serde of the SplitEnumerator 
checkpoints. */
+   private final SimpleVersionedSerializer 
enumCheckpointSerializer;
+   /** The serializer for the SourceSplit of the associated Source. */
+   private final SimpleVersionedSerializer splitSerializer;
+   /** The context containing the states of the coordinator. */
+   private final SourceCoordinatorContext context;
+   /** The split enumerator created from the associated Source. */
+   private SplitEnumerator enumerator;
+   /** A flag marking whether the coordinator has started. */
+   private boolean started;
+
+   public SourceCoordinator(
+   ExecutorService coordinatorExecutor,
+   Source source,
+   SourceCoordinatorContext context) {
+   this.coordinatorExecutor = coordinatorExecutor;
+   this.source = source;
+   this.enumCheckpointSerializer = 
source.getEnumeratorCheckpointSerializer();
+   this.splitSerializer = source.getSplitSerializer();
+   this.context = context;
+   this.enumerator = source.createEnumerator(context);
+   this.started = false;
+   }
+
+   @Override
+   public void start() throws Exception {
+   LOG.info("Starting split enumerator.");
+   enumerator.start();
+   

[GitHub] [flink] StephanEwen commented on a change in pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

2020-04-18 Thread GitBox
StephanEwen commented on a change in pull request #11554: 
[FLINK-15101][connector/common] Add SourceCoordinator implementation.
URL: https://github.com/apache/flink/pull/11554#discussion_r410735535
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 ##
 @@ -0,0 +1,230 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The default implementation of the {@link OperatorCoordinator} for the 
{@link Source}.
+ *
+ * The SourceCoordinator provides an event loop style thread 
model to interact with
+ * the Flink runtime. The coordinator ensures that all the state manipulations 
are made by its event loop
+ * thread. It also helps keep track of the necessary split assignments history 
per subtask to simplify the
+ * {@link SplitEnumerator} implementation.
+ *
+ * The coordinator maintains a {@link 
org.apache.flink.api.connector.source.SplitEnumeratorContext
+ * SplitEnumeratorContxt} and shares it with the enumerator. When the 
coordinator receives an action
+ * request from the Flink runtime, it sets up the context, and calls 
corresponding method of the
+ * SplitEnumerator to take actions.
+ */
+@Internal
+public class SourceCoordinator 
implements OperatorCoordinator {
+   private static final Logger LOG = 
LoggerFactory.getLogger(OperatorCoordinator.class);
+   /** A single-thread executor to handle all the changes to the 
coordinator. */
+   private final ExecutorService coordinatorExecutor;
+   /** The Source that is associated with this SourceCoordinator. */
+   private final Source source;
+   /** The serializer that handles the serde of the SplitEnumerator 
checkpoints. */
+   private final SimpleVersionedSerializer 
enumCheckpointSerializer;
+   /** The serializer for the SourceSplit of the associated Source. */
+   private final SimpleVersionedSerializer splitSerializer;
+   /** The context containing the states of the coordinator. */
+   private final SourceCoordinatorContext context;
+   /** The split enumerator created from the associated Source. */
+   private SplitEnumerator enumerator;
+   /** A flag marking whether the coordinator has started. */
+   private boolean started;
+
+   public SourceCoordinator(
+   ExecutorService coordinatorExecutor,
+   Source source,
+   SourceCoordinatorContext context) {
+   this.coordinatorExecutor = coordinatorExecutor;
+   this.source = source;
+   this.enumCheckpointSerializer = 
source.getEnumeratorCheckpointSerializer();
+   this.splitSerializer = source.getSplitSerializer();
+   this.context = context;
+   this.enumerator = source.createEnumerator(context);
+   this.started = false;
+   }
+
+   @Override
+   public void start() throws Exception {
+   LOG.info("Starting split enumerator.");
 
 Review comment:
   Could we somehow get the name of 

[GitHub] [flink] StephanEwen commented on a change in pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

2020-04-18 Thread GitBox
StephanEwen commented on a change in pull request #11554: 
[FLINK-15101][connector/common] Add SourceCoordinator implementation.
URL: https://github.com/apache/flink/pull/11554#discussion_r410737305
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 ##
 @@ -0,0 +1,230 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The default implementation of the {@link OperatorCoordinator} for the 
{@link Source}.
+ *
+ * The SourceCoordinator provides an event loop style thread 
model to interact with
+ * the Flink runtime. The coordinator ensures that all the state manipulations 
are made by its event loop
+ * thread. It also helps keep track of the necessary split assignments history 
per subtask to simplify the
+ * {@link SplitEnumerator} implementation.
+ *
+ * The coordinator maintains a {@link 
org.apache.flink.api.connector.source.SplitEnumeratorContext
+ * SplitEnumeratorContxt} and shares it with the enumerator. When the 
coordinator receives an action
+ * request from the Flink runtime, it sets up the context, and calls 
corresponding method of the
+ * SplitEnumerator to take actions.
+ */
+@Internal
+public class SourceCoordinator 
implements OperatorCoordinator {
+   private static final Logger LOG = 
LoggerFactory.getLogger(OperatorCoordinator.class);
+   /** A single-thread executor to handle all the changes to the 
coordinator. */
+   private final ExecutorService coordinatorExecutor;
+   /** The Source that is associated with this SourceCoordinator. */
+   private final Source source;
+   /** The serializer that handles the serde of the SplitEnumerator 
checkpoints. */
+   private final SimpleVersionedSerializer 
enumCheckpointSerializer;
+   /** The serializer for the SourceSplit of the associated Source. */
+   private final SimpleVersionedSerializer splitSerializer;
+   /** The context containing the states of the coordinator. */
+   private final SourceCoordinatorContext context;
+   /** The split enumerator created from the associated Source. */
+   private SplitEnumerator enumerator;
+   /** A flag marking whether the coordinator has started. */
+   private boolean started;
+
+   public SourceCoordinator(
+   ExecutorService coordinatorExecutor,
+   Source source,
+   SourceCoordinatorContext context) {
+   this.coordinatorExecutor = coordinatorExecutor;
+   this.source = source;
+   this.enumCheckpointSerializer = 
source.getEnumeratorCheckpointSerializer();
+   this.splitSerializer = source.getSplitSerializer();
+   this.context = context;
+   this.enumerator = source.createEnumerator(context);
+   this.started = false;
+   }
+
+   @Override
+   public void start() throws Exception {
+   LOG.info("Starting split enumerator.");
+   enumerator.start();
+   

[GitHub] [flink] StephanEwen commented on a change in pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

2020-04-18 Thread GitBox
StephanEwen commented on a change in pull request #11554: 
[FLINK-15101][connector/common] Add SourceCoordinator implementation.
URL: https://github.com/apache/flink/pull/11554#discussion_r410745424
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 ##
 @@ -0,0 +1,230 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The default implementation of the {@link OperatorCoordinator} for the 
{@link Source}.
+ *
+ * The SourceCoordinator provides an event loop style thread 
model to interact with
+ * the Flink runtime. The coordinator ensures that all the state manipulations 
are made by its event loop
+ * thread. It also helps keep track of the necessary split assignments history 
per subtask to simplify the
+ * {@link SplitEnumerator} implementation.
+ *
+ * The coordinator maintains a {@link 
org.apache.flink.api.connector.source.SplitEnumeratorContext
+ * SplitEnumeratorContxt} and shares it with the enumerator. When the 
coordinator receives an action
+ * request from the Flink runtime, it sets up the context, and calls 
corresponding method of the
+ * SplitEnumerator to take actions.
+ */
+@Internal
+public class SourceCoordinator 
implements OperatorCoordinator {
+   private static final Logger LOG = 
LoggerFactory.getLogger(OperatorCoordinator.class);
+   /** A single-thread executor to handle all the changes to the 
coordinator. */
+   private final ExecutorService coordinatorExecutor;
+   /** The Source that is associated with this SourceCoordinator. */
+   private final Source source;
+   /** The serializer that handles the serde of the SplitEnumerator 
checkpoints. */
+   private final SimpleVersionedSerializer 
enumCheckpointSerializer;
+   /** The serializer for the SourceSplit of the associated Source. */
+   private final SimpleVersionedSerializer splitSerializer;
+   /** The context containing the states of the coordinator. */
+   private final SourceCoordinatorContext context;
+   /** The split enumerator created from the associated Source. */
+   private SplitEnumerator enumerator;
+   /** A flag marking whether the coordinator has started. */
+   private boolean started;
+
+   public SourceCoordinator(
+   ExecutorService coordinatorExecutor,
+   Source source,
+   SourceCoordinatorContext context) {
+   this.coordinatorExecutor = coordinatorExecutor;
+   this.source = source;
+   this.enumCheckpointSerializer = 
source.getEnumeratorCheckpointSerializer();
+   this.splitSerializer = source.getSplitSerializer();
+   this.context = context;
+   this.enumerator = source.createEnumerator(context);
+   this.started = false;
+   }
+
+   @Override
+   public void start() throws Exception {
+   LOG.info("Starting split enumerator.");
+   enumerator.start();
+   

[GitHub] [flink] StephanEwen commented on a change in pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

2020-04-18 Thread GitBox
StephanEwen commented on a change in pull request #11554: 
[FLINK-15101][connector/common] Add SourceCoordinator implementation.
URL: https://github.com/apache/flink/pull/11554#discussion_r410737187
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 ##
 @@ -0,0 +1,230 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The default implementation of the {@link OperatorCoordinator} for the 
{@link Source}.
+ *
+ * The SourceCoordinator provides an event loop style thread 
model to interact with
+ * the Flink runtime. The coordinator ensures that all the state manipulations 
are made by its event loop
+ * thread. It also helps keep track of the necessary split assignments history 
per subtask to simplify the
+ * {@link SplitEnumerator} implementation.
+ *
+ * The coordinator maintains a {@link 
org.apache.flink.api.connector.source.SplitEnumeratorContext
+ * SplitEnumeratorContxt} and shares it with the enumerator. When the 
coordinator receives an action
+ * request from the Flink runtime, it sets up the context, and calls 
corresponding method of the
+ * SplitEnumerator to take actions.
+ */
+@Internal
+public class SourceCoordinator 
implements OperatorCoordinator {
+   private static final Logger LOG = 
LoggerFactory.getLogger(OperatorCoordinator.class);
+   /** A single-thread executor to handle all the changes to the 
coordinator. */
+   private final ExecutorService coordinatorExecutor;
+   /** The Source that is associated with this SourceCoordinator. */
+   private final Source source;
+   /** The serializer that handles the serde of the SplitEnumerator 
checkpoints. */
+   private final SimpleVersionedSerializer 
enumCheckpointSerializer;
+   /** The serializer for the SourceSplit of the associated Source. */
+   private final SimpleVersionedSerializer splitSerializer;
+   /** The context containing the states of the coordinator. */
+   private final SourceCoordinatorContext context;
+   /** The split enumerator created from the associated Source. */
+   private SplitEnumerator enumerator;
+   /** A flag marking whether the coordinator has started. */
+   private boolean started;
+
+   public SourceCoordinator(
+   ExecutorService coordinatorExecutor,
+   Source source,
+   SourceCoordinatorContext context) {
+   this.coordinatorExecutor = coordinatorExecutor;
+   this.source = source;
+   this.enumCheckpointSerializer = 
source.getEnumeratorCheckpointSerializer();
+   this.splitSerializer = source.getSplitSerializer();
+   this.context = context;
+   this.enumerator = source.createEnumerator(context);
+   this.started = false;
+   }
+
+   @Override
+   public void start() throws Exception {
+   LOG.info("Starting split enumerator.");
+   enumerator.start();
+   

[GitHub] [flink] StephanEwen commented on a change in pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

2020-04-18 Thread GitBox
StephanEwen commented on a change in pull request #11554: 
[FLINK-15101][connector/common] Add SourceCoordinator implementation.
URL: https://github.com/apache/flink/pull/11554#discussion_r410736100
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 ##
 @@ -0,0 +1,230 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The default implementation of the {@link OperatorCoordinator} for the 
{@link Source}.
+ *
+ * The SourceCoordinator provides an event loop style thread 
model to interact with
+ * the Flink runtime. The coordinator ensures that all the state manipulations 
are made by its event loop
+ * thread. It also helps keep track of the necessary split assignments history 
per subtask to simplify the
+ * {@link SplitEnumerator} implementation.
+ *
+ * The coordinator maintains a {@link 
org.apache.flink.api.connector.source.SplitEnumeratorContext
+ * SplitEnumeratorContxt} and shares it with the enumerator. When the 
coordinator receives an action
+ * request from the Flink runtime, it sets up the context, and calls 
corresponding method of the
+ * SplitEnumerator to take actions.
+ */
+@Internal
+public class SourceCoordinator 
implements OperatorCoordinator {
+   private static final Logger LOG = 
LoggerFactory.getLogger(OperatorCoordinator.class);
+   /** A single-thread executor to handle all the changes to the 
coordinator. */
+   private final ExecutorService coordinatorExecutor;
+   /** The Source that is associated with this SourceCoordinator. */
+   private final Source source;
+   /** The serializer that handles the serde of the SplitEnumerator 
checkpoints. */
+   private final SimpleVersionedSerializer 
enumCheckpointSerializer;
+   /** The serializer for the SourceSplit of the associated Source. */
+   private final SimpleVersionedSerializer splitSerializer;
+   /** The context containing the states of the coordinator. */
+   private final SourceCoordinatorContext context;
+   /** The split enumerator created from the associated Source. */
+   private SplitEnumerator enumerator;
+   /** A flag marking whether the coordinator has started. */
+   private boolean started;
+
+   public SourceCoordinator(
+   ExecutorService coordinatorExecutor,
+   Source source,
+   SourceCoordinatorContext context) {
+   this.coordinatorExecutor = coordinatorExecutor;
+   this.source = source;
+   this.enumCheckpointSerializer = 
source.getEnumeratorCheckpointSerializer();
+   this.splitSerializer = source.getSplitSerializer();
+   this.context = context;
+   this.enumerator = source.createEnumerator(context);
+   this.started = false;
+   }
+
+   @Override
+   public void start() throws Exception {
+   LOG.info("Starting split enumerator.");
+   enumerator.start();
+   

[GitHub] [flink] StephanEwen commented on a change in pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

2020-04-18 Thread GitBox
StephanEwen commented on a change in pull request #11554: 
[FLINK-15101][connector/common] Add SourceCoordinator implementation.
URL: https://github.com/apache/flink/pull/11554#discussion_r410744704
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 ##
 @@ -0,0 +1,230 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The default implementation of the {@link OperatorCoordinator} for the 
{@link Source}.
+ *
+ * The SourceCoordinator provides an event loop style thread 
model to interact with
+ * the Flink runtime. The coordinator ensures that all the state manipulations 
are made by its event loop
+ * thread. It also helps keep track of the necessary split assignments history 
per subtask to simplify the
+ * {@link SplitEnumerator} implementation.
+ *
+ * The coordinator maintains a {@link 
org.apache.flink.api.connector.source.SplitEnumeratorContext
+ * SplitEnumeratorContxt} and shares it with the enumerator. When the 
coordinator receives an action
+ * request from the Flink runtime, it sets up the context, and calls 
corresponding method of the
+ * SplitEnumerator to take actions.
+ */
+@Internal
+public class SourceCoordinator 
implements OperatorCoordinator {
+   private static final Logger LOG = 
LoggerFactory.getLogger(OperatorCoordinator.class);
+   /** A single-thread executor to handle all the changes to the 
coordinator. */
+   private final ExecutorService coordinatorExecutor;
+   /** The Source that is associated with this SourceCoordinator. */
+   private final Source source;
+   /** The serializer that handles the serde of the SplitEnumerator 
checkpoints. */
+   private final SimpleVersionedSerializer 
enumCheckpointSerializer;
+   /** The serializer for the SourceSplit of the associated Source. */
+   private final SimpleVersionedSerializer splitSerializer;
+   /** The context containing the states of the coordinator. */
+   private final SourceCoordinatorContext context;
+   /** The split enumerator created from the associated Source. */
+   private SplitEnumerator enumerator;
+   /** A flag marking whether the coordinator has started. */
+   private boolean started;
+
+   public SourceCoordinator(
+   ExecutorService coordinatorExecutor,
+   Source source,
+   SourceCoordinatorContext context) {
+   this.coordinatorExecutor = coordinatorExecutor;
+   this.source = source;
+   this.enumCheckpointSerializer = 
source.getEnumeratorCheckpointSerializer();
+   this.splitSerializer = source.getSplitSerializer();
+   this.context = context;
+   this.enumerator = source.createEnumerator(context);
+   this.started = false;
+   }
+
+   @Override
+   public void start() throws Exception {
+   LOG.info("Starting split enumerator.");
+   enumerator.start();
+   

[GitHub] [flink] flinkbot edited a comment on issue #11786: [FLINK-17208][network] Reduce redundant data available notifications of PipelinedSubpartition

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11786: [FLINK-17208][network] Reduce 
redundant data available notifications of PipelinedSubpartition
URL: https://github.com/apache/flink/pull/11786#issuecomment-615087816
 
 
   
   ## CI report:
   
   * 66188949ff1646c298d32df7fe45c6745f771bc1 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160850008) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7714)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-16155) Translate "Operator/Process Function" into Chinese

2020-04-18 Thread Xu Bai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086580#comment-17086580
 ] 

Xu Bai commented on FLINK-16155:


Hi~[~jark] l'm very happy to communicate with you.

This PR hasn't been updated for a while. I'd like to take the time to finish it

> Translate "Operator/Process Function" into Chinese
> --
>
> Key: FLINK-16155
> URL: https://issues.apache.org/jira/browse/FLINK-16155
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The page is located at _"docs/dev/stream/operators/process_function.zh.md"_



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11786: [FLINK-17208][network] Reduce redundant data available notifications of PipelinedSubpartition

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11786: [FLINK-17208][network] Reduce 
redundant data available notifications of PipelinedSubpartition
URL: https://github.com/apache/flink/pull/11786#issuecomment-615087816
 
 
   
   ## CI report:
   
   * 66188949ff1646c298d32df7fe45c6745f771bc1 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160850008) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7714)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11786: [FLINK-17208][network] Reduce redundant data available notifications of PipelinedSubpartition

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11786: [FLINK-17208][network] Reduce 
redundant data available notifications of PipelinedSubpartition
URL: https://github.com/apache/flink/pull/11786#issuecomment-615087816
 
 
   
   ## CI report:
   
   * d34a27ea803c0203027ec4cc5fb751bb30d51bad Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160839179) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7706)
 
   * 66188949ff1646c298d32df7fe45c6745f771bc1 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160850008) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7714)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11786: [FLINK-17208][network] Reduce redundant data available notifications of PipelinedSubpartition

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11786: [FLINK-17208][network] Reduce 
redundant data available notifications of PipelinedSubpartition
URL: https://github.com/apache/flink/pull/11786#issuecomment-615087816
 
 
   
   ## CI report:
   
   * d34a27ea803c0203027ec4cc5fb751bb30d51bad Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160839179) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7706)
 
   * 66188949ff1646c298d32df7fe45c6745f771bc1 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11700: [FLINK-16366] [table] Introduce executeSql method in TableEnvironment and support many statements

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11700: [FLINK-16366] [table] Introduce 
executeSql method in TableEnvironment and support many statements
URL: https://github.com/apache/flink/pull/11700#issuecomment-612000686
 
 
   
   ## CI report:
   
   * f52f4a10b99fa573ffeb63f16491d735cc157a4b UNKNOWN
   * 0ac7518a59d0205c3fe65d8bf636f6a8229663bb Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160843171) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7711)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11707: [FLINK-15648][k8s] Introduce CPU limit support for JM/TM Container

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11707: [FLINK-15648][k8s] Introduce CPU 
limit support for JM/TM Container
URL: https://github.com/apache/flink/pull/11707#issuecomment-612573622
 
 
   
   ## CI report:
   
   * 4f0e56971c9ef4876abab1352f7e0468ef712a92 UNKNOWN
   * 40701c83ad09fa7e5d7c678015ed9ca7aec41a84 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160841002) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7709)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11700: [FLINK-16366] [table] Introduce executeSql method in TableEnvironment and support many statements

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11700: [FLINK-16366] [table] Introduce 
executeSql method in TableEnvironment and support many statements
URL: https://github.com/apache/flink/pull/11700#issuecomment-612000686
 
 
   
   ## CI report:
   
   * f52f4a10b99fa573ffeb63f16491d735cc157a4b UNKNOWN
   * 0ac7518a59d0205c3fe65d8bf636f6a8229663bb Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160843171) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7711)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11708: [FLINK-15816][k8s] Limit the value of kubernetes.cluster-id to have no more than 45 characters

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11708: [FLINK-15816][k8s] Limit the value 
of kubernetes.cluster-id to have no more than 45 characters
URL: https://github.com/apache/flink/pull/11708#issuecomment-612587571
 
 
   
   ## CI report:
   
   * 77fea16c342424d8b0ce3950d5738bfc43dcb3cc UNKNOWN
   * 288ddcae06c47b8286524f8a62786f14b0887c28 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160836060) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7703)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11803: [hotfix][config][docs] Deprecate taskmanager.numberOfTaskSlots in mesos_task_manager_configuration.html

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11803: [hotfix][config][docs] Deprecate 
taskmanager.numberOfTaskSlots in mesos_task_manager_configuration.html
URL: https://github.com/apache/flink/pull/11803#issuecomment-615861324
 
 
   
   ## CI report:
   
   * a8101f9f9ca854333bacf980e33d14115088eac7 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160836074) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7704)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aljoscha commented on issue #11696: [FLINK-16658][FLINK-16660] Introduce the ApplicationDispatcherBootstrap and wire it to StandaloneJobEntrypoint

2020-04-18 Thread GitBox
aljoscha commented on issue #11696: [FLINK-16658][FLINK-16660] Introduce the 
ApplicationDispatcherBootstrap and wire it to StandaloneJobEntrypoint
URL: https://github.com/apache/flink/pull/11696#issuecomment-615894182
 
 
   @flinkbot run azure


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11797: [FLINK-17169][table-blink] Refactor 
BaseRow to use RowKind instead of byte header
URL: https://github.com/apache/flink/pull/11797#issuecomment-615294694
 
 
   
   ## CI report:
   
   * 85f40e3041783b1dbda1eb3b812f23e77936f7b3 UNKNOWN
   * 213179008a58c7aec2817c009cc11a5d744529e7 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160839191) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7707)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-16600) Respect the rest.bind-port for the Kubernetes setup

2020-04-18 Thread Zili Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zili Chen closed FLINK-16600.
-
Resolution: Fixed

master(1.11) via 7a7aaec9745461da9590c61f6ba75be64388d397

> Respect the rest.bind-port for the Kubernetes setup
> ---
>
> Key: FLINK-16600
> URL: https://issues.apache.org/jira/browse/FLINK-16600
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.10.0
>Reporter: Canbin Zheng
>Assignee: Canbin Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Our current logic only takes care of {{RestOptions.PORT}} but not 
> {{RestOptions.BIND_PORT}}, which is a bug.
> For example, when one sets the {{RestOptions.BIND_PORT}} to a value other 
> than {{RestOptions.PORT}}, jobs could not be submitted to the existing 
> session cluster deployed via the kubernetes-session.sh.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] TisonKun closed pull request #11705: [FLINK-16600][k8s] Fix not respecting the RestOptions.BIND_PORT for the Kubernetes setup

2020-04-18 Thread GitBox
TisonKun closed pull request #11705: [FLINK-16600][k8s] Fix not respecting the 
RestOptions.BIND_PORT for the Kubernetes setup
URL: https://github.com/apache/flink/pull/11705
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-17189) Table with processing time attribute can not be read from Hive catalog

2020-04-18 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-17189:

Fix Version/s: (was: 1.10.1)
   1.10.2

> Table with processing time attribute can not be read from Hive catalog
> --
>
> Key: FLINK-17189
> URL: https://issues.apache.org/jira/browse/FLINK-17189
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Ecosystem, Table SQL / Planner
>Reporter: Timo Walther
>Priority: Major
> Fix For: 1.11.0, 1.10.2
>
>
> DDL:
> {code}
> CREATE TABLE PROD_LINEITEM (
>   L_ORDERKEY   INTEGER,
>   L_PARTKEYINTEGER,
>   L_SUPPKEYINTEGER,
>   L_LINENUMBER INTEGER,
>   L_QUANTITY   DOUBLE,
>   L_EXTENDEDPRICE  DOUBLE,
>   L_DISCOUNT   DOUBLE,
>   L_TAXDOUBLE,
>   L_CURRENCY   STRING,
>   L_RETURNFLAG STRING,
>   L_LINESTATUS STRING,
>   L_ORDERTIME  TIMESTAMP(3),
>   L_SHIPINSTRUCT   STRING,
>   L_SHIPMODE   STRING,
>   L_COMMENTSTRING,
>   WATERMARK FOR L_ORDERTIME AS L_ORDERTIME - INTERVAL '5' MINUTE,
>   L_PROCTIME   AS PROCTIME()
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = 'universal',
>   'connector.topic' = 'Lineitem',
>   'connector.properties.zookeeper.connect' = 'not-needed',
>   'connector.properties.bootstrap.servers' = 'kafka:9092',
>   'connector.startup-mode' = 'earliest-offset',
>   'format.type' = 'csv',
>   'format.field-delimiter' = '|'
> );
> {code}
> Query:
> {code}
> SELECT * FROM prod_lineitem;
> {code}
> Result:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.AssertionError: Conversion to relational algebra failed to preserve 
> datatypes:
> validated type:
> RecordType(INTEGER L_ORDERKEY, INTEGER L_PARTKEY, INTEGER L_SUPPKEY, INTEGER 
> L_LINENUMBER, DOUBLE L_QUANTITY, DOUBLE L_EXTENDEDPRICE, DOUBLE L_DISCOUNT, 
> DOUBLE L_TAX, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_CURRENCY, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_RETURNFLAG, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_LINESTATUS, TIME 
> ATTRIBUTE(ROWTIME) L_ORDERTIME, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> L_SHIPINSTRUCT, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_SHIPMODE, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_COMMENT, TIMESTAMP(3) NOT NULL 
> L_PROCTIME) NOT NULL
> converted type:
> RecordType(INTEGER L_ORDERKEY, INTEGER L_PARTKEY, INTEGER L_SUPPKEY, INTEGER 
> L_LINENUMBER, DOUBLE L_QUANTITY, DOUBLE L_EXTENDEDPRICE, DOUBLE L_DISCOUNT, 
> DOUBLE L_TAX, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_CURRENCY, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_RETURNFLAG, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_LINESTATUS, TIME 
> ATTRIBUTE(ROWTIME) L_ORDERTIME, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> L_SHIPINSTRUCT, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_SHIPMODE, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_COMMENT, TIME 
> ATTRIBUTE(PROCTIME) NOT NULL L_PROCTIME) NOT NULL
> rel:
> LogicalProject(L_ORDERKEY=[$0], L_PARTKEY=[$1], L_SUPPKEY=[$2], 
> L_LINENUMBER=[$3], L_QUANTITY=[$4], L_EXTENDEDPRICE=[$5], L_DISCOUNT=[$6], 
> L_TAX=[$7], L_CURRENCY=[$8], L_RETURNFLAG=[$9], L_LINESTATUS=[$10], 
> L_ORDERTIME=[$11], L_SHIPINSTRUCT=[$12], L_SHIPMODE=[$13], L_COMMENT=[$14], 
> L_PROCTIME=[$15])
>   LogicalWatermarkAssigner(rowtime=[L_ORDERTIME], watermark=[-($11, 
> 30:INTERVAL MINUTE)])
> LogicalProject(L_ORDERKEY=[$0], L_PARTKEY=[$1], L_SUPPKEY=[$2], 
> L_LINENUMBER=[$3], L_QUANTITY=[$4], L_EXTENDEDPRICE=[$5], L_DISCOUNT=[$6], 
> L_TAX=[$7], L_CURRENCY=[$8], L_RETURNFLAG=[$9], L_LINESTATUS=[$10], 
> L_ORDERTIME=[$11], L_SHIPINSTRUCT=[$12], L_SHIPMODE=[$13], L_COMMENT=[$14], 
> L_PROCTIME=[PROCTIME()])
>   LogicalTableScan(table=[[hcat, default, prod_lineitem, source: 
> [KafkaTableSource(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, 
> L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, L_CURRENCY, L_RETURNFLAG, L_LINESTATUS, 
> L_ORDERTIME, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT)]]])
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17189) Table with processing time attribute can not be read from Hive catalog

2020-04-18 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-17189:

Fix Version/s: 1.11.0
   1.10.1

> Table with processing time attribute can not be read from Hive catalog
> --
>
> Key: FLINK-17189
> URL: https://issues.apache.org/jira/browse/FLINK-17189
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Ecosystem, Table SQL / Planner
>Reporter: Timo Walther
>Priority: Major
> Fix For: 1.10.1, 1.11.0
>
>
> DDL:
> {code}
> CREATE TABLE PROD_LINEITEM (
>   L_ORDERKEY   INTEGER,
>   L_PARTKEYINTEGER,
>   L_SUPPKEYINTEGER,
>   L_LINENUMBER INTEGER,
>   L_QUANTITY   DOUBLE,
>   L_EXTENDEDPRICE  DOUBLE,
>   L_DISCOUNT   DOUBLE,
>   L_TAXDOUBLE,
>   L_CURRENCY   STRING,
>   L_RETURNFLAG STRING,
>   L_LINESTATUS STRING,
>   L_ORDERTIME  TIMESTAMP(3),
>   L_SHIPINSTRUCT   STRING,
>   L_SHIPMODE   STRING,
>   L_COMMENTSTRING,
>   WATERMARK FOR L_ORDERTIME AS L_ORDERTIME - INTERVAL '5' MINUTE,
>   L_PROCTIME   AS PROCTIME()
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = 'universal',
>   'connector.topic' = 'Lineitem',
>   'connector.properties.zookeeper.connect' = 'not-needed',
>   'connector.properties.bootstrap.servers' = 'kafka:9092',
>   'connector.startup-mode' = 'earliest-offset',
>   'format.type' = 'csv',
>   'format.field-delimiter' = '|'
> );
> {code}
> Query:
> {code}
> SELECT * FROM prod_lineitem;
> {code}
> Result:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.AssertionError: Conversion to relational algebra failed to preserve 
> datatypes:
> validated type:
> RecordType(INTEGER L_ORDERKEY, INTEGER L_PARTKEY, INTEGER L_SUPPKEY, INTEGER 
> L_LINENUMBER, DOUBLE L_QUANTITY, DOUBLE L_EXTENDEDPRICE, DOUBLE L_DISCOUNT, 
> DOUBLE L_TAX, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_CURRENCY, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_RETURNFLAG, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_LINESTATUS, TIME 
> ATTRIBUTE(ROWTIME) L_ORDERTIME, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> L_SHIPINSTRUCT, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_SHIPMODE, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_COMMENT, TIMESTAMP(3) NOT NULL 
> L_PROCTIME) NOT NULL
> converted type:
> RecordType(INTEGER L_ORDERKEY, INTEGER L_PARTKEY, INTEGER L_SUPPKEY, INTEGER 
> L_LINENUMBER, DOUBLE L_QUANTITY, DOUBLE L_EXTENDEDPRICE, DOUBLE L_DISCOUNT, 
> DOUBLE L_TAX, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_CURRENCY, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_RETURNFLAG, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_LINESTATUS, TIME 
> ATTRIBUTE(ROWTIME) L_ORDERTIME, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> L_SHIPINSTRUCT, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_SHIPMODE, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_COMMENT, TIME 
> ATTRIBUTE(PROCTIME) NOT NULL L_PROCTIME) NOT NULL
> rel:
> LogicalProject(L_ORDERKEY=[$0], L_PARTKEY=[$1], L_SUPPKEY=[$2], 
> L_LINENUMBER=[$3], L_QUANTITY=[$4], L_EXTENDEDPRICE=[$5], L_DISCOUNT=[$6], 
> L_TAX=[$7], L_CURRENCY=[$8], L_RETURNFLAG=[$9], L_LINESTATUS=[$10], 
> L_ORDERTIME=[$11], L_SHIPINSTRUCT=[$12], L_SHIPMODE=[$13], L_COMMENT=[$14], 
> L_PROCTIME=[$15])
>   LogicalWatermarkAssigner(rowtime=[L_ORDERTIME], watermark=[-($11, 
> 30:INTERVAL MINUTE)])
> LogicalProject(L_ORDERKEY=[$0], L_PARTKEY=[$1], L_SUPPKEY=[$2], 
> L_LINENUMBER=[$3], L_QUANTITY=[$4], L_EXTENDEDPRICE=[$5], L_DISCOUNT=[$6], 
> L_TAX=[$7], L_CURRENCY=[$8], L_RETURNFLAG=[$9], L_LINESTATUS=[$10], 
> L_ORDERTIME=[$11], L_SHIPINSTRUCT=[$12], L_SHIPMODE=[$13], L_COMMENT=[$14], 
> L_PROCTIME=[PROCTIME()])
>   LogicalTableScan(table=[[hcat, default, prod_lineitem, source: 
> [KafkaTableSource(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, 
> L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, L_CURRENCY, L_RETURNFLAG, L_LINESTATUS, 
> L_ORDERTIME, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT)]]])
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16819) Got KryoException while using UDAF in flink1.9

2020-04-18 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086501#comment-17086501
 ] 

Jark Wu commented on FLINK-16819:
-

Glad to hear that too [~neighborhood]! 

> Got KryoException while using UDAF in flink1.9
> --
>
> Key: FLINK-16819
> URL: https://issues.apache.org/jira/browse/FLINK-16819
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System, Table SQL / Planner
>Affects Versions: 1.9.1
> Environment: Flink1.9.1
> Apache hadoop 2.7.2
>Reporter: Xingxing Di
>Priority: Major
> Fix For: 1.9.2
>
>
> Recently,  we are trying to upgrade online *sql jobs* from flink1.7 to 
> flink1.9 , most jobs works fine, but some jobs got  KryoExceptions. 
> We found that UDAF will trigger this exception, btw ,we are using blink 
> planner.
> *Here is the full stack traces:*
>  2020-03-27 11:46:55
>  com.esotericsoftware.kryo.KryoException: 
> java.lang.IndexOutOfBoundsException: Index: 104, Size: 2
>  Serialization trace:
>  seed (java.util.Random)
>  gen (com.tdunning.math.stats.AVLTreeDigest)
>  at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>  at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>  at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>  at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>  at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
>  at 
> org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:536)
>  at 
> org.apache.flink.table.dataformat.BinaryGeneric.getJavaObjectFromBinaryGeneric(BinaryGeneric.java:86)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:628)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:633)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:320)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1293)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1257)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:302)
>  at GroupAggsHandler$71.setAccumulators(Unknown Source)
>  at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:151)
>  at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
>  at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.lang.IndexOutOfBoundsException: Index: 104, Size: 2
>  at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>  at java.util.ArrayList.get(ArrayList.java:433)
>  at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)
>  at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>  ... 26 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-16379) Introduce fromValues in TableEnvironment

2020-04-18 Thread Dawid Wysakowicz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz closed FLINK-16379.

Resolution: Fixed

Implemented in 6b3e037b7073c06412210cc82843924e8a3c9eee

> Introduce fromValues in TableEnvironment
> 
>
> Key: FLINK-16379
> URL: https://issues.apache.org/jira/browse/FLINK-16379
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Introduce a fromValues method to TableEnvironment similar to {{VALUES}} 
> clause in SQL
> The suggested API could look like:
> {code}
>   /**
>* Creates a Table from a given row constructing expressions.
>*
>* Examples:
>*
>* You can use {@link Expressions#row(Object, Object...)} to create 
> a composite rows:
>* {@code
>*  tEnv.fromValues(
>*  row(1, "ABC"),
>*  row(2L, "ABCDE")
>*  )
>* }
>* will produce a Table with a schema as follows:
>* {@code
>*  root
>*  |-- f0: BIGINT NOT NULL
>*  |-- f1: VARCHAR(5) NOT NULL
>* }
>*
>* ROWs that are a result of e.g. a function call are not flattened
>* {@code
>*  public class RowFunction extends ScalarFunction {
>*  @DataTypeHint("ROW")
>*  Row eval();
>*  }
>*
>*  tEnv.fromValues(
>*  call(new RowFunction()),
>*  call(new RowFunction())
>*  )
>* }
>* will produce a Table with a schema as follows:
>* {@code
>*  root
>*  |-- f0: ROW<`f0` BIGINT, `f1` VARCHAR(5)>
>* }
>*
>* The row constructor can be dropped to create a table with a 
> single row:
>*
>* ROWs that are a result of e.g. a function call are not flattened
>* {@code
>*  tEnv.fromValues(
>*  1,
>*  2L,
>*  3
>*  )
>* }
>* will produce a Table with a schema as follows:
>* {@code
>*  root
>*  |-- f0: BIGINT NOT NULL
>* }
>*
>* @param expressions Expressions for constructing rows of the VALUES 
> table.
>*/
>   Table fromValues(Expression... expressions);
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] dawidwys closed pull request #11290: [FLINK-16379][table] Introduce fromValues in TableEnvironment

2020-04-18 Thread GitBox
dawidwys closed pull request #11290: [FLINK-16379][table] Introduce fromValues 
in TableEnvironment
URL: https://github.com/apache/flink/pull/11290
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11700: [FLINK-16366] [table] Introduce executeSql method in TableEnvironment and support many statements

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11700: [FLINK-16366] [table] Introduce 
executeSql method in TableEnvironment and support many statements
URL: https://github.com/apache/flink/pull/11700#issuecomment-612000686
 
 
   
   ## CI report:
   
   * f52f4a10b99fa573ffeb63f16491d735cc157a4b UNKNOWN
   * 32354a866dcb787e5118ce59b908e4947c6ecc77 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160362240) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7510)
 
   * 0ac7518a59d0205c3fe65d8bf636f6a8229663bb Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160843171) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7711)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11707: [FLINK-15648][k8s] Introduce CPU limit support for JM/TM Container

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11707: [FLINK-15648][k8s] Introduce CPU 
limit support for JM/TM Container
URL: https://github.com/apache/flink/pull/11707#issuecomment-612573622
 
 
   
   ## CI report:
   
   * 4f0e56971c9ef4876abab1352f7e0468ef712a92 UNKNOWN
   * 40701c83ad09fa7e5d7c678015ed9ca7aec41a84 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160841002) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7709)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-17196) Improve the implementation of Fabric8FlinkKubeClient#getRestEndpoint

2020-04-18 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086485#comment-17086485
 ] 

Yang Wang commented on FLINK-17196:
---

Just one quick reminder, if we completely separate the service exposed type and 
do not have any fallback, we should make sure that it could be easily used both 
in managed and unmanned K8s cluster.

Since the K8s may need some time to create the load balancer, the Flink client 
may take longer time for the submission. When the creation of load balancer 
timeouts, we need to show up some logs for users to set the service exposed 
type to {{NodePort}} or check why the LB could not be allocated.

> Improve the implementation of Fabric8FlinkKubeClient#getRestEndpoint
> 
>
> Key: FLINK-17196
> URL: https://issues.apache.org/jira/browse/FLINK-17196
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes
>Affects Versions: 1.10.0, 1.10.1
>Reporter: Canbin Zheng
>Priority: Major
> Fix For: 1.11.0
>
>
> Currently there are some bugs in the 
> {{Fabric8FlinkKubeClient#getRestEndpoint}} and serveral implicit 
> fallback/toleration behaviors when retrieving the Endpoint.
> This ticket proposes to fix the bugs and improve the implementation by 
> deprecating some implicit fallback/toleration behaviors.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11786: [FLINK-17208][network] Reduce redundant data available notifications of PipelinedSubpartition

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11786: [FLINK-17208][network] Reduce 
redundant data available notifications of PipelinedSubpartition
URL: https://github.com/apache/flink/pull/11786#issuecomment-615087816
 
 
   
   ## CI report:
   
   * d34a27ea803c0203027ec4cc5fb751bb30d51bad Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160839179) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7706)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-16819) Got KryoException while using UDAF in flink1.9

2020-04-18 Thread Lsw_aka_laplace (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086483#comment-17086483
 ] 

Lsw_aka_laplace edited comment on FLINK-16819 at 4/18/20, 3:05 PM:
---

[~jark]  These jobs which I mentioned aboved has been running correctly for a 
few day after upgrading to 1.9.2. Your advice did help us to fix this problem

Salute!


was (Author: neighborhood):
[~jark]  These jobs which I mentioned aboved has been running correctly for a 
few day after upgrading to 1.9.2. Your advance did help us to fix this problem

Salute!

> Got KryoException while using UDAF in flink1.9
> --
>
> Key: FLINK-16819
> URL: https://issues.apache.org/jira/browse/FLINK-16819
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System, Table SQL / Planner
>Affects Versions: 1.9.1
> Environment: Flink1.9.1
> Apache hadoop 2.7.2
>Reporter: Xingxing Di
>Priority: Major
> Fix For: 1.9.2
>
>
> Recently,  we are trying to upgrade online *sql jobs* from flink1.7 to 
> flink1.9 , most jobs works fine, but some jobs got  KryoExceptions. 
> We found that UDAF will trigger this exception, btw ,we are using blink 
> planner.
> *Here is the full stack traces:*
>  2020-03-27 11:46:55
>  com.esotericsoftware.kryo.KryoException: 
> java.lang.IndexOutOfBoundsException: Index: 104, Size: 2
>  Serialization trace:
>  seed (java.util.Random)
>  gen (com.tdunning.math.stats.AVLTreeDigest)
>  at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>  at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>  at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>  at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>  at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
>  at 
> org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:536)
>  at 
> org.apache.flink.table.dataformat.BinaryGeneric.getJavaObjectFromBinaryGeneric(BinaryGeneric.java:86)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:628)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:633)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:320)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1293)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1257)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:302)
>  at GroupAggsHandler$71.setAccumulators(Unknown Source)
>  at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:151)
>  at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
>  at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.lang.IndexOutOfBoundsException: Index: 104, Size: 2
>  at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>  at java.util.ArrayList.get(ArrayList.java:433)
>  at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)
>  at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>  ... 26 

[GitHub] [flink] flinkbot edited a comment on issue #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11797: [FLINK-17169][table-blink] Refactor 
BaseRow to use RowKind instead of byte header
URL: https://github.com/apache/flink/pull/11797#issuecomment-615294694
 
 
   
   ## CI report:
   
   * 85f40e3041783b1dbda1eb3b812f23e77936f7b3 UNKNOWN
   * 213179008a58c7aec2817c009cc11a5d744529e7 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160839191) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7707)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11700: [FLINK-16366] [table] Introduce executeSql method in TableEnvironment and support many statements

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11700: [FLINK-16366] [table] Introduce 
executeSql method in TableEnvironment and support many statements
URL: https://github.com/apache/flink/pull/11700#issuecomment-612000686
 
 
   
   ## CI report:
   
   * f52f4a10b99fa573ffeb63f16491d735cc157a4b UNKNOWN
   * 32354a866dcb787e5118ce59b908e4947c6ecc77 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160362240) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7510)
 
   * 0ac7518a59d0205c3fe65d8bf636f6a8229663bb UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-16819) Got KryoException while using UDAF in flink1.9

2020-04-18 Thread Lsw_aka_laplace (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086483#comment-17086483
 ] 

Lsw_aka_laplace commented on FLINK-16819:
-

[~jark]  These jobs which I mentioned aboved has been running correctly for a 
few day after upgrading to 1.9.2. Your advance did help us to fix this problem

Salute!

> Got KryoException while using UDAF in flink1.9
> --
>
> Key: FLINK-16819
> URL: https://issues.apache.org/jira/browse/FLINK-16819
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System, Table SQL / Planner
>Affects Versions: 1.9.1
> Environment: Flink1.9.1
> Apache hadoop 2.7.2
>Reporter: Xingxing Di
>Priority: Major
> Fix For: 1.9.2
>
>
> Recently,  we are trying to upgrade online *sql jobs* from flink1.7 to 
> flink1.9 , most jobs works fine, but some jobs got  KryoExceptions. 
> We found that UDAF will trigger this exception, btw ,we are using blink 
> planner.
> *Here is the full stack traces:*
>  2020-03-27 11:46:55
>  com.esotericsoftware.kryo.KryoException: 
> java.lang.IndexOutOfBoundsException: Index: 104, Size: 2
>  Serialization trace:
>  seed (java.util.Random)
>  gen (com.tdunning.math.stats.AVLTreeDigest)
>  at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>  at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>  at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>  at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>  at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
>  at 
> org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:536)
>  at 
> org.apache.flink.table.dataformat.BinaryGeneric.getJavaObjectFromBinaryGeneric(BinaryGeneric.java:86)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:628)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:633)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:320)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1293)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1257)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:302)
>  at GroupAggsHandler$71.setAccumulators(Unknown Source)
>  at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:151)
>  at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
>  at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.lang.IndexOutOfBoundsException: Index: 104, Size: 2
>  at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>  at java.util.ArrayList.get(ArrayList.java:433)
>  at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)
>  at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>  ... 26 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wangyang0918 commented on issue #11705: [FLINK-16600][k8s] Fix not respecting the RestOptions.BIND_PORT for the Kubernetes setup

2020-04-18 Thread GitBox
wangyang0918 commented on issue #11705: [FLINK-16600][k8s] Fix not respecting 
the RestOptions.BIND_PORT for the Kubernetes setup
URL: https://github.com/apache/flink/pull/11705#issuecomment-615885320
 
 
   @TisonKun @zhengcanbin I have checked the test and changes again. If we set 
`rest.bind-port` explicitly, the test 
`testBuildAccompanyingKubernetesResources` should fail. So updating the test 
seems reasonable.
   
   Now i give the pass, +1 for merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-17229) Add a flink-sql-client-sdk module

2020-04-18 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086477#comment-17086477
 ] 

godfrey he edited comment on FLINK-17229 at 4/18/20, 3:00 PM:
--

Hi [~molsion] there is a project named {{flink-jdbc-driver}} [1] under 
ververica. flink-sql-gateway is developing under first version. you can try 
them out, and any feedback and contribution are welcome.

[1] https://github.com/ververica/flink-jdbc-driver


was (Author: godfreyhe):
Hi [~molsion] there is a project named {{flink-jdbc-driver}} [1] under 
ververica. you can try it out, and any feedback and contribution are welcome.

[1] https://github.com/ververica/flink-jdbc-driver

> Add a flink-sql-client-sdk module 
> --
>
> Key: FLINK-17229
> URL: https://issues.apache.org/jira/browse/FLINK-17229
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API, Table SQL / Client
>Reporter: molsion
>Priority: Major
>
> can we add a flink-sql-client-sdk module that can import like jdbc(sdk)? If 
> we do it , our user can use flink-sql-client-sdk to sumit sql job . it will 
> be more friendly than flink-sql-gateway。
> I have created and implemented a flink-sql-client-sdk project and i want to 
> contribute to community。
>  
> [flink-sql-gateway : 
> https://github.com/ververica/flink-sql-gateway|https://github.com/ververica/flink-sql-gateway]
> [flink-sql-client-sdk : 
> https://github.com/molsionmo/flink-sql-client|https://github.com/molsionmo/flink-sql-client]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] godfreyhe commented on a change in pull request #11727: [FLINK-17106][table] Support create and drop view in Flink SQL

2020-04-18 Thread GitBox
godfreyhe commented on a change in pull request #11727: [FLINK-17106][table] 
Support create and drop view in Flink SQL
URL: https://github.com/apache/flink/pull/11727#discussion_r410708396
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
 ##
 @@ -456,6 +468,59 @@ private Operation convertAlterDatabase(SqlAlterDatabase 
sqlAlterDatabase) {
return new AlterDatabaseOperation(catalogName, databaseName, 
catalogDatabase);
}
 
+   /** Convert CREATE VIEW statement. */
+   private Operation convertCreateView(SqlCreateView sqlCreateView) {
+   final SqlNode query = sqlCreateView.getQuery();
+   final SqlNodeList fieldList = sqlCreateView.getFieldList();
+
+   SqlNode validateQuery = flinkPlanner.validate(query);
+   PlannerQueryOperation operation = 
toQueryOperation(flinkPlanner, validateQuery);
+   TableSchema schema = operation.getTableSchema();
+
+   if (!fieldList.getList().isEmpty()) {
 
 Review comment:
   I think it's better add some comments here or an example, I doubt this 
because most people are  familiar with "create view ..." without alias. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #11727: [FLINK-17106][table] Support create and drop view in Flink SQL

2020-04-18 Thread GitBox
godfreyhe commented on a change in pull request #11727: [FLINK-17106][table] 
Support create and drop view in Flink SQL
URL: https://github.com/apache/flink/pull/11727#discussion_r410708396
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
 ##
 @@ -456,6 +468,59 @@ private Operation convertAlterDatabase(SqlAlterDatabase 
sqlAlterDatabase) {
return new AlterDatabaseOperation(catalogName, databaseName, 
catalogDatabase);
}
 
+   /** Convert CREATE VIEW statement. */
+   private Operation convertCreateView(SqlCreateView sqlCreateView) {
+   final SqlNode query = sqlCreateView.getQuery();
+   final SqlNodeList fieldList = sqlCreateView.getFieldList();
+
+   SqlNode validateQuery = flinkPlanner.validate(query);
+   PlannerQueryOperation operation = 
toQueryOperation(flinkPlanner, validateQuery);
+   TableSchema schema = operation.getTableSchema();
+
+   if (!fieldList.getList().isEmpty()) {
 
 Review comment:
   I think it's better add some comments here or add an example, I doubt this 
because most people are  familiar with "create view ..." without alias. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11786: [FLINK-17208][network] Reduce redundant data available notifications of PipelinedSubpartition

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11786: [FLINK-17208][network] Reduce 
redundant data available notifications of PipelinedSubpartition
URL: https://github.com/apache/flink/pull/11786#issuecomment-615087816
 
 
   
   ## CI report:
   
   * d34a27ea803c0203027ec4cc5fb751bb30d51bad Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160839179) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7706)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11803: [hotfix][config][docs] Deprecate taskmanager.numberOfTaskSlots in mesos_task_manager_configuration.html

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11803: [hotfix][config][docs] Deprecate 
taskmanager.numberOfTaskSlots in mesos_task_manager_configuration.html
URL: https://github.com/apache/flink/pull/11803#issuecomment-615861324
 
 
   
   ## CI report:
   
   * a8101f9f9ca854333bacf980e33d14115088eac7 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160836074) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7704)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11707: [FLINK-15648][k8s] Introduce CPU limit support for JM/TM Container

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11707: [FLINK-15648][k8s] Introduce CPU 
limit support for JM/TM Container
URL: https://github.com/apache/flink/pull/11707#issuecomment-612573622
 
 
   
   ## CI report:
   
   * 4f0e56971c9ef4876abab1352f7e0468ef712a92 UNKNOWN
   * ae7d6fac92d028c50b0b027da50aecac66818e19 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160006145) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7378)
 
   * 40701c83ad09fa7e5d7c678015ed9ca7aec41a84 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160841002) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7709)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11397: [FLINK-16217] [sql-client] catch all exceptions to avoid SQL client crashed

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11397: [FLINK-16217] [sql-client] catch all 
exceptions to avoid SQL client crashed
URL: https://github.com/apache/flink/pull/11397#issuecomment-598575354
 
 
   
   ## CI report:
   
   * 14bde242e3aca096cbfc4e36782d055f8c01c119 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160840112) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7708)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-17229) Add a flink-sql-client-sdk module

2020-04-18 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086477#comment-17086477
 ] 

godfrey he commented on FLINK-17229:


Hi [~molsion] there is a project named {{flink-jdbc-driver}} [1] under 
ververica. you can try it out, and any feedback and contribution are welcome.

[1] https://github.com/ververica/flink-jdbc-driver

> Add a flink-sql-client-sdk module 
> --
>
> Key: FLINK-17229
> URL: https://issues.apache.org/jira/browse/FLINK-17229
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API, Table SQL / Client
>Reporter: molsion
>Priority: Major
>
> can we add a flink-sql-client-sdk module that can import like jdbc(sdk)? If 
> we do it , our user can use flink-sql-client-sdk to sumit sql job . it will 
> be more friendly than flink-sql-gateway。
> I have created and implemented a flink-sql-client-sdk project and i want to 
> contribute to community。
>  
> [flink-sql-gateway : 
> https://github.com/ververica/flink-sql-gateway|https://github.com/ververica/flink-sql-gateway]
> [flink-sql-client-sdk : 
> https://github.com/molsionmo/flink-sql-client|https://github.com/molsionmo/flink-sql-client]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-15400) elasticsearch sink support dynamic index.

2020-04-18 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu resolved FLINK-15400.
-
Resolution: Fixed

Resolved in master (1.11.0): c1a21c6d3853fb6feb9c91a59952bc0095d5b29f

> elasticsearch sink support dynamic index.
> -
>
> Key: FLINK-15400
> URL: https://issues.apache.org/jira/browse/FLINK-15400
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / ElasticSearch, Table SQL / Ecosystem
>Affects Versions: 1.11.0
>Reporter: ouyangwulin
>Assignee: Leonard Xu
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> From 
> user...@flink.apache.org([https://lists.apache.org/thread.html/ac4e0c068baeb3b070f0213a2e1314e6552b226b8132a4c49d667ecd%40%3Cuser-zh.flink.apache.org%3E]),
>   Becuase the es 6/7 not support ttl. so User need clean the index by 
> timestamp. Add dynamic index is a useful function.  Add with properties 
> 'dynamicIndex' as a switch for open dynamicIndex. Add with  properties 
> 'indexField'  for the extract time field, Add properties 'indexInterval' for 
> change cycle mode.
>  
> ||With property||discribe||default||Required||
> |dynamicIndex|Dynamic or not|false(true/false)|false|
> |indexField|extract index field| none|dynamicIndex is true , then indexField 
> is required,only supported type "timestamp","date","long" |
> |indexInterval|mode for  cycle|d|ddynamicIndex is true , this field is 
> required ,the value optional is:
>  d:day
>  m:mouth
>  w:week|
>  
> After discussion, the final design looks as following :
> {code:java}
> CREATE TABLE es_sink_table (
>   log_source varchar ,
>   log_content varchar ,
>   log_level bigint ,
>   log_ts timestamp,
> ) WITH (
> 'connector.type' = 'elasticsearch',
> 'connector.version' = '6',
> 'connector.index'='my-log-{log_ts|-MM-dd}', 
> # elasticsearch index name, Flink support create index based on field at 
> # runtime dynamically, the index value comes from the dynamicIndex 
> # pattern when the field type is varchar, eg:'my-log-{log_source}',the # 
> dynamicIndex pattern support format and parse date by Java 
> # SimpleDataFormat when the field type is timestamp/date, 
> # eg:'my-log-{log_ts|-MM-dd}'.
> 'connector.index-alias'='my-log',       
> # index alias name, the alias name mapping to all indies that 
> # creatd from 'connector.index'. 
> …
> )
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wuchong merged pull request #11466: [FLINK-15400][connectors / elasticsearch] elasticsearch table sink support dynamic index.

2020-04-18 Thread GitBox
wuchong merged pull request #11466:  [FLINK-15400][connectors / elasticsearch] 
elasticsearch table sink support dynamic index.
URL: https://github.com/apache/flink/pull/11466
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on issue #11466: [FLINK-15400][connectors / elasticsearch] elasticsearch table sink support dynamic index.

2020-04-18 Thread GitBox
wuchong commented on issue #11466:  [FLINK-15400][connectors / elasticsearch] 
elasticsearch table sink support dynamic index.
URL: https://github.com/apache/flink/pull/11466#issuecomment-615882465
 
 
   Merging... 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on issue #11700: [FLINK-16366] [table] Introduce executeSql method in TableEnvironment and support many statements

2020-04-18 Thread GitBox
godfreyhe commented on issue #11700: [FLINK-16366] [table] Introduce executeSql 
method in TableEnvironment and support many statements
URL: https://github.com/apache/flink/pull/11700#issuecomment-615882267
 
 
   I will merge the last three commits into the first one once it's approved


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-16819) Got KryoException while using UDAF in flink1.9

2020-04-18 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086473#comment-17086473
 ] 

Jark Wu commented on FLINK-16819:
-

Glad to hear that [~dixingx...@yeah.net]!

> Got KryoException while using UDAF in flink1.9
> --
>
> Key: FLINK-16819
> URL: https://issues.apache.org/jira/browse/FLINK-16819
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System, Table SQL / Planner
>Affects Versions: 1.9.1
> Environment: Flink1.9.1
> Apache hadoop 2.7.2
>Reporter: Xingxing Di
>Priority: Major
> Fix For: 1.9.2
>
>
> Recently,  we are trying to upgrade online *sql jobs* from flink1.7 to 
> flink1.9 , most jobs works fine, but some jobs got  KryoExceptions. 
> We found that UDAF will trigger this exception, btw ,we are using blink 
> planner.
> *Here is the full stack traces:*
>  2020-03-27 11:46:55
>  com.esotericsoftware.kryo.KryoException: 
> java.lang.IndexOutOfBoundsException: Index: 104, Size: 2
>  Serialization trace:
>  seed (java.util.Random)
>  gen (com.tdunning.math.stats.AVLTreeDigest)
>  at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>  at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>  at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>  at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>  at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
>  at 
> org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:536)
>  at 
> org.apache.flink.table.dataformat.BinaryGeneric.getJavaObjectFromBinaryGeneric(BinaryGeneric.java:86)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:628)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:633)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:320)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1293)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1257)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:302)
>  at GroupAggsHandler$71.setAccumulators(Unknown Source)
>  at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:151)
>  at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
>  at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.lang.IndexOutOfBoundsException: Index: 104, Size: 2
>  at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>  at java.util.ArrayList.get(ArrayList.java:433)
>  at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)
>  at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>  ... 26 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11707: [FLINK-15648][k8s] Introduce CPU limit support for JM/TM Container

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11707: [FLINK-15648][k8s] Introduce CPU 
limit support for JM/TM Container
URL: https://github.com/apache/flink/pull/11707#issuecomment-612573622
 
 
   
   ## CI report:
   
   * 4f0e56971c9ef4876abab1352f7e0468ef712a92 UNKNOWN
   * ae7d6fac92d028c50b0b027da50aecac66818e19 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160006145) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7378)
 
   * 40701c83ad09fa7e5d7c678015ed9ca7aec41a84 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11397: [FLINK-16217] [sql-client] catch all exceptions to avoid SQL client crashed

2020-04-18 Thread GitBox
flinkbot edited a comment on issue #11397: [FLINK-16217] [sql-client] catch all 
exceptions to avoid SQL client crashed
URL: https://github.com/apache/flink/pull/11397#issuecomment-598575354
 
 
   
   ## CI report:
   
   * cb47aa5215844a964177a827e609f536810bf13d Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/153174868) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6285)
 
   * 14bde242e3aca096cbfc4e36782d055f8c01c119 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160840112) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7708)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-16819) Got KryoException while using UDAF in flink1.9

2020-04-18 Thread Xingxing Di (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingxing Di closed FLINK-16819.
---
Resolution: Fixed

the problem resolved by upgrading to 1.9.2

> Got KryoException while using UDAF in flink1.9
> --
>
> Key: FLINK-16819
> URL: https://issues.apache.org/jira/browse/FLINK-16819
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System, Table SQL / Planner
>Affects Versions: 1.9.1
> Environment: Flink1.9.1
> Apache hadoop 2.7.2
>Reporter: Xingxing Di
>Priority: Major
> Fix For: 1.9.2
>
>
> Recently,  we are trying to upgrade online *sql jobs* from flink1.7 to 
> flink1.9 , most jobs works fine, but some jobs got  KryoExceptions. 
> We found that UDAF will trigger this exception, btw ,we are using blink 
> planner.
> *Here is the full stack traces:*
>  2020-03-27 11:46:55
>  com.esotericsoftware.kryo.KryoException: 
> java.lang.IndexOutOfBoundsException: Index: 104, Size: 2
>  Serialization trace:
>  seed (java.util.Random)
>  gen (com.tdunning.math.stats.AVLTreeDigest)
>  at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>  at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>  at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>  at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>  at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
>  at 
> org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:536)
>  at 
> org.apache.flink.table.dataformat.BinaryGeneric.getJavaObjectFromBinaryGeneric(BinaryGeneric.java:86)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:628)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:633)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:320)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1293)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1257)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:302)
>  at GroupAggsHandler$71.setAccumulators(Unknown Source)
>  at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:151)
>  at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
>  at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.lang.IndexOutOfBoundsException: Index: 104, Size: 2
>  at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>  at java.util.ArrayList.get(ArrayList.java:433)
>  at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)
>  at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>  ... 26 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >