[GitHub] [flink] rmetzger commented on a change in pull request #10976: [FLINK-13978][build system] Add experimental support for building on Azure Pipelines

2020-02-13 Thread GitBox
rmetzger commented on a change in pull request #10976: [FLINK-13978][build 
system] Add experimental support for building on Azure Pipelines
URL: https://github.com/apache/flink/pull/10976#discussion_r379291684
 
 

 ##
 File path: tools/azure-pipelines/jobs-template.yml
 ##
 @@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+parameters:
+  test_pool_definition: # defines the hardware pool for compilation and unit 
test execution.
+  e2e_pool_definion: # defines the hardware pool for end-to-end test execution
+  stage_name: # defines a unique identifier for all jobs in a stage (in case 
the jobs are added multiple times to a stage)
+  environment: # defines environment variables for downstream scripts
+
+jobs:
+- job: compile_${{parameters.stage_name}}
+  condition: not(eq(variables['MODE'], 'e2e'))
+  pool: ${{parameters.test_pool_definition}}
+  container: flink-build-container
+  timeoutInMinutes: 240
+  cancelTimeoutInMinutes: 1
+  workspace:
+clean: all # this cleans the entire workspace directory before running a 
new job
+# It is necessary because the custom build machines are reused for tests.
+# See also 
https://docs.microsoft.com/en-us/azure/devops/pipelines/process/phases?view=azure-devops=yaml#workspace
 
+
+  steps:
+  # The cache task is persisting the .m2 directory between builds, so that
+  # we do not have to re-download all dependencies from maven central for 
+  # each build. The hope is that downloading the cache is faster than
+  # all dependencies individually.
+  # In this configuration, we a hash over all committed (not generated) .pom 
files 
+  # as a key for the build cache (CACHE_KEY). If we have a cache miss on the 
hash
+  # (usually because a pom file has changed), we'll fall back to a key without
+  # the pom files (CACHE_FALLBACK_KEY).
+  # Offical documentation of the Cache task: 
https://docs.microsoft.com/en-us/azure/devops/pipelines/caching/?view=azure-devops
+  - task: Cache@2
+inputs:
+  key: $(CACHE_KEY)
+  restoreKeys: $(CACHE_FALLBACK_KEY)
+  path: $(MAVEN_CACHE_FOLDER)
+continueOnError: true # continue the build even if the cache fails.
+displayName: Cache Maven local repo
+
+  # Compile
+  - script: STAGE=compile ${{parameters.environment}} 
./tools/azure_controller.sh compile
+displayName: Build
+
+  # upload artifacts for next stage
+  - task: PublishPipelineArtifact@1
+inputs:
+  path: $(CACHE_FLINK_DIR)
+  artifact: FlinkCompileCacheDir-${{parameters.stage_name}}
+
+- job: test_${{parameters.stage_name}}
+  dependsOn: compile_${{parameters.stage_name}}
+  condition: not(eq(variables['MODE'], 'e2e'))
+  pool: ${{parameters.test_pool_definition}}
+  container: flink-build-container
+  timeoutInMinutes: 240
+  cancelTimeoutInMinutes: 1
+  workspace:
+clean: all
+  strategy:
+matrix:
+  core:
+module: core
+  python:
+module: python
+  libraries:
+module: libraries
+  blink_planner:
+module: blink_planner
+  connectors:
+module: connectors
+  kafka_gelly:
+module: kafka/gelly
+  tests:
+module: tests
+  legacy_scheduler_core:
+module: legacy_scheduler_core
+  legacy_scheduler_tests:
+module: legacy_scheduler_tests
+  misc:
+module: misc
+  steps:
+
+  # download artifacts
+  - task: DownloadPipelineArtifact@2
+inputs:
+  path: $(CACHE_FLINK_DIR)
+  artifact: FlinkCompileCacheDir-${{parameters.stage_name}}
+
+  # recreate "build-target" symlink for python tests
+  - script: |
+  ls -lisah $(CACHE_FLINK_DIR)
+  ls -lisah .
+  ln -snf 
$(CACHE_FLINK_DIR)/flink-dist/target/flink-*-SNAPSHOT-bin/flink-*-SNAPSHOT 
$(CACHE_FLINK_DIR)/build-target
+displayName: Recreate 'build-target' symlink
+  # Test
+  - script: STAGE=test ${{parameters.environment}} ./tools/azure_controller.sh 
$(module)
+displayName: Test - $(module)
+
+  - task: PublishTestResults@2
+inputs:
+  testResultsFormat: 'JUnit'
+
+
+- job: e2e_${{parameters.stage_name}}
+  condition: eq(variables['MODE'], 'e2e')
+  # We are not running this job on a container, but in a VM.
 
 Review comment:
   Running the pre commit on a 

[GitHub] [flink] flinkbot commented on issue #11089: [FLINK-16053][python] Remove redundant metrics in PyFlink

2020-02-13 Thread GitBox
flinkbot commented on issue #11089: [FLINK-16053][python] Remove redundant 
metrics in PyFlink
URL: https://github.com/apache/flink/pull/11089#issuecomment-586139901
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit b48b5777b9700ed5c2bae5dac3ead1fcad6ede9c (Fri Feb 14 
07:50:41 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-16053) Remove redundant metrics in PyFlink

2020-02-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-16053:
---
Labels: pull-request-available  (was: )

> Remove redundant metrics in PyFlink
> ---
>
> Key: FLINK-16053
> URL: https://issues.apache.org/jira/browse/FLINK-16053
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> We have recorded the metrics about how many elements it has processed in 
> Python UDF. This kind of information is not necessary as there is also this 
> kind of information in the Java operator. I have performed a simple test and 
> find that removing it could improve the performance about 5% - 10%.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] dianfu opened a new pull request #11089: [FLINK-16053][python] Remove redundant metrics in PyFlink

2020-02-13 Thread GitBox
dianfu opened a new pull request #11089: [FLINK-16053][python] Remove redundant 
metrics in PyFlink
URL: https://github.com/apache/flink/pull/11089
 
 
   ## What is the purpose of the change
   
   *We have recorded the metrics about how many elements it has processed in 
Python UDF. This kind of information is not necessary as there is also this 
kind of information in the Java operator. I have performed a simple test and 
find that removing it could improve the performance about 5% - 10%.*
   
   ## Brief change log
   
 - *Removes redundant metrics in PyFlink*
   
   ## Verifying this change
   
   This change is a performance improvement work and have verified in my local 
environment. 
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-16053) Remove redundant metrics in PyFlink

2020-02-13 Thread Dian Fu (Jira)
Dian Fu created FLINK-16053:
---

 Summary: Remove redundant metrics in PyFlink
 Key: FLINK-16053
 URL: https://issues.apache.org/jira/browse/FLINK-16053
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.11.0


We have recorded the metrics about how many elements it has processed in Python 
UDF. This kind of information is not necessary as there is also this kind of 
information in the Java operator. I have performed a simple test and find that 
removing it could improve the performance about 5% - 10%.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16052) Homebrew test failed with 1.10.0 dist package

2020-02-13 Thread Yu Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036747#comment-17036747
 ] 

Yu Li commented on FLINK-16052:
---

cc [~karmagyz] [~xintongsong] 

> Homebrew test failed with 1.10.0 dist package
> -
>
> Key: FLINK-16052
> URL: https://issues.apache.org/jira/browse/FLINK-16052
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Scripts
>Affects Versions: 1.10.0
>Reporter: Yu Li
>Priority: Major
> Fix For: 1.10.1, 1.11.0
>
>
> After updating the Homebrew formula to 1.10.0 (in {{$(brew --repository 
> homebrew/core)}} directory) with patch of this 
> [PR|https://github.com/Homebrew/homebrew-core/pull/50110], executing `brew 
> install --build-from-source Formula/apache-flink.rb` and then `brew test 
> Formula/apache-flink.rb`, we could see below error:
> {code:java}
> [ERROR] Unexpected result: Error: Could not find or load main class 
> org.apache.flink.runtime.util.BashJavaUtils
> [ERROR] The last line of the BashJavaUtils outputs is expected to be the 
> execution result, following the prefix 'BASH_JAVA_UTILS_EXEC_RESULT:'
> Picked up _JAVA_OPTIONS: 
> -Djava.io.tmpdir=/private/tmp/apache-flink-test-20200214-33361-1jotper 
> -Duser.home=/Users/jueding/Library/Caches/Homebrew/java_cache
> Error: Could not find or load main class 
> org.apache.flink.runtime.util.BashJavaUtils
> [ERROR] Could not get JVM parameters properly.
> Error: apache-flink: failed
> Failed executing:
> {code}
> After a bisect checking on {{flink-dist/src/main/flink-bin/bin}} changes, 
> confirmed the above issue is related to FLINK-15488, but we will see new 
> errors like below after reverting FLINK-15488 (and FLINK-15519):
> {code:java}
> ==> /usr/local/Cellar/apache-flink/1.10.0/libexec/bin/start-cluster.sh
> ==> /usr/local/Cellar/apache-flink/1.10.0/bin/flink run -p 1 
> /usr/local/Cellar/apache-flink/1.10.0/libexec/examples/streaming/WordCount.jar
>  --input input --output result
> Last 15 lines from 
> /Users/jueding/Library/Logs/Homebrew/apache-flink/test.02.flink:
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>   ... 4 more
> Caused by: 
> org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException:
>  Could not fulfill slot request b7f17c0928112209ae873d089123b1c6. Requested 
> resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$fulfillPendingSlotRequestWithPendingTaskManagerSlot$9(SlotManagerImpl.java:772)
>   at 
> org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:768)
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$7(SlotManagerImpl.java:755)
>   at 
> org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:755)
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:314)
>   ... 27 more
> Error: apache-flink: failed
> Failed executing:
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16052) Homebrew test failed with 1.10.0 dist package

2020-02-13 Thread Yu Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036744#comment-17036744
 ] 

Yu Li commented on FLINK-16052:
---

Some background:

This issue is found during the follow up of the 1.10.0 [release 
process|https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release],
 which includes a step of opening a Homebrew PR for the new release. I thought 
it to be an environment problem when testing locally since the 1.9.1 formula 
also didn't work on my Mac, until the reviewer of homebrew PR observed the same 
issue.

> Homebrew test failed with 1.10.0 dist package
> -
>
> Key: FLINK-16052
> URL: https://issues.apache.org/jira/browse/FLINK-16052
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Scripts
>Affects Versions: 1.10.0
>Reporter: Yu Li
>Priority: Major
> Fix For: 1.10.1, 1.11.0
>
>
> After updating the Homebrew formula to 1.10.0 (in {{$(brew --repository 
> homebrew/core)}} directory) with patch of this 
> [PR|https://github.com/Homebrew/homebrew-core/pull/50110], executing `brew 
> install --build-from-source Formula/apache-flink.rb` and then `brew test 
> Formula/apache-flink.rb`, we could see below error:
> {code:java}
> [ERROR] Unexpected result: Error: Could not find or load main class 
> org.apache.flink.runtime.util.BashJavaUtils
> [ERROR] The last line of the BashJavaUtils outputs is expected to be the 
> execution result, following the prefix 'BASH_JAVA_UTILS_EXEC_RESULT:'
> Picked up _JAVA_OPTIONS: 
> -Djava.io.tmpdir=/private/tmp/apache-flink-test-20200214-33361-1jotper 
> -Duser.home=/Users/jueding/Library/Caches/Homebrew/java_cache
> Error: Could not find or load main class 
> org.apache.flink.runtime.util.BashJavaUtils
> [ERROR] Could not get JVM parameters properly.
> Error: apache-flink: failed
> Failed executing:
> {code}
> After a bisect checking on {{flink-dist/src/main/flink-bin/bin}} changes, 
> confirmed the above issue is related to FLINK-15488, but we will see new 
> errors like below after reverting FLINK-15488 (and FLINK-15519):
> {code:java}
> ==> /usr/local/Cellar/apache-flink/1.10.0/libexec/bin/start-cluster.sh
> ==> /usr/local/Cellar/apache-flink/1.10.0/bin/flink run -p 1 
> /usr/local/Cellar/apache-flink/1.10.0/libexec/examples/streaming/WordCount.jar
>  --input input --output result
> Last 15 lines from 
> /Users/jueding/Library/Logs/Homebrew/apache-flink/test.02.flink:
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>   ... 4 more
> Caused by: 
> org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException:
>  Could not fulfill slot request b7f17c0928112209ae873d089123b1c6. Requested 
> resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$fulfillPendingSlotRequestWithPendingTaskManagerSlot$9(SlotManagerImpl.java:772)
>   at 
> org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:768)
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$7(SlotManagerImpl.java:755)
>   at 
> org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:755)
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:314)
>   ... 27 more
> Error: apache-flink: failed
> Failed executing:
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16052) Homebrew test failed with 1.10.0 dist package

2020-02-13 Thread Yu Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Li updated FLINK-16052:
--
Description: 
After updating the Homebrew formula to 1.10.0 (in {{$(brew --repository 
homebrew/core)}} directory) with patch of this 
[PR|https://github.com/Homebrew/homebrew-core/pull/50110], executing `brew 
install --build-from-source Formula/apache-flink.rb` and then `brew test 
Formula/apache-flink.rb`, we could see below error:
{code:java}
[ERROR] Unexpected result: Error: Could not find or load main class 
org.apache.flink.runtime.util.BashJavaUtils
[ERROR] The last line of the BashJavaUtils outputs is expected to be the 
execution result, following the prefix 'BASH_JAVA_UTILS_EXEC_RESULT:'
Picked up _JAVA_OPTIONS: 
-Djava.io.tmpdir=/private/tmp/apache-flink-test-20200214-33361-1jotper 
-Duser.home=/Users/jueding/Library/Caches/Homebrew/java_cache
Error: Could not find or load main class 
org.apache.flink.runtime.util.BashJavaUtils
[ERROR] Could not get JVM parameters properly.
Error: apache-flink: failed
Failed executing:
{code}
After a bisect checking on {{flink-dist/src/main/flink-bin/bin}} changes, 
confirmed the above issue is related to FLINK-15488, but we will see new errors 
like below after reverting FLINK-15488 (and FLINK-15519):
{code:java}
==> /usr/local/Cellar/apache-flink/1.10.0/libexec/bin/start-cluster.sh
==> /usr/local/Cellar/apache-flink/1.10.0/bin/flink run -p 1 
/usr/local/Cellar/apache-flink/1.10.0/libexec/examples/streaming/WordCount.jar 
--input input --output result
Last 15 lines from 
/Users/jueding/Library/Logs/Homebrew/apache-flink/test.02.flink:
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: 
org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException:
 Could not fulfill slot request b7f17c0928112209ae873d089123b1c6. Requested 
resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$fulfillPendingSlotRequestWithPendingTaskManagerSlot$9(SlotManagerImpl.java:772)
at 
org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:768)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$7(SlotManagerImpl.java:755)
at 
org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:755)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:314)
... 27 more
Error: apache-flink: failed
Failed executing:
{code}

  was:
After updating the homebrew formula to 1.10.0 with patch supplied in this 
[PR|https://github.com/Homebrew/homebrew-core/pull/50110], executing `brew 
install --build-from-source Formula/apache-flink.rb` and then `brew test 
Formula/apache-flink.rb`, we could see below error:
{code}
[ERROR] Unexpected result: Error: Could not find or load main class 
org.apache.flink.runtime.util.BashJavaUtils
[ERROR] The last line of the BashJavaUtils outputs is expected to be the 
execution result, following the prefix 'BASH_JAVA_UTILS_EXEC_RESULT:'
Picked up _JAVA_OPTIONS: 
-Djava.io.tmpdir=/private/tmp/apache-flink-test-20200214-33361-1jotper 
-Duser.home=/Users/jueding/Library/Caches/Homebrew/java_cache
Error: Could not find or load main class 
org.apache.flink.runtime.util.BashJavaUtils
[ERROR] Could not get JVM parameters properly.
Error: apache-flink: failed
Failed executing:
{code}

After a bisect checking on {{flink-dist/src/main/flink-bin/bin}} changes, 
confirmed the above issue is related to FLINK-15488, but we will see new errors 
like below after reverting FLINK-15488 (and FLINK-15519):
{code}
==> /usr/local/Cellar/apache-flink/1.10.0/libexec/bin/start-cluster.sh
==> /usr/local/Cellar/apache-flink/1.10.0/bin/flink run -p 1 
/usr/local/Cellar/apache-flink/1.10.0/libexec/examples/streaming/WordCount.jar 
--input input --output result
Last 15 lines from 
/Users/jueding/Library/Logs/Homebrew/apache-flink/test.02.flink:
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 

[jira] [Created] (FLINK-16052) Homebrew test failed with 1.10.0 dist package

2020-02-13 Thread Yu Li (Jira)
Yu Li created FLINK-16052:
-

 Summary: Homebrew test failed with 1.10.0 dist package
 Key: FLINK-16052
 URL: https://issues.apache.org/jira/browse/FLINK-16052
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Scripts
Affects Versions: 1.10.0
Reporter: Yu Li
 Fix For: 1.10.1, 1.11.0


After updating the homebrew formula to 1.10.0 with patch supplied in this 
[PR|https://github.com/Homebrew/homebrew-core/pull/50110], executing `brew 
install --build-from-source Formula/apache-flink.rb` and then `brew test 
Formula/apache-flink.rb`, we could see below error:
{code}
[ERROR] Unexpected result: Error: Could not find or load main class 
org.apache.flink.runtime.util.BashJavaUtils
[ERROR] The last line of the BashJavaUtils outputs is expected to be the 
execution result, following the prefix 'BASH_JAVA_UTILS_EXEC_RESULT:'
Picked up _JAVA_OPTIONS: 
-Djava.io.tmpdir=/private/tmp/apache-flink-test-20200214-33361-1jotper 
-Duser.home=/Users/jueding/Library/Caches/Homebrew/java_cache
Error: Could not find or load main class 
org.apache.flink.runtime.util.BashJavaUtils
[ERROR] Could not get JVM parameters properly.
Error: apache-flink: failed
Failed executing:
{code}

After a bisect checking on {{flink-dist/src/main/flink-bin/bin}} changes, 
confirmed the above issue is related to FLINK-15488, but we will see new errors 
like below after reverting FLINK-15488 (and FLINK-15519):
{code}
==> /usr/local/Cellar/apache-flink/1.10.0/libexec/bin/start-cluster.sh
==> /usr/local/Cellar/apache-flink/1.10.0/bin/flink run -p 1 
/usr/local/Cellar/apache-flink/1.10.0/libexec/examples/streaming/WordCount.jar 
--input input --output result
Last 15 lines from 
/Users/jueding/Library/Logs/Homebrew/apache-flink/test.02.flink:
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: 
org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException:
 Could not fulfill slot request b7f17c0928112209ae873d089123b1c6. Requested 
resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$fulfillPendingSlotRequestWithPendingTaskManagerSlot$9(SlotManagerImpl.java:772)
at 
org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:768)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$7(SlotManagerImpl.java:755)
at 
org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:755)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:314)
... 27 more
Error: apache-flink: failed
Failed executing:
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] hequn8128 commented on a change in pull request #10995: [FLINK-15847][ml] Include flink-ml-api and flink-ml-lib in opt

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #10995: [FLINK-15847][ml] 
Include flink-ml-api and flink-ml-lib in opt
URL: https://github.com/apache/flink/pull/10995#discussion_r379278971
 
 

 ##
 File path: flink-ml-parent/flink-ml-lib/pom.xml
 ##
 @@ -57,4 +57,30 @@ under the License.
1.1.2


+
+   
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   shade-flink
+   package
+   
+   shade
+   
+   
+   
+   
+   
com.github.fommil.netlib:core
 
 Review comment:
   @zentol cc


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-16047) Blink planner produces wrong aggregate results with state clean up

2020-02-13 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036729#comment-17036729
 ] 

Jark Wu commented on FLINK-16047:
-

[~twalthr] you are right. We didn't spot this problem before. Let's fix it! 

> Blink planner produces wrong aggregate results with state clean up
> --
>
> Key: FLINK-16047
> URL: https://issues.apache.org/jira/browse/FLINK-16047
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0
>Reporter: Timo Walther
>Priority: Critical
>
> It seems that FLINK-10674 has not been ported to the Blink planner.
> Because state clean up happens in processing time, it might be the case that 
> retractions are arriving after the state has been cleaned up. Before these 
> changes, a new accumulator was created and invalid retraction messages were 
> emitted. This change drops retraction messages for which no accumulator 
> exists.
> These lines are missing in 
> {{org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction}}:
> {code}
> if (null == accumulators) {
>   // Don't create a new accumulator for a retraction message. This
>   // might happen if the retraction message is the first message for the
>   // key or after a state clean up.
>   if (!inputC.change) {
> return
>   }
>   // first accumulate message
>   firstRow = true
>   accumulators = function.createAccumulators()
> } else {
>   firstRow = false
> }
> {code}
> The bug has not been verified. I spotted it only by looking at the code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16047) Blink planner produces wrong aggregate results with state clean up

2020-02-13 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-16047:

Fix Version/s: 1.10.1
   1.9.3

> Blink planner produces wrong aggregate results with state clean up
> --
>
> Key: FLINK-16047
> URL: https://issues.apache.org/jira/browse/FLINK-16047
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0
>Reporter: Timo Walther
>Priority: Critical
> Fix For: 1.9.3, 1.10.1
>
>
> It seems that FLINK-10674 has not been ported to the Blink planner.
> Because state clean up happens in processing time, it might be the case that 
> retractions are arriving after the state has been cleaned up. Before these 
> changes, a new accumulator was created and invalid retraction messages were 
> emitted. This change drops retraction messages for which no accumulator 
> exists.
> These lines are missing in 
> {{org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction}}:
> {code}
> if (null == accumulators) {
>   // Don't create a new accumulator for a retraction message. This
>   // might happen if the retraction message is the first message for the
>   // key or after a state clean up.
>   if (!inputC.change) {
> return
>   }
>   // first accumulate message
>   firstRow = true
>   accumulators = function.createAccumulators()
> } else {
>   firstRow = false
> }
> {code}
> The bug has not been verified. I spotted it only by looking at the code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15707) Update state migration tests for Flink 1.10

2020-02-13 Thread vinoyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036726#comment-17036726
 ] 

vinoyang commented on FLINK-15707:
--

[~aljoscha] and [~trohrmann] Since Flink 1.10 has been released recently. Shall 
we start this work?

> Update state migration tests for Flink 1.10
> ---
>
> Key: FLINK-15707
> URL: https://issues.apache.org/jira/browse/FLINK-15707
> Project: Flink
>  Issue Type: Task
>  Components: Tests
>Affects Versions: 1.11.0
>Reporter: vinoyang
>Priority: Major
> Fix For: 1.11.0
>
>
> Once the Flink 1.10.0 release is out, we should update existing migration 
> tests to cover restoring from 1.10.0 savepoints.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] pnowojski merged pull request #11083: [hotfix][docs] Add note SSL not enabled by default

2020-02-13 Thread GitBox
pnowojski merged pull request #11083: [hotfix][docs] Add note SSL not enabled 
by default
URL: https://github.com/apache/flink/pull/11083
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-16012) Reduce the default number of exclusive buffers from 2 to 1 on receiver side

2020-02-13 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036715#comment-17036715
 ] 

Yingjie Cao commented on FLINK-16012:
-

[~wind_ljy] I have updated the results and the previous results were remove.

> Reduce the default number of exclusive buffers from 2 to 1 on receiver side
> ---
>
> Key: FLINK-16012
> URL: https://issues.apache.org/jira/browse/FLINK-16012
> Project: Flink
>  Issue Type: Improvement
>Reporter: Zhijiang
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In order to reduce the inflight buffers for checkpoint in the case of back 
> pressure, we can reduce the number of exclusive buffers for remote input 
> channel from default 2 to 1 as the first step. Besides that, the total 
> required buffers are also reduced as a result. We can further verify the 
> performance effect via various of benchmarks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16012) Reduce the default number of exclusive buffers from 2 to 1 on receiver side

2020-02-13 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036714#comment-17036714
 ] 

Yingjie Cao commented on FLINK-16012:
-

Theoretically, reducing the number of buffers may break the data processing 
pipeline which can influence the performance. For verification, I hava tested 
the change using the flink micro benchmark and a simple benchmark job. 
Unfortunately, regressions are seen for both tests.

For micro benchmark, the following are some results with regression (Because of 
the unstable result, I run each test three times.):

Using 2 buffer:
{code:java}
Benchmark                             (channelsFlushTimeout)  (writers)   Mode  
Cnt      Score      Error   Units

networkThroughput    1000,100ms          1   thrpt  
 30  15972.952 ±  752.985  ops/ms
networkThroughput                            1000,100ms          4   thrpt  
 30  27650.498 ±  713.728  ops/ms

networkThroughput                            1000,100ms          1   thrpt  
 30  15566.705 ± 2007.335  ops/ms
networkThroughput                            1000,100ms          4   thrpt  
 30  27769.195 ± 1632.614  ops/ms

networkThroughput                            1000,100ms          1   thrpt  
 30  15598.175 ± 1671.515  ops/ms
networkThroughput                            1000,100ms          4   thrpt  
 30  27499.901 ± 1035.415  ops/ms{code}
Using 1 buffer:
{code:java}
Benchmark                             (channelsFlushTimeout)  (writers)   Mode  
Cnt      Score      Error   Units

networkThroughput                            1000,100ms          1   thrpt  
 30  13116.610 ±  325.587  ops/ms
networkThroughput                            1000,100ms          4   thrpt  
 30  22837.502 ± 1024.360  ops/ms

networkThroughput                            1000,100ms          1   thrpt  
 30  11924.883 ± 1038.508  ops/ms
networkThroughput                            1000,100ms          4   thrpt  
 30  22823.586 ±  892.918  ops/ms

networkThroughput                            1000,100ms          1   thrpt  
 30  12960.345 ± 1596.465  ops/ms
networkThroughput                            1000,100ms          4   thrpt  
 30  23028.803 ±  933.609  ops/ms{code}
>From the above results, we can see about 20% performance regression. For the 
>benchmark job, there are also regressions (about 10% - 20%) in some cases 
>where input channel numbers are small, for example 2 input channels, which 
>means the number of buffer can be used is limited.

 

 

> Reduce the default number of exclusive buffers from 2 to 1 on receiver side
> ---
>
> Key: FLINK-16012
> URL: https://issues.apache.org/jira/browse/FLINK-16012
> Project: Flink
>  Issue Type: Improvement
>Reporter: Zhijiang
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In order to reduce the inflight buffers for checkpoint in the case of back 
> pressure, we can reduce the number of exclusive buffers for remote input 
> channel from default 2 to 1 as the first step. Besides that, the total 
> required buffers are also reduced as a result. We can further verify the 
> performance effect via various of benchmarks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16012) Reduce the default number of exclusive buffers from 2 to 1 on receiver side

2020-02-13 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-16012:

Attachment: (was: image-2020-02-14-09-13-07-967.png)

> Reduce the default number of exclusive buffers from 2 to 1 on receiver side
> ---
>
> Key: FLINK-16012
> URL: https://issues.apache.org/jira/browse/FLINK-16012
> Project: Flink
>  Issue Type: Improvement
>Reporter: Zhijiang
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In order to reduce the inflight buffers for checkpoint in the case of back 
> pressure, we can reduce the number of exclusive buffers for remote input 
> channel from default 2 to 1 as the first step. Besides that, the total 
> required buffers are also reduced as a result. We can further verify the 
> performance effect via various of benchmarks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16012) Reduce the default number of exclusive buffers from 2 to 1 on receiver side

2020-02-13 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-16012:

Attachment: (was: image-2020-02-14-07-23-16-171.png)

> Reduce the default number of exclusive buffers from 2 to 1 on receiver side
> ---
>
> Key: FLINK-16012
> URL: https://issues.apache.org/jira/browse/FLINK-16012
> Project: Flink
>  Issue Type: Improvement
>Reporter: Zhijiang
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In order to reduce the inflight buffers for checkpoint in the case of back 
> pressure, we can reduce the number of exclusive buffers for remote input 
> channel from default 2 to 1 as the first step. Besides that, the total 
> required buffers are also reduced as a result. We can further verify the 
> performance effect via various of benchmarks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16012) Reduce the default number of exclusive buffers from 2 to 1 on receiver side

2020-02-13 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-16012:

Attachment: (was: image-2020-02-13-21-54-05-026.png)

> Reduce the default number of exclusive buffers from 2 to 1 on receiver side
> ---
>
> Key: FLINK-16012
> URL: https://issues.apache.org/jira/browse/FLINK-16012
> Project: Flink
>  Issue Type: Improvement
>Reporter: Zhijiang
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In order to reduce the inflight buffers for checkpoint in the case of back 
> pressure, we can reduce the number of exclusive buffers for remote input 
> channel from default 2 to 1 as the first step. Besides that, the total 
> required buffers are also reduced as a result. We can further verify the 
> performance effect via various of benchmarks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16012) Reduce the default number of exclusive buffers from 2 to 1 on receiver side

2020-02-13 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-16012:

Attachment: (was: image-2020-02-13-23-30-17-951.png)

> Reduce the default number of exclusive buffers from 2 to 1 on receiver side
> ---
>
> Key: FLINK-16012
> URL: https://issues.apache.org/jira/browse/FLINK-16012
> Project: Flink
>  Issue Type: Improvement
>Reporter: Zhijiang
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In order to reduce the inflight buffers for checkpoint in the case of back 
> pressure, we can reduce the number of exclusive buffers for remote input 
> channel from default 2 to 1 as the first step. Besides that, the total 
> required buffers are also reduced as a result. We can further verify the 
> performance effect via various of benchmarks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16030) Add heartbeat between netty server and client to detect long connection alive

2020-02-13 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036692#comment-17036692
 ] 

Zhijiang commented on FLINK-16030:
--

I agree with [~pnowojski]'s concern. I forgot the previous issue that the netty 
thread might stuck in IO operations for blocking partition while reading data 
in some serious scenarios. It might cause the delay response for heartbeat ping 
message to bring unnecessary failure. The current netty handlers in flink stack 
are unified for both pipelined & blocking partitions, so we might not only 
consider the pipelined case.

Answer above [~pnowojski]'s question. The current heartbeat between TM/JM can 
not work well for this case. When the server side is aware of the network issue 
(local machine iptable issue), it would close the channel on its side and 
release all the partitions. But this can also happen in the normal case like 
when the client side send `CancelPartition|CloseRequest` message explicitly to 
close the channel, so it would throw any exception on server side to report JM. 
In short words the server side can not distinguish the cases while aware of 
inactive channel. 

When the server side closes its local channel, the client side would be aware 
of this issue after two hours(based on the default kernel keep-alive mechanism 
), so it would cause the whole job stuck until failure after two hours.

I guess there might other options to work around for this issue. If we can make 
the server side distinguish the different cases to cause inactive channels, 
then it can perform different actions to notify JM to trigger job failure.

> Add heartbeat between netty server and client to detect long connection alive
> -
>
> Key: FLINK-16030
> URL: https://issues.apache.org/jira/browse/FLINK-16030
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.7.2, 1.8.3, 1.9.2, 1.10.0
>Reporter: begginghard
>Priority: Major
>
> As reported on [the user mailing 
> list|https://lists.apache.org/list.html?u...@flink.apache.org:lte=1M:Encountered%20error%20while%20consuming%20partitions]
> Network can fail in many ways, sometimes pretty subtle (e.g. high ratio 
> packet loss).  
> When the long tcp connection between netty client and server is lost, the 
> server would failed to send response to the client, then shut down the 
> channel. At the same time, the netty client does not know that the connection 
> has been disconnected, so it has been waiting for two hours.
> To detect the long tcp connection alive on netty client and server, we should 
> have two ways: tcp keepalive and heartbeat.
>  
> The tcp keepalive is 2 hours by default. When the long tcp connection dead, 
> you continue to wait for 2 hours, the netty client will trigger exception and 
> enter failover recovery.
> If you want to detect quickly, netty provides IdleStateHandler which it use 
> ping-pang mechanism. If netty client sends continuously n ping message and 
> receives no one pang message, then trigger exception.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16051) Subtask id in Checkpoint UI not consistent with Subtask UI

2020-02-13 Thread Jiayi Liao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiayi Liao updated FLINK-16051:
---
Issue Type: Bug  (was: Improvement)

> Subtask id in Checkpoint UI not consistent with Subtask UI
> --
>
> Key: FLINK-16051
> URL: https://issues.apache.org/jira/browse/FLINK-16051
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend
>Affects Versions: 1.10.0
>Reporter: Jiayi Liao
>Priority: Major
> Attachments: checkpointui.png, taskui.png
>
>
> The subtask id in Subtask UI starts from 0, but in Checkpoint UI subtask id 
> starts from 1.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16051) Subtask id in Checkpoint UI not consistent with Subtask UI

2020-02-13 Thread Jiayi Liao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiayi Liao updated FLINK-16051:
---
Summary: Subtask id in Checkpoint UI not consistent with Subtask UI  (was: 
Subtask id in Checkpoint UI not consistent with TaskUI)

> Subtask id in Checkpoint UI not consistent with Subtask UI
> --
>
> Key: FLINK-16051
> URL: https://issues.apache.org/jira/browse/FLINK-16051
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.10.0
>Reporter: Jiayi Liao
>Priority: Major
> Attachments: checkpointui.png, taskui.png
>
>
> The subtask id in Subtask UI starts from 0, but in Checkpoint UI subtask id 
> starts from 1.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #11051: 
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical 
Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379242650
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonCorrelateRule.java
 ##
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecPythonCorrelate;
+import org.apache.flink.table.planner.plan.utils.PythonUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rex.RexNode;
+
+import scala.Option;
+import scala.Some;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} 
to
+ * {@link BatchExecPythonCorrelate}.
+ */
+public class BatchExecPythonCorrelateRule extends ConverterRule {
+
+   public static final RelOptRule INSTANCE = new 
BatchExecPythonCorrelateRule();
+
+   private BatchExecPythonCorrelateRule() {
+   super(FlinkLogicalCorrelate.class, FlinkConventions.LOGICAL(), 
FlinkConventions.BATCH_PHYSICAL(),
+   "BatchExecPythonCorrelateRule");
+   }
+
+   @Override
+   public boolean matches(RelOptRuleCall call) {
+   FlinkLogicalCorrelate join = call.rel(0);
+   RelNode right = ((RelSubset) join.getRight()).getOriginal();
+
+   if (right instanceof FlinkLogicalTableFunctionScan) {
+   // right node is a python table function
+   FlinkLogicalTableFunctionScan scan = 
(FlinkLogicalTableFunctionScan) right;
+   return PythonUtil.isPythonCall(scan.getCall());
+   } else if (right instanceof FlinkLogicalCalc) {
+   // a filter is pushed above the table function
+   FlinkLogicalCalc calc = (FlinkLogicalCalc) right;
+   RelSubset input = (RelSubset) calc.getInput();
+   FlinkLogicalTableFunctionScan scan = 
(FlinkLogicalTableFunctionScan) input.getOriginal();
 
 Review comment:
   We can't cast the original node to `FlinkLogicalTableFunctionScan`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #11051: 
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical 
Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379240935
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala
 ##
 @@ -18,30 +18,19 @@
 package org.apache.flink.table.planner.plan.nodes.physical.batch
 
 import org.apache.flink.api.dag.Transformation
-import org.apache.flink.runtime.operators.DamBehavior
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
CorrelateCodeGenerator}
 import org.apache.flink.table.planner.delegation.BatchPlanner
-import org.apache.flink.table.planner.functions.utils.TableSqlFunction
-import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, 
FlinkRelDistributionTraitDef, TraitUtil}
-import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
 import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
-import org.apache.flink.table.planner.plan.utils.RelExplainUtil
 
-import org.apache.calcite.plan.{RelOptCluster, RelOptRule, RelTraitSet}
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.{Correlate, JoinRelType}
-import org.apache.calcite.rel.{RelCollationTraitDef, RelDistribution, 
RelFieldCollation, RelNode, RelWriter, SingleRel}
-import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexProgram}
-import org.apache.calcite.sql.SqlKind
-import org.apache.calcite.util.mapping.{Mapping, MappingType, Mappings}
-
-import java.util
-
-import scala.collection.JavaConversions._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rex.{RexNode, RexProgram}
 
 /**
-  * Batch physical RelNode for [[Correlate]] (user defined table function).
+  * Batch physical RelNode for [[Correlate]] (Java user defined table 
function).
 
 Review comment:
   Java => Java/Scala ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #11051: 
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical 
Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379234575
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
 ##
 @@ -178,4 +177,15 @@ class CorrelateTest extends TableTestBase {
   .flatMap(func2('f3))
 util.verifyPlan(resultTable)
   }
+
+  @Test
+  def testCorrelatePythonTableFunction(): Unit = {
+val util = streamTestUtil()
+val sourceTable = util.addTableSource[(Int, Int, String)]("MyTable", 'a, 
'b, 'c)
+val func = new PythonTableFunction
+util.addFunction("pyFunc", func)
 
 Review comment:
   We don't need to register the function here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #11051: 
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical 
Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379235239
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecCorrelateRule.scala
 ##
 @@ -39,13 +40,13 @@ class BatchExecCorrelateRule extends ConverterRule(
 val right = join.getRight.asInstanceOf[RelSubset].getOriginal
 
 right match {
-  // right node is a table function
-  case _: FlinkLogicalTableFunctionScan => true
+  // right node is a java table function
+  case scan: FlinkLogicalTableFunctionScan => 
PythonUtil.isNonPythonCall(scan.getCall)
   // a filter is pushed above the table function
   case calc: FlinkLogicalCalc =>
-calc.getInput.asInstanceOf[RelSubset]
-.getOriginal.isInstanceOf[FlinkLogicalTableFunctionScan]
-  case _ => false
 
 Review comment:
   Please recover this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #11051: 
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical 
Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379227379
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelateBase.scala
 ##
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.nodes.CommonCorrelate
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.plan.schema.RowSchema
+
+/**
+  * Base RelNode for data stream correlate.
+  */
+abstract class DataStreamCorrelateBase(
 
 Review comment:
   Hi @twalthr It seems we can't use the implemented methods of a Scala trait 
from Java([see 
details](https://alvinalexander.com/scala/how-to-wrap-scala-traits-used-accessed-java-classes-methods))
 which prevents us from turning this class to a Java one. For this class, it 
needs to extend two traits, i.e., CommonCorrelate and DataStreamRel. 
Considering this, can we keep this class as a Scala one for now?
   
   I have checked that the classes of Rule has been implemented in Java in this 
PR. 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #11051: 
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical 
Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r378822971
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala
 ##
 @@ -113,6 +114,25 @@ class TableFunc3(data: String, conf: Map[String, String]) 
extends TableFunction[
   }
 }
 
+class PythonTableFunction extends TableFunction[Row] with PythonFunction {
+
+  def eval(x: Int, y: Int): Unit = {
+for (i <- 0 until y) {
+  val row = new Row(2)
+  row.setField(0, x)
+  row.setField(1, i * i)
+  collect(row)
+}
 
 Review comment:
   How about remove these lines. The code will never be called.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #11051: 
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical 
Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379244169
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonCorrelateRule.java
 ##
 @@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCorrelate;
+import org.apache.flink.table.planner.plan.utils.PythonUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rex.RexNode;
+
+import scala.Option;
+import scala.Some;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} 
to
+ * {@link StreamExecPythonCorrelate}.
+ */
+public class StreamExecPythonCorrelateRule extends ConverterRule {
+
+   public static final RelOptRule INSTANCE = new 
StreamExecPythonCorrelateRule();
+
+   private StreamExecPythonCorrelateRule() {
+   super(FlinkLogicalCorrelate.class, FlinkConventions.LOGICAL(), 
FlinkConventions.STREAM_PHYSICAL(),
+   "StreamExecPythonCorrelateRule");
+   }
+
+   // find only calc and table function
+   private boolean findTableFunction(FlinkLogicalCalc calc) {
+   RelNode child = ((RelSubset) calc.getInput()).getOriginal();
+   if (child instanceof FlinkLogicalTableFunctionScan) {
+   FlinkLogicalTableFunctionScan scan = 
(FlinkLogicalTableFunctionScan) child;
+   return PythonUtil.isPythonCall(scan.getCall());
+   } else if (child instanceof FlinkLogicalCalc) {
+   FlinkLogicalCalc childCalc = (FlinkLogicalCalc) child;
+   return findTableFunction(childCalc);
+   }
+   return false;
+   }
+
+   @Override
+   public boolean matches(RelOptRuleCall call) {
+   FlinkLogicalCorrelate correlate = call.rel(0);
+   RelNode right = ((RelSubset) 
correlate.getRight()).getOriginal();
+   if (right instanceof FlinkLogicalTableFunctionScan) {
+   // right node is a python table function
+   FlinkLogicalTableFunctionScan scan = 
(FlinkLogicalTableFunctionScan) right;
+   return PythonUtil.isPythonCall(scan.getCall());
+   } else if (right instanceof FlinkLogicalCalc) {
+   // a filter is pushed above the table function
+   return findTableFunction((FlinkLogicalCalc) right);
+   }
+   return false;
+   }
+
+   @Override
+   public RelNode convert(RelNode relNode) {
+   StreamExecPythonCorrelateFactory factory = new 
StreamExecPythonCorrelateFactory(relNode);
+   return factory.convertToCorrelate();
+   }
+
+   /**
+* The factory is responsible to creating {@link 
StreamExecPythonCorrelate}.
 
 Review comment:
   responsible to => responsible for


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-16051) Subtask id in Checkpoint UI not consistent with TaskUI

2020-02-13 Thread Jiayi Liao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiayi Liao updated FLINK-16051:
---
Summary: Subtask id in Checkpoint UI not consistent with TaskUI  (was: 
subtask id in Checkpoint UI not consistent with TaskUI)

> Subtask id in Checkpoint UI not consistent with TaskUI
> --
>
> Key: FLINK-16051
> URL: https://issues.apache.org/jira/browse/FLINK-16051
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.10.0
>Reporter: Jiayi Liao
>Priority: Major
> Attachments: checkpointui.png, taskui.png
>
>
> The subtask id in Subtask UI starts from 0, but in Checkpoint UI subtask id 
> starts from 1.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #11051: 
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical 
Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379241683
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelate.scala
 ##
 @@ -21,59 +21,37 @@ import org.apache.flink.api.dag.Transformation
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
CorrelateCodeGenerator}
 import org.apache.flink.table.planner.delegation.StreamPlanner
-import org.apache.flink.table.planner.functions.utils.TableSqlFunction
-import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, 
StreamExecNode}
 import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
-import org.apache.flink.table.planner.plan.utils.RelExplainUtil
 import org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.JoinRelType
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.calcite.rex.{RexCall, RexNode, RexProgram}
-
-import java.util
-
-import scala.collection.JavaConversions._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rex.{RexNode, RexProgram}
 
 /**
-  * Flink RelNode which matches along with join a user defined table function.
+  * Flink RelNode which matches along with join a java user defined table 
function.
 
 Review comment:
   java => Java/Scala


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #11051: 
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical 
Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379228490
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonCorrelateRule.java
 ##
 @@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCorrelate;
+import org.apache.flink.table.planner.plan.utils.PythonUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rex.RexNode;
+
+import scala.Option;
+import scala.Some;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} 
to
+ * {@link StreamExecPythonCorrelate}.
+ */
+public class StreamExecPythonCorrelateRule extends ConverterRule {
+
+   public static final RelOptRule INSTANCE = new 
StreamExecPythonCorrelateRule();
+
+   private StreamExecPythonCorrelateRule() {
+   super(FlinkLogicalCorrelate.class, FlinkConventions.LOGICAL(), 
FlinkConventions.STREAM_PHYSICAL(),
+   "StreamExecPythonCorrelateRule");
+   }
+
+   // find only calc and table function
+   private boolean findTableFunction(FlinkLogicalCalc calc) {
+   RelNode child = ((RelSubset) calc.getInput()).getOriginal();
+   if (child instanceof FlinkLogicalTableFunctionScan) {
+   FlinkLogicalTableFunctionScan scan = 
(FlinkLogicalTableFunctionScan) child;
+   return PythonUtil.isPythonCall(scan.getCall());
+   } else if (child instanceof FlinkLogicalCalc) {
+   FlinkLogicalCalc childCalc = (FlinkLogicalCalc) child;
+   return findTableFunction(childCalc);
+   }
+   return false;
+   }
+
+   @Override
+   public boolean matches(RelOptRuleCall call) {
+   FlinkLogicalCorrelate correlate = call.rel(0);
+   RelNode right = ((RelSubset) 
correlate.getRight()).getOriginal();
+   if (right instanceof FlinkLogicalTableFunctionScan) {
+   // right node is a python table function
 
 Review comment:
   The right node may not be a python table function.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #11051: 
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical 
Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379228560
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/datastream/DataStreamPythonCorrelateRule.java
 ##
 @@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream;
+
+import org.apache.flink.table.plan.nodes.FlinkConventions;
+import org.apache.flink.table.plan.nodes.datastream.DataStreamPythonCorrelate;
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalCorrelate;
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import org.apache.flink.table.plan.schema.RowSchema;
+import org.apache.flink.table.plan.util.CorrelateUtil;
+import org.apache.flink.table.plan.util.PythonUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rex.RexNode;
+
+import scala.Option;
+import scala.Some;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} 
to
+ * {@link DataStreamPythonCorrelate}.
+ */
+public class DataStreamPythonCorrelateRule extends ConverterRule {
+
+   public static final RelOptRule INSTANCE = new 
DataStreamPythonCorrelateRule();
+
+   private DataStreamPythonCorrelateRule() {
+   super(FlinkLogicalCorrelate.class, FlinkConventions.LOGICAL(), 
FlinkConventions.DATASTREAM(),
+   "DataStreamPythonCorrelateRule");
+   }
+
+   @Override
+   public boolean matches(RelOptRuleCall call) {
+   FlinkLogicalCorrelate join = call.rel(0);
+   RelNode right = ((RelSubset) join.getRight()).getOriginal();
+
+   if (right instanceof FlinkLogicalTableFunctionScan) {
+   // right node is a python table function
 
 Review comment:
   The right node may not be a python table function.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #11051: 
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical 
Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379228461
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonCorrelateRule.java
 ##
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecPythonCorrelate;
+import org.apache.flink.table.planner.plan.utils.PythonUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rex.RexNode;
+
+import scala.Option;
+import scala.Some;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} 
to
+ * {@link BatchExecPythonCorrelate}.
+ */
+public class BatchExecPythonCorrelateRule extends ConverterRule {
+
+   public static final RelOptRule INSTANCE = new 
BatchExecPythonCorrelateRule();
+
+   private BatchExecPythonCorrelateRule() {
+   super(FlinkLogicalCorrelate.class, FlinkConventions.LOGICAL(), 
FlinkConventions.BATCH_PHYSICAL(),
+   "BatchExecPythonCorrelateRule");
+   }
+
+   @Override
+   public boolean matches(RelOptRuleCall call) {
+   FlinkLogicalCorrelate join = call.rel(0);
+   RelNode right = ((RelSubset) join.getRight()).getOriginal();
+
+   if (right instanceof FlinkLogicalTableFunctionScan) {
+   // right node is a python table function
 
 Review comment:
   The right node may not be a python table function.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #11051: 
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical 
Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379230207
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonCorrelateRule.java
 ##
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecPythonCorrelate;
+import org.apache.flink.table.planner.plan.utils.PythonUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rex.RexNode;
+
+import scala.Option;
+import scala.Some;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} 
to
+ * {@link BatchExecPythonCorrelate}.
+ */
+public class BatchExecPythonCorrelateRule extends ConverterRule {
+
+   public static final RelOptRule INSTANCE = new 
BatchExecPythonCorrelateRule();
+
+   private BatchExecPythonCorrelateRule() {
+   super(FlinkLogicalCorrelate.class, FlinkConventions.LOGICAL(), 
FlinkConventions.BATCH_PHYSICAL(),
+   "BatchExecPythonCorrelateRule");
+   }
+
+   @Override
+   public boolean matches(RelOptRuleCall call) {
+   FlinkLogicalCorrelate join = call.rel(0);
+   RelNode right = ((RelSubset) join.getRight()).getOriginal();
+
+   if (right instanceof FlinkLogicalTableFunctionScan) {
+   // right node is a python table function
+   FlinkLogicalTableFunctionScan scan = 
(FlinkLogicalTableFunctionScan) right;
+   return PythonUtil.isPythonCall(scan.getCall());
+   } else if (right instanceof FlinkLogicalCalc) {
+   // a filter is pushed above the table function
+   FlinkLogicalCalc calc = (FlinkLogicalCalc) right;
+   RelSubset input = (RelSubset) calc.getInput();
+   FlinkLogicalTableFunctionScan scan = 
(FlinkLogicalTableFunctionScan) input.getOriginal();
+   return PythonUtil.isPythonCall(scan.getCall());
+   }
+   return false;
+   }
+
+   @Override
+   public RelNode convert(RelNode relNode) {
+   BatchExecPythonCorrelateFactory factory = new 
BatchExecPythonCorrelateFactory(relNode);
+   return factory.convertToCorrelate();
+   }
+
+   /**
+* The factory is responsible to creating {@link 
BatchExecPythonCorrelate}.
+*/
+   private static class BatchExecPythonCorrelateFactory {
+   private final RelNode correlateRel;
 
 Review comment:
   We can remove this member.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #11051: 
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical 
Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379244079
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonCorrelateRule.java
 ##
 @@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCorrelate;
+import org.apache.flink.table.planner.plan.utils.PythonUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rex.RexNode;
+
+import scala.Option;
+import scala.Some;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} 
to
+ * {@link StreamExecPythonCorrelate}.
+ */
+public class StreamExecPythonCorrelateRule extends ConverterRule {
+
+   public static final RelOptRule INSTANCE = new 
StreamExecPythonCorrelateRule();
+
+   private StreamExecPythonCorrelateRule() {
+   super(FlinkLogicalCorrelate.class, FlinkConventions.LOGICAL(), 
FlinkConventions.STREAM_PHYSICAL(),
+   "StreamExecPythonCorrelateRule");
+   }
+
+   // find only calc and table function
+   private boolean findTableFunction(FlinkLogicalCalc calc) {
+   RelNode child = ((RelSubset) calc.getInput()).getOriginal();
+   if (child instanceof FlinkLogicalTableFunctionScan) {
+   FlinkLogicalTableFunctionScan scan = 
(FlinkLogicalTableFunctionScan) child;
+   return PythonUtil.isPythonCall(scan.getCall());
+   } else if (child instanceof FlinkLogicalCalc) {
+   FlinkLogicalCalc childCalc = (FlinkLogicalCalc) child;
+   return findTableFunction(childCalc);
+   }
+   return false;
+   }
+
+   @Override
+   public boolean matches(RelOptRuleCall call) {
+   FlinkLogicalCorrelate correlate = call.rel(0);
+   RelNode right = ((RelSubset) 
correlate.getRight()).getOriginal();
+   if (right instanceof FlinkLogicalTableFunctionScan) {
+   // right node is a python table function
+   FlinkLogicalTableFunctionScan scan = 
(FlinkLogicalTableFunctionScan) right;
+   return PythonUtil.isPythonCall(scan.getCall());
+   } else if (right instanceof FlinkLogicalCalc) {
+   // a filter is pushed above the table function
+   return findTableFunction((FlinkLogicalCalc) right);
+   }
+   return false;
+   }
+
+   @Override
+   public RelNode convert(RelNode relNode) {
+   StreamExecPythonCorrelateFactory factory = new 
StreamExecPythonCorrelateFactory(relNode);
+   return factory.convertToCorrelate();
+   }
+
+   /**
+* The factory is responsible to creating {@link 
StreamExecPythonCorrelate}.
+*/
+   private static class StreamExecPythonCorrelateFactory {
+   private final RelNode correlateRel;
 
 Review comment:
   Remove this member.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this 

[jira] [Created] (FLINK-16051) subtask id in Checkpoint UI not consistent with TaskUI

2020-02-13 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-16051:
--

 Summary: subtask id in Checkpoint UI not consistent with TaskUI
 Key: FLINK-16051
 URL: https://issues.apache.org/jira/browse/FLINK-16051
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Affects Versions: 1.10.0
Reporter: Jiayi Liao
 Attachments: checkpointui.png, taskui.png

The subtask id in Subtask UI starts from 0, but in Checkpoint UI subtask id 
starts from 1.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #11051: 
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical 
Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r378836618
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelateBase.scala
 ##
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.nodes.CommonCorrelate
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.plan.schema.RowSchema
+
+/**
+  * Base RelNode for data stream correlate.
+  */
+abstract class DataStreamCorrelateBase(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputSchema: RowSchema,
+input: RelNode,
+scan: FlinkLogicalTableFunctionScan,
+condition: Option[RexNode],
+schema: RowSchema,
+joinSchema: RowSchema,
 
 Review comment:
   Remove this member. It has never been used.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #11051: 
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical 
Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379234158
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala
 ##
 @@ -113,6 +114,25 @@ class TableFunc3(data: String, conf: Map[String, String]) 
extends TableFunction[
   }
 }
 
+class PythonTableFunction extends TableFunction[Row] with PythonFunction {
 
 Review comment:
   How about renaming it to MockPythonTableFunction which is more clear in case 
of any confusion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #11051: 
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical 
Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379235017
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecCorrelateRule.scala
 ##
 @@ -39,13 +40,13 @@ class BatchExecCorrelateRule extends ConverterRule(
 val right = join.getRight.asInstanceOf[RelSubset].getOriginal
 
 right match {
-  // right node is a table function
-  case _: FlinkLogicalTableFunctionScan => true
+  // right node is a java table function
 
 Review comment:
   The right node may not be a java table function


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #11051: 
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical 
Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379229634
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonCorrelateRule.java
 ##
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecPythonCorrelate;
+import org.apache.flink.table.planner.plan.utils.PythonUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rex.RexNode;
+
+import scala.Option;
+import scala.Some;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} 
to
+ * {@link BatchExecPythonCorrelate}.
+ */
+public class BatchExecPythonCorrelateRule extends ConverterRule {
+
+   public static final RelOptRule INSTANCE = new 
BatchExecPythonCorrelateRule();
+
+   private BatchExecPythonCorrelateRule() {
+   super(FlinkLogicalCorrelate.class, FlinkConventions.LOGICAL(), 
FlinkConventions.BATCH_PHYSICAL(),
+   "BatchExecPythonCorrelateRule");
+   }
+
+   @Override
+   public boolean matches(RelOptRuleCall call) {
+   FlinkLogicalCorrelate join = call.rel(0);
+   RelNode right = ((RelSubset) join.getRight()).getOriginal();
+
+   if (right instanceof FlinkLogicalTableFunctionScan) {
+   // right node is a python table function
+   FlinkLogicalTableFunctionScan scan = 
(FlinkLogicalTableFunctionScan) right;
+   return PythonUtil.isPythonCall(scan.getCall());
+   } else if (right instanceof FlinkLogicalCalc) {
+   // a filter is pushed above the table function
+   FlinkLogicalCalc calc = (FlinkLogicalCalc) right;
+   RelSubset input = (RelSubset) calc.getInput();
+   FlinkLogicalTableFunctionScan scan = 
(FlinkLogicalTableFunctionScan) input.getOriginal();
+   return PythonUtil.isPythonCall(scan.getCall());
+   }
+   return false;
+   }
+
+   @Override
+   public RelNode convert(RelNode relNode) {
+   BatchExecPythonCorrelateFactory factory = new 
BatchExecPythonCorrelateFactory(relNode);
+   return factory.convertToCorrelate();
+   }
+
+   /**
+* The factory is responsible to creating {@link 
BatchExecPythonCorrelate}.
 
 Review comment:
   responsible for


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #11051: 
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical 
Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r378832786
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala
 ##
 @@ -40,10 +40,11 @@ class DataStreamCorrelateRule
 val right = join.getRight.asInstanceOf[RelSubset].getOriginal
 
 right match {
-  // right node is a table function
-  case scan: FlinkLogicalTableFunctionScan => true
+  // right node is a java table function
+  case scan: FlinkLogicalTableFunctionScan => 
PythonUtil.isNonPythonCall(scan.getCall)
   // a filter is pushed above the table function
-  case calc: FlinkLogicalCalc if 
CorrelateUtil.getTableFunctionScan(calc).isDefined => true
+  case calc: FlinkLogicalCalc =>
+
PythonUtil.isNonPythonCall(CorrelateUtil.getTableFunctionScan(calc).get.getCall)
 
 Review comment:
   The method `getTableFunctionScan` may returns None.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #11051: 
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical 
Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r378836753
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelateBase.scala
 ##
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.nodes.CommonCorrelate
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.plan.schema.RowSchema
+
+/**
+  * Base RelNode for data stream correlate.
+  */
+abstract class DataStreamCorrelateBase(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputSchema: RowSchema,
+input: RelNode,
+scan: FlinkLogicalTableFunctionScan,
+condition: Option[RexNode],
+schema: RowSchema,
+joinSchema: RowSchema,
+joinType: JoinRelType,
+ruleDescription: String)
 
 Review comment:
   Remove this member. It has never been used.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #11051: 
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical 
Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379235507
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecCorrelateRule.scala
 ##
 @@ -39,13 +40,13 @@ class BatchExecCorrelateRule extends ConverterRule(
 val right = join.getRight.asInstanceOf[RelSubset].getOriginal
 
 right match {
-  // right node is a table function
-  case _: FlinkLogicalTableFunctionScan => true
+  // right node is a java table function
+  case scan: FlinkLogicalTableFunctionScan => 
PythonUtil.isNonPythonCall(scan.getCall)
   // a filter is pushed above the table function
   case calc: FlinkLogicalCalc =>
-calc.getInput.asInstanceOf[RelSubset]
-.getOriginal.isInstanceOf[FlinkLogicalTableFunctionScan]
-  case _ => false
+val scan = calc.getInput.asInstanceOf[RelSubset]
+  .getOriginal.asInstanceOf[FlinkLogicalTableFunctionScan]
 
 Review comment:
   The original may not be a `FlinkLogicalTableFunctionScan`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #11051: 
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical 
Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r378830458
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala
 ##
 @@ -40,10 +40,11 @@ class DataStreamCorrelateRule
 val right = join.getRight.asInstanceOf[RelSubset].getOriginal
 
 right match {
-  // right node is a table function
-  case scan: FlinkLogicalTableFunctionScan => true
+  // right node is a java table function
 
 Review comment:
   I don't think the right node must be a java table function. The original 
comment is ok.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #11051: 
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical 
Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r378823281
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
 ##
 @@ -172,6 +173,25 @@ class PojoUser() {
   }
 }
 
+class PythonTableFunction extends TableFunction[Row] with PythonFunction {
+
+  def eval(x: Int, y: Int): Unit = {
+for (i <- 0 until y) {
+  val row = new Row(2)
+  row.setField(0, x)
+  row.setField(1, i * i)
+  collect(row)
 
 Review comment:
   How about remove these lines?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #11051: 
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical 
Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379234497
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.scala
 ##
 @@ -117,4 +116,15 @@ class CorrelateTest extends TableTestBase {
 
 util.verifyPlan(result)
   }
+
+  @Test
+  def testCorrelatePythonTableFunction(): Unit = {
+val util = batchTestUtil()
+val sourceTable = util.addTableSource[(Int, Int, String)]("MyTable", 'a, 
'b, 'c)
+val func = new PythonTableFunction
+util.addFunction("pyFunc", func)
 
 Review comment:
   We don't need to register the function here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python

2020-02-13 Thread GitBox
hequn8128 commented on a change in pull request #11051: 
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical 
Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379243229
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonCorrelateRule.java
 ##
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecPythonCorrelate;
+import org.apache.flink.table.planner.plan.utils.PythonUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rex.RexNode;
+
+import scala.Option;
+import scala.Some;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} 
to
+ * {@link BatchExecPythonCorrelate}.
+ */
+public class BatchExecPythonCorrelateRule extends ConverterRule {
+
+   public static final RelOptRule INSTANCE = new 
BatchExecPythonCorrelateRule();
+
+   private BatchExecPythonCorrelateRule() {
+   super(FlinkLogicalCorrelate.class, FlinkConventions.LOGICAL(), 
FlinkConventions.BATCH_PHYSICAL(),
+   "BatchExecPythonCorrelateRule");
+   }
+
+   @Override
+   public boolean matches(RelOptRuleCall call) {
+   FlinkLogicalCorrelate join = call.rel(0);
+   RelNode right = ((RelSubset) join.getRight()).getOriginal();
+
+   if (right instanceof FlinkLogicalTableFunctionScan) {
+   // right node is a python table function
+   FlinkLogicalTableFunctionScan scan = 
(FlinkLogicalTableFunctionScan) right;
+   return PythonUtil.isPythonCall(scan.getCall());
+   } else if (right instanceof FlinkLogicalCalc) {
+   // a filter is pushed above the table function
+   FlinkLogicalCalc calc = (FlinkLogicalCalc) right;
+   RelSubset input = (RelSubset) calc.getInput();
+   FlinkLogicalTableFunctionScan scan = 
(FlinkLogicalTableFunctionScan) input.getOriginal();
+   return PythonUtil.isPythonCall(scan.getCall());
+   }
+   return false;
+   }
+
+   @Override
+   public RelNode convert(RelNode relNode) {
+   BatchExecPythonCorrelateFactory factory = new 
BatchExecPythonCorrelateFactory(relNode);
+   return factory.convertToCorrelate();
+   }
+
+   /**
+* The factory is responsible to creating {@link 
BatchExecPythonCorrelate}.
+*/
+   private static class BatchExecPythonCorrelateFactory {
+   private final RelNode correlateRel;
+   private final FlinkLogicalCorrelate correlate;
+   private final RelTraitSet traitSet;
+   private final RelNode convInput;
+   private final RelNode right;
+
+   BatchExecPythonCorrelateFactory(RelNode rel) {
+   this.correlateRel = rel;
 
 Review comment:
   Remove this member?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-15981) Control the direct memory in FileChannelBoundedData.FileBufferReader

2020-02-13 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036679#comment-17036679
 ] 

Zhijiang commented on FLINK-15981:
--

> One option to try out would be to not use any memory at all, but look at the 
>zero-copy-file transfer facilities.

Exactly, I also heard of this way before, and let me investigate how to work in 
netty and integrate with our code stacks. If this options works, it might solve 
our memory concern completely.

> Control the direct memory in FileChannelBoundedData.FileBufferReader
> 
>
> Key: FLINK-15981
> URL: https://issues.apache.org/jira/browse/FLINK-15981
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.10.0
>Reporter: Jingsong Lee
>Priority: Critical
> Fix For: 1.10.1, 1.11.0
>
>
> Now, the default blocking BoundedData is FileChannelBoundedData. In its 
> reader, will create new direct buffer 64KB.
> When parallelism greater than 100, users need configure 
> "taskmanager.memory.task.off-heap.size" to avoid direct memory OOM. It is 
> hard to configure, and it cost a lot of memory. Consider 1000 parallelism, 
> maybe we need 1GB+ for a task manager.
> This is not conducive to the scenario of less slots and large parallelism. 
> Batch jobs could run little by little, but memory shortage would consume a 
> lot.
> If we provided N-Input operators, maybe things will be worse. This means the 
> number of subpartitions that can be requested at the same time will be more. 
> We have no idea how much memory.
> Here are my rough thoughts:
>  * Obtain memory from network buffers.
>  * provide "The maximum number of subpartitions that can be requested at the 
> same time".



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15981) Control the direct memory in FileChannelBoundedData.FileBufferReader

2020-02-13 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036678#comment-17036678
 ] 

Zhijiang commented on FLINK-15981:
--

Thanks for the confirmation and in general we are on the same page.

> When a task is released, it cannot occupy any pooled resources any more.

We can stick to this rule. Although it is interpretable to decouple task and 
partition resources, it would make things more complex from the respective of 
JM/RM. So I also think it is better to limit this issue only in network 
component if possible.

> So we would need buffers per TCP channel. That is often fewer than per 
> subpartition (because if multiplexing) but not always (one slot TMs).

I also prefer to the way of per-channel. From the amount aspect, it seems no 
obvious difference between per-thread and per-channel, or it is hard to say in 
different scenarios. But it is not practical to rely on per-thread for the 
current code base, and we have to realize the assumption that the previous 
buffer must be released when the thread loop to fetch the next data. I 
remembered we ever discussed this issue in another story. :)  Actually it would 
be much easier to rely on per-channel in practice now.

> Control the direct memory in FileChannelBoundedData.FileBufferReader
> 
>
> Key: FLINK-15981
> URL: https://issues.apache.org/jira/browse/FLINK-15981
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.10.0
>Reporter: Jingsong Lee
>Priority: Critical
> Fix For: 1.10.1, 1.11.0
>
>
> Now, the default blocking BoundedData is FileChannelBoundedData. In its 
> reader, will create new direct buffer 64KB.
> When parallelism greater than 100, users need configure 
> "taskmanager.memory.task.off-heap.size" to avoid direct memory OOM. It is 
> hard to configure, and it cost a lot of memory. Consider 1000 parallelism, 
> maybe we need 1GB+ for a task manager.
> This is not conducive to the scenario of less slots and large parallelism. 
> Batch jobs could run little by little, but memory shortage would consume a 
> lot.
> If we provided N-Input operators, maybe things will be worse. This means the 
> number of subpartitions that can be requested at the same time will be more. 
> We have no idea how much memory.
> Here are my rough thoughts:
>  * Obtain memory from network buffers.
>  * provide "The maximum number of subpartitions that can be requested at the 
> same time".



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14138) Show Pending Slots in Job Detail

2020-02-13 Thread lining (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining updated FLINK-14138:
---
Component/s: Runtime / REST

> Show Pending Slots in Job Detail
> 
>
> Key: FLINK-14138
> URL: https://issues.apache.org/jira/browse/FLINK-14138
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST, Runtime / Web Frontend
>Reporter: Yadong Xie
>Priority: Major
> Attachments: 屏幕快照 2019-09-20 下午12.04.00.png, 屏幕快照 2019-09-20 
> 下午12.04.05.png
>
>
> It is hard to troubleshoot when all subtasks are always on the SCHEDULED 
> status(just like the screenshot below) when users submit a job.
> !屏幕快照 2019-09-20 下午12.04.00.png|width=494,height=258!
> The most common reason for this problem is that vertex has applied for more 
> resources than the cluster has. A pending slots tab could help users to check 
> which vertex or subtask is blocked.
> !屏幕快照 2019-09-20 下午12.04.05.png|width=576,height=163!
>  
> REST API needed:
> add /jobs/:jobid/pending-slots API to get pending slots data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka

2020-02-13 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-16048:
---
Description: 
I found SQL Kafka connector can not consume avro data that was serialized by 
`KafkaAvroSerializer` and only can consume Row data with avro schema because we 
use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de data in  
`AvroRowFormatFactory`. 

I think we should support this because `KafkaAvroSerializer` is very common in 
Kafka.

and someone met same question in stackoverflow[1].

[[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]

  was:
KafkaAvroSerializer and AvroRowSerializationSchema
I found SQL Kafka connector can not consume avro data that was serialized by 
`KafkaAvroSerializer` and only can consume Row data with avro schema because we 
use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de data in  
`AvroRowFormatFactory`. 

I think we should support this because `KafkaAvroSerializer` is very common in 
Kafka.

and someone met same question in stackoverflow[1].

[[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]


> Support read/write confluent schema registry avro data  from Kafka
> --
>
> Key: FLINK-16048
> URL: https://issues.apache.org/jira/browse/FLINK-16048
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.11.0
>Reporter: Leonard Xu
>Priority: Major
> Fix For: 1.11.0
>
>
> I found SQL Kafka connector can not consume avro data that was serialized by 
> `KafkaAvroSerializer` and only can consume Row data with avro schema because 
> we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de 
> data in  `AvroRowFormatFactory`. 
> I think we should support this because `KafkaAvroSerializer` is very common 
> in Kafka.
> and someone met same question in stackoverflow[1].
> [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka

2020-02-13 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036670#comment-17036670
 ] 

Leonard Xu commented on FLINK-16048:


Compare to AvroRowSerializationSchema[1], KafkaAvroSerializer[2] writes 
external magic bytes and id  field in result byte array. So,the example code[3] 
can not read correct record and  throws following Exception:
{code:java}
Caused by: java.io.IOException: Failed to deserialize Avro record.Caused by: 
java.io.IOException: Failed to deserialize Avro record. at 
org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:170)
 at 
org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:78)
 at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
 at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:146)
 at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
 at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
 at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) 
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)Caused
 by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: 
-1 at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336) at 
org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263) at 
org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201) at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
 at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
 at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
 at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
 at 
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:122)
 at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
 at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
 at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) at 
org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:167)
 ... 7 more


{code}

[1] confluent avro format serialize(): 
[https://github.com/confluentinc/schema-registry/blob/c19da74f7c4326438dd96d022a7e3d38dd3c68af/avro-ser[1]ializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java#L58|https://github.com/confluentinc/schema-registry/blob/c19da74f7c4326438dd96d022a7e3d38dd3c68af/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java#L58]
 

[2] Flink Table avro format serialize()  : 
[https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java#L138]

[3][https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2kafka_1/ConsumeConfluentAvroTest.java]

 

> Support read/write confluent schema registry avro data  from Kafka
> --
>
> Key: FLINK-16048
> URL: https://issues.apache.org/jira/browse/FLINK-16048
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.11.0
>Reporter: Leonard Xu
>Priority: Major
> Fix For: 1.11.0
>
>
> KafkaAvroSerializer and AvroRowSerializationSchema
> I found SQL Kafka connector can not consume avro data that was serialized by 
> `KafkaAvroSerializer` and only can consume Row data with avro schema because 
> we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de 
> data in  `AvroRowFormatFactory`. 
> I think we should support this because `KafkaAvroSerializer` is very common 
> in Kafka.
> and someone met same question in stackoverflow[1].
> [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14730) Add pending slots for job

2020-02-13 Thread lining (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining updated FLINK-14730:
---
Parent: FLINK-14138
Issue Type: Sub-task  (was: Improvement)

> Add pending slots for job
> -
>
> Key: FLINK-14730
> URL: https://issues.apache.org/jira/browse/FLINK-14730
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / REST
>Reporter: lining
>Priority: Major
>
> *Current*
> If the resource requested by the job can‘t be satisfied by the cluster, the 
> job will remain in the scheduling state.
> The user couldn't know the scheduler is blocked by which slot request.
> *Proposal*
> We could add a rest handler to show information about pending requests in 
> SlotPoolImpl.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14730) Add pending slots for job

2020-02-13 Thread lining (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining updated FLINK-14730:
---
Component/s: (was: Runtime / Web Frontend)

> Add pending slots for job
> -
>
> Key: FLINK-14730
> URL: https://issues.apache.org/jira/browse/FLINK-14730
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: lining
>Priority: Major
>
> *Current*
> If the resource requested by the job can‘t be satisfied by the cluster, the 
> job will remain in the scheduling state.
> The user couldn't know the scheduler is blocked by which slot request.
> *Proposal*
> We could add a rest handler to show information about pending requests in 
> SlotPoolImpl.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-13510) Show fail attempt for subtask in timelime

2020-02-13 Thread lining (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining updated FLINK-13510:
---
Component/s: (was: Runtime / Web Frontend)

> Show fail attempt for subtask in timelime
> -
>
> Key: FLINK-13510
> URL: https://issues.apache.org/jira/browse/FLINK-13510
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / REST
>Reporter: lining
>Priority: Major
>
> Now, user just can see subtask current attempt in timeline. If job failover, 
> can not see some has cancled task timeline.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-13510) Show fail attempt for subtask in timelime In Rest API

2020-02-13 Thread lining (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining updated FLINK-13510:
---
Summary: Show fail attempt for subtask in timelime In Rest API  (was: Show 
fail attempt for subtask in timelime)

> Show fail attempt for subtask in timelime In Rest API
> -
>
> Key: FLINK-13510
> URL: https://issues.apache.org/jira/browse/FLINK-13510
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / REST
>Reporter: lining
>Priority: Major
>
> Now, user just can see subtask current attempt in timeline. If job failover, 
> can not see some has cancled task timeline.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14137) Show Attempt History in Vertex SubTask In WebUI

2020-02-13 Thread lining (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining updated FLINK-14137:
---
Summary: Show Attempt History in Vertex SubTask In WebUI  (was: Show 
Attempt History in Vertex SubTask)

> Show Attempt History in Vertex SubTask In WebUI
> ---
>
> Key: FLINK-14137
> URL: https://issues.apache.org/jira/browse/FLINK-14137
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Web Frontend
>Reporter: Yadong Xie
>Priority: Major
> Attachments: 屏幕快照 2019-09-20 上午11.32.54.png, 屏幕快照 2019-09-20 
> 上午11.32.59.png
>
>
> According to the 
> [docs|https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-vertices-vertexid-subtasks-subtaskindex],
>  there may exist more than one attempt in a subtask, but there is no way to 
> get the attempt history list in the REST API, users have no way to know if 
> the subtask has failed before.
> !屏幕快照 2019-09-20 上午11.32.54.png|width=499,height=205!
> We can add the Attempt History tab under the Subtasks drawer on the job 
> vertex page, here is a demo below.
> !屏幕快照 2019-09-20 上午11.32.59.png|width=518,height=203!
> REST API needed:
> add /jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex/attempts API to 
> get attempt history.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14713) Show Attempt History in Vertex SubTask In Rest Api

2020-02-13 Thread lining (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining updated FLINK-14713:
---
Component/s: (was: Runtime / Web Frontend)

> Show Attempt History in Vertex SubTask In Rest Api
> --
>
> Key: FLINK-14713
> URL: https://issues.apache.org/jira/browse/FLINK-14713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / REST
>Reporter: lining
>Priority: Major
>
> Flink jobs could recovery by failover, but the user couldn't see any 
> information about the jobs' failure. There isn't information about the failed 
> attempt.
> h3. Proposed Changes
> h4. Add SubtaskAllExecutionAttemptsDetailsHandler for failed attempt
>  * return subtask all attempt and state
>  * get prior attempts according to
> {code:java}
> final AccessExecution execution = 
> executionVertex.getCurrentExecutionAttempt();
> final int currentAttemptNum = execution.getAttemptNumber();
> if (currentAttemptNum > 0) {
>   for (int i = currentAttemptNum - 1; i >= 0; i--) {
>  final AccessExecution currentExecution = 
> executionVertex.getPriorExecutionAttempt(i);
>  if (currentExecution != null) {
> 
> allAttempts.add(SubtaskExecutionAttemptDetailsInfo.create(currentExecution, 
> metricFetcher, jobID, jobVertexID));
>  }
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14713) Show Attempt History in Vertex SubTask In Rest Api

2020-02-13 Thread lining (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining updated FLINK-14713:
---
Summary: Show Attempt History in Vertex SubTask In Rest Api  (was: Show 
Attempt History in Vertex SubTask)

> Show Attempt History in Vertex SubTask In Rest Api
> --
>
> Key: FLINK-14713
> URL: https://issues.apache.org/jira/browse/FLINK-14713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / REST, Runtime / Web Frontend
>Reporter: lining
>Priority: Major
>
> Flink jobs could recovery by failover, but the user couldn't see any 
> information about the jobs' failure. There isn't information about the failed 
> attempt.
> h3. Proposed Changes
> h4. Add SubtaskAllExecutionAttemptsDetailsHandler for failed attempt
>  * return subtask all attempt and state
>  * get prior attempts according to
> {code:java}
> final AccessExecution execution = 
> executionVertex.getCurrentExecutionAttempt();
> final int currentAttemptNum = execution.getAttemptNumber();
> if (currentAttemptNum > 0) {
>   for (int i = currentAttemptNum - 1; i >= 0; i--) {
>  final AccessExecution currentExecution = 
> executionVertex.getPriorExecutionAttempt(i);
>  if (currentExecution != null) {
> 
> allAttempts.add(SubtaskExecutionAttemptDetailsInfo.create(currentExecution, 
> metricFetcher, jobID, jobVertexID));
>  }
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka

2020-02-13 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036662#comment-17036662
 ] 

Jingsong Lee commented on FLINK-16048:
--

More accurate, should be confluent avro schema support. {{KafkaAvroSerializer}} 
is in confluent.

And more accurate, should be exposing {{FormatFactory}} for confluent 
{{SerializationSchema}}.

> Support read/write confluent schema registry avro data  from Kafka
> --
>
> Key: FLINK-16048
> URL: https://issues.apache.org/jira/browse/FLINK-16048
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.11.0
>Reporter: Leonard Xu
>Priority: Major
> Fix For: 1.11.0
>
>
> KafkaAvroSerializer and AvroRowSerializationSchema
> I found SQL Kafka connector can not consume avro data that was serialized by 
> `KafkaAvroSerializer` and only can consume Row data with avro schema because 
> we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de 
> data in  `AvroRowFormatFactory`. 
> I think we should support this because `KafkaAvroSerializer` is very common 
> in Kafka.
> and someone met same question in stackoverflow[1].
> [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-13510) Show fail attempt for subtask in timelime

2020-02-13 Thread lining (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining updated FLINK-13510:
---
Parent: FLINK-16050
Issue Type: Sub-task  (was: Improvement)

> Show fail attempt for subtask in timelime
> -
>
> Key: FLINK-13510
> URL: https://issues.apache.org/jira/browse/FLINK-13510
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / REST, Runtime / Web Frontend
>Reporter: lining
>Priority: Major
>
> Now, user just can see subtask current attempt in timeline. If job failover, 
> can not see some has cancled task timeline.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14143) Failed Attempt does not display in the timeline

2020-02-13 Thread lining (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining updated FLINK-14143:
---
Parent: FLINK-16050
Issue Type: Sub-task  (was: Improvement)

> Failed Attempt does not display in the timeline
> ---
>
> Key: FLINK-14143
> URL: https://issues.apache.org/jira/browse/FLINK-14143
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Web Frontend
>Reporter: Yadong Xie
>Priority: Major
> Attachments: 屏幕快照 2019-09-20 下午3.46.40.png
>
>
> There may exist more than one attempt in a subtask, but in the timeline page, 
> the Web UI can only get and visualize the latest execution attempt of a 
> subtask timeline, there is no way to get a failed attempt timeline in current 
> REST API.
> !屏幕快照 2019-09-20 下午3.46.40.png|width=453,height=207!
>  
> REST API needed:
> add failed attempt time in /jobs/:jobid/vertices/:vertexid/subtasktimes



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-10918) incremental Keyed state with RocksDB throws cannot create directory error in windows

2020-02-13 Thread Yu Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036661#comment-17036661
 ] 

Yu Li commented on FLINK-10918:
---

cc [~klion26] [~yunta]

> incremental Keyed state with RocksDB throws cannot create directory error in 
> windows
> 
>
> Key: FLINK-10918
> URL: https://issues.apache.org/jira/browse/FLINK-10918
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Queryable State, Runtime / State Backends
>Affects Versions: 1.6.2, 1.9.2, 1.10.0
> Environment: windows
> {code:java}
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val rocksdb = new RocksDBStateBackend("file:\\C:\\rocksdb\\checkpoint",true)
> rocksdb.setDbStoragePath("file:\\C:\\rocksdb\\storage")
> env.setStateBackend(rocksdb)
> env.enableCheckpointing(10)
> {code}
>  
>Reporter: Amit
>Priority: Major
>  Labels: usability
> Attachments: 
> 0001-FLINK-10918-Fix-for-checkpoint-creation-on-windows-1.patch
>
>
> Facing error while enabling keyed state with RocksDBBackend with 
> checkpointing to a local windows directory
>  
> {code:java}
> Caused by: org.rocksdb.RocksDBException: Failed to create dir: 
> /c:/tmp/data/job_dbe01128760d4d5cb90809cd94c2a936_op_StreamMap_b5c8d46f3e7b141acf271f12622e752b__3_8__uuid_45c1f62b-a198-44f5-add5-7683079b03f8/chk-1.tmp:
>  Invalid argument
>     at org.rocksdb.Checkpoint.createCheckpoint(Native Method)
>     at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.takeSnapshot(RocksDBKeyedStateBackend.java:2549)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$IncrementalSnapshotStrategy.performSnapshot(RocksDBKeyedStateBackend.java:2008)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:498)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406)
>     ... 13 more
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14713) Show Attempt History in Vertex SubTask

2020-02-13 Thread lining (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining updated FLINK-14713:
---
Parent: FLINK-16050
Issue Type: Sub-task  (was: Improvement)

> Show Attempt History in Vertex SubTask
> --
>
> Key: FLINK-14713
> URL: https://issues.apache.org/jira/browse/FLINK-14713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / REST, Runtime / Web Frontend
>Reporter: lining
>Priority: Major
>
> Flink jobs could recovery by failover, but the user couldn't see any 
> information about the jobs' failure. There isn't information about the failed 
> attempt.
> h3. Proposed Changes
> h4. Add SubtaskAllExecutionAttemptsDetailsHandler for failed attempt
>  * return subtask all attempt and state
>  * get prior attempts according to
> {code:java}
> final AccessExecution execution = 
> executionVertex.getCurrentExecutionAttempt();
> final int currentAttemptNum = execution.getAttemptNumber();
> if (currentAttemptNum > 0) {
>   for (int i = currentAttemptNum - 1; i >= 0; i--) {
>  final AccessExecution currentExecution = 
> executionVertex.getPriorExecutionAttempt(i);
>  if (currentExecution != null) {
> 
> allAttempts.add(SubtaskExecutionAttemptDetailsInfo.create(currentExecution, 
> metricFetcher, jobID, jobVertexID));
>  }
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14137) Show Attempt History in Vertex SubTask

2020-02-13 Thread lining (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining updated FLINK-14137:
---
Parent: (was: FLINK-14713)
Issue Type: Task  (was: Sub-task)

> Show Attempt History in Vertex SubTask
> --
>
> Key: FLINK-14137
> URL: https://issues.apache.org/jira/browse/FLINK-14137
> Project: Flink
>  Issue Type: Task
>  Components: Runtime / Web Frontend
>Reporter: Yadong Xie
>Priority: Major
> Attachments: 屏幕快照 2019-09-20 上午11.32.54.png, 屏幕快照 2019-09-20 
> 上午11.32.59.png
>
>
> According to the 
> [docs|https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-vertices-vertexid-subtasks-subtaskindex],
>  there may exist more than one attempt in a subtask, but there is no way to 
> get the attempt history list in the REST API, users have no way to know if 
> the subtask has failed before.
> !屏幕快照 2019-09-20 上午11.32.54.png|width=499,height=205!
> We can add the Attempt History tab under the Subtasks drawer on the job 
> vertex page, here is a demo below.
> !屏幕快照 2019-09-20 上午11.32.59.png|width=518,height=203!
> REST API needed:
> add /jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex/attempts API to 
> get attempt history.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14137) Show Attempt History in Vertex SubTask

2020-02-13 Thread lining (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining updated FLINK-14137:
---
Parent: FLINK-16050
Issue Type: Sub-task  (was: Task)

> Show Attempt History in Vertex SubTask
> --
>
> Key: FLINK-14137
> URL: https://issues.apache.org/jira/browse/FLINK-14137
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Web Frontend
>Reporter: Yadong Xie
>Priority: Major
> Attachments: 屏幕快照 2019-09-20 上午11.32.54.png, 屏幕快照 2019-09-20 
> 上午11.32.59.png
>
>
> According to the 
> [docs|https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-vertices-vertexid-subtasks-subtaskindex],
>  there may exist more than one attempt in a subtask, but there is no way to 
> get the attempt history list in the REST API, users have no way to know if 
> the subtask has failed before.
> !屏幕快照 2019-09-20 上午11.32.54.png|width=499,height=205!
> We can add the Attempt History tab under the Subtasks drawer on the job 
> vertex page, here is a demo below.
> !屏幕快照 2019-09-20 上午11.32.59.png|width=518,height=203!
> REST API needed:
> add /jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex/attempts API to 
> get attempt history.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15966) Capture the call stack of RPC ask() calls.

2020-02-13 Thread guoqiang.zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036658#comment-17036658
 ] 

guoqiang.zhang commented on FLINK-15966:


look forward for it

> Capture the call stack of RPC ask() calls.
> --
>
> Key: FLINK-15966
> URL: https://issues.apache.org/jira/browse/FLINK-15966
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.10.1, 1.11.0
>
>
> Currently, when an RPC ask() call fails, we get a rather unhelpful exception 
> with a stack trace from akka's internal scheduler.
> Instead, we should capture the call stack during the invocation and use it to 
> give a helpful error message when the RPC call failed. This is especially 
> helpful in cases where the future (and future handlers) are passed for later 
> asynchronous result handling (which is the common case in most components (JM 
> / TM / RM).
> The options should have a flag to turn it off, because when having a lot of 
> concurrent ask calls (hundreds of thousands, during large deploy phases), it 
> may be possible that the captured call.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16026) Travis failed due to python setup

2020-02-13 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036656#comment-17036656
 ] 

Dian Fu edited comment on FLINK-16026 at 2/14/20 2:53 AM:
--

[~chesnay] Thanks a lot for your suggestions. Appreciated!

I have investigated this issue with [~sunjincheng121] offline and want to share 
our thoughts as following:
 - It's a good idea to limit the versions of the direct dependencies and I 
think we have already done that in PyFlink.
 - It may be not a good idea to limit the versions of the transitive 
dependencies to a different version declared by the direct dependency:
 -- We should trust the direct dependency, e.g. if apache-beam declares that it 
support avro-python3<2, we should trust that it does support all versions of 
avro-python3 which are less than 2.
 -- We should not change the range of versions of the transitive dependencies 
declared by the direct dependency, e.g. if apache-beam declares that it 
supports avro-python3<2, it may not be a good idea to change the version limit 
to avro-python3<1.9.1 in PyFlink. This will introduce a lot of issues:
 --- We have to upgrade the versions of the transitive dependencies 
periodically if new packages have been released
 --- For released package of PyFlink, there is no way to upgrade the versions 
of the transitive dependencies any more
 -- If there are exceptions for the above cases, we could address them case by 
case(just like this JIRA does). For example, regarding to the issue of this 
JIRA, it's because that avro community has released an error package. We could 
just disable it for the time being. This is a trade off, however, I guess most 
Python projects handle this kind of issues in this way.

What's your thoughts?


was (Author: dian.fu):
[~chesnay] Thanks a lot for your suggestions. Appreciated!

I have investigated this issue with [~sunjincheng121] offline and want to share 
our thoughts as following:
 - It's a good idea to limit the versions of the direct dependencies and I 
think we have already done that in PyFlink.
 - It may be not a good idea to limit the versions of the transitive 
dependencies to a different version declared by the direct dependency:
 -- We should trust the direct dependency, e.g. if apache-beam declares that it 
support avro-python3<2, we should trust that it does support all versions of 
avro-python3 which are less than 2.
 -- We should not change the range of versions of the transitive dependencies 
declared by the direct dependency, e.g. if apache-beam declares that it 
supports avro-python3<2, it may not be a good idea to change the version limit 
to avro-python3<1.9.1 in PyFlink. This will introduce a lot of issues:
 --- We have to upgrade the versions of the transitive dependencies 
periodically if new packages have been released
 --- For released package of PyFlink, there is no way to upgrade the versions 
of the transitive dependencies any more
 -- If there are exceptions for the above cases, we could address them case by 
case. For example, regarding to the issue of this JIRA, it's because that avro 
community has released an error package. We could just disable it for the time 
being. This is a trade off, however, I guess most Python projects handle this 
kind of issues in this way.

What's your thoughts?

> Travis failed due to python setup
> -
>
> Key: FLINK-16026
> URL: https://issues.apache.org/jira/browse/FLINK-16026
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Reporter: Jingsong Lee
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.1, 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> [https://api.travis-ci.com/v3/job/286671652/log.txt]
> [https://api.travis-ci.org/v3/job/649754603/log.txt]
> [https://api.travis-ci.com/v3/job/286409130/log.txt]
> Collecting avro-python3<2.0.0,>=1.8.1; python_version >= "3.0" (from 
> apache-beam==2.19.0->apache-flink==1.11.dev0) Using cached 
> https://files.pythonhosted.org/packages/31/21/d98e2515e5ca0337d7e747e8065227ee77faf5c817bbb74391899613178a/avro-python3-1.9.2.tar.gz
>  Complete output from command python setup.py egg_info: Traceback (most 
> recent call last): File "", line 1, in  File 
> "/tmp/pip-install-d6uvsl_b/avro-python3/setup.py", line 41, in  
> import pycodestyle ModuleNotFoundError: No module named 'pycodestyle' 
>  Command "python setup.py egg_info" 
> failed with error code 1 in /tmp/pip-install-d6uvsl_b/avro-python3/ You are 
> using pip version 10.0.1, however version 20.0.2 is available. You should 
> consider upgrading via the 'pip install --upgrade pip' command.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16026) Travis failed due to python setup

2020-02-13 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036656#comment-17036656
 ] 

Dian Fu edited comment on FLINK-16026 at 2/14/20 2:52 AM:
--

[~chesnay] Thanks a lot for your suggestions. Appreciated!

I have investigated this issue with [~sunjincheng121] offline and want to share 
our thoughts as following:
 - It's a good idea to limit the versions of the direct dependencies and I 
think we have already done that in PyFlink.
 - It may be not a good idea to limit the versions of the transitive 
dependencies to a different version declared by the direct dependency:
 -- We should trust the direct dependency, e.g. if apache-beam declares that it 
support avro-python3<2, we should trust that it does support all versions of 
avro-python3 which are less than 2.
 -- We should not change the range of versions of the transitive dependencies 
declared by the direct dependency, e.g. if apache-beam declares that it 
supports avro-python3<2, it may not be a good idea to change the version limit 
to avro-python3<1.9.1 in PyFlink. This will introduce a lot of issues:
 --- We have to upgrade the versions of the transitive dependencies 
periodically if new packages have been released
 --- For released package of PyFlink, there is no way to upgrade the versions 
of the transitive dependencies any more
 -- If there are exceptions for the above cases, we could address them case by 
case. For example, regarding to the issue of this JIRA, it's because that avro 
community has released an error package. We could just disable it for the time 
being. This is a trade off, however, I guess most Python projects handle this 
kind of issues in this way.

What's your thoughts?


was (Author: dian.fu):
[~chesnay] Thanks a lot for your suggestions. Appreciated!

We have investigated this issue and want to share our thoughts as following:
- It's a good idea to limit the versions of the direct dependencies and I think 
we have already done that in PyFlink.
- It may be not a good idea to limit the versions of the transitive 
dependencies to a different version declared by the direct dependency:
-- We should trust the direct dependency, e.g. if apache-beam declares that 
it support avro-python3<2, we should trust that it does support all versions of 
avro-python3 which are less than 2.
-- We should not change the range of versions of the transitive 
dependencies declared by the direct dependency, e.g. if apache-beam declares 
that it supports avro-python3<2, it may not be a good idea to change the 
version limit to avro-python3<1.9.1 in PyFlink. This will introduce a lot of 
issues:
--- We have to upgrade the versions of the transitive dependencies 
periodically if new packages have been released
--- For released package of PyFlink, there is no way to upgrade the 
versions of the transitive dependencies any more
-- If there are exceptions for the above cases, we could address them case 
by case. For example, regarding to the issue of this JIRA, it's because that 
avro community has released an error package. We could just disable it for the 
time being. This is a trade off, however, I guess most Python projects handle 
this kind of issues in this way.

What's your thoughts?

> Travis failed due to python setup
> -
>
> Key: FLINK-16026
> URL: https://issues.apache.org/jira/browse/FLINK-16026
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Reporter: Jingsong Lee
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.1, 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> [https://api.travis-ci.com/v3/job/286671652/log.txt]
> [https://api.travis-ci.org/v3/job/649754603/log.txt]
> [https://api.travis-ci.com/v3/job/286409130/log.txt]
> Collecting avro-python3<2.0.0,>=1.8.1; python_version >= "3.0" (from 
> apache-beam==2.19.0->apache-flink==1.11.dev0) Using cached 
> https://files.pythonhosted.org/packages/31/21/d98e2515e5ca0337d7e747e8065227ee77faf5c817bbb74391899613178a/avro-python3-1.9.2.tar.gz
>  Complete output from command python setup.py egg_info: Traceback (most 
> recent call last): File "", line 1, in  File 
> "/tmp/pip-install-d6uvsl_b/avro-python3/setup.py", line 41, in  
> import pycodestyle ModuleNotFoundError: No module named 'pycodestyle' 
>  Command "python setup.py egg_info" 
> failed with error code 1 in /tmp/pip-install-d6uvsl_b/avro-python3/ You are 
> using pip version 10.0.1, however version 20.0.2 is available. You should 
> consider upgrading via the 'pip install --upgrade pip' command.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka

2020-02-13 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-16048:
---
Summary: Support read/write confluent schema registry avro data  from Kafka 
 (was: Support read confluent schema registry avro data  from Kafka)

> Support read/write confluent schema registry avro data  from Kafka
> --
>
> Key: FLINK-16048
> URL: https://issues.apache.org/jira/browse/FLINK-16048
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.11.0
>Reporter: Leonard Xu
>Priority: Major
> Fix For: 1.11.0
>
>
>  found SQL Kafka connector can not consume avro data that was serialized by 
> `KafkaAvroSerializer` and only can consume Row data with avro schema because 
> we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de 
> data in  `AvroRowFormatFactory`. 
> I think we should support this because `KafkaAvroSerializer` is very common 
> in Kafka.
> and someone met same question in stackoverflow[1].
> [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16048) Support read/write confluent schema registry avro data from Kafka

2020-02-13 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-16048:
---
Description: 
KafkaAvroSerializer and AvroRowSerializationSchema
I found SQL Kafka connector can not consume avro data that was serialized by 
`KafkaAvroSerializer` and only can consume Row data with avro schema because we 
use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de data in  
`AvroRowFormatFactory`. 

I think we should support this because `KafkaAvroSerializer` is very common in 
Kafka.

and someone met same question in stackoverflow[1].

[[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]

  was:
 found SQL Kafka connector can not consume avro data that was serialized by 
`KafkaAvroSerializer` and only can consume Row data with avro schema because we 
use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de data in  
`AvroRowFormatFactory`. 

I think we should support this because `KafkaAvroSerializer` is very common in 
Kafka.

and someone met same question in stackoverflow[1].


[[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]


> Support read/write confluent schema registry avro data  from Kafka
> --
>
> Key: FLINK-16048
> URL: https://issues.apache.org/jira/browse/FLINK-16048
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.11.0
>Reporter: Leonard Xu
>Priority: Major
> Fix For: 1.11.0
>
>
> KafkaAvroSerializer and AvroRowSerializationSchema
> I found SQL Kafka connector can not consume avro data that was serialized by 
> `KafkaAvroSerializer` and only can consume Row data with avro schema because 
> we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de 
> data in  `AvroRowFormatFactory`. 
> I think we should support this because `KafkaAvroSerializer` is very common 
> in Kafka.
> and someone met same question in stackoverflow[1].
> [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16026) Travis failed due to python setup

2020-02-13 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036656#comment-17036656
 ] 

Dian Fu edited comment on FLINK-16026 at 2/14/20 2:51 AM:
--

[~chesnay] Thanks a lot for your suggestions. Appreciated!

We have investigated this issue and want to share our thoughts as following:
- It's a good idea to limit the versions of the direct dependencies and I think 
we have already done that in PyFlink.
- It may be not a good idea to limit the versions of the transitive 
dependencies to a different version declared by the direct dependency:
-- We should trust the direct dependency, e.g. if apache-beam declares that 
it support avro-python3<2, we should trust that it does support all versions of 
avro-python3 which are less than 2.
-- We should not change the range of versions of the transitive 
dependencies declared by the direct dependency, e.g. if apache-beam declares 
that it supports avro-python3<2, it may not be a good idea to change the 
version limit to avro-python3<1.9.1 in PyFlink. This will introduce a lot of 
issues:
--- We have to upgrade the versions of the transitive dependencies 
periodically if new packages have been released
--- For released package of PyFlink, there is no way to upgrade the 
versions of the transitive dependencies any more
-- If there are exceptions for the above cases, we could address them case 
by case. For example, regarding to the issue of this JIRA, it's because that 
avro community has released an error package. We could just disable it for the 
time being. This is a trade off, however, I guess most Python projects handle 
this kind of issues in this way.

What's your thoughts?


was (Author: dian.fu):
[~chesnay] Thanks a lot for your suggestions. Appreciated!

We have investigated this issue and want to share our thoughts as following:
- It's a good idea to limit the versions of the direct dependencies and I think 
we have already done that in PyFlink.
- It may be not a good idea to limit the versions of the transitive 
dependencies to a different version declared by the direct dependency:
1. We should trust the direct dependency, e.g. if apache-beam declares that 
it support avro-python3<2, we should trust that it does support all versions of 
avro-python3 which are less than 2.
2. We should not change the range of versions of the transitive 
dependencies declared by the direct dependency, e.g. if apache-beam declares 
that it supports avro-python3<2, it may not be a good idea to change the 
version limit to avro-python3<1.9.1 in PyFlink. This will introduce a lot of 
issues:
1) We have to upgrade the versions of the transitive dependencies 
periodically if new packages have been released
2) For released package of PyFlink, there is no way to upgrade the 
versions of the transitive dependencies any more
3. If there are exceptions for the above cases, we could address them case 
by case. For example, regarding to the issue of this JIRA, it's because that 
avro community has released an error package. We could just disable it for the 
time being. This is a trade off, however, I guess most Python projects handle 
this kind of issues in this way.

What's your thoughts?

> Travis failed due to python setup
> -
>
> Key: FLINK-16026
> URL: https://issues.apache.org/jira/browse/FLINK-16026
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Reporter: Jingsong Lee
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.1, 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> [https://api.travis-ci.com/v3/job/286671652/log.txt]
> [https://api.travis-ci.org/v3/job/649754603/log.txt]
> [https://api.travis-ci.com/v3/job/286409130/log.txt]
> Collecting avro-python3<2.0.0,>=1.8.1; python_version >= "3.0" (from 
> apache-beam==2.19.0->apache-flink==1.11.dev0) Using cached 
> https://files.pythonhosted.org/packages/31/21/d98e2515e5ca0337d7e747e8065227ee77faf5c817bbb74391899613178a/avro-python3-1.9.2.tar.gz
>  Complete output from command python setup.py egg_info: Traceback (most 
> recent call last): File "", line 1, in  File 
> "/tmp/pip-install-d6uvsl_b/avro-python3/setup.py", line 41, in  
> import pycodestyle ModuleNotFoundError: No module named 'pycodestyle' 
>  Command "python setup.py egg_info" 
> failed with error code 1 in /tmp/pip-install-d6uvsl_b/avro-python3/ You are 
> using pip version 10.0.1, however version 20.0.2 is available. You should 
> consider upgrading via the 'pip install --upgrade pip' command.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16048) Support read confluent schema registry avro data from Kafka

2020-02-13 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-16048:
---
Summary: Support read confluent schema registry avro data  from Kafka  
(was: Support read avro data that serialized by KafkaAvroSerializer from Kafka 
in Table)

> Support read confluent schema registry avro data  from Kafka
> 
>
> Key: FLINK-16048
> URL: https://issues.apache.org/jira/browse/FLINK-16048
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.11.0
>Reporter: Leonard Xu
>Priority: Major
> Fix For: 1.11.0
>
>
>  found SQL Kafka connector can not consume avro data that was serialized by 
> `KafkaAvroSerializer` and only can consume Row data with avro schema because 
> we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de 
> data in  `AvroRowFormatFactory`. 
> I think we should support this because `KafkaAvroSerializer` is very common 
> in Kafka.
> and someone met same question in stackoverflow[1].
> [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16026) Travis failed due to python setup

2020-02-13 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036656#comment-17036656
 ] 

Dian Fu commented on FLINK-16026:
-

[~chesnay] Thanks a lot for your suggestions. Appreciated!

We have investigated this issue and want to share our thoughts as following:
- It's a good idea to limit the versions of the direct dependencies and I think 
we have already done that in PyFlink.
- It may be not a good idea to limit the versions of the transitive 
dependencies to a different version declared by the direct dependency:
1. We should trust the direct dependency, e.g. if apache-beam declares that 
it support avro-python3<2, we should trust that it does support all versions of 
avro-python3 which are less than 2.
2. We should not change the range of versions of the transitive 
dependencies declared by the direct dependency, e.g. if apache-beam declares 
that it supports avro-python3<2, it may not be a good idea to change the 
version limit to avro-python3<1.9.1 in PyFlink. This will introduce a lot of 
issues:
1) We have to upgrade the versions of the transitive dependencies 
periodically if new packages have been released
2) For released package of PyFlink, there is no way to upgrade the 
versions of the transitive dependencies any more
3. If there are exceptions for the above cases, we could address them case 
by case. For example, regarding to the issue of this JIRA, it's because that 
avro community has released an error package. We could just disable it for the 
time being. This is a trade off, however, I guess most Python projects handle 
this kind of issues in this way.

What's your thoughts?

> Travis failed due to python setup
> -
>
> Key: FLINK-16026
> URL: https://issues.apache.org/jira/browse/FLINK-16026
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Reporter: Jingsong Lee
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.1, 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> [https://api.travis-ci.com/v3/job/286671652/log.txt]
> [https://api.travis-ci.org/v3/job/649754603/log.txt]
> [https://api.travis-ci.com/v3/job/286409130/log.txt]
> Collecting avro-python3<2.0.0,>=1.8.1; python_version >= "3.0" (from 
> apache-beam==2.19.0->apache-flink==1.11.dev0) Using cached 
> https://files.pythonhosted.org/packages/31/21/d98e2515e5ca0337d7e747e8065227ee77faf5c817bbb74391899613178a/avro-python3-1.9.2.tar.gz
>  Complete output from command python setup.py egg_info: Traceback (most 
> recent call last): File "", line 1, in  File 
> "/tmp/pip-install-d6uvsl_b/avro-python3/setup.py", line 41, in  
> import pycodestyle ModuleNotFoundError: No module named 'pycodestyle' 
>  Command "python setup.py egg_info" 
> failed with error code 1 in /tmp/pip-install-d6uvsl_b/avro-python3/ You are 
> using pip version 10.0.1, however version 20.0.2 is available. You should 
> consider upgrading via the 'pip install --upgrade pip' command.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16050) Add Attempt Information

2020-02-13 Thread lining (Jira)
lining created FLINK-16050:
--

 Summary: Add Attempt Information
 Key: FLINK-16050
 URL: https://issues.apache.org/jira/browse/FLINK-16050
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / REST, Runtime / Web Frontend
Reporter: lining


According to the 
[docs|https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-vertices-vertexid-subtasks-subtaskindex],
 there may exist more than one attempt in a subtask, but there is no way to get 
the attempt history list in the REST API, users have no way to know if the 
subtask has failed before. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14127) Better BackPressure Detection in WebUI

2020-02-13 Thread lining (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining updated FLINK-14127:
---
Parent: FLINK-14712
Issue Type: Sub-task  (was: Improvement)

> Better BackPressure Detection in WebUI
> --
>
> Key: FLINK-14127
> URL: https://issues.apache.org/jira/browse/FLINK-14127
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Web Frontend
>Affects Versions: 1.10.0
>Reporter: Yadong Xie
>Priority: Major
> Fix For: 1.11.0
>
> Attachments: 屏幕快照 2019-09-19 下午6.00.05.png, 屏幕快照 2019-09-19 
> 下午6.00.57.png, 屏幕快照 2019-09-19 下午6.01.43.png
>
>
> According to the 
> [Document|https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/back_pressure.html],
>  the backpressure monitor only triggered on request and it is currently not 
> available via metrics. This means that in the web UI we have no way to show 
> all the backpressure state of all vertexes at the same time. The users need 
> to click every vertex to get its backpressure state.
> !屏幕快照 2019-09-19 下午6.00.05.png|width=510,height=197!
> In Flink 1.9.0 and above, there are four metrics available(outPoolUsage, 
> inPoolUsage, floatingBuffersUsage, exclusiveBuffersUsage), we can use these 
> metrics to determine if there are possible backpressure, and then use the 
> backpressure REST API to confirm it.
> Here is a table get from 
> [https://flink.apache.org/2019/07/23/flink-network-stack-2.html]
> !屏幕快照 2019-09-19 下午6.00.57.png|width=516,height=304!
>  
> We can display the possible backpressure status on the vertex graph, thus 
> users can get all the vertex backpressure states and locate the potential 
> problem quickly.
>  
> !屏幕快照 2019-09-19 下午6.01.43.png|width=572,height=277!
>  
> REST API needed:
> add outPoolUsage, inPoolUsage, floatingBuffersUsage, exclusiveBuffersUsage 
> metrics for each vertex in the /jobs/:jobId API



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16012) Reduce the default number of exclusive buffers from 2 to 1 on receiver side

2020-02-13 Thread Jiayi Liao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036647#comment-17036647
 ] 

Jiayi Liao commented on FLINK-16012:


[~kevin.cyj] Which result is tested before the change? And which is after?

> Reduce the default number of exclusive buffers from 2 to 1 on receiver side
> ---
>
> Key: FLINK-16012
> URL: https://issues.apache.org/jira/browse/FLINK-16012
> Project: Flink
>  Issue Type: Improvement
>Reporter: Zhijiang
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
> Attachments: image-2020-02-13-21-54-05-026.png, 
> image-2020-02-13-23-30-17-951.png, image-2020-02-14-07-23-16-171.png, 
> image-2020-02-14-09-13-07-967.png
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In order to reduce the inflight buffers for checkpoint in the case of back 
> pressure, we can reduce the number of exclusive buffers for remote input 
> channel from default 2 to 1 as the first step. Besides that, the total 
> required buffers are also reduced as a result. We can further verify the 
> performance effect via various of benchmarks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-15913) Add Python Table Function Runner And Operator

2020-02-13 Thread Hequn Cheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng reassigned FLINK-15913:
---

Assignee: Huang Xingbo

> Add Python Table Function Runner And Operator
> -
>
> Key: FLINK-15913
> URL: https://issues.apache.org/jira/browse/FLINK-15913
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> This Jira will include two PRs:
>  * Add Python Table Function Runner and Operator in legacy planner
>  * Add Python Table Function Runner and Operator in blink planner



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-15913) Add Python Table Function Runner And Operator

2020-02-13 Thread Hequn Cheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng closed FLINK-15913.
---
Resolution: Resolved

> Add Python Table Function Runner And Operator
> -
>
> Key: FLINK-15913
> URL: https://issues.apache.org/jira/browse/FLINK-15913
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> This Jira will include two PRs:
>  * Add Python Table Function Runner and Operator in legacy planner
>  * Add Python Table Function Runner and Operator in blink planner



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15913) Add Python Table Function Runner And Operator

2020-02-13 Thread Hequn Cheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036638#comment-17036638
 ] 

Hequn Cheng edited comment on FLINK-15913 at 2/14/20 2:10 AM:
--

Resolved in 1.11.0 via 
#11044: d36d97c84a365d1aee8101dbb1db61374348d024
#11020: fee187e592e474236104f4185629a50abad66292


was (Author: hequn8128):
Resolved in 
#11044: d36d97c84a365d1aee8101dbb1db61374348d024
#11020: fee187e592e474236104f4185629a50abad66292

> Add Python Table Function Runner And Operator
> -
>
> Key: FLINK-15913
> URL: https://issues.apache.org/jira/browse/FLINK-15913
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Huang Xingbo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> This Jira will include two PRs:
>  * Add Python Table Function Runner and Operator in legacy planner
>  * Add Python Table Function Runner and Operator in blink planner



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15913) Add Python Table Function Runner And Operator

2020-02-13 Thread Hequn Cheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036638#comment-17036638
 ] 

Hequn Cheng commented on FLINK-15913:
-

Resolved in 
#11044: d36d97c84a365d1aee8101dbb1db61374348d024
#11020: fee187e592e474236104f4185629a50abad66292

> Add Python Table Function Runner And Operator
> -
>
> Key: FLINK-15913
> URL: https://issues.apache.org/jira/browse/FLINK-15913
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Huang Xingbo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> This Jira will include two PRs:
>  * Add Python Table Function Runner and Operator in legacy planner
>  * Add Python Table Function Runner and Operator in blink planner



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] hequn8128 closed pull request #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner

2020-02-13 Thread GitBox
hequn8128 closed pull request #11044: [FLINK-15913][python] Add Python 
TableFunction Runner and Operator in Blink planner
URL: https://github.com/apache/flink/pull/11044
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-16048) Support read avro data that serialized by KafkaAvroSerializer from Kafka in Table

2020-02-13 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036628#comment-17036628
 ] 

Leonard Xu commented on FLINK-16048:


Hi , [~dwysakowicz]  you're right, I think we should support Kafka's 
{{Serializers, because many business data format come from Kafka's 
}}{{Serializers in Kafka}}{{. }}
{{As the linked question, I met same question and sure schema has been right, I 
can post a demo code link later. }}

> Support read avro data that serialized by KafkaAvroSerializer from Kafka in 
> Table
> -
>
> Key: FLINK-16048
> URL: https://issues.apache.org/jira/browse/FLINK-16048
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.11.0
>Reporter: Leonard Xu
>Priority: Major
> Fix For: 1.11.0
>
>
>  found SQL Kafka connector can not consume avro data that was serialized by 
> `KafkaAvroSerializer` and only can consume Row data with avro schema because 
> we use `AvroRowDeserializationSchema/AvroRowSerializationSchema` to se/de 
> data in  `AvroRowFormatFactory`. 
> I think we should support this because `KafkaAvroSerializer` is very common 
> in Kafka.
> and someone met same question in stackoverflow[1].
> [[1]https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259|https://stackoverflow.com/questions/56452571/caused-by-org-apache-avro-avroruntimeexception-malformed-data-length-is-negat/56478259]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] dianfu edited a comment on issue #11084: [FLINK-16040][python] Change local import to global import

2020-02-13 Thread GitBox
dianfu edited a comment on issue #11084: [FLINK-16040][python] Change local 
import to global import
URL: https://github.com/apache/flink/pull/11084#issuecomment-586051943
 
 
   @HuangXingBo Thanks for the PR and sharing the performance data. The 
performance gain is impressive. LGTM. Merging...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-16040) Change local import to global import

2020-02-13 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu closed FLINK-16040.
---
Resolution: Resolved

Merged to master via 3cc8e1ae3a941410111e228d0beef4058fd3ed7e

> Change local import to global import
> 
>
> Key: FLINK-16040
> URL: https://issues.apache.org/jira/browse/FLINK-16040
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> There two reasons support doing this:
>  # Execute import will cost time.
>  # PEP8  claims that "Imports are always put at the top of the file" 
> [https://www.python.org/dev/peps/pep-0008/#imports]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] dianfu merged pull request #11084: [FLINK-16040][python] Change local import to global import

2020-02-13 Thread GitBox
dianfu merged pull request #11084: [FLINK-16040][python] Change local import to 
global import
URL: https://github.com/apache/flink/pull/11084
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on issue #11084: [FLINK-16040][python] Change local import to global import

2020-02-13 Thread GitBox
dianfu commented on issue #11084: [FLINK-16040][python] Change local import to 
global import
URL: https://github.com/apache/flink/pull/11084#issuecomment-586051943
 
 
   LGTM. Merging...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-16012) Reduce the default number of exclusive buffers from 2 to 1 on receiver side

2020-02-13 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-16012:

Attachment: image-2020-02-14-09-13-07-967.png

> Reduce the default number of exclusive buffers from 2 to 1 on receiver side
> ---
>
> Key: FLINK-16012
> URL: https://issues.apache.org/jira/browse/FLINK-16012
> Project: Flink
>  Issue Type: Improvement
>Reporter: Zhijiang
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
> Attachments: image-2020-02-13-21-54-05-026.png, 
> image-2020-02-13-23-30-17-951.png, image-2020-02-14-07-23-16-171.png, 
> image-2020-02-14-09-13-07-967.png
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In order to reduce the inflight buffers for checkpoint in the case of back 
> pressure, we can reduce the number of exclusive buffers for remote input 
> channel from default 2 to 1 as the first step. Besides that, the total 
> required buffers are also reduced as a result. We can further verify the 
> performance effect via various of benchmarks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10483: [FLINK-15099][runtime] (FLIP-27) Add Operator Coordinators and Events

2020-02-13 Thread GitBox
flinkbot edited a comment on issue #10483: [FLINK-15099][runtime] (FLIP-27) Add 
Operator Coordinators and Events
URL: https://github.com/apache/flink/pull/10483#issuecomment-562912876
 
 
   
   ## CI report:
   
   * f7d3793fc60c8700f99adf52ed760e9665f42a90 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/139976585) 
   * 774d4ca5b2af749eee6fec62bcda369bac3c19c3 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148409371) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5071)
 
   * 58eaae24455b72e6aa44b78fb5b7f084e566154f Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148819933) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5145)
 
   * d2fa14deacb7f97e21646a009ce90aa6988901e2 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148826095) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5148)
 
   * b0a384d744b8a8957e916eb6a3860f66f0c0bb4d Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148841026) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5151)
 
   * ab016b838014b66333015c2d1cf792aca6aa4d0b Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148863321) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5158)
 
   * 627d2c11934000d81604777ea8f934dc0ece1bc4 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148871984) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5162)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11086: [FLINK-16046][es] Drop Elasticsearch 2 connector

2020-02-13 Thread GitBox
flinkbot edited a comment on issue #11086: [FLINK-16046][es] Drop Elasticsearch 
2 connector
URL: https://github.com/apache/flink/pull/11086#issuecomment-585812010
 
 
   
   ## CI report:
   
   * b80c63e29a93076852da00f58a8554b0d35cfa98 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148807618) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5141)
 
   * 3b767e732ce88b1aa37887c7588eec260923a47d Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148826157) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5149)
 
   * c21293665f798293136bd3b637481037bc8614b8 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148854531) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5155)
 
   * 6b99318ade3d773fdd3bdbe12cc88a7268225531 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148872021) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5163)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-16012) Reduce the default number of exclusive buffers from 2 to 1 on receiver side

2020-02-13 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-16012:

Attachment: image-2020-02-14-07-23-16-171.png

> Reduce the default number of exclusive buffers from 2 to 1 on receiver side
> ---
>
> Key: FLINK-16012
> URL: https://issues.apache.org/jira/browse/FLINK-16012
> Project: Flink
>  Issue Type: Improvement
>Reporter: Zhijiang
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
> Attachments: image-2020-02-13-21-54-05-026.png, 
> image-2020-02-13-23-30-17-951.png, image-2020-02-14-07-23-16-171.png
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In order to reduce the inflight buffers for checkpoint in the case of back 
> pressure, we can reduce the number of exclusive buffers for remote input 
> channel from default 2 to 1 as the first step. Besides that, the total 
> required buffers are also reduced as a result. We can further verify the 
> performance effect via various of benchmarks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11086: [FLINK-16046][es] Drop Elasticsearch 2 connector

2020-02-13 Thread GitBox
flinkbot edited a comment on issue #11086: [FLINK-16046][es] Drop Elasticsearch 
2 connector
URL: https://github.com/apache/flink/pull/11086#issuecomment-585812010
 
 
   
   ## CI report:
   
   * b80c63e29a93076852da00f58a8554b0d35cfa98 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148807618) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5141)
 
   * 3b767e732ce88b1aa37887c7588eec260923a47d Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148826157) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5149)
 
   * c21293665f798293136bd3b637481037bc8614b8 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148854531) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5155)
 
   * 6b99318ade3d773fdd3bdbe12cc88a7268225531 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148872021) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5163)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10483: [FLINK-15099][runtime] (FLIP-27) Add Operator Coordinators and Events

2020-02-13 Thread GitBox
flinkbot edited a comment on issue #10483: [FLINK-15099][runtime] (FLIP-27) Add 
Operator Coordinators and Events
URL: https://github.com/apache/flink/pull/10483#issuecomment-562912876
 
 
   
   ## CI report:
   
   * f7d3793fc60c8700f99adf52ed760e9665f42a90 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/139976585) 
   * 774d4ca5b2af749eee6fec62bcda369bac3c19c3 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148409371) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5071)
 
   * 58eaae24455b72e6aa44b78fb5b7f084e566154f Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148819933) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5145)
 
   * d2fa14deacb7f97e21646a009ce90aa6988901e2 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148826095) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5148)
 
   * b0a384d744b8a8957e916eb6a3860f66f0c0bb4d Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148841026) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5151)
 
   * ab016b838014b66333015c2d1cf792aca6aa4d0b Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148863321) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5158)
 
   * 627d2c11934000d81604777ea8f934dc0ece1bc4 Travis: 
[PENDING](https://travis-ci.com/flink-ci/flink/builds/148871984) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5162)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11086: [FLINK-16046][es] Drop Elasticsearch 2 connector

2020-02-13 Thread GitBox
flinkbot edited a comment on issue #11086: [FLINK-16046][es] Drop Elasticsearch 
2 connector
URL: https://github.com/apache/flink/pull/11086#issuecomment-585812010
 
 
   
   ## CI report:
   
   * b80c63e29a93076852da00f58a8554b0d35cfa98 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148807618) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5141)
 
   * 3b767e732ce88b1aa37887c7588eec260923a47d Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148826157) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5149)
 
   * c21293665f798293136bd3b637481037bc8614b8 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148854531) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5155)
 
   * 6b99318ade3d773fdd3bdbe12cc88a7268225531 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10483: [FLINK-15099][runtime] (FLIP-27) Add Operator Coordinators and Events

2020-02-13 Thread GitBox
flinkbot edited a comment on issue #10483: [FLINK-15099][runtime] (FLIP-27) Add 
Operator Coordinators and Events
URL: https://github.com/apache/flink/pull/10483#issuecomment-562912876
 
 
   
   ## CI report:
   
   * f7d3793fc60c8700f99adf52ed760e9665f42a90 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/139976585) 
   * 774d4ca5b2af749eee6fec62bcda369bac3c19c3 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148409371) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5071)
 
   * 58eaae24455b72e6aa44b78fb5b7f084e566154f Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148819933) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5145)
 
   * d2fa14deacb7f97e21646a009ce90aa6988901e2 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148826095) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5148)
 
   * b0a384d744b8a8957e916eb6a3860f66f0c0bb4d Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148841026) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5151)
 
   * ab016b838014b66333015c2d1cf792aca6aa4d0b Travis: 
[PENDING](https://travis-ci.com/flink-ci/flink/builds/148863321) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5158)
 
   * 627d2c11934000d81604777ea8f934dc0ece1bc4 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10918) incremental Keyed state with RocksDB throws cannot create directory error in windows

2020-02-13 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036536#comment-17036536
 ] 

Stephan Ewen commented on FLINK-10918:
--

Here is an implementation of using {{java.nio.file.Path}} rather than Flink's 
Path class:
https://github.com/StephanEwen/flink/tree/rocks_incremental_windows


> incremental Keyed state with RocksDB throws cannot create directory error in 
> windows
> 
>
> Key: FLINK-10918
> URL: https://issues.apache.org/jira/browse/FLINK-10918
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Queryable State, Runtime / State Backends
>Affects Versions: 1.6.2, 1.9.2, 1.10.0
> Environment: windows
> {code:java}
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val rocksdb = new RocksDBStateBackend("file:\\C:\\rocksdb\\checkpoint",true)
> rocksdb.setDbStoragePath("file:\\C:\\rocksdb\\storage")
> env.setStateBackend(rocksdb)
> env.enableCheckpointing(10)
> {code}
>  
>Reporter: Amit
>Priority: Major
>  Labels: usability
> Attachments: 
> 0001-FLINK-10918-Fix-for-checkpoint-creation-on-windows-1.patch
>
>
> Facing error while enabling keyed state with RocksDBBackend with 
> checkpointing to a local windows directory
>  
> {code:java}
> Caused by: org.rocksdb.RocksDBException: Failed to create dir: 
> /c:/tmp/data/job_dbe01128760d4d5cb90809cd94c2a936_op_StreamMap_b5c8d46f3e7b141acf271f12622e752b__3_8__uuid_45c1f62b-a198-44f5-add5-7683079b03f8/chk-1.tmp:
>  Invalid argument
>     at org.rocksdb.Checkpoint.createCheckpoint(Native Method)
>     at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.takeSnapshot(RocksDBKeyedStateBackend.java:2549)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$IncrementalSnapshotStrategy.performSnapshot(RocksDBKeyedStateBackend.java:2008)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:498)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406)
>     ... 13 more
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] StephanEwen commented on a change in pull request #10483: [FLINK-15099][runtime] (FLIP-27) Add Operator Coordinators and Events

2020-02-13 Thread GitBox
StephanEwen commented on a change in pull request #10483: 
[FLINK-15099][runtime] (FLIP-27) Add Operator Coordinators and Events
URL: https://github.com/apache/flink/pull/10483#discussion_r379143892
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/util/AutoContextClassLoader.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+/**
+ * Sets a context class loader in a "try-with-resources" pattern.
+ *
+ * 
+ * {@code
+ * try (AutoContextClassLoader ignored = 
AutoContextClassLoader.of(classloader)) {
+ * // code that needs the context class loader
+ * }
+ * }
+ * 
+ *
+ * This is conceptually the same as the code below.
+
+ * 
+ * {@code
+ * ClassLoader original = Thread.currentThread().getContextClassLoader();
+ * Thread.currentThread().setContextClassLoader(classloader);
+ * try {
+ * // code that needs the context class loader
+ * } finally {
+ * Thread.currentThread().setContextClassLoader(original);
+ * }
+ * }
+ * 
+ */
+public final class AutoContextClassLoader implements AutoCloseable {
 
 Review comment:
   Would consolidate that in a separate commit. If that gets used beyond 
plugins, it should reside in a more "universal" package. The 
`AutoContextClassLoader` is a tad bit nicer, especially javadoc wise.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10483: [FLINK-15099][runtime] (FLIP-27) Add Operator Coordinators and Events

2020-02-13 Thread GitBox
flinkbot edited a comment on issue #10483: [FLINK-15099][runtime] (FLIP-27) Add 
Operator Coordinators and Events
URL: https://github.com/apache/flink/pull/10483#issuecomment-562912876
 
 
   
   ## CI report:
   
   * f7d3793fc60c8700f99adf52ed760e9665f42a90 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/139976585) 
   * 774d4ca5b2af749eee6fec62bcda369bac3c19c3 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148409371) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5071)
 
   * 58eaae24455b72e6aa44b78fb5b7f084e566154f Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148819933) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5145)
 
   * d2fa14deacb7f97e21646a009ce90aa6988901e2 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148826095) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5148)
 
   * b0a384d744b8a8957e916eb6a3860f66f0c0bb4d Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148841026) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5151)
 
   * ab016b838014b66333015c2d1cf792aca6aa4d0b Travis: 
[PENDING](https://travis-ci.com/flink-ci/flink/builds/148863321) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5158)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   >