[GitHub] [flink-statefun] tzulitai opened a new pull request #139: [FLINK-19107] Add checkpointing and recovery options to template flink-conf.yaml

2020-09-01 Thread GitBox


tzulitai opened a new pull request #139:
URL: https://github.com/apache/flink-statefun/pull/139


   A corresponding change for the template flink-conf.yaml used in distributed 
Docker images can be found at 
https://github.com/apache/flink-statefun-docker/pull/6.
   
   This also changes the flink-conf.yaml used in e2e tests to verify that the 
template does indeed work out-of-box for users.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19093) "Elasticsearch (v6.3.1) sink end-to-end test" failed with "SubtaskCheckpointCoordinatorImpl was closed without closing asyncCheckpointRunnable 1"

2020-09-01 Thread Robert Metzger (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-19093:
---
Priority: Critical  (was: Major)

> "Elasticsearch (v6.3.1) sink end-to-end test" failed with 
> "SubtaskCheckpointCoordinatorImpl was closed without closing 
> asyncCheckpointRunnable 1"
> -
>
> Key: FLINK-19093
> URL: https://issues.apache.org/jira/browse/FLINK-19093
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Assignee: Roman Khachatryan
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=5986&view=logs&j=91bf6583-3fb2-592f-e4d4-d79d79c3230a&t=3425d8ba-5f03-540a-c64b-51b8481bf7d6
> {code}
> 2020-08-29T22:20:02.3500263Z 2020-08-29 22:20:00,851 INFO  
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Source: 
> Sequence Source -> Flat Map -> Sink: Unnamed (1/1) - asynchronous part of 
> checkpoint 1 could not be completed.
> 2020-08-29T22:20:02.3501112Z java.lang.IllegalStateException: 
> SubtaskCheckpointCoordinatorImpl was closed without closing 
> asyncCheckpointRunnable 1
> 2020-08-29T22:20:02.3502049Z  at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:217) 
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> 2020-08-29T22:20:02.3503280Z  at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.registerAsyncCheckpointRunnable(SubtaskCheckpointCoordinatorImpl.java:371)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> 2020-08-29T22:20:02.3504647Z  at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.lambda$registerConsumer$2(SubtaskCheckpointCoordinatorImpl.java:479)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> 2020-08-29T22:20:02.3505882Z  at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:95)
>  [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> 2020-08-29T22:20:02.3506614Z  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_265]
> 2020-08-29T22:20:02.3507203Z  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_265]
> 2020-08-29T22:20:02.3507685Z  at java.lang.Thread.run(Thread.java:748) 
> [?:1.8.0_265]
> 2020-08-29T22:20:02.3509577Z 2020-08-29 22:20:00,927 INFO  
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
> TaskSlot(index:0, state:ACTIVE, resource profile: 
> ResourceProfile{cpuCores=1., taskHeapMemory=384.000mb 
> (402653174 bytes), taskOffHeapMemory=0 bytes, managedMemory=512.000mb 
> (536870920 bytes), networkMemory=128.000mb (134217730 bytes)}, allocationId: 
> ca890bc4df19c66146370647d07bf510, jobId: 3522a3e4940d4b3cefc6dc1f22123f4b).
> 2020-08-29T22:20:02.3511425Z 2020-08-29 22:20:00,939 INFO  
> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 
> 3522a3e4940d4b3cefc6dc1f22123f4b from job leader monitoring.
> 2020-08-29T22:20:02.3512499Z 2020-08-29 22:20:00,939 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Close 
> JobManager connection for job 3522a3e4940d4b3cefc6dc1f22123f4b.
> 2020-08-29T22:20:02.3513174Z Checking for non-empty .out files...
> 2020-08-29T22:20:02.3513706Z No non-empty .out files.
> 2020-08-29T22:20:02.3513878Z 
> 2020-08-29T22:20:02.3514679Z [FAIL] 'Elasticsearch (v6.3.1) sink end-to-end 
> test' failed after 0 minutes and 37 seconds! Test exited with exit code 0 but 
> the logs contained errors, exceptions or non-empty .out files
> 2020-08-29T22:20:02.3515138Z 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18651) implicitly cast the time attribute to regular TIMESTAMP type in regular join

2020-09-01 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188185#comment-17188185
 ] 

Timo Walther commented on FLINK-18651:
--

A proper solution is described in FLINK-10211. I don't know if we want close 
this issue as a duplicate. A solution is to move the time indicator converter 
between logical and physical optimization. I had a branch where I started this 
effort for the legacy planner (it was working for almost all operators) but was 
interrupted by the Blink merge.

> implicitly cast the time attribute to regular TIMESTAMP type in regular join
> 
>
> Key: FLINK-18651
> URL: https://issues.apache.org/jira/browse/FLINK-18651
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: godfrey he
>Priority: Major
>
> Currently, regular join does not accept rowtime attribute field as input, and 
> requires users manually cast the time attribute as regular timestamp. Because 
> time attribute will be out-of-order after regular join, and then we can't do 
> window aggregate based on the time attribute. 
> We can improve it that the planner can implicitly cast the time attribute to 
> regular TIMESTAMP type, and throws exception there is an operator (after 
> join) depended on time attribute, like window aggregate.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19107) Add basic checkpoint and recovery config keys to template flink-conf.yaml

2020-09-01 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-19107:
---
Labels: pull-request-available  (was: )

> Add basic checkpoint and recovery config keys to template flink-conf.yaml
> -
>
> Key: FLINK-19107
> URL: https://issues.apache.org/jira/browse/FLINK-19107
> Project: Flink
>  Issue Type: Task
>  Components: Stateful Functions
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: pull-request-available
> Fix For: statefun-2.2.0
>
>
> How to enable checkpointing in Stateful Functions seems to be a recurring 
> question.
> Adding the relevant configuration keys to the template flink-conf.yaml in 
> StateFun's Docker images could help with this. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-10211) Time indicators are not correctly materialized for LogicalJoin

2020-09-01 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-10211:
-
Component/s: Table SQL / Planner

> Time indicators are not correctly materialized for LogicalJoin
> --
>
> Key: FLINK-10211
> URL: https://issues.apache.org/jira/browse/FLINK-10211
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.6.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> Currently 
> {{org.apache.flink.table.calcite.RelTimeIndicatorConverter#visit(LogicalJoin)}}
>  correctly handles only windowed joins. Output of non windowed joins 
> shouldn't contain any time indicators.
> Update:
> The root cause of this issue is the early phase in which 
> {{RelTimeIndicatorConverter}} is called. Due to lack of information (since 
> the join condition might not have been pushed into the join node), we can not 
> differentiate between a window and non-window join. Thus, we cannot perform 
> the time indicator materialization more fine grained. A solution would be to 
> perform the materialization later after the logical optimization and before 
> the physical translation, this would also make sense from a semantic 
> perspective because time indicators are more a physical characteristic.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] dawidwys merged pull request #13287: [FLINK-13857] Remove deprecated ExecutionConfig#get/setCodeAnalysisMode

2020-09-01 Thread GitBox


dawidwys merged pull request #13287:
URL: https://github.com/apache/flink/pull/13287


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-13857) Remove remaining UdfAnalyzer configurations

2020-09-01 Thread Dawid Wysakowicz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz closed FLINK-13857.

  Assignee: Dawid Wysakowicz
Resolution: Fixed

Implemented in 08c04af79072518854025d695431b67c510a3dfb

> Remove remaining UdfAnalyzer configurations
> ---
>
> Key: FLINK-13857
> URL: https://issues.apache.org/jira/browse/FLINK-13857
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Configuration
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> The UdfAnalyzer code was dropped in 1.9 release. A few configuration 
> classes/options were marked as deprecated as part of this effort. Having in 
> mind that they take no effect at all and were deprecated in 1.9 release I 
> suggest to drop them in 1.10 release.
> It also does not break binary compatibility as all the classes were marked 
> with PublicEvolving from the very beginning.
> I suggest to drop:
> * CodeAnalysisMode
> * ExecutionConfig#get/setCodeAnalysisMode
> * SkipCodeAnalysis



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhijiangW commented on a change in pull request #13286: [FLINK-19093][task] Fix isActive check in AsyncCheckpointRunnable

2020-09-01 Thread GitBox


zhijiangW commented on a change in pull request #13286:
URL: https://github.com/apache/flink/pull/13286#discussion_r480904057



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##
@@ -366,11 +366,10 @@ private void registerAsyncCheckpointRunnable(long 
checkpointId, AsyncCheckpointR
synchronized (lock) {
if (closed) {
LOG.debug("Cannot register Closeable, this 
subtaskCheckpointCoordinator is already closed. Closing argument.");
-   final boolean running = 
asyncCheckpointRunnable.isRunning();
closeQuietly(asyncCheckpointRunnable);
checkState(
-   !running,
-   "SubtaskCheckpointCoordinatorImpl was 
closed without closing asyncCheckpointRunnable %s",
+   !checkpoints.containsKey(checkpointId),
+   "SubtaskCheckpointCoordinator was 
closed without releasing asyncCheckpointRunnable, %s",

Review comment:
   nit: I guess it seem not very readable for message like `.releasing 
asyncCheckpointRunnable, 12`. Maybe change to `..releasing 
asyncCheckpointRunnable for checkpoint 12`? (Take checkpointId = 12 as example).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19097) Support add_jars() for Python StreamExecutionEnvironment

2020-09-01 Thread Shuiqiang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuiqiang Chen updated FLINK-19097:
---
Description: Add add_jars() interface in Python StreamExecutionEnvironment 
to enable users to specify jar dependencies in their Python DataStream Job.  
(was: Add add_jar() interface in Python StreamExecutionEnvironment to enable 
users to specify jar dependencies in their Python DataStream Job.)

> Support add_jars() for Python StreamExecutionEnvironment
> 
>
> Key: FLINK-19097
> URL: https://issues.apache.org/jira/browse/FLINK-19097
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Shuiqiang Chen
>Priority: Major
> Fix For: 1.12.0
>
>
> Add add_jars() interface in Python StreamExecutionEnvironment to enable users 
> to specify jar dependencies in their Python DataStream Job.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19097) Support add_jars() for Python StreamExecutionEnvironment

2020-09-01 Thread Shuiqiang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuiqiang Chen updated FLINK-19097:
---
Summary: Support add_jars() for Python StreamExecutionEnvironment  (was: 
Support add_jar() for Python StreamExecutionEnvironment)

> Support add_jars() for Python StreamExecutionEnvironment
> 
>
> Key: FLINK-19097
> URL: https://issues.apache.org/jira/browse/FLINK-19097
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Shuiqiang Chen
>Priority: Major
> Fix For: 1.12.0
>
>
> Add add_jar() interface in Python StreamExecutionEnvironment to enable users 
> to specify jar dependencies in their Python DataStream Job.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13288: [FLINK-19084] Remove deprecated methods from ExecutionConfig

2020-09-01 Thread GitBox


flinkbot edited a comment on pull request #13288:
URL: https://github.com/apache/flink/pull/13288#issuecomment-683810782


   
   ## CI report:
   
   * 40a28918bfb1a79f700fd35a916999d83d36acb8 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6020)
 
   * 58877ec6fc95356dd829aab87a6d96e920f29bc8 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] igalshilman commented on a change in pull request #137: [FLINK-19102] [core, sdk] Make StateBinder a per-FunctionType entity

2020-09-01 Thread GitBox


igalshilman commented on a change in pull request #137:
URL: https://github.com/apache/flink-statefun/pull/137#discussion_r480276203



##
File path: 
statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedAppendingBuffer.java
##
@@ -160,6 +160,13 @@ public void clear() {
 accessor.clear();
   }
 
+  @Override

Review comment:
   I think that it would be somewhat not intuitive for users not to see the 
actual value, what do you think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19097) Support add_jars() for Python StreamExecutionEnvironment

2020-09-01 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-19097:
---
Labels: pull-request-available  (was: )

> Support add_jars() for Python StreamExecutionEnvironment
> 
>
> Key: FLINK-19097
> URL: https://issues.apache.org/jira/browse/FLINK-19097
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Shuiqiang Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> Add add_jars() interface in Python StreamExecutionEnvironment to enable users 
> to specify jar dependencies in their Python DataStream Job.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] shuiqiangchen opened a new pull request #13292: [FLINK-19097][python] Support add_jars() for Python StreamExecutionEnvironment.

2020-09-01 Thread GitBox


shuiqiangchen opened a new pull request #13292:
URL: https://github.com/apache/flink/pull/13292


   
   
   ## What is the purpose of the change
   
   Add add_jars() interface in Python StreamExecutionEnvironment to enable 
users to specify jar dependencies in their Python DataStream Job.
   
   ## Brief change log
   
   - Added StreamExecutionEnvironment.add_jars() interface.
   
   ## Verifying this change
   
   This pull request has been tested by test_add_jars() in 
test_stream_execution_environment.py.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? ( not documented)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13292: [FLINK-19097][python] Support add_jars() for Python StreamExecutionEnvironment.

2020-09-01 Thread GitBox


flinkbot commented on pull request #13292:
URL: https://github.com/apache/flink/pull/13292#issuecomment-684508363


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 81162c9ed4c9b178e093393cf88e2d16709553de (Tue Sep 01 
07:18:20 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-19097).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-19108) Stop expanding the identifiers with scope aliased by the system with 'EXPR$' prefix

2020-09-01 Thread Danny Chen (Jira)
Danny Chen created FLINK-19108:
--

 Summary: Stop expanding the identifiers with scope aliased by the 
system with 'EXPR$' prefix
 Key: FLINK-19108
 URL: https://issues.apache.org/jira/browse/FLINK-19108
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.12.0, 1.11.2
Reporter: Danny Chen
 Fix For: 1.12.0, 1.11.2


For query

{code:sql}
create view tmp_view as
select * from (
  select f0,
  row_number() over (partition by f0 order by f0 desc) as rowNum
  from source) -- the query would be aliased as "EXPR$1"
  where rowNum = 1
{code}

when validation, the inner would have query alias by the system with prefix 
"EXPR$1", when in the `Expander`, we replace the id in the inner query all with 
this prefix which is wrong because we do not add the alias to the inner query 
anymore.

To solve the problem, skip the expanding of id with "EXPR$" just like how 
{{SqlUtil#deriveAliasFromOrdinal}} added it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19108) Stop expanding the identifiers with scope aliased by the system with 'EXPR$' prefix

2020-09-01 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-19108:
---
Description: 
For query

{code:sql}
create view tmp_view as
select * from (
  select f0,
  row_number() over (partition by f0 order by f0 desc) as rowNum
  from source) -- the query would be aliased as "EXPR$1"
  where rowNum = 1
{code}

When validation, the inner query would have alias assigned by the system with 
prefix "EXPR$1", when in the `Expander`, we replace the id in the inner query 
all with this prefix which is wrong because we do not add the alias to the 
inner query anymore.

To solve the problem, skip the expanding of id with "EXPR$" just like how 
{{SqlUtil#deriveAliasFromOrdinal}} added it.

  was:
For query

{code:sql}
create view tmp_view as
select * from (
  select f0,
  row_number() over (partition by f0 order by f0 desc) as rowNum
  from source) -- the query would be aliased as "EXPR$1"
  where rowNum = 1
{code}

when validation, the inner would have query alias by the system with prefix 
"EXPR$1", when in the `Expander`, we replace the id in the inner query all with 
this prefix which is wrong because we do not add the alias to the inner query 
anymore.

To solve the problem, skip the expanding of id with "EXPR$" just like how 
{{SqlUtil#deriveAliasFromOrdinal}} added it.


> Stop expanding the identifiers with scope aliased by the system with 'EXPR$' 
> prefix
> ---
>
> Key: FLINK-19108
> URL: https://issues.apache.org/jira/browse/FLINK-19108
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0, 1.11.2
>Reporter: Danny Chen
>Priority: Major
> Fix For: 1.12.0, 1.11.2
>
>
> For query
> {code:sql}
> create view tmp_view as
> select * from (
>   select f0,
>   row_number() over (partition by f0 order by f0 desc) as rowNum
>   from source) -- the query would be aliased as "EXPR$1"
>   where rowNum = 1
> {code}
> When validation, the inner query would have alias assigned by the system with 
> prefix "EXPR$1", when in the `Expander`, we replace the id in the inner query 
> all with this prefix which is wrong because we do not add the alias to the 
> inner query anymore.
> To solve the problem, skip the expanding of id with "EXPR$" just like how 
> {{SqlUtil#deriveAliasFromOrdinal}} added it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhijiangW commented on a change in pull request #13286: [FLINK-19093][task] Fix isActive check in AsyncCheckpointRunnable

2020-09-01 Thread GitBox


zhijiangW commented on a change in pull request #13286:
URL: https://github.com/apache/flink/pull/13286#discussion_r480911091



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##
@@ -366,11 +366,10 @@ private void registerAsyncCheckpointRunnable(long 
checkpointId, AsyncCheckpointR
synchronized (lock) {
if (closed) {
LOG.debug("Cannot register Closeable, this 
subtaskCheckpointCoordinator is already closed. Closing argument.");
-   final boolean running = 
asyncCheckpointRunnable.isRunning();
closeQuietly(asyncCheckpointRunnable);
checkState(
-   !running,

Review comment:
   In the past we might encounter unnecessary exception while 
`#registerAsyncCheckpointRunnable` and `AsyncCheckpointRunnable#close` execute 
concurrently. Now we only close the runnable quietly without throwing any 
exception for `closed` case.
   
   I agree with this fix, but do we have any existing ITCase for 
covering/verifying this change?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19108) Stop expanding the identifiers with scope aliased by the system with 'EXPR$' prefix

2020-09-01 Thread Danny Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-19108:
---
Description: 
For query

{code:sql}
create view tmp_view as
select * from (
  select f0,
  row_number() over (partition by f0 order by f0 desc) as rowNum
  from source) -- the query would be aliased as "EXPR$1"
  where rowNum = 1
{code}

When validation, the inner query would have alias assigned by the system with 
prefix "EXPR$1", when in the `Expander`, we replace the id in the inner query 
all with this prefix which is wrong because we do not add the alias to the 
inner query anymore.

To solve the problem, skip the expanding of id with "EXPR$" just like how 
{{SqlUtil#deriveAliasFromOrdinal}} added it.

This was introduced by FLINK-18750.

  was:
For query

{code:sql}
create view tmp_view as
select * from (
  select f0,
  row_number() over (partition by f0 order by f0 desc) as rowNum
  from source) -- the query would be aliased as "EXPR$1"
  where rowNum = 1
{code}

When validation, the inner query would have alias assigned by the system with 
prefix "EXPR$1", when in the `Expander`, we replace the id in the inner query 
all with this prefix which is wrong because we do not add the alias to the 
inner query anymore.

To solve the problem, skip the expanding of id with "EXPR$" just like how 
{{SqlUtil#deriveAliasFromOrdinal}} added it.


> Stop expanding the identifiers with scope aliased by the system with 'EXPR$' 
> prefix
> ---
>
> Key: FLINK-19108
> URL: https://issues.apache.org/jira/browse/FLINK-19108
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0, 1.11.2
>Reporter: Danny Chen
>Priority: Major
> Fix For: 1.12.0, 1.11.2
>
>
> For query
> {code:sql}
> create view tmp_view as
> select * from (
>   select f0,
>   row_number() over (partition by f0 order by f0 desc) as rowNum
>   from source) -- the query would be aliased as "EXPR$1"
>   where rowNum = 1
> {code}
> When validation, the inner query would have alias assigned by the system with 
> prefix "EXPR$1", when in the `Expander`, we replace the id in the inner query 
> all with this prefix which is wrong because we do not add the alias to the 
> inner query anymore.
> To solve the problem, skip the expanding of id with "EXPR$" just like how 
> {{SqlUtil#deriveAliasFromOrdinal}} added it.
> This was introduced by FLINK-18750.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] danny0405 commented on a change in pull request #13291: [FLINK-18988][table] Continuous query with LATERAL and LIMIT produces…

2020-09-01 Thread GitBox


danny0405 commented on a change in pull request #13291:
URL: https://github.com/apache/flink/pull/13291#discussion_r480913964



##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.scala
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.planner.calcite.FlinkRelBuilder
+import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, 
RankType}
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelCollations
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.{Aggregate, Correlate, Filter, JoinRelType, 
Project, Sort}
+import org.apache.calcite.rex.{RexCall, RexCorrelVariable, RexFieldAccess, 
RexInputRef, RexLiteral, RexNode}
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.util.ImmutableBitSet
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+ * Planner rule that rewrites sort correlation to a Rank.
+ * Typically, the following plan
+ *
+ * {{{
+ *   LogicalProject(state=[$0], name=[$1])
+ *   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
+ *  :- LogicalAggregate(group=[{0}])
+ *  :  +- LogicalProject(state=[$1])
+ *  : +- LogicalTableScan(table=[[default_catalog, default_database, 
cities]])
+ *  +- LogicalSort(sort0=[$1], dir0=[DESC-nulls-last], fetch=[3])
+ * +- LogicalProject(name=[$0], pop=[$2])
+ *+- LogicalFilter(condition=[=($1, $cor0.state)])
+ *   +- LogicalTableScan(table=[[default_catalog, 
default_database, cities]])
+ * }}}
+ *
+ * would be transformed to
+ *
+ * {{{
+ *   LogicalProject(state=[$0], name=[$1])
+ *+- LogicalProject(state=[$1], name=[$0], pop=[$2])
+ *   +- LogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, 
rankEnd=3], partitionBy=[$1], orderBy=[$2 DESC], select=[name=$0, state=$1, 
pop=$2])
+ *  +- LogicalTableScan(table=[[default_catalog, default_database, 
cities]])
+ * }}}
+ *
+ * To match the Correlate, the LHS needs to be a global Aggregate on a 
scan, the RHS should
+ * be a Sort with an equal Filter predicate whose keys are same with the LHS 
grouping keys.
+ *
+ * This rule can only be used in HepPlanner.
+ */
+class CorrelateSortToRankRule extends RelOptRule(

Review comment:
   Thanks for the reminder, i saw most of the rules are implemented as 
scala code when contribution, do you mean we prefer java rules in the future ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] morsapaes commented on pull request #13203: [FLINK-18984][python][docs] Add tutorial documentation for Python DataStream API

2020-09-01 Thread GitBox


morsapaes commented on pull request #13203:
URL: https://github.com/apache/flink/pull/13203#issuecomment-684512433


   LGTM, @hequn8128, thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] danny0405 opened a new pull request #13293: [FLINK-19108][table] Stop expanding the identifiers with scope aliase…

2020-09-01 Thread GitBox


danny0405 opened a new pull request #13293:
URL: https://github.com/apache/flink/pull/13293


   …d by the system with 'EXPR$' prefix
   
   ## What is the purpose of the change
   
   For query
   
   ```sql
   create view tmp_view as
   select * from (
 select f0,
 row_number() over (partition by f0 order by f0 desc) as rowNum
 from source) -- the query would be aliased as "EXPR$1"
 where rowNum = 1
   ```
   When validation, the inner query would have alias assigned by the system 
with prefix "EXPR$1", when in the `Expander`, we replace the id in the inner 
query all with this prefix which is wrong because we do not add the alias to 
the inner query anymore.
   
   To solve the problem, skip the expanding of id with "EXPR$" just like how 
SqlUtil#deriveAliasFromOrdinal added it.
   
   This was introduced by FLINK-18750.
   
   ## Brief change log
   
   - Add fix in `Expander` and add tests
   
   
   ## Verifying this change
   
   Added UT.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not documented
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19108) Stop expanding the identifiers with scope aliased by the system with 'EXPR$' prefix

2020-09-01 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-19108:
---
Labels: pull-request-available  (was: )

> Stop expanding the identifiers with scope aliased by the system with 'EXPR$' 
> prefix
> ---
>
> Key: FLINK-19108
> URL: https://issues.apache.org/jira/browse/FLINK-19108
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0, 1.11.2
>Reporter: Danny Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2
>
>
> For query
> {code:sql}
> create view tmp_view as
> select * from (
>   select f0,
>   row_number() over (partition by f0 order by f0 desc) as rowNum
>   from source) -- the query would be aliased as "EXPR$1"
>   where rowNum = 1
> {code}
> When validation, the inner query would have alias assigned by the system with 
> prefix "EXPR$1", when in the `Expander`, we replace the id in the inner query 
> all with this prefix which is wrong because we do not add the alias to the 
> inner query anymore.
> To solve the problem, skip the expanding of id with "EXPR$" just like how 
> {{SqlUtil#deriveAliasFromOrdinal}} added it.
> This was introduced by FLINK-18750.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #12056: [FLINK-17502] [flink-connector-rabbitmq] RMQSource refactor

2020-09-01 Thread GitBox


flinkbot edited a comment on pull request #12056:
URL: https://github.com/apache/flink/pull/12056#issuecomment-626167900


   
   ## CI report:
   
   * c42ef1f62396948ece865a3b8628bea229d5ffb9 UNKNOWN
   * fb66f2e5d16daeabafaa62eab9493112997b9f74 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4786)
 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4787)
 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4788)
 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4789)
 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4088)
 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4790)
 
   * b58ab30b48a7405840656ebc26ce55fad7473497 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6040)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-19109) Split Reader eats chained periodic watermarks

2020-09-01 Thread David Anderson (Jira)
David Anderson created FLINK-19109:
--

 Summary: Split Reader eats chained periodic watermarks
 Key: FLINK-19109
 URL: https://issues.apache.org/jira/browse/FLINK-19109
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.11.1, 1.10.2, 1.11.0, 1.10.1, 1.10.0
Reporter: David Anderson


Attempting to generate watermarks chained to the Split Reader / 
ContinuousFileReaderOperator, as in


{{SingleOutputStreamOperator results = env
.readTextFile(...)
.map(...)
.assignTimestampsAndWatermarks(bounded)
.keyBy(...)
.process(...);
}}

leads to the Watermarks failing to be produced. Breaking the chain, via 
{{disableOperatorChaining()}} or a {{rebalance}}, works around the bug. Using 
punctuated watermarks also avoids the issue.

Looking at this in the debugger reveals that timer service is being prematurely 
quiesced.

In many respects this is FLINK-7666 brought back to life.

The problem is not present in 1.9.3.

There's a minimal reproducible example in 
https://github.com/alpinegizmo/flink-question-001/tree/bug.






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] rkhachatryan commented on a change in pull request #13286: [FLINK-19093][task] Fix isActive check in AsyncCheckpointRunnable

2020-09-01 Thread GitBox


rkhachatryan commented on a change in pull request #13286:
URL: https://github.com/apache/flink/pull/13286#discussion_r480921088



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##
@@ -366,11 +366,10 @@ private void registerAsyncCheckpointRunnable(long 
checkpointId, AsyncCheckpointR
synchronized (lock) {
if (closed) {
LOG.debug("Cannot register Closeable, this 
subtaskCheckpointCoordinator is already closed. Closing argument.");
-   final boolean running = 
asyncCheckpointRunnable.isRunning();
closeQuietly(asyncCheckpointRunnable);
checkState(
-   !running,

Review comment:
   I think there are no ITCases covering this, but some existing end-to-end 
tests indirectly cover this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] tzulitai commented on a change in pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

2020-09-01 Thread GitBox


tzulitai commented on a change in pull request #12944:
URL: https://github.com/apache/flink/pull/12944#discussion_r480921657



##
File path: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
##
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
+import org.junit.Test;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import 
software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import 
software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
+import 
software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
+import 
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+
+import java.net.URI;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_REGION;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleArn;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleSessionName;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.webIdentityTokenFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link AwsV2Util}.
+ */
+public class AwsV2UtilTest {
+
+   @Test
+   public void testGetCredentialsProviderEnvironmentVariables() {
+   Properties properties = properties(AWS_CREDENTIALS_PROVIDER, 
"ENV_VAR");
+
+   AwsCredentialsProvider credentialsProvider = 
AwsV2Util.getCredentialsProvider(properties);
+
+   assertTrue(credentialsProvider instanceof 
EnvironmentVariableCredentialsProvider);
+   }
+
+   @Test
+   public void testGetCredentialsProviderSystemProperties() {
+   Properties properties = properties(AWS_CREDENTIALS_PROVIDER, 
"SYS_PROP");
+
+   AwsCredentialsProvider credentialsProvider = 
AwsV2Util.getCredentialsProvider(properties);
+
+   assertTrue(credentialsProvider instanceof 
SystemPropertyCredentialsProvider);
+   }
+
+   @Test
+   public void 
testGetCredentialsProviderWebIdentityTokenFileCredentialsProvider() {
+   Properties properties = properties(AWS_CREDENTIALS_PROVIDER, 
"WEB_IDENTITY_TOKEN");
+
+   AwsCredentialsProvider credentialsProvider = 
AwsV2Util.getCredentialsProvider(properties);
+
+   assertTrue(credentialsProvider instanceof 
WebIdentityTokenFileCredentialsProvider);
+   }
+
+   @Test
+   public void t

[GitHub] [flink] rkhachatryan commented on pull request #13286: [FLINK-19093][task] Fix isActive check in AsyncCheckpointRunnable

2020-09-01 Thread GitBox


rkhachatryan commented on pull request #13286:
URL: https://github.com/apache/flink/pull/13286#issuecomment-684516123


   Thanks for reviewing @zhijiangW, I've updated the PR (the error message).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19109) Split Reader eats chained periodic watermarks

2020-09-01 Thread David Anderson (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Anderson updated FLINK-19109:
---
Description: 
Attempting to generate watermarks chained to the Split Reader / 
ContinuousFileReaderOperator, as in
{code:java}
SingleOutputStreamOperator results = env
 .readTextFile(...)
 .map(...)
 .assignTimestampsAndWatermarks(bounded)
 .keyBy(...)
 .process(...);{code}
leads to the Watermarks failing to be produced. Breaking the chain, via 
{{disableOperatorChaining()}} or a {{rebalance}}, works around the bug. Using 
punctuated watermarks also avoids the issue.

Looking at this in the debugger reveals that timer service is being prematurely 
quiesced.

In many respects this is FLINK-7666 brought back to life.

The problem is not present in 1.9.3.

There's a minimal reproducible example in 
[https://github.com/alpinegizmo/flink-question-001/tree/bug].

  was:
Attempting to generate watermarks chained to the Split Reader / 
ContinuousFileReaderOperator, as in


{{SingleOutputStreamOperator results = env
.readTextFile(...)
.map(...)
.assignTimestampsAndWatermarks(bounded)
.keyBy(...)
.process(...);
}}

leads to the Watermarks failing to be produced. Breaking the chain, via 
{{disableOperatorChaining()}} or a {{rebalance}}, works around the bug. Using 
punctuated watermarks also avoids the issue.

Looking at this in the debugger reveals that timer service is being prematurely 
quiesced.

In many respects this is FLINK-7666 brought back to life.

The problem is not present in 1.9.3.

There's a minimal reproducible example in 
https://github.com/alpinegizmo/flink-question-001/tree/bug.





> Split Reader eats chained periodic watermarks
> -
>
> Key: FLINK-19109
> URL: https://issues.apache.org/jira/browse/FLINK-19109
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.10.0, 1.10.1, 1.11.0, 1.10.2, 1.11.1
>Reporter: David Anderson
>Priority: Critical
>
> Attempting to generate watermarks chained to the Split Reader / 
> ContinuousFileReaderOperator, as in
> {code:java}
> SingleOutputStreamOperator results = env
>  .readTextFile(...)
>  .map(...)
>  .assignTimestampsAndWatermarks(bounded)
>  .keyBy(...)
>  .process(...);{code}
> leads to the Watermarks failing to be produced. Breaking the chain, via 
> {{disableOperatorChaining()}} or a {{rebalance}}, works around the bug. Using 
> punctuated watermarks also avoids the issue.
> Looking at this in the debugger reveals that timer service is being 
> prematurely quiesced.
> In many respects this is FLINK-7666 brought back to life.
> The problem is not present in 1.9.3.
> There's a minimal reproducible example in 
> [https://github.com/alpinegizmo/flink-question-001/tree/bug].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13003: [FLINK-18737][docs]translate jdbc connector

2020-09-01 Thread GitBox


flinkbot edited a comment on pull request #13003:
URL: https://github.com/apache/flink/pull/13003#issuecomment-664844064


   
   ## CI report:
   
   * 565d353b41557312917ef867210bb731dd972fe7 UNKNOWN
   * 7892181dbf2f1c3146cec80956cf88f2dff39957 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6033)
 
   * 148cef0724df6b33f6bae78619f42a3dce360ccd UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19109) Split Reader eats chained periodic watermarks

2020-09-01 Thread David Anderson (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Anderson updated FLINK-19109:
---
Description: 
Attempting to generate watermarks chained to the Split Reader / 
ContinuousFileReaderOperator, as in
{code:java}
SingleOutputStreamOperator results = env
  .readTextFile(...)
  .map(...)
  .assignTimestampsAndWatermarks(bounded)
  .keyBy(...)
  .process(...);{code}
leads to the Watermarks failing to be produced. Breaking the chain, via 
{{disableOperatorChaining()}} or a {{rebalance}}, works around the bug. Using 
punctuated watermarks also avoids the issue.

Looking at this in the debugger reveals that timer service is being prematurely 
quiesced.

In many respects this is FLINK-7666 brought back to life.

The problem is not present in 1.9.3.

There's a minimal reproducible example in 
[https://github.com/alpinegizmo/flink-question-001/tree/bug].

  was:
Attempting to generate watermarks chained to the Split Reader / 
ContinuousFileReaderOperator, as in
{code:java}
SingleOutputStreamOperator results = env
 .readTextFile(...)
 .map(...)
 .assignTimestampsAndWatermarks(bounded)
 .keyBy(...)
 .process(...);{code}
leads to the Watermarks failing to be produced. Breaking the chain, via 
{{disableOperatorChaining()}} or a {{rebalance}}, works around the bug. Using 
punctuated watermarks also avoids the issue.

Looking at this in the debugger reveals that timer service is being prematurely 
quiesced.

In many respects this is FLINK-7666 brought back to life.

The problem is not present in 1.9.3.

There's a minimal reproducible example in 
[https://github.com/alpinegizmo/flink-question-001/tree/bug].


> Split Reader eats chained periodic watermarks
> -
>
> Key: FLINK-19109
> URL: https://issues.apache.org/jira/browse/FLINK-19109
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.10.0, 1.10.1, 1.11.0, 1.10.2, 1.11.1
>Reporter: David Anderson
>Priority: Critical
>
> Attempting to generate watermarks chained to the Split Reader / 
> ContinuousFileReaderOperator, as in
> {code:java}
> SingleOutputStreamOperator results = env
>   .readTextFile(...)
>   .map(...)
>   .assignTimestampsAndWatermarks(bounded)
>   .keyBy(...)
>   .process(...);{code}
> leads to the Watermarks failing to be produced. Breaking the chain, via 
> {{disableOperatorChaining()}} or a {{rebalance}}, works around the bug. Using 
> punctuated watermarks also avoids the issue.
> Looking at this in the debugger reveals that timer service is being 
> prematurely quiesced.
> In many respects this is FLINK-7666 brought back to life.
> The problem is not present in 1.9.3.
> There's a minimal reproducible example in 
> [https://github.com/alpinegizmo/flink-question-001/tree/bug].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13288: [FLINK-19084] Remove deprecated methods from ExecutionConfig

2020-09-01 Thread GitBox


flinkbot edited a comment on pull request #13288:
URL: https://github.com/apache/flink/pull/13288#issuecomment-683810782


   
   ## CI report:
   
   * 40a28918bfb1a79f700fd35a916999d83d36acb8 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6020)
 
   * 58877ec6fc95356dd829aab87a6d96e920f29bc8 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6043)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13292: [FLINK-19097][python] Support add_jars() for Python StreamExecutionEnvironment.

2020-09-01 Thread GitBox


flinkbot commented on pull request #13292:
URL: https://github.com/apache/flink/pull/13292#issuecomment-684516974


   
   ## CI report:
   
   * 81162c9ed4c9b178e093393cf88e2d16709553de UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13293: [FLINK-19108][table] Stop expanding the identifiers with scope aliase…

2020-09-01 Thread GitBox


flinkbot commented on pull request #13293:
URL: https://github.com/apache/flink/pull/13293#issuecomment-684516967


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit b0524b7153e6a3b04a1dd84c4a98f84a06837f75 (Tue Sep 01 
07:34:37 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-19108).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] tzulitai closed pull request #136: [FLINK-19096] [sdk] Rework PersistedStateRegistry registration methods

2020-09-01 Thread GitBox


tzulitai closed pull request #136:
URL: https://github.com/apache/flink-statefun/pull/136


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] tzulitai closed pull request #137: [FLINK-19102] [core, sdk] Make StateBinder a per-FunctionType entity

2020-09-01 Thread GitBox


tzulitai closed pull request #137:
URL: https://github.com/apache/flink-statefun/pull/137


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13003: [FLINK-18737][docs]translate jdbc connector

2020-09-01 Thread GitBox


flinkbot edited a comment on pull request #13003:
URL: https://github.com/apache/flink/pull/13003#issuecomment-664844064


   
   ## CI report:
   
   * 565d353b41557312917ef867210bb731dd972fe7 UNKNOWN
   * 7892181dbf2f1c3146cec80956cf88f2dff39957 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6033)
 
   * 148cef0724df6b33f6bae78619f42a3dce360ccd Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6042)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13209: [FLINK-18832][datastream] Add compatible check for blocking partition with buffer timeout

2020-09-01 Thread GitBox


flinkbot edited a comment on pull request #13209:
URL: https://github.com/apache/flink/pull/13209#issuecomment-677744672


   
   ## CI report:
   
   * a343c2c3bf36c97dca7045c65eccbcccfbbef5bf UNKNOWN
   * 895da41424f7b688b26c469b61e3d024b0e325ed Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6032)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13273: [FLINK-18801][docs][python] Add a "10 minutes to Table API" document under the "Python API" -> "User Guide" -> "Table API" section.

2020-09-01 Thread GitBox


flinkbot edited a comment on pull request #13273:
URL: https://github.com/apache/flink/pull/13273#issuecomment-682353427


   
   ## CI report:
   
   * 7307a02f70f288ff9857bb74bd1a49e60fe5a497 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6038)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13286: [FLINK-19093][task] Fix isActive check in AsyncCheckpointRunnable

2020-09-01 Thread GitBox


flinkbot edited a comment on pull request #13286:
URL: https://github.com/apache/flink/pull/13286#issuecomment-683777071


   
   ## CI report:
   
   * 358ebf0c0aaffb505bc95a97d6133183ec749d7a Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6016)
 
   * 9578eb8c57698bec3490503073833a93a810847a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13293: [FLINK-19108][table] Stop expanding the identifiers with scope aliase…

2020-09-01 Thread GitBox


flinkbot commented on pull request #13293:
URL: https://github.com/apache/flink/pull/13293#issuecomment-684526091


   
   ## CI report:
   
   * b0524b7153e6a3b04a1dd84c4a98f84a06837f75 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13292: [FLINK-19097][python] Support add_jars() for Python StreamExecutionEnvironment.

2020-09-01 Thread GitBox


flinkbot edited a comment on pull request #13292:
URL: https://github.com/apache/flink/pull/13292#issuecomment-684516974


   
   ## CI report:
   
   * 81162c9ed4c9b178e093393cf88e2d16709553de Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6044)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] tzulitai commented on a change in pull request #138: [FLINK-19106] [core] Add more timeout configs for remote functions

2020-09-01 Thread GitBox


tzulitai commented on a change in pull request #138:
URL: https://github.com/apache/flink-statefun/pull/138#discussion_r480937709



##
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
##
@@ -111,14 +138,36 @@ public Builder withMaxRequestDuration(Duration duration) {
   return this;
 }
 
+public Builder withConnectTimeoutDuration(Duration duration) {
+  this.connectTimeout = Objects.requireNonNull(duration);
+  return this;
+}
+
+public Builder withReadTimeoutDuration(Duration duration) {
+  this.readTimeout = Objects.requireNonNull(duration);
+  return this;
+}
+
+public Builder withWriteTimeoutDuration(Duration duration) {
+  this.writeTimeout = Objects.requireNonNull(duration);
+  return this;
+}
+
 public Builder withMaxNumBatchRequests(int maxNumBatchRequests) {
   this.maxNumBatchRequests = maxNumBatchRequests;
   return this;
 }
 
 public HttpFunctionSpec build() {
   return new HttpFunctionSpec(
-  functionType, endpoint, states, maxRequestDuration, 
maxNumBatchRequests);
+  functionType,

Review comment:
   0 duration would mean that no timeout is imposed. Should that really be 
an invalid configuration?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] tzulitai commented on a change in pull request #138: [FLINK-19106] [core] Add more timeout configs for remote functions

2020-09-01 Thread GitBox


tzulitai commented on a change in pull request #138:
URL: https://github.com/apache/flink-statefun/pull/138#discussion_r480937709



##
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
##
@@ -111,14 +138,36 @@ public Builder withMaxRequestDuration(Duration duration) {
   return this;
 }
 
+public Builder withConnectTimeoutDuration(Duration duration) {
+  this.connectTimeout = Objects.requireNonNull(duration);
+  return this;
+}
+
+public Builder withReadTimeoutDuration(Duration duration) {
+  this.readTimeout = Objects.requireNonNull(duration);
+  return this;
+}
+
+public Builder withWriteTimeoutDuration(Duration duration) {
+  this.writeTimeout = Objects.requireNonNull(duration);
+  return this;
+}
+
 public Builder withMaxNumBatchRequests(int maxNumBatchRequests) {
   this.maxNumBatchRequests = maxNumBatchRequests;
   return this;
 }
 
 public HttpFunctionSpec build() {
   return new HttpFunctionSpec(
-  functionType, endpoint, states, maxRequestDuration, 
maxNumBatchRequests);
+  functionType,

Review comment:
   0 duration would mean that the user wants no timeout to be applied. 
Should that really be an invalid configuration?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-benchmarks] pnowojski commented on a change in pull request #3: [FLINK-18905] Provide basic benchmarks for MultipleInputStreamOperator

2020-09-01 Thread GitBox


pnowojski commented on a change in pull request #3:
URL: https://github.com/apache/flink-benchmarks/pull/3#discussion_r480893656



##
File path: src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.benchmark.functions.LongSource;
+import org.apache.flink.benchmark.functions.QueuingLongSource;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.operators.AbstractInput;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import 
org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class MultipleInputBenchmark extends BenchmarkBase {
+
+   public static final int RECORDS_PER_INVOCATION = 
TwoInputBenchmark.RECORDS_PER_INVOCATION;
+   public static final int ONE_IDLE_RECORDS_PER_INVOCATION = 
TwoInputBenchmark.ONE_IDLE_RECORDS_PER_INVOCATION;
+   public static final long CHECKPOINT_INTERVAL_MS = 
TwoInputBenchmark.CHECKPOINT_INTERVAL_MS;
+
+   public static void main(String[] args)
+   throws RunnerException {
+   Options options = new OptionsBuilder()
+   .verbosity(VerboseMode.NORMAL)
+   .include(".*" + 
MultipleInputBenchmark.class.getSimpleName() + ".*")
+   .build();
+
+   new Runner(options).run();
+   }
+
+   @Benchmark
+   @OperationsPerInvocation(RECORDS_PER_INVOCATION)
+   public void multiInputMapSink(FlinkEnvironmentContext context) throws 
Exception {
+
+   StreamExecutionEnvironment env = context.env;
+
+   env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
+   env.setParallelism(1);
+   env.setRestartStrategy(RestartStrategies.noRestart());
+
+   // Setting buffer timeout to 1 is an attempt to improve 
twoInputMapSink benchmark stability.
+   // Without 1ms buffer timeout, some JVM forks are much slower 
then others, making results
+   // unstable and unreliable.
+   env.setBufferTimeout(1);
+
+   long numRecordsPerInput = RECORDS_PER_INVOCATION / 2;
+   DataStreamSource source1 = env.addSource(new 
LongSource(numRecordsPerInput));
+   DataStreamSource source2 = env.addSource(new 
LongSource(numRecordsPerInput));
+   connectAndDiscard(env, source1, source2);
+
+   env.execute();
+   }
+
+   @Benchmark
+   @OperationsPerInvocation(ONE_IDLE_RECORDS_PER_INVOCATION)
+   public void multiInputOneIdleMapSink(FlinkEnvironmentContext context) 
throws Exception {
+
+   StreamExecutionEnvironment env = context.env;
+   env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
+   env.setParallelism(1);
+
+   QueuingLongSource.reset();
+   DataStreamSource source1 = env

[GitHub] [flink-benchmarks] pnowojski merged pull request #3: [FLINK-18905] Provide basic benchmarks for MultipleInputStreamOperator

2020-09-01 Thread GitBox


pnowojski merged pull request #3:
URL: https://github.com/apache/flink-benchmarks/pull/3


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] liuyufei9527 commented on pull request #13280: [FLINK-18070][table-planner-blink] Don't materialize time attribute in SubGraphOptimize

2020-09-01 Thread GitBox


liuyufei9527 commented on pull request #13280:
URL: https://github.com/apache/flink/pull/13280#issuecomment-684531845


   @flinkbot run azure



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] tillrohrmann commented on a change in pull request #13181: [FLINK-18957] Implement logical request bulk tracking in SlotSharingExecutionSlotAllocator

2020-09-01 Thread GitBox


tillrohrmann commented on a change in pull request #13181:
URL: https://github.com/apache/flink/pull/13181#discussion_r480001862



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImpl.java
##
@@ -169,44 +146,12 @@ public void cancelSlotRequest(SlotRequestId 
slotRequestId, Throwable cause) {
}
}
 
-   private void schedulePendingRequestBulkTimeoutCheck(
-   final PhysicalSlotRequestBulk slotRequestBulk,
-   final Time timeout) {
-
-   componentMainThreadExecutor.schedule(() -> {
-   final PhysicalSlotRequestBulkChecker.TimeoutCheckResult 
result =
-   
slotRequestBulkChecker.checkPhysicalSlotRequestBulkTimeout(slotRequestBulk, 
timeout);
-
-   switch (result) {
-   case PENDING:
-   //re-schedule the timeout check
-   
schedulePendingRequestBulkTimeoutCheck(slotRequestBulk, timeout);
-   break;
-   case TIMEOUT:
-   timeoutSlotRequestBulk(slotRequestBulk);
-   break;
-   default: // no action to take
-   }
-   }, timeout.getSize(), timeout.getUnit());
-   }
-
-   private void timeoutSlotRequestBulk(final PhysicalSlotRequestBulk 
slotRequestBulk) {
-   final Exception cause = new TimeoutException("Slot request bulk 
is not fulfillable!");
-   // pending requests must be canceled first otherwise they might 
be fulfilled by
-   // allocated slots released from this bulk
-   for (SlotRequestId slotRequestId : 
slotRequestBulk.getPendingRequests().keySet()) {
-   cancelSlotRequest(slotRequestId, cause);
-   }
-   for (SlotRequestId slotRequestId : 
slotRequestBulk.getFulfilledRequests().keySet()) {
-   cancelSlotRequest(slotRequestId, cause);
-   }
-   }
-
-   private Set getAllSlotInfos() {
-   return Stream
-   .concat(
-   
slotPool.getAvailableSlotsInformation().stream(),
-   
slotPool.getAllocatedSlotsInformation().stream())
-   .collect(Collectors.toSet());
+   private PhysicalSlotRequestBulkImpl createPhysicalSlotRequestBulk(final 
Collection physicalSlotRequests) {
+   final PhysicalSlotRequestBulkImpl slotRequestBulk = new 
PhysicalSlotRequestBulkImpl(physicalSlotRequests
+   .stream()
+   .collect(Collectors.toMap(
+   PhysicalSlotRequest::getSlotRequestId,
+   r -> 
r.getSlotProfile().getPhysicalSlotResourceProfile())), this::cancelSlotRequest);

Review comment:
   nit the formatting is a bit off here
   
   ```suggestion
final PhysicalSlotRequestBulkImpl slotRequestBulk = new 
PhysicalSlotRequestBulkImpl(
physicalSlotRequests
.stream()
.collect(Collectors.toMap(
PhysicalSlotRequest::getSlotRequestId,
r -> 
r.getSlotProfile().getPhysicalSlotResourceProfile())), 
this::cancelSlotRequest);
   ```

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImpl.java
##
@@ -49,50 +43,33 @@
 
private static final Logger LOG = 
LoggerFactory.getLogger(BulkSlotProviderImpl.class);
 
-   private ComponentMainThreadExecutor componentMainThreadExecutor;
-
private final SlotSelectionStrategy slotSelectionStrategy;
 
private final SlotPool slotPool;
 
private final PhysicalSlotRequestBulkChecker slotRequestBulkChecker;
 
-   BulkSlotProviderImpl(final SlotSelectionStrategy slotSelectionStrategy, 
final SlotPool slotPool) {
+   BulkSlotProviderImpl(
+   final SlotSelectionStrategy slotSelectionStrategy,
+   final SlotPool slotPool,
+   final PhysicalSlotRequestBulkChecker 
slotRequestBulkChecker) {
this.slotSelectionStrategy = 
checkNotNull(slotSelectionStrategy);
this.slotPool = checkNotNull(slotPool);
-
-   this.slotRequestBulkChecker = new 
PhysicalSlotRequestBulkChecker(
-   this::getAllSlotInfos,
-   SystemClock.getInstance());
-
-   this.componentMainThreadExecutor = new 
ComponentMainThreadExecutor.DummyComponentMainThreadExecutor(
-   "Scheduler is not initialized with proper main thre

[GitHub] [flink] flinkbot edited a comment on pull request #13284: [FLINK-17016][runtime] Change blink planner batch jobs to run with pipelined region scheduling

2020-09-01 Thread GitBox


flinkbot edited a comment on pull request #13284:
URL: https://github.com/apache/flink/pull/13284#issuecomment-683683000


   
   ## CI report:
   
   * 6e68f6bd327d805261acdc9005a9cfc099f595ae Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6035)
 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6011)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13286: [FLINK-19093][task] Fix isActive check in AsyncCheckpointRunnable

2020-09-01 Thread GitBox


flinkbot edited a comment on pull request #13286:
URL: https://github.com/apache/flink/pull/13286#issuecomment-683777071


   
   ## CI report:
   
   * 358ebf0c0aaffb505bc95a97d6133183ec749d7a Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6016)
 
   * 9578eb8c57698bec3490503073833a93a810847a Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6045)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13280: [FLINK-18070][table-planner-blink] Don't materialize time attribute in SubGraphOptimize

2020-09-01 Thread GitBox


flinkbot edited a comment on pull request #13280:
URL: https://github.com/apache/flink/pull/13280#issuecomment-683394013


   
   ## CI report:
   
   * 559294f51238f10a9107d50074ed33c41801ae8b Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5996)
 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6047)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13293: [FLINK-19108][table] Stop expanding the identifiers with scope aliase…

2020-09-01 Thread GitBox


flinkbot edited a comment on pull request #13293:
URL: https://github.com/apache/flink/pull/13293#issuecomment-684526091


   
   ## CI report:
   
   * b0524b7153e6a3b04a1dd84c4a98f84a06837f75 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6046)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on pull request #13294: [FLINK-19002][canal][json] Support to only read changelogs of specific database and table for canal-json format

2020-09-01 Thread GitBox


wuchong commented on pull request #13294:
URL: https://github.com/apache/flink/pull/13294#issuecomment-684560175


   cc @leonardBang 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong opened a new pull request #13294: [FLINK-19002][canal][json] Support to only read changelogs of specific database and table for canal-json format

2020-09-01 Thread GitBox


wuchong opened a new pull request #13294:
URL: https://github.com/apache/flink/pull/13294


   
   
   
   ## What is the purpose of the change
   
   Usually, users use Canal to synchronize binlog data from various MySQL 
databases and tables into a single Kafka topic. However, currently, canal-json 
can't support this case, because it requires the canal data in the topic should 
be in the same data format.
   
   This issue propose to introduce a new option "canal-json.database" and 
"canal-json.table" to filter out the specific data.
   
   ## Brief change log
   
   - Added `canal-json.database` and `canal-json.table` option to 
`CanalJsonFormatFactory`.
   - Drop records if they don't match the specific `database` and `table` in 
`CanalJsonDeserializationSchema`. 
   
   ## Verifying this change
   
   - Added a test data which captures changes from 2 tables, and deserialize it 
using canal-json with specific database and table.
   - Added tests for factory to verify new introduced options. 
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
 - The serializers: (**yes** / no / don't know)
 - The runtime per-record code paths (performance sensitive): (**yes** / no 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / 
don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] shuiqiangchen opened a new pull request #13295: [FLINK-18598][python][docs] Add instructions for asynchronous execute in PyFlink doc.

2020-09-01 Thread GitBox


shuiqiangchen opened a new pull request #13295:
URL: https://github.com/apache/flink/pull/13295


   
   
   ## What is the purpose of the change
   
   Add instructions for asynchronous execute in PyFlink doc
   
   ## Brief change log
   
   - Added brief instructions for asynchronous execute in PyFlink FAQ page 
(faq.md).
   
   ## Verifying this change
   
   This pull request is a documentation enhancement without test case coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19002) Support to only read changelogs of specific database and table for canal-json format

2020-09-01 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-19002:
---
Labels: pull-request-available  (was: )

> Support to only read changelogs of specific database and table for canal-json 
> format
> 
>
> Key: FLINK-19002
> URL: https://issues.apache.org/jira/browse/FLINK-19002
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> Usually, users use Canal to synchronize binlog data from various MySQL 
> databases and tables into a single Kafka topic. However, currently, 
> canal-json can't support this case, because it requires the canal data in the 
> topic should be in the same data format. 
> This issue propose to introduce a new option "canal-json.database" and 
> "canal-json.table" to filter out the specific data. It would be great to 
> support table list or table pattern in case of all the tables have the same 
> schema. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18598) Add instructions for asynchronous execute in PyFlink doc

2020-09-01 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-18598:
---
Labels: pull-request-available  (was: )

> Add instructions for asynchronous execute in PyFlink doc
> 
>
> Key: FLINK-18598
> URL: https://issues.apache.org/jira/browse/FLINK-18598
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python, Documentation
>Affects Versions: 1.11.0
>Reporter: Huang Xingbo
>Priority: Major
>  Labels: pull-request-available
>
> Add instructions for asynchronous execute in PyFlink doc



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13280: [FLINK-18070][table-planner-blink] Don't materialize time attribute in SubGraphOptimize

2020-09-01 Thread GitBox


flinkbot edited a comment on pull request #13280:
URL: https://github.com/apache/flink/pull/13280#issuecomment-683394013


   
   ## CI report:
   
   * 559294f51238f10a9107d50074ed33c41801ae8b Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6047)
 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5996)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13278: [FLINK-19091][python] Introduce expression DSL for Python Table API

2020-09-01 Thread GitBox


flinkbot edited a comment on pull request #13278:
URL: https://github.com/apache/flink/pull/13278#issuecomment-683294857


   
   ## CI report:
   
   * ca0216692b0b058bd5cbe6a1a6b3f345feba3def UNKNOWN
   * 23c61fffdf54d684e8346d7509846170f7151053 Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6036)
 
   * 6d6ed885eee02649d76f67b0720084df6eb703ca Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6039)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13295: [FLINK-18598][python][docs] Add instructions for asynchronous execute in PyFlink doc.

2020-09-01 Thread GitBox


flinkbot commented on pull request #13295:
URL: https://github.com/apache/flink/pull/13295#issuecomment-684564557


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 9f2e570c493ec1ca97e2c2ef1e73a6e227117016 (Tue Sep 01 
08:27:05 UTC 2020)
   
   **Warnings:**
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-18598).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13294: [FLINK-19002][canal][json] Support to only read changelogs of specific database and table for canal-json format

2020-09-01 Thread GitBox


flinkbot commented on pull request #13294:
URL: https://github.com/apache/flink/pull/13294#issuecomment-684564615


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit d9ce2f0083bbf6e00e4782c3ab2f7dd686d07e55 (Tue Sep 01 
08:27:07 UTC 2020)
   
✅no warnings
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] caozhen1937 opened a new pull request #13296: [FLINK-18774][format][debezium] Support debezium-avro format

2020-09-01 Thread GitBox


caozhen1937 opened a new pull request #13296:
URL: https://github.com/apache/flink/pull/13296


   
   
   ## What is the purpose of the change
   
Support debezium-avro format
   
   ## Brief change log
   
   - add DeserializationSchema for deserialize byte[]
   - add SerializationSchema for serialize data to byte[]
   - add formatfactory for DeserializationSchema&SerializationSchema
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   - Added java tests for deserialize byte[] from table source & serialize data 
to byte[] into table sink
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] caozhen1937 commented on pull request #13296: [FLINK-18774][format][debezium] Support debezium-avro format

2020-09-01 Thread GitBox


caozhen1937 commented on pull request #13296:
URL: https://github.com/apache/flink/pull/13296#issuecomment-684584441


   CC @wuchong 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-18774) Support debezium-avro format

2020-09-01 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-18774:
---
Labels: pull-request-available  (was: )

> Support debezium-avro format 
> -
>
> Key: FLINK-18774
> URL: https://issues.apache.org/jira/browse/FLINK-18774
> Project: Flink
>  Issue Type: Sub-task
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Jark Wu
>Assignee: CaoZhen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> Debezium+Avro+Confluent Schema Registry is a popular pattern in the industry. 
> It would be great if we can support this. This depends on the implementation 
> of {{avro-confluent}} format (FLINK-16048). 
> The format name is up to discuss. I would propose to use 
> {{debezium-avro-confluent}} to make it explicitly. As we may support Apicurio 
> Registry in the future. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] qinjunjerry commented on pull request #12747: [FLINK-17327] Fix Kafka Producer Resource Leaks (backport to Flink 1.10)

2020-09-01 Thread GitBox


qinjunjerry commented on pull request #12747:
URL: https://github.com/apache/flink/pull/12747#issuecomment-684585282


   Anything blocking here? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13294: [FLINK-19002][canal][json] Support to only read changelogs of specific database and table for canal-json format

2020-09-01 Thread GitBox


flinkbot commented on pull request #13294:
URL: https://github.com/apache/flink/pull/13294#issuecomment-684586137


   
   ## CI report:
   
   * d9ce2f0083bbf6e00e4782c3ab2f7dd686d07e55 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13295: [FLINK-18598][python][docs] Add instructions for asynchronous execute in PyFlink doc.

2020-09-01 Thread GitBox


flinkbot commented on pull request #13295:
URL: https://github.com/apache/flink/pull/13295#issuecomment-684586420


   
   ## CI report:
   
   * 9f2e570c493ec1ca97e2c2ef1e73a6e227117016 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13296: [FLINK-18774][format][debezium] Support debezium-avro format

2020-09-01 Thread GitBox


flinkbot commented on pull request #13296:
URL: https://github.com/apache/flink/pull/13296#issuecomment-684586142


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 83b63305bf9953e98355465c51a584fc3ec813e8 (Tue Sep 01 
08:42:02 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] tzulitai commented on a change in pull request #138: [FLINK-19106] [core] Add more timeout configs for remote functions

2020-09-01 Thread GitBox


tzulitai commented on a change in pull request #138:
URL: https://github.com/apache/flink-statefun/pull/138#discussion_r480937709



##
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
##
@@ -111,14 +138,36 @@ public Builder withMaxRequestDuration(Duration duration) {
   return this;
 }
 
+public Builder withConnectTimeoutDuration(Duration duration) {
+  this.connectTimeout = Objects.requireNonNull(duration);
+  return this;
+}
+
+public Builder withReadTimeoutDuration(Duration duration) {
+  this.readTimeout = Objects.requireNonNull(duration);
+  return this;
+}
+
+public Builder withWriteTimeoutDuration(Duration duration) {
+  this.writeTimeout = Objects.requireNonNull(duration);
+  return this;
+}
+
 public Builder withMaxNumBatchRequests(int maxNumBatchRequests) {
   this.maxNumBatchRequests = maxNumBatchRequests;
   return this;
 }
 
 public HttpFunctionSpec build() {
   return new HttpFunctionSpec(
-  functionType, endpoint, states, maxRequestDuration, 
maxNumBatchRequests);
+  functionType,

Review comment:
   0 duration would mean that the user wants no timeout to be applied. 
Should that really be an invalid configuration?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] twalthr commented on a change in pull request #13291: [FLINK-18988][table] Continuous query with LATERAL and LIMIT produces…

2020-09-01 Thread GitBox


twalthr commented on a change in pull request #13291:
URL: https://github.com/apache/flink/pull/13291#discussion_r480977253



##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.scala
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.planner.calcite.FlinkRelBuilder
+import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, 
RankType}
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelCollations
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.{Aggregate, Correlate, Filter, JoinRelType, 
Project, Sort}
+import org.apache.calcite.rex.{RexCall, RexCorrelVariable, RexFieldAccess, 
RexInputRef, RexLiteral, RexNode}
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.util.ImmutableBitSet
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+ * Planner rule that rewrites sort correlation to a Rank.
+ * Typically, the following plan
+ *
+ * {{{
+ *   LogicalProject(state=[$0], name=[$1])
+ *   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
+ *  :- LogicalAggregate(group=[{0}])
+ *  :  +- LogicalProject(state=[$1])
+ *  : +- LogicalTableScan(table=[[default_catalog, default_database, 
cities]])
+ *  +- LogicalSort(sort0=[$1], dir0=[DESC-nulls-last], fetch=[3])
+ * +- LogicalProject(name=[$0], pop=[$2])
+ *+- LogicalFilter(condition=[=($1, $cor0.state)])
+ *   +- LogicalTableScan(table=[[default_catalog, 
default_database, cities]])
+ * }}}
+ *
+ * would be transformed to
+ *
+ * {{{
+ *   LogicalProject(state=[$0], name=[$1])
+ *+- LogicalProject(state=[$1], name=[$0], pop=[$2])
+ *   +- LogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, 
rankEnd=3], partitionBy=[$1], orderBy=[$2 DESC], select=[name=$0, state=$1, 
pop=$2])
+ *  +- LogicalTableScan(table=[[default_catalog, default_database, 
cities]])
+ * }}}
+ *
+ * To match the Correlate, the LHS needs to be a global Aggregate on a 
scan, the RHS should
+ * be a Sort with an equal Filter predicate whose keys are same with the LHS 
grouping keys.
+ *
+ * This rule can only be used in HepPlanner.
+ */
+class CorrelateSortToRankRule extends RelOptRule(

Review comment:
   See 
[FLIP-32](https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions)
 Appendix: Porting Guidelines. 
   ```
   A new planner rule or node that only depends on Calcite and runtime classes 
should be implemented in Java.
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] danny0405 commented on a change in pull request #13291: [FLINK-18988][table] Continuous query with LATERAL and LIMIT produces…

2020-09-01 Thread GitBox


danny0405 commented on a change in pull request #13291:
URL: https://github.com/apache/flink/pull/13291#discussion_r480978916



##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.scala
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.planner.calcite.FlinkRelBuilder
+import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, 
RankType}
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelCollations
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.{Aggregate, Correlate, Filter, JoinRelType, 
Project, Sort}
+import org.apache.calcite.rex.{RexCall, RexCorrelVariable, RexFieldAccess, 
RexInputRef, RexLiteral, RexNode}
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.util.ImmutableBitSet
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+ * Planner rule that rewrites sort correlation to a Rank.
+ * Typically, the following plan
+ *
+ * {{{
+ *   LogicalProject(state=[$0], name=[$1])
+ *   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
+ *  :- LogicalAggregate(group=[{0}])
+ *  :  +- LogicalProject(state=[$1])
+ *  : +- LogicalTableScan(table=[[default_catalog, default_database, 
cities]])
+ *  +- LogicalSort(sort0=[$1], dir0=[DESC-nulls-last], fetch=[3])
+ * +- LogicalProject(name=[$0], pop=[$2])
+ *+- LogicalFilter(condition=[=($1, $cor0.state)])
+ *   +- LogicalTableScan(table=[[default_catalog, 
default_database, cities]])
+ * }}}
+ *
+ * would be transformed to
+ *
+ * {{{
+ *   LogicalProject(state=[$0], name=[$1])
+ *+- LogicalProject(state=[$1], name=[$0], pop=[$2])
+ *   +- LogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, 
rankEnd=3], partitionBy=[$1], orderBy=[$2 DESC], select=[name=$0, state=$1, 
pop=$2])
+ *  +- LogicalTableScan(table=[[default_catalog, default_database, 
cities]])
+ * }}}
+ *
+ * To match the Correlate, the LHS needs to be a global Aggregate on a 
scan, the RHS should
+ * be a Sort with an equal Filter predicate whose keys are same with the LHS 
grouping keys.
+ *
+ * This rule can only be used in HepPlanner.
+ */
+class CorrelateSortToRankRule extends RelOptRule(

Review comment:
   I see, thanks for the share ~





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] twalthr commented on a change in pull request #13291: [FLINK-18988][table] Continuous query with LATERAL and LIMIT produces…

2020-09-01 Thread GitBox


twalthr commented on a change in pull request #13291:
URL: https://github.com/apache/flink/pull/13291#discussion_r480979229



##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.scala
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.planner.calcite.FlinkRelBuilder
+import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, 
RankType}
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelCollations
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.{Aggregate, Correlate, Filter, JoinRelType, 
Project, Sort}
+import org.apache.calcite.rex.{RexCall, RexCorrelVariable, RexFieldAccess, 
RexInputRef, RexLiteral, RexNode}
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.util.ImmutableBitSet
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+ * Planner rule that rewrites sort correlation to a Rank.
+ * Typically, the following plan
+ *
+ * {{{
+ *   LogicalProject(state=[$0], name=[$1])
+ *   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
+ *  :- LogicalAggregate(group=[{0}])
+ *  :  +- LogicalProject(state=[$1])
+ *  : +- LogicalTableScan(table=[[default_catalog, default_database, 
cities]])
+ *  +- LogicalSort(sort0=[$1], dir0=[DESC-nulls-last], fetch=[3])
+ * +- LogicalProject(name=[$0], pop=[$2])
+ *+- LogicalFilter(condition=[=($1, $cor0.state)])
+ *   +- LogicalTableScan(table=[[default_catalog, 
default_database, cities]])
+ * }}}
+ *
+ * would be transformed to
+ *
+ * {{{
+ *   LogicalProject(state=[$0], name=[$1])
+ *+- LogicalProject(state=[$1], name=[$0], pop=[$2])
+ *   +- LogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, 
rankEnd=3], partitionBy=[$1], orderBy=[$2 DESC], select=[name=$0, state=$1, 
pop=$2])
+ *  +- LogicalTableScan(table=[[default_catalog, default_database, 
cities]])
+ * }}}
+ *
+ * To match the Correlate, the LHS needs to be a global Aggregate on a 
scan, the RHS should
+ * be a Sort with an equal Filter predicate whose keys are same with the LHS 
grouping keys.
+ *
+ * This rule can only be used in HepPlanner.
+ */
+class CorrelateSortToRankRule extends RelOptRule(

Review comment:
   We rework so many classes all the time, eventually the Scala code will 
hopefully be gone at some point.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-18959) Fail to archiveExecutionGraph because job is not finished when dispatcher close

2020-09-01 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188277#comment-17188277
 ] 

Aljoscha Krettek commented on FLINK-18959:
--

Agreed on everything. But that's a new Jira issue, right? For this one, we 
should just change the CANCEL path to go through all the normal steps.

> Fail to archiveExecutionGraph because job is not finished when dispatcher 
> close
> ---
>
> Key: FLINK-18959
> URL: https://issues.apache.org/jira/browse/FLINK-18959
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.12.0, 1.11.1
>Reporter: Liu
>Assignee: Liu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2, 1.10.3
>
> Attachments: flink-debug-log
>
>
> When job is cancelled, we expect to see it in flink's history server. But I 
> can not see my job after it is cancelled.
> After digging into the problem, I find that the function 
> archiveExecutionGraph is not executed. Below is the brief log:
> {panel:title=log}
> 2020-08-14 15:10:06,406 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph 
> [flink-akka.actor.default-dispatcher- 15] - Job EtlAndWindow 
> (6f784d4cc5bae88a332d254b21660372) switched from state RUNNING to CANCELLING.
> 2020-08-14 15:10:06,415 DEBUG 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-3] - Shutting down per-job cluster 
> because the job was canceled.
> 2020-08-14 15:10:06,629 INFO 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-3] - Stopping dispatcher 
> akka.tcp://flink@bjfk-c9865.yz02:38663/user/dispatcher.
> 2020-08-14 15:10:06,629 INFO 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-3] - Stopping all currently running jobs 
> of dispatcher akka.tcp://flink@bjfk-c9865.yz02:38663/user/dispatcher.
> 2020-08-14 15:10:06,631 INFO org.apache.flink.runtime.jobmaster.JobMaster 
> [flink-akka.actor.default-dispatcher-29] - Stopping the JobMaster for job 
> EtlAndWindow(6f784d4cc5bae88a332d254b21660372).
> 2020-08-14 15:10:06,632 DEBUG org.apache.flink.runtime.jobmaster.JobMaster 
> [flink-akka.actor.default-dispatcher-29] - Disconnect TaskExecutor 
> container_e144_1590060720089_2161_01_06 because: Stopping JobMaster for 
> job EtlAndWindow(6f784d4cc5bae88a332d254b21660372).
> 2020-08-14 15:10:06,646 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph 
> [flink-akka.actor.default-dispatcher-29] - Job EtlAndWindow 
> (6f784d4cc5bae88a332d254b21660372) switched from state CANCELLING to CANCELED.
> 2020-08-14 15:10:06,664 DEBUG 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-4] - There is a newer JobManagerRunner 
> for the job 6f784d4cc5bae88a332d254b21660372.
> {panel}
> From the log, we can see that job is not finished when dispatcher closes. The 
> process is as following:
>  * Receive cancel command and send it to all tasks async.
>  * In MiniDispatcher, begin to shutting down per-job cluster.
>  * Stopping dispatcher and remove job.
>  * Job is cancelled and callback is executed in method startJobManagerRunner.
>  * Because job is removed before, so currentJobManagerRunner is null which 
> not equals to the original jobManagerRunner. In this case, 
> archivedExecutionGraph will not be uploaded.
> In normal cases, I find that job is cancelled first and then dispatcher is 
> stopped so that archivedExecutionGraph will succeed. But I think that the 
> order is not constrained and it is hard to know which comes first. 
> Above is what I suspected. If so, then we should fix it.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13295: [FLINK-18598][python][docs] Add instructions for asynchronous execute in PyFlink doc.

2020-09-01 Thread GitBox


flinkbot edited a comment on pull request #13295:
URL: https://github.com/apache/flink/pull/13295#issuecomment-684586420


   
   ## CI report:
   
   * 9f2e570c493ec1ca97e2c2ef1e73a6e227117016 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6049)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13294: [FLINK-19002][canal][json] Support to only read changelogs of specific database and table for canal-json format

2020-09-01 Thread GitBox


flinkbot edited a comment on pull request #13294:
URL: https://github.com/apache/flink/pull/13294#issuecomment-684586137


   
   ## CI report:
   
   * d9ce2f0083bbf6e00e4782c3ab2f7dd686d07e55 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6048)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13296: [FLINK-18774][format][debezium] Support debezium-avro format

2020-09-01 Thread GitBox


flinkbot commented on pull request #13296:
URL: https://github.com/apache/flink/pull/13296#issuecomment-684611193


   
   ## CI report:
   
   * 83b63305bf9953e98355465c51a584fc3ec813e8 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-14986) Support to get detailed Kubernetes cluster description

2020-09-01 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188280#comment-17188280
 ] 

Yang Wang commented on FLINK-14986:
---

[~ouyangwuli] Thanks for working on this. Ping me if you have finished the PR 
and i could help with review.

> Support to get detailed Kubernetes cluster description
> --
>
> Key: FLINK-14986
> URL: https://issues.apache.org/jira/browse/FLINK-14986
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Reporter: Yang Wang
>Assignee: ouyangwulin
>Priority: Major
>
> Currently Flink supports get yarn cluster description by 
> `YarnClusterDescriptor#getClusterDescription`. We should support the same 
> behavior in Kubernetes cluster.
> Usually the cluster description contains the "total resources, available 
> resources, etc."



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] danny0405 commented on a change in pull request #13291: [FLINK-18988][table] Continuous query with LATERAL and LIMIT produces…

2020-09-01 Thread GitBox


danny0405 commented on a change in pull request #13291:
URL: https://github.com/apache/flink/pull/13291#discussion_r480983677



##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.scala
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.planner.calcite.FlinkRelBuilder
+import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, 
RankType}
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelCollations
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.{Aggregate, Correlate, Filter, JoinRelType, 
Project, Sort}
+import org.apache.calcite.rex.{RexCall, RexCorrelVariable, RexFieldAccess, 
RexInputRef, RexLiteral, RexNode}
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.util.ImmutableBitSet
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+ * Planner rule that rewrites sort correlation to a Rank.
+ * Typically, the following plan
+ *
+ * {{{
+ *   LogicalProject(state=[$0], name=[$1])
+ *   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
+ *  :- LogicalAggregate(group=[{0}])
+ *  :  +- LogicalProject(state=[$1])
+ *  : +- LogicalTableScan(table=[[default_catalog, default_database, 
cities]])
+ *  +- LogicalSort(sort0=[$1], dir0=[DESC-nulls-last], fetch=[3])
+ * +- LogicalProject(name=[$0], pop=[$2])
+ *+- LogicalFilter(condition=[=($1, $cor0.state)])
+ *   +- LogicalTableScan(table=[[default_catalog, 
default_database, cities]])
+ * }}}
+ *
+ * would be transformed to
+ *
+ * {{{
+ *   LogicalProject(state=[$0], name=[$1])
+ *+- LogicalProject(state=[$1], name=[$0], pop=[$2])
+ *   +- LogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, 
rankEnd=3], partitionBy=[$1], orderBy=[$2 DESC], select=[name=$0, state=$1, 
pop=$2])
+ *  +- LogicalTableScan(table=[[default_catalog, default_database, 
cities]])
+ * }}}
+ *
+ * To match the Correlate, the LHS needs to be a global Aggregate on a 
scan, the RHS should
+ * be a Sort with an equal Filter predicate whose keys are same with the LHS 
grouping keys.
+ *
+ * This rule can only be used in HepPlanner.
+ */
+class CorrelateSortToRankRule extends RelOptRule(

Review comment:
   Sure, it would be exciting if all the code can switch to Java.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-12884) Implement HighAvailabilityService based on native k8s APIs

2020-09-01 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-12884:
--
Parent: FLINK-17709
Issue Type: Sub-task  (was: New Feature)

> Implement HighAvailabilityService based on native k8s APIs
> --
>
> Key: FLINK-12884
> URL: https://issues.apache.org/jira/browse/FLINK-12884
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: MalcolmSanders
>Assignee: MalcolmSanders
>Priority: Major
>
> Currently flink only supports HighAvailabilityService using zookeeper. As a 
> result, it requires a zookeeper cluster to be deployed on k8s cluster if our 
> customers needs high availability for flink. If we support 
> HighAvailabilityService based on native k8s APIs, it will save the efforts of 
> zookeeper deployment as well as the resources used by zookeeper cluster. It 
> might be especially helpful for customers who run small-scale k8s clusters so 
> that flink HighAvailabilityService may not cause too much overhead on k8s 
> clusters.
> Previously [FLINK-11105|https://issues.apache.org/jira/browse/FLINK-11105] 
> has proposed a HighAvailabilityService using etcd. As [~NathanHowell] 
> suggested in FLINK-11105, since k8s doesn't expose its own etcd cluster by 
> design (see [Securing etcd 
> clusters|https://kubernetes.io/docs/tasks/administer-cluster/configure-upgrade-etcd/#securing-etcd-clusters]),
>  it also requires the deployment of etcd cluster if flink uses etcd to 
> achieve HA.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] rmetzger commented on pull request #13217: [FLINK-16866] Make job submission non-blocking

2020-09-01 Thread GitBox


rmetzger commented on pull request #13217:
URL: https://github.com/apache/flink/pull/13217#issuecomment-684635708


   Thanks a lot for this extensive review. I believe I have addressed all 
comments. I'm looking forward to more feedback.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13296: [FLINK-18774][format][debezium] Support debezium-avro format

2020-09-01 Thread GitBox


flinkbot edited a comment on pull request #13296:
URL: https://github.com/apache/flink/pull/13296#issuecomment-684611193


   
   ## CI report:
   
   * 83b63305bf9953e98355465c51a584fc3ec813e8 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6050)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13291: [FLINK-18988][table] Continuous query with LATERAL and LIMIT produces…

2020-09-01 Thread GitBox


flinkbot edited a comment on pull request #13291:
URL: https://github.com/apache/flink/pull/13291#issuecomment-684180034


   
   ## CI report:
   
   * 8406cd55c09be8ad125f574be5e58a91ad717f38 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6034)
 
   * 3dff1294d9c3e3b595353075fab383714d97f63f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-09-01 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188291#comment-17188291
 ] 

Till Rohrmann commented on FLINK-17075:
---

[~chesnay] should we go ahead with merging this feature into Flink {{1.10.3}}?

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2, 1.10.3
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] azagrebin commented on a change in pull request #13284: [FLINK-17016][runtime] Change blink planner batch jobs to run with pipelined region scheduling

2020-09-01 Thread GitBox


azagrebin commented on a change in pull request #13284:
URL: https://github.com/apache/flink/pull/13284#discussion_r480166064



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
##
@@ -300,6 +305,32 @@ public int getNumberOfVertices() {
return this.taskVertices.size();
}
 
+   public Set getSlotSharingGroups() {
+   final Set slotSharingGroups = new HashSet<>();
+   for (JobVertex vertex : getVertices()) {
+   final SlotSharingGroup slotSharingGroup = 
vertex.getSlotSharingGroup();
+   checkNotNull(slotSharingGroup);
+
+   slotSharingGroups.add(slotSharingGroup);
+   }
+   return Collections.unmodifiableSet(slotSharingGroups);
+   }
+
+   public Set getCoLocationGroupDescriptors() {
+   final Set coLocationGroups = new HashSet<>();
+   for (JobVertex vertex : getVertices()) {
+   CoLocationGroup coLocationGroup = 
vertex.getCoLocationGroup();
+   if (coLocationGroup != null) {
+   coLocationGroups.add(coLocationGroup);
+   }
+   }
+   final Set coLocationGroupDescs = 
coLocationGroups
+   .stream()
+   .map(CoLocationGroupDesc::from)
+   .collect(Collectors.toSet());

Review comment:
   ```suggestion
final Set coLocationGroupDescs = new 
HashSet<>();
for (JobVertex vertex : getVertices()) {
CoLocationGroup coLocationGroup = 
vertex.getCoLocationGroup();
if (coLocationGroup != null) {
CoLocationGroupDesc coLocationGroupDesc = 
CoLocationGroupDesc.from(coLocationGroup)
coLocationGroups.add(coLocationGroupDesc);
}
}
   ```

##
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/PipelinedRegionSchedulingITCase.java
##
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.scheduling;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * IT case for pipelined region scheduling.
+ */
+public class PipelinedRegionSchedulingITCase extends TestLogger {
+
+   @Test
+   public void testSuccessWithSlotsNoFewerThanTheMaxRegionRequired() 
throws Exception {
+   final JobResult jobResult = executeSchedulingTest(2);
+   assertThat(jobResult.getSerializedThrowable().isPresent(), 
is(false));
+   }
+
+   @Test
+   public void testFailsOnInsufficientSlots() throws Exception {
+   final JobResult jobResult = executeSchedulingTest(1);
+   assertThat(jobResult.getSerializedThrowable().isPresent(), 
is(true));

[jira] [Commented] (FLINK-18959) Fail to archiveExecutionGraph because job is not finished when dispatcher close

2020-09-01 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188303#comment-17188303
 ] 

Till Rohrmann commented on FLINK-18959:
---

Yes exactly. The scope of this ticket is to fix the current regression which 
means to that cancelling a job should still trigger its archiving.

> Fail to archiveExecutionGraph because job is not finished when dispatcher 
> close
> ---
>
> Key: FLINK-18959
> URL: https://issues.apache.org/jira/browse/FLINK-18959
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.12.0, 1.11.1
>Reporter: Liu
>Assignee: Liu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2, 1.10.3
>
> Attachments: flink-debug-log
>
>
> When job is cancelled, we expect to see it in flink's history server. But I 
> can not see my job after it is cancelled.
> After digging into the problem, I find that the function 
> archiveExecutionGraph is not executed. Below is the brief log:
> {panel:title=log}
> 2020-08-14 15:10:06,406 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph 
> [flink-akka.actor.default-dispatcher- 15] - Job EtlAndWindow 
> (6f784d4cc5bae88a332d254b21660372) switched from state RUNNING to CANCELLING.
> 2020-08-14 15:10:06,415 DEBUG 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-3] - Shutting down per-job cluster 
> because the job was canceled.
> 2020-08-14 15:10:06,629 INFO 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-3] - Stopping dispatcher 
> akka.tcp://flink@bjfk-c9865.yz02:38663/user/dispatcher.
> 2020-08-14 15:10:06,629 INFO 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-3] - Stopping all currently running jobs 
> of dispatcher akka.tcp://flink@bjfk-c9865.yz02:38663/user/dispatcher.
> 2020-08-14 15:10:06,631 INFO org.apache.flink.runtime.jobmaster.JobMaster 
> [flink-akka.actor.default-dispatcher-29] - Stopping the JobMaster for job 
> EtlAndWindow(6f784d4cc5bae88a332d254b21660372).
> 2020-08-14 15:10:06,632 DEBUG org.apache.flink.runtime.jobmaster.JobMaster 
> [flink-akka.actor.default-dispatcher-29] - Disconnect TaskExecutor 
> container_e144_1590060720089_2161_01_06 because: Stopping JobMaster for 
> job EtlAndWindow(6f784d4cc5bae88a332d254b21660372).
> 2020-08-14 15:10:06,646 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph 
> [flink-akka.actor.default-dispatcher-29] - Job EtlAndWindow 
> (6f784d4cc5bae88a332d254b21660372) switched from state CANCELLING to CANCELED.
> 2020-08-14 15:10:06,664 DEBUG 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-4] - There is a newer JobManagerRunner 
> for the job 6f784d4cc5bae88a332d254b21660372.
> {panel}
> From the log, we can see that job is not finished when dispatcher closes. The 
> process is as following:
>  * Receive cancel command and send it to all tasks async.
>  * In MiniDispatcher, begin to shutting down per-job cluster.
>  * Stopping dispatcher and remove job.
>  * Job is cancelled and callback is executed in method startJobManagerRunner.
>  * Because job is removed before, so currentJobManagerRunner is null which 
> not equals to the original jobManagerRunner. In this case, 
> archivedExecutionGraph will not be uploaded.
> In normal cases, I find that job is cancelled first and then dispatcher is 
> stopped so that archivedExecutionGraph will succeed. But I think that the 
> order is not constrained and it is hard to know which comes first. 
> Above is what I suspected. If so, then we should fix it.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-17304) Kafka two streams cannot use Flink SQL to query inner join

2020-09-01 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-17304.
---
Resolution: Invalid

Close since no response. 

> Kafka two streams cannot use Flink SQL to query inner join
> --
>
> Key: FLINK-17304
> URL: https://issues.apache.org/jira/browse/FLINK-17304
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Table SQL / API
>Affects Versions: 1.9.0
> Environment: flink.version=1.9.0
> scala.binary.version=2.11
>Reporter: xingyuan cheng
>Priority: Major
>
> In my work, I found that when subscribing datastream from two different 
> topics of Kafka, the operator operations of the two streams can be executed 
> respectively, but in the end, I did not query the inner join through Flink 
> SQL as expected. What do I need to do to make it work?
> TestStreamSQL.java
> ```
> public class TestStreamSQL {
>  private static Logger log = LoggerFactory.getLogger(BinlogStreamSQL.class);
>  public static void main(String[] args) throws Exception {
>  StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>  env.enableCheckpointing(1000);
>  
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>  env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
>  env.getCheckpointConfig().setCheckpointTimeout(6);
>  env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>  
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> // env.setStateBackend(new 
> FsStateBackend("hdfs://ido001:8020/user/lwj/flink/checkpoint"));
>  StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>  StreamQueryConfig queryConfig = new StreamQueryConfig();
>  queryConfig.withIdleStateRetentionTime(Time.days(10), Time.days(30));
>  Properties properties = new Properties();
>  properties.setProperty("bootstrap.servers", 
> "ido001.gzcb.com:9092,ido002.gzcb.com:9092,ido003.gzcb.com:9092");
>  properties.setProperty("group.id", "flink");
>  String topic_1 = "bps-16-r3p3";
>  String topic_2 = "bps-16-r3p4";
>  DataStreamSource topic1 = env.addSource(new 
> FlinkKafkaConsumer010(topic_1, new SimpleStringSchema(), properties));
>  SingleOutputStreamOperator> kafkaSource1 = 
> topic1.filter(new FilterFunction() {
>  @Override
>  public boolean filter(String value) throws Exception {
>  try {
>  BinLogBean binLogBean = JSONObject.parseObject(value, BinLogBean.class);
>  if ("app_case".equals(binLogBean.getTableName())){
>  return true;
>  }else {
>  return false;
>  }
>  }catch (Exception e){
>  log.error("JSON转换失败,str={}", value, e);
>  return false;
>  }
>  }
>  }).map(new MapFunction>() {
>  @Override
>  public Tuple3 map(String s) throws Exception {
>  BinLogBean binLogBean = JSONObject.parseObject(s, BinLogBean.class);
>  String case_id = BinLogUtil.getValueByField(binLogBean, "case_id");
>  String close_time = BinLogUtil.getValueByField(binLogBean, "close_time");
>  String approve_result = BinLogUtil.getValueByField(binLogBean, 
> "approve_result");
>  return new Tuple3(case_id, close_time, 
> approve_result);
>  }
>  });
>  tEnv.registerDataStream("app_case", kafkaSource1, "case_id, close_time, 
> approve_result");
>  DataStreamSource topic2 = env.addSource(new 
> FlinkKafkaConsumer010(topic_2, new SimpleStringSchema(), properties));
>  SingleOutputStreamOperator> kafkaSource2 = 
> topic2.filter(new FilterFunction() {
>  @Override
>  public boolean filter(String value) throws Exception {
>  try {
>  BinLogBean binLogBean = JSONObject.parseObject(value, BinLogBean.class);
>  if ("cm_customer".equals(binLogBean.getTableName())){
>  return true;
>  }else {
>  return false;
>  }
>  }catch (Exception e){
>  log.error("JSON转换失败,str={}", value, e);
>  return false;
>  }
>  }
>  }).map(new MapFunction>() {
>  @Override
>  public Tuple2 map(String s) throws Exception {
>  BinLogBean binLogBean = JSONObject.parseObject(s, BinLogBean.class);
>  String case_id = BinLogUtil.getValueByField(binLogBean, "case_id");
>  String idtfno = BinLogUtil.getValueByField(binLogBean, "idtfno");
>  return new Tuple2(case_id, idtfno);
>  }
>  });
>  tEnv.registerDataStream("cm_customer", kafkaSource2, "case_id, idtfno");
>  Table result = tEnv.sqlQuery("select a.*,b.idtfno " +
>  "from app_case a left join cm_customer b on a.case_id = b.case_id " +
>  "where a.close_time not in('')");
>  tEnv.toRetractStream(result, Row.class, queryConfig).filter(new 
> FilterFunction>() {
>  @Override
>  public boolean filter(Tuple2 booleanRowTuple2) throws 
> Exception {
>  return booleanRowTuple2.f0;
>  }
>  }).print();
>  env.execute();
>  }
> }
> ```
>  
> BinLogBean.java
> ```
> public class BinLogBean implements Serializable{
>  private Stri

[GitHub] [flink] flinkbot edited a comment on pull request #13217: [FLINK-16866] Make job submission non-blocking

2020-09-01 Thread GitBox


flinkbot edited a comment on pull request #13217:
URL: https://github.com/apache/flink/pull/13217#issuecomment-678285884


   
   ## CI report:
   
   * 3655fcea1966bfbcb85c86d6a159c354f20d6cc7 UNKNOWN
   * 41769d3c4ebae61f502e16398207d477f59451f9 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5875)
 
   * 08f8c0a7fbfecf7a6eba88653140439f3673c2cc UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13296: [FLINK-18774][format][debezium] Support debezium-avro format

2020-09-01 Thread GitBox


flinkbot edited a comment on pull request #13296:
URL: https://github.com/apache/flink/pull/13296#issuecomment-684611193


   
   ## CI report:
   
   * 83b63305bf9953e98355465c51a584fc3ec813e8 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6050)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] pnowojski commented on a change in pull request #13044: [FLINK-18641][checkpointing] Fix CheckpointCoordinator to work with ExternallyInducedSource

2020-09-01 Thread GitBox


pnowojski commented on a change in pull request #13044:
URL: https://github.com/apache/flink/pull/13044#discussion_r480989742



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##
@@ -530,15 +530,22 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {

request.getOnCompletionFuture()),
timer);
 
-   final CompletableFuture masterStatesComplete = 
pendingCheckpointCompletableFuture
-   .thenCompose(this::snapshotMasterState);
-
final CompletableFuture 
coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture
.thenComposeAsync((pendingCheckpoint) ->

OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(

coordinatorsToCheckpoint, pendingCheckpoint, timer),
timer);
 
+   // We have to take the snapshot of the master hooks 
after the coordinator checkpoints has completed.
+   // This is to ensure the tasks are checkpointed after 
the OperatorCoordinators in case
+   // ExternallyInducedSource is used.
+   final CompletableFuture masterStatesComplete = 
coordinatorCheckpointsComplete
+   .thenComposeAsync(ignored -> {
+   PendingCheckpoint checkpoint =
+   
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);

Review comment:
   > the behavior is guaranteed by CompletableFuture, the assertion here 
would essentially be verifying CompletableFuture,
   
   Not exactly. It would be verifying that you chained a couple of futures and 
callbacks correctly. That the callback `foo()` is using `future1` result and is 
triggered once `future2` completes, and that `future1` and `future2` are 
chained (or am I still mis understanding this code?). Java library doesn't 
guarantee you that, but your code that is chaining the futures does. Which is 
outside of the `foo()`'s control, so from `foo()`s perspective, that's an 
external assumption, and falls under:
   
   > ensure the interface contract with users are not broken.
   
   Where "users" are function's callers.
   
   And as I wrote before. If something violates this assumption, and even if 
some unit test fail, it's a bit easier to understand a `checkState` compared to 
`NPE`. Note performance overhead of one if check doesn't matter here at all. 
Also it's harder of `checkState` to become outdated and misleading over time.
   
   If you have so strong feelings about, put a comment, but I do not see any 
drawback of replacing a comment with a `checkState` with the same comment but 
as a message here.

##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##
@@ -2260,6 +2263,121 @@ public void testSavepointScheduledInUnalignedMode() 
throws Exception {
}
}
 
+   /**
+* Test that the checkpoint still behave correctly when the task 
checkpoint is triggered by the
+* master hooks and finished before the master checkpoint.
+*/
+   @Test
+   public void testTaskCheckpointTriggeredByMasterHooks() {
+   try {
+   final JobID jid = new JobID();
+
+   // create some mock Execution vertices that receive the 
checkpoint trigger messages
+   final ExecutionAttemptID attemptID1 = new 
ExecutionAttemptID();
+   final ExecutionAttemptID attemptID2 = new 
ExecutionAttemptID();
+   ExecutionVertex vertex1 = 
mockExecutionVertex(attemptID1,
+   (executionAttemptID, jobId, checkpointId, 
timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+   ExecutionVertex vertex2 = 
mockExecutionVertex(attemptID2,
+   (executionAttemptID, jobId, checkpointId, 
timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+
+   // set up the coordinator and validate the initial state
+   CheckpointCoordinator coord = 
getCheckpointCoordinator(jid, vertex1, vertex2);
+   AtomicReference checkpointIdRef = new 
AtomicReference<>();
+
+   OperatorID opID1 = 
OperatorID.fromJobVertexID(vertex1.getJobvertexId());
+   OperatorID opID2 = 
OperatorID.fromJobVertexID(vertex2.getJobvertexId());
+  

[GitHub] [flink] flinkbot edited a comment on pull request #13291: [FLINK-18988][table] Continuous query with LATERAL and LIMIT produces…

2020-09-01 Thread GitBox


flinkbot edited a comment on pull request #13291:
URL: https://github.com/apache/flink/pull/13291#issuecomment-684180034


   
   ## CI report:
   
   * 8406cd55c09be8ad125f574be5e58a91ad717f38 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6034)
 
   * 3dff1294d9c3e3b595353075fab383714d97f63f Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6052)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] pnowojski commented on a change in pull request #13044: [FLINK-18641][checkpointing] Fix CheckpointCoordinator to work with ExternallyInducedSource

2020-09-01 Thread GitBox


pnowojski commented on a change in pull request #13044:
URL: https://github.com/apache/flink/pull/13044#discussion_r480989742



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##
@@ -530,15 +530,22 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {

request.getOnCompletionFuture()),
timer);
 
-   final CompletableFuture masterStatesComplete = 
pendingCheckpointCompletableFuture
-   .thenCompose(this::snapshotMasterState);
-
final CompletableFuture 
coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture
.thenComposeAsync((pendingCheckpoint) ->

OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(

coordinatorsToCheckpoint, pendingCheckpoint, timer),
timer);
 
+   // We have to take the snapshot of the master hooks 
after the coordinator checkpoints has completed.
+   // This is to ensure the tasks are checkpointed after 
the OperatorCoordinators in case
+   // ExternallyInducedSource is used.
+   final CompletableFuture masterStatesComplete = 
coordinatorCheckpointsComplete
+   .thenComposeAsync(ignored -> {
+   PendingCheckpoint checkpoint =
+   
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);

Review comment:
   > the behavior is guaranteed by CompletableFuture, the assertion here 
would essentially be verifying CompletableFuture,
   
   Not exactly. It would be verifying that you chained a couple of futures and 
callbacks correctly. That the callback `foo()` is using `future1` result and is 
triggered once `future2` completes, and that `future1` and `future2` are 
chained (or am I still mis understanding this code?). Java library doesn't 
guarantee you that, but your code that is chaining the futures does. Which is 
outside of the `foo()`'s control, so from `foo()`s perspective, that's an 
external assumption, and falls under:
   
   > ensure the interface contract with users are not broken.
   
   Where "users" are function's callers.
   
   And as I wrote before. If something violates this assumption, and even if 
some unit test fail, it's a bit easier to understand a `checkState` compared to 
`NPE`. Note performance overhead of one if check doesn't matter here at all. 
Also it's harder of `checkState` to become outdated and misleading over time.
   
   But it's not big issue, so if you have so strong feelings about it, put a 
comment.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] pnowojski commented on a change in pull request #13044: [FLINK-18641][checkpointing] Fix CheckpointCoordinator to work with ExternallyInducedSource

2020-09-01 Thread GitBox


pnowojski commented on a change in pull request #13044:
URL: https://github.com/apache/flink/pull/13044#discussion_r480989742



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##
@@ -530,15 +530,22 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {

request.getOnCompletionFuture()),
timer);
 
-   final CompletableFuture masterStatesComplete = 
pendingCheckpointCompletableFuture
-   .thenCompose(this::snapshotMasterState);
-
final CompletableFuture 
coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture
.thenComposeAsync((pendingCheckpoint) ->

OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(

coordinatorsToCheckpoint, pendingCheckpoint, timer),
timer);
 
+   // We have to take the snapshot of the master hooks 
after the coordinator checkpoints has completed.
+   // This is to ensure the tasks are checkpointed after 
the OperatorCoordinators in case
+   // ExternallyInducedSource is used.
+   final CompletableFuture masterStatesComplete = 
coordinatorCheckpointsComplete
+   .thenComposeAsync(ignored -> {
+   PendingCheckpoint checkpoint =
+   
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);

Review comment:
   > the behavior is guaranteed by CompletableFuture, the assertion here 
would essentially be verifying CompletableFuture,
   
   Not exactly. It would be verifying that you chained a couple of futures and 
callbacks correctly. That the callback `foo()` is using `future1` result and is 
triggered once `future2` completes, and that `future1` and `future2` are 
chained (or am I still mis understanding this code?). Java library doesn't 
guarantee you that, but your code that is chaining the futures does. Which is 
outside of the `foo()`'s control, so from `foo()`s perspective, that's an 
external assumption, and falls under:
   
   > ensure the interface contract with users are not broken.
   
   Where "users" are function's callers.
   
   And as I wrote before. If something violates this assumption, and even if 
some unit test fail, it's a bit easier to understand a `checkState` compared to 
`NPE`. Note performance overhead of one if check doesn't matter here at all. 
Also it's harder of `checkState` to become outdated and misleading over time.
   
   But it's not big issue, so if you have so strong feelings about, put a 
comment.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (FLINK-17528) Use getters instead of RowData#get() utility in JsonRowDataSerializationSchema

2020-09-01 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-17528:
---

Assignee: Jark Wu

> Use getters instead of RowData#get() utility in JsonRowDataSerializationSchema
> --
>
> Key: FLINK-17528
> URL: https://issues.apache.org/jira/browse/FLINK-17528
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>
> Currently, we are using utility {{RowData#get(RowData, int, LogicalType)}} 
> and {{ArrayData#get(ArrayData, int, LogicalType)}} to get the field/element 
> objects. However, this is not as efficient as getters of RowData/ArrayData, 
> because this utility involves boxing and unboxing and a big switch condition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-16528) Support Limit push down for streaming sources

2020-09-01 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-16528:
---

Assignee: Jark Wu

> Support Limit push down for streaming sources
> -
>
> Key: FLINK-16528
> URL: https://issues.apache.org/jira/browse/FLINK-16528
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Ecosystem, Table SQL / Planner
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>  Labels: usability
> Fix For: 1.12.0
>
>
> Currently, a limit query {{SELECT * FROM kafka LIMIT 10}} will be translated 
> into TopN operator and will scan the full data in the source. However, 
> {{LIMIT}} is a very useful feature in SQL CLI to explore data in the source. 
> It doesn't make sense it never stop. 
> We can support such case in streaming mode (ignore the text format):
> {code}
> flink > SELECT * FROM kafka LIMIT 10;
>  kafka_key  |user_name| lang |   created_at
> +-+--+-
>  494227746231685121 | burncaniff  | en   | 2014-07-29 14:07:31.000
>  494227746214535169 | gu8tn   | ja   | 2014-07-29 14:07:31.000
>  494227746219126785 | pequitamedicen  | es   | 2014-07-29 14:07:31.000
>  494227746201931777 | josnyS  | ht   | 2014-07-29 14:07:31.000
>  494227746219110401 | Cafe510 | en   | 2014-07-29 14:07:31.000
>  494227746210332673 | Da_JuanAnd_Only | en   | 2014-07-29 14:07:31.000
>  494227746193956865 | Smile_Kidrauhl6 | pt   | 2014-07-29 14:07:31.000
>  494227750426017793 | CashforeverCD   | en   | 2014-07-29 14:07:32.000
>  494227750396653569 | FilmArsivimiz   | tr   | 2014-07-29 14:07:32.000
>  494227750388256769 | jmolas  | es   | 2014-07-29 14:07:32.000
> (10 rows)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16528) Support Limit push down for Kafka streaming sources

2020-09-01 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-16528:

Summary: Support Limit push down for Kafka streaming sources  (was: Support 
Limit push down for streaming sources)

> Support Limit push down for Kafka streaming sources
> ---
>
> Key: FLINK-16528
> URL: https://issues.apache.org/jira/browse/FLINK-16528
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Ecosystem, Table SQL / Planner
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>  Labels: usability
> Fix For: 1.12.0
>
>
> Currently, a limit query {{SELECT * FROM kafka LIMIT 10}} will be translated 
> into TopN operator and will scan the full data in the source. However, 
> {{LIMIT}} is a very useful feature in SQL CLI to explore data in the source. 
> It doesn't make sense it never stop. 
> We can support such case in streaming mode (ignore the text format):
> {code}
> flink > SELECT * FROM kafka LIMIT 10;
>  kafka_key  |user_name| lang |   created_at
> +-+--+-
>  494227746231685121 | burncaniff  | en   | 2014-07-29 14:07:31.000
>  494227746214535169 | gu8tn   | ja   | 2014-07-29 14:07:31.000
>  494227746219126785 | pequitamedicen  | es   | 2014-07-29 14:07:31.000
>  494227746201931777 | josnyS  | ht   | 2014-07-29 14:07:31.000
>  494227746219110401 | Cafe510 | en   | 2014-07-29 14:07:31.000
>  494227746210332673 | Da_JuanAnd_Only | en   | 2014-07-29 14:07:31.000
>  494227746193956865 | Smile_Kidrauhl6 | pt   | 2014-07-29 14:07:31.000
>  494227750426017793 | CashforeverCD   | en   | 2014-07-29 14:07:32.000
>  494227750396653569 | FilmArsivimiz   | tr   | 2014-07-29 14:07:32.000
>  494227750388256769 | jmolas  | es   | 2014-07-29 14:07:32.000
> (10 rows)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhuzhurk commented on a change in pull request #13284: [FLINK-17016][runtime] Change blink planner batch jobs to run with pipelined region scheduling

2020-09-01 Thread GitBox


zhuzhurk commented on a change in pull request #13284:
URL: https://github.com/apache/flink/pull/13284#discussion_r481013002



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
##
@@ -300,6 +305,32 @@ public int getNumberOfVertices() {
return this.taskVertices.size();
}
 
+   public Set getSlotSharingGroups() {
+   final Set slotSharingGroups = new HashSet<>();
+   for (JobVertex vertex : getVertices()) {
+   final SlotSharingGroup slotSharingGroup = 
vertex.getSlotSharingGroup();
+   checkNotNull(slotSharingGroup);
+
+   slotSharingGroups.add(slotSharingGroup);
+   }
+   return Collections.unmodifiableSet(slotSharingGroups);
+   }
+
+   public Set getCoLocationGroupDescriptors() {
+   final Set coLocationGroups = new HashSet<>();
+   for (JobVertex vertex : getVertices()) {
+   CoLocationGroup coLocationGroup = 
vertex.getCoLocationGroup();
+   if (coLocationGroup != null) {
+   coLocationGroups.add(coLocationGroup);
+   }
+   }
+   final Set coLocationGroupDescs = 
coLocationGroups
+   .stream()
+   .map(CoLocationGroupDesc::from)
+   .collect(Collectors.toSet());

Review comment:
   We cannot do this because `CoLocationGroupDesc` does not override 
`equals(...)`. And I'd like to avoid the equality comparison which may require 
to compare 2 lists.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #13284: [FLINK-17016][runtime] Change blink planner batch jobs to run with pipelined region scheduling

2020-09-01 Thread GitBox


zhuzhurk commented on a change in pull request #13284:
URL: https://github.com/apache/flink/pull/13284#discussion_r481014309



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorFactory.java
##
@@ -19,11 +19,39 @@
 
 package org.apache.flink.runtime.scheduler;
 
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
 /**
  * Interface for {@link ExecutionSlotAllocator} factories.
  */
+@FunctionalInterface
 public interface ExecutionSlotAllocatorFactory {
 
-   ExecutionSlotAllocator createInstance(PreferredLocationsRetriever 
preferredLocationsRetriever);
-
+   /**
+* Instantiates the {@link ExecutionSlotAllocator}.
+*
+* @param preferredLocationsRetriever to retrieve preferred locations 
for each execution
+* @param resourceProfileRetriever to retrieve required {@link 
ResourceProfile} for each execution
+* @param priorAllocationIdRetriever to retrieve prior {@link 
AllocationID} for each execution
+* @param topology scheduling topology of the job
+* @param logicalSlotSharingGroupSupplier to supply all slot sharing 
groups in the job
+* @param coLocationGroupSupplier to supply all co-location groups in 
the job
+* @return The instantiated slot allocator
+*/
+   ExecutionSlotAllocator createInstance(
+   PreferredLocationsRetriever preferredLocationsRetriever,
+   Function 
resourceProfileRetriever,
+   Function 
priorAllocationIdRetriever,
+   SchedulingTopology topology,
+   Supplier> logicalSlotSharingGroupSupplier,
+   Supplier> coLocationGroupSupplier);

Review comment:
   Making it suppliers make it possible to invoke `getSlotSharingGroups()` 
and `getCoLocationGroupDescriptors()` lazily.
   It's better to not invoke them if it is not 
`SlotSharingExecutionSlotAllocator` .





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13141: [FLINK-18852] Fix StreamScan doesn't inherit parallelism from input in legacy planner

2020-09-01 Thread GitBox


flinkbot edited a comment on pull request #13141:
URL: https://github.com/apache/flink/pull/13141#issuecomment-673492124


   
   ## CI report:
   
   * 0e9c4198095239ebee07a466a5e23de1a60809ac Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5768)
 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5581)
 
   * 2ae1223cf2a3be7c8fea0666821a217e8fa2c841 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #13284: [FLINK-17016][runtime] Change blink planner batch jobs to run with pipelined region scheduling

2020-09-01 Thread GitBox


zhuzhurk commented on a change in pull request #13284:
URL: https://github.com/apache/flink/pull/13284#discussion_r481015386



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorFactory.java
##
@@ -19,11 +19,39 @@
 
 package org.apache.flink.runtime.scheduler;
 
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
 /**
  * Interface for {@link ExecutionSlotAllocator} factories.
  */
+@FunctionalInterface
 public interface ExecutionSlotAllocatorFactory {
 
-   ExecutionSlotAllocator createInstance(PreferredLocationsRetriever 
preferredLocationsRetriever);
-
+   /**
+* Instantiates the {@link ExecutionSlotAllocator}.
+*
+* @param preferredLocationsRetriever to retrieve preferred locations 
for each execution
+* @param resourceProfileRetriever to retrieve required {@link 
ResourceProfile} for each execution
+* @param priorAllocationIdRetriever to retrieve prior {@link 
AllocationID} for each execution
+* @param topology scheduling topology of the job
+* @param logicalSlotSharingGroupSupplier to supply all slot sharing 
groups in the job
+* @param coLocationGroupSupplier to supply all co-location groups in 
the job
+* @return The instantiated slot allocator
+*/
+   ExecutionSlotAllocator createInstance(
+   PreferredLocationsRetriever preferredLocationsRetriever,
+   Function 
resourceProfileRetriever,
+   Function 
priorAllocationIdRetriever,
+   SchedulingTopology topology,
+   Supplier> logicalSlotSharingGroupSupplier,
+   Supplier> coLocationGroupSupplier);

Review comment:
   Good suggestion.
   But I think it would be better to compose `PreferredLocationsRetriever` 
instead of inherit it.
   WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13217: [FLINK-16866] Make job submission non-blocking

2020-09-01 Thread GitBox


flinkbot edited a comment on pull request #13217:
URL: https://github.com/apache/flink/pull/13217#issuecomment-678285884


   
   ## CI report:
   
   * 3655fcea1966bfbcb85c86d6a159c354f20d6cc7 UNKNOWN
   * 41769d3c4ebae61f502e16398207d477f59451f9 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5875)
 
   * 08f8c0a7fbfecf7a6eba88653140439f3673c2cc Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6051)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   4   >