[GitHub] [flink] xintongsong commented on a change in pull request #8704: [FLINK-12812][runtime] Set resource profiles for task slots

2019-07-03 Thread GitBox
xintongsong commented on a change in pull request #8704: [FLINK-12812][runtime] 
Set resource profiles for task slots
URL: https://github.com/apache/flink/pull/8704#discussion_r300238299
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
 ##
 @@ -284,12 +301,39 @@ else if (obj != null && obj.getClass() == 
ResourceProfile.class) {
return this.cpuCores == that.cpuCores &&
this.heapMemoryInMB == 
that.heapMemoryInMB &&
this.directMemoryInMB == 
that.directMemoryInMB &&
+   this.nativeMemoryInMB == 
that.nativeMemoryInMB &&
this.networkMemoryInMB == 
that.networkMemoryInMB &&
+   this.managedMemoryInMB == 
that.managedMemoryInMB &&
Objects.equals(extendedResources, 
that.extendedResources);
}
return false;
}
 
+   public boolean approximate(ResourceProfile that) {
+   if (that == null) {
+   return false;
+   }
+   if (Math.abs(this.cpuCores - that.cpuCores) > 1e-6f) {
 
 Review comment:
   I'll add comments for these explanations.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #8704: [FLINK-12812][runtime] Set resource profiles for task slots

2019-07-03 Thread GitBox
xintongsong commented on a change in pull request #8704: [FLINK-12812][runtime] 
Set resource profiles for task slots
URL: https://github.com/apache/flink/pull/8704#discussion_r300238018
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
 ##
 @@ -284,12 +301,39 @@ else if (obj != null && obj.getClass() == 
ResourceProfile.class) {
return this.cpuCores == that.cpuCores &&
this.heapMemoryInMB == 
that.heapMemoryInMB &&
this.directMemoryInMB == 
that.directMemoryInMB &&
+   this.nativeMemoryInMB == 
that.nativeMemoryInMB &&
this.networkMemoryInMB == 
that.networkMemoryInMB &&
+   this.managedMemoryInMB == 
that.managedMemoryInMB &&
Objects.equals(extendedResources, 
that.extendedResources);
}
return false;
}
 
+   public boolean approximate(ResourceProfile that) {
+   if (that == null) {
+   return false;
+   }
+   if (Math.abs(this.cpuCores - that.cpuCores) > 1e-6f) {
 
 Review comment:
   Yes, this is about the rounding errors during the profile calculation.
   
   The calculation of profiles involves multiplying integer values with 
floating values. Rounding the floating value product to integer value will 
cause error. E.g., rounded result of `Total * Fraction` and `Total - Total * (1 
- Fraction)` may be different, where `Total` is an integer value and `Fraction` 
is a floating value.
   
   Each time we do such rounding, we may get an error with max value 1. Since 
there are two of such fraction based calculation (for managed memory and 
network memory), I set the max error allowed here to 2.
   
   This approximate matching is only used for matching `PendingTaskManagerSlot` 
with slot registered from TM. It can be replaced with exact matching once we 
unify the TM resource configuration. After we unify the TM resource 
configurations, there will be no more resource calculations on TM side. For 
Yarn, RM will calculate the resource profile and pass the calculation result to 
the TM to be started, so the pending slots and the actual slots should have 
exact same profiles. For standalone, there shouldn't be any pending slots.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11638) Translate "Savepoints" page into Chinese

2019-07-03 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-11638:
---

Assignee: Forward Xu  (was: yelun)

> Translate "Savepoints" page into Chinese
> 
>
> Key: FLINK-11638
> URL: https://issues.apache.org/jira/browse/FLINK-11638
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Congxian Qiu(klion26)
>Assignee: Forward Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> doc locates in flink/docs/ops/state/savepoints.zh.md



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11638) Translate "Savepoints" page into Chinese

2019-07-03 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878324#comment-16878324
 ] 

Jark Wu commented on FLINK-11638:
-

Hi [~guanghui], [~x1q1j1] has assign this issue to himself and created a pull 
request. Please ask the assignee before take issue.

> Translate "Savepoints" page into Chinese
> 
>
> Key: FLINK-11638
> URL: https://issues.apache.org/jira/browse/FLINK-11638
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Congxian Qiu(klion26)
>Assignee: yelun
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> doc locates in flink/docs/ops/state/savepoints.zh.md



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11638) Translate "Savepoints" page into Chinese

2019-07-03 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-11638.
---
   Resolution: Fixed
Fix Version/s: 1.9.0

Fixed in 1.9.0: 4ad19eff713284c0f30c7dba3cd21095baf18d42

> Translate "Savepoints" page into Chinese
> 
>
> Key: FLINK-11638
> URL: https://issues.apache.org/jira/browse/FLINK-11638
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Congxian Qiu(klion26)
>Assignee: yelun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> doc locates in flink/docs/ops/state/savepoints.zh.md



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11638) Translate "Savepoints" page into Chinese

2019-07-03 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878324#comment-16878324
 ] 

Jark Wu edited comment on FLINK-11638 at 7/4/19 5:48 AM:
-

Hi [~guanghui], [~x1q1j1] has assign this issue to himself and created a pull 
request before. Please ask the assignee before take issue.


was (Author: jark):
Hi [~guanghui], [~x1q1j1] has assign this issue to himself and created a pull 
request. Please ask the assignee before take issue.

> Translate "Savepoints" page into Chinese
> 
>
> Key: FLINK-11638
> URL: https://issues.apache.org/jira/browse/FLINK-11638
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Congxian Qiu(klion26)
>Assignee: yelun
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> doc locates in flink/docs/ops/state/savepoints.zh.md



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese

2019-07-03 Thread GitBox
asfgit closed pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints 
page into Chinese
URL: https://github.com/apache/flink/pull/8300
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8979: [FLINK-13087][table] Add group window Aggregate operator to Table API

2019-07-03 Thread GitBox
flinkbot commented on issue #8979: [FLINK-13087][table] Add group window 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8979#issuecomment-508346981
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13087) Add group window Aggregate operator to Table API

2019-07-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-13087:
---
Labels: pull-request-available  (was: )

> Add group window Aggregate operator to Table API
> 
>
> Key: FLINK-13087
> URL: https://issues.apache.org/jira/browse/FLINK-13087
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Add Group Window Aggregate operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>  .window(Tumble over 15.minute on 'rowtime as 'w)
>  .groupBy('w, 'a) // leave out groupBy-clause to define global aggregates
>  .agg(fun: AggregateFunction)  // output has columns 'a, 'b, 'c
>  .select('a, 'c, 'w.start)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] hequn8128 opened a new pull request #8979: [FLINK-13087][table] Add group window Aggregate operator to Table API

2019-07-03 Thread GitBox
hequn8128 opened a new pull request #8979: [FLINK-13087][table] Add group 
window Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8979
 
 
   
   ## What is the purpose of the change
   
   This pull request adds row-based group window Aggregate operator to Table 
API. 
   Also, there are some hotfixes in separate commits. The implementation window 
aggregate operator depends on some hotfixes, so I didn't create other jiras.
   
   
   ## Brief change log
   
   There are totally 5 commits:
 - 
[#f5e8d1f](https://github.com/apache/flink/commit/f5e8d1f60a1de4ff69e5e713c4382327c149f33c)
 Resolve indent problem for `WindowFlatAggregateTableImpl`.
 - 
[#9395878](https://github.com/apache/flink/commit/939587853716a64c787af17be4610216f9c3b225)
 Add call support in the group by for row based aggregate. For example, support 
groupBy('a % b) expressions.
 - 
[#ee2b065](https://github.com/apache/flink/commit/ee2b06567ef11569dc29fd3c9622fd8afbc49bcc)
 Throw exceptions if there is a start in the select after window 
(table)aggregate. Because we don't know which window properties should be 
selected.
 - 
[#365f31d](https://github.com/apache/flink/commit/365f31da14376b74dd4a3286dbe0ae17be22d4a6)
 Validate alias length for aggregate. The alias length should equal to the 
length of result type.
 - 
[#deca009](https://github.com/apache/flink/commit/deca0096d4ed9b59718e0a14c8f242bdaaab55d5)
 Add group window Aggregate operator to Table API
   
   For the last commit: Add group window Aggregate operator to Table API, it 
mainly contains the following changes
 - Add `aggregate` method to the `WindowGroupedTable`
 - Build `windowAggregate` QueryOperation in `OperationTreeBuilder`
 - Add tests and docs
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - Added plan tests in `AggregateStringExpressionTest`, `AggregateTest`
 - Add validation tests in `GroupWindowValidationTest`
 - Add IT tests in `GroupWindowITCase`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (docs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-13090) Test Hive connector with hive runner

2019-07-03 Thread Rui Li (JIRA)
Rui Li created FLINK-13090:
--

 Summary: Test Hive connector with hive runner
 Key: FLINK-13090
 URL: https://issues.apache.org/jira/browse/FLINK-13090
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive
Reporter: Rui Li
Assignee: Rui Li






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] lirui-apache commented on issue #8965: [FLINK-13068][hive] HiveTableSink should implement PartitionableTable…

2019-07-03 Thread GitBox
lirui-apache commented on issue #8965: [FLINK-13068][hive] HiveTableSink should 
implement PartitionableTable…
URL: https://github.com/apache/flink/pull/8965#issuecomment-508344669
 
 
   Thanks @xuefuz and @bowenli86 for the review. Please take another look.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on a change in pull request #8976: [FLINK-12277][table/hive/doc] Add documentation for catalogs

2019-07-03 Thread GitBox
lirui-apache commented on a change in pull request #8976: 
[FLINK-12277][table/hive/doc] Add documentation for catalogs
URL: https://github.com/apache/flink/pull/8976#discussion_r300233788
 
 

 ##
 File path: docs/dev/table/catalog.md
 ##
 @@ -0,0 +1,345 @@
+---
+title: "Catalog"
+is_beta: true
+nav-parent_id: tableapi
+nav-pos: 100
+---
+
+
+A catalog can provide information about metadata, such as names, schemas, 
statistics of tables, and information about how to access data stored in a 
database or table. Once a catalog is registered to a `TableEnvironment`, all 
meta-objects defined in a catalog can be accessed from Table API or SQL queries.
+
+
+* This will be replaced by the TOC
+{:toc}
+
+
+Catalog Interface
+-
+
+APIs are defined in `Catalog` interface. The interface defines a set of APIs 
to read and write catalog meta-objects such as database, tables, partitions, 
views, and functions.
+
+Users can develop their own catalogs by implementing the interface.
+
+
+Naming Structure in Catalog
+---
+
+Flink's catalogs use a strict two-level structure, that is, catalogs contain 
databases, and databases contain meta-objects. Thus, the full name of a 
meta-object is always structured as `catalogName`.`databaseName`.`objectName`.
+
+All registered catalogs are managed by a `CatalogManager` instance in 
`TableEnvironment`. In order to ease access to meta-objects, `CatalogManager` 
has a concept of current catalog and current database. Usually how users access 
meta-objects in a catalog is to specify its full name in the format of 
`catalogName`.`databaseName`.`objectName`. By setting current catalog and 
current database, users can use just the meta-object's name in their queries. 
This greatly simplifies user experience. For example, a previous query as
+
+```sql
+select * from mycatalog.mydb.myTable;
+```
+
+can be shortened as
+
+```sql
+select * from myTable;
+```
+
+Querying tables in a different databases under the default catalog would be
+
+```
+select * from mydb2.myTable
+```
+
+`CatalogManager` always has a built-in `GenericInMemoryCatalog` with name of 
`default_catalog`, which has a built-in default database named 
`default_database`. They will be the current catalog and current database if no 
other catalog and database are explicitly set. All temp meta-objects will be 
registered to this catalog. Users can set current catalog and database via 
`TableEnvironment.useCatalog(...)` and `TableEnvironment.useDatabase(...)` in 
Table API, or `USE CATALOG ...` and `USE DATABASE ...` in Flink SQL.
+
+
+Catalog Types
+-
+
+## GenericInMemoryCatalog
+
+All meta-objects in this catalog are stored in memory, and be will lost once 
the session shuts down.
+
+Its config entry value in SQL CLI yaml file is "generic_in_memory".
+
+## HiveCatalog
+
+`HiveCatalog` can read and write both Flink and Hive meta-objects by using 
Hive Metastore as a persistent storage.
+
+Its config entry value in SQL CLI yaml file is "hive".
+
+### Persist Flink meta-objects
+
+Previously, Flink meta-objects are only stored in memory and are per session 
based. That means users have to recreate all the meta-objects every time they 
start a new session.
+
+To solve this user pain point, users can choose the option to use 
`HiveCatalog` to persist all of users' Flink streaming and batch meta-objects 
by using Hive Metastore as a pure storage. Because Hive Metastore is only used 
for storage in this case, Hive itself may not understand Flink's meta-objects 
stored in the metastore.
+
+### Integrate Flink with Hive metadata
+
+The ultimate goal for integrating Flink with Hive metadata is that:
+
+1. existing meta-objects, like tables, views, and functions, created by Hive 
or other Hive-compatible applications can be used by Flink
+
+2. meta-objects created by `HiveCatalog` can be written back to Hive metastore 
such that Hive and other Hive-compatibile applications can consume.
+
+## User-configured Catalog
+
+Catalogs are pluggable, and users can use their own, customized catalog 
implementations.
+
+
+HiveCatalog
+---
+
+## Supported Hive Versions
+
+`HiveCatalog` officially supports Hive 2.3.4 and 1.2.1, and depends on Hive's 
own compatibility for the other 2.x.x and 1.x.x versions.
+
+Users need to explicitly specify the Hive version as string, by either passing 
it to the constructor when creating `HiveCatalog` instances directly in Table 
API or specifying it in yaml config file in SQL CLI. The Hive version string 
will be either `2.3.4`, `1.2.1`, or your own 2.x.x/1.x.x versions.
+
+## Dependencies
+
+In order to use `HiveCatalog`, users need to either downloading the following 
dependency jars and adding them to the `/lib` dir in Flink distribution, or 
adding their existing Hive jars to Flink's classpath in order for Flink to find 
them at runtime.
+
+Take Hive 2.3.4 for example:
+
+```
+// Hive dependencies
+
+- hive-metastore-2.3.4.jar
 
 

[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…

2019-07-03 Thread GitBox
danny0405 commented on a change in pull request #8844: 
[FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
URL: https://github.com/apache/flink/pull/8844#discussion_r300221923
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 ##
 @@ -245,6 +247,57 @@ public String explain(Table table) {
return planner.getCompletionHints(statement, position);
}
 
+   @Override
+   public void sql(String statement) {
+   List operations = planner.parse(statement);
+   operations.forEach(operation -> {
+   if (operation instanceof CreateTableOperation) {
+   CreateTableOperation operation1 = 
(CreateTableOperation) operation;
+   registerTable(
+   operation1.getTablePath(),
+   operation1.getCatalogTable(),
+   operation1.isIgnoreIfExists());
+   } else if (operation instanceof ModifyOperation) {
+   queryConfigProvider.setConfig(new 
StreamQueryConfig());
+   List> transformations =
+   
planner.translate(Collections.singletonList((ModifyOperation) operation));
+
+   execEnv.apply(transformations);
 
 Review comment:
   What do you mean by "translated independently" ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…

2019-07-03 Thread GitBox
danny0405 commented on a change in pull request #8844: 
[FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
URL: https://github.com/apache/flink/pull/8844#discussion_r300223062
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
 ##
 @@ -304,6 +304,22 @@ public Builder() {
fieldDataTypes = new ArrayList<>();
}
 
+   /** Create a proto builder from an existing schema.
+*
+* Caution: This will invoke {@link #instance()}} to fetch a 
new builder first.
+*/
+   public Builder proto(TableSchema other) {
 
 Review comment:
   Create a builder from another TableSchema.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…

2019-07-03 Thread GitBox
danny0405 commented on a change in pull request #8844: 
[FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
URL: https://github.com/apache/flink/pull/8844#discussion_r300221812
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
 ##
 @@ -287,6 +287,46 @@
@Deprecated
String[] getCompletionHints(String statement, int position);
 
+   /**
+* Evaluates mixed-in sql statements including DDLs and DMLs.
+* Note: Always use this interface to execute a sql query if no {@link 
Table}
+* result are expected, else use {@link #sqlQuery(String)}.
 
 Review comment:
   The sqlQuery should returns a Table object which will make the semantics of 
the interface not that clear, say: how about we have a sql select statement 
among the multiple statements ? It does not make any sense if we support a 
single select select statement but return nothing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-07-03 Thread GitBox
becketqin commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r300232263
 
 

 ##
 File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java
 ##
 @@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannel;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * A sink function that outputs to PubSub.
+ *
+ * @param  type of PubSubSink messages to write
+ */
+public class PubSubSink extends RichSinkFunction implements 
CheckpointedFunction {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSink.class);
+
+   private final Credentials credentials;
+   private final SerializationSchema serializationSchema;
+   private final String projectName;
+   private final String topicName;
+   private final String hostAndPortForEmulator;
+
+   private transient Publisher publisher;
+
+   private PubSubSink(
+   Credentials credentials,
+   SerializationSchema serializationSchema,
+   String projectName,
+   String topicName,
+   String hostAndPortForEmulator) {
+   this.credentials = credentials;
+   this.serializationSchema = serializationSchema;
+   this.projectName = projectName;
+   this.topicName = topicName;
+   this.hostAndPortForEmulator = hostAndPortForEmulator;
+   }
+
+   private transient ManagedChannel managedChannel = null;
+   private transient TransportChannel channel = null;
+
+   @Override
+   public void open(Configuration configuration) throws Exception {
+   Publisher.Builder builder = Publisher
+   .newBuilder(ProjectTopicName.of(projectName, topicName))
+   
.setCredentialsProvider(FixedCredentialsProvider.create(credentials));
+
+   if (hostAndPortForEmulator != null) {
+   managedChannel = ManagedChannelBuilder
+   .forTarget(hostAndPortForEmulator)
+   .usePlaintext(true) // This is 'Ok' because 
this is ONLY used for testing.
+   .build();
+   channel = 
GrpcTransportChannel.newBuilder().setManagedChannel(managedChannel).build();
+   
builder.setChannelProvider(FixedTransportChannelProvider.create(channel))
+   
.setCredentialsProvider(NoCredentialsProvider.create());
+   }
+
+   publisher = builder.build();
+   }
+
+   @Override
+   public void close() throws Exception {
+   super.close();
+   shutdownPublisher();
+   

[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…

2019-07-03 Thread GitBox
danny0405 commented on a change in pull request #8844: 
[FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
URL: https://github.com/apache/flink/pull/8844#discussion_r300221301
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
 ##
 @@ -235,6 +236,40 @@ public String getColumnSqlString() {
return writer.toString();
}
 
+   /** Split the computed columns sql into string k-v pairs which are put 
into
+* the {@code container}.
+*
+* For example, {@code col1 as to_timestamp(col2)} would be split 
into pair:
+* (col1, to_timestamp(col2)).
+**/
+   public void getComputedColumnExpressions(List> 
container) {
 
 Review comment:
   Removed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…

2019-07-03 Thread GitBox
danny0405 commented on a change in pull request #8844: 
[FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
URL: https://github.com/apache/flink/pull/8844#discussion_r300222616
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 ##
 @@ -245,6 +247,57 @@ public String explain(Table table) {
return planner.getCompletionHints(statement, position);
}
 
+   @Override
+   public void sql(String statement) {
+   List operations = planner.parse(statement);
+   operations.forEach(operation -> {
+   if (operation instanceof CreateTableOperation) {
+   CreateTableOperation operation1 = 
(CreateTableOperation) operation;
+   registerTable(
+   operation1.getTablePath(),
+   operation1.getCatalogTable(),
+   operation1.isIgnoreIfExists());
+   } else if (operation instanceof ModifyOperation) {
+   queryConfigProvider.setConfig(new 
StreamQueryConfig());
+   List> transformations =
+   
planner.translate(Collections.singletonList((ModifyOperation) operation));
+
+   execEnv.apply(transformations);
+   } else {
+   throw new ValidationException(
+   "Unsupported SQL statement: sql() only 
accepts DDLs or Inserts.");
+   }
+   });
+   }
+
+   /**
+* Registers a {@link CatalogBaseTable} under a given object path. The 
{@code path} could be
+* 3 formats:
+* 
+*   `catalog.db.table`: A full table path including the catalog 
name,
+*   the database name and table name.
+*   `db.table`: database name following table name, with the 
current catalog name.
+*   `table`: Only the table name, with the current catalog name 
and database  name.
+* 
+* The registered tables then can be referenced in Sql queries.
+*
+* @param path   The path under which the table will be 
registered
+* @param catalogTable   The table to register
+* @param ignoreIfExists If true, do nothing if there is already same 
table name under
+*   the {@code path}. If false, a 
TableAlreadyExistException throws.
+*/
+   private void registerTable(String[] path, CatalogBaseTable 
catalogTable, boolean ignoreIfExists) {
 
 Review comment:
   merged into `registerTableInternal`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…

2019-07-03 Thread GitBox
danny0405 commented on a change in pull request #8844: 
[FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
URL: https://github.com/apache/flink/pull/8844#discussion_r300222001
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 ##
 @@ -245,6 +247,57 @@ public String explain(Table table) {
return planner.getCompletionHints(statement, position);
}
 
+   @Override
+   public void sql(String statement) {
+   List operations = planner.parse(statement);
+   operations.forEach(operation -> {
+   if (operation instanceof CreateTableOperation) {
+   CreateTableOperation operation1 = 
(CreateTableOperation) operation;
+   registerTable(
+   operation1.getTablePath(),
+   operation1.getCatalogTable(),
+   operation1.isIgnoreIfExists());
+   } else if (operation instanceof ModifyOperation) {
+   queryConfigProvider.setConfig(new 
StreamQueryConfig());
+   List> transformations =
+   
planner.translate(Collections.singletonList((ModifyOperation) operation));
+
+   execEnv.apply(transformations);
+   } else {
+   throw new ValidationException(
 
 Review comment:
   Yep, no bridge logic in this patch.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…

2019-07-03 Thread GitBox
danny0405 commented on a change in pull request #8844: 
[FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
URL: https://github.com/apache/flink/pull/8844#discussion_r300223365
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlExecutableStatements.java
 ##
 @@ -18,62 +18,197 @@
 
 package org.apache.flink.table.sqlexec;
 
+import org.apache.flink.sql.parser.SqlProperty;
 import org.apache.flink.sql.parser.ddl.SqlCreateTable;
-import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.calcite.FlinkTypeSystem;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.PlannerQueryOperation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.util.StringUtils;
 
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.util.ReflectUtil;
 import org.apache.calcite.util.ReflectiveVisitor;
 
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 /**
  * Mix-in tool class for {@code SqlNode} that allows DDL commands to be
  * executed directly.
  *
- * For every kind of {@link SqlNode}, there needs a method named
- * #execute(type), the 'type' argument should be the subclass
- * type for the supported {@link SqlNode}.
+ * For every kind of {@link SqlNode}, there needs to have a corresponding
+ * #execute(type) method, the 'type' argument should be the subclass
+ * of the supported {@link SqlNode}.
+ *
+ * Every #execute() should return a {@link Operation} which can be used in
+ * {@link org.apache.flink.table.delegation.Planner}.
  */
 public class SqlExecutableStatements implements ReflectiveVisitor {
-   private TableEnvironment tableEnv;
+   private FlinkPlannerImpl flinkPlanner;
 
-   private final ReflectUtil.MethodDispatcher dispatcher =
-   ReflectUtil.createMethodDispatcher(Void.class,
+   private final ReflectUtil.MethodDispatcher dispatcher =
+   ReflectUtil.createMethodDispatcher(Operation.class,
 
 Review comment:
   Visitor returns a SqlNode while i need a Operation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-07-03 Thread GitBox
becketqin commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r300215868
 
 

 ##
 File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
 ##
 @@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeIdsForCheckpoint;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeOnCheckpoint;
+import org.apache.flink.streaming.connectors.gcp.pubsub.common.Acknowledger;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import org.apache.flink.util.Preconditions;
+
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * PubSub Source, this Source will consume PubSub messages from a subscription 
and Acknowledge them on the next checkpoint.
+ * This ensures every message will get acknowledged at least once.
+ */
+public class PubSubSource extends RichSourceFunction
+   implements ResultTypeQueryable, ParallelSourceFunction, 
CheckpointListener, ListCheckpointed> {
+   public static final int NO_MAX_MESSAGES_TO_ACKNOWLEDGE_LIMIT = -1;
+   private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSource.class);
+   protected final PubSubDeserializationSchema deserializationSchema;
+   protected final PubSubSubscriberFactory pubSubSubscriberFactory;
+   protected final Credentials credentials;
+   protected final int maxMessagesToAcknowledge;
+   protected final AcknowledgeOnCheckpointFactory 
acknowledgeOnCheckpointFactory;
+
+   protected transient AcknowledgeOnCheckpoint 
acknowledgeOnCheckpoint;
+   protected transient PubSubSubscriber subscriber;
+
+   protected transient volatile boolean isRunning;
+
+   PubSubSource(PubSubDeserializationSchema deserializationSchema,
+   PubSubSubscriberFactory pubSubSubscriberFactory,
+   Credentials credentials,
+   int maxMessagesToAcknowledge,
+   AcknowledgeOnCheckpointFactory 
acknowledgeOnCheckpointFactory) {
+   this.deserializationSchema = deserializationSchema;
+   this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+   this.credentials = credentials;
+   this.maxMessagesToAcknowledge = maxMessagesToAcknowledge;
+   this.acknowledgeOnCheckpointFactory = 
acknowledgeOnCheckpointFactory;
+   }
+
+   @Override
+   public void open(Configuration configuration) throws Exception {
+   

[GitHub] [flink] becketqin commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-07-03 Thread GitBox
becketqin commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r300223762
 
 

 ##
 File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java
 ##
 @@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannel;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+import static org.apache.flink.runtime.concurrent.Executors.directExecutor;
+
+/**
+ * A sink function that outputs to PubSub.
+ *
+ * @param  type of PubSubSink messages to write
+ */
+public class PubSubSink extends RichSinkFunction implements 
CheckpointedFunction {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSink.class);
+
+   private final ApiFutureCallback failureHandler;
+   private final Credentials credentials;
+   private final SerializationSchema serializationSchema;
+   private final String projectName;
+   private final String topicName;
+   private final String hostAndPortForEmulator;
+
+   private transient Publisher publisher;
+
+   private PubSubSink(
+   Credentials credentials,
+   SerializationSchema serializationSchema,
+   String projectName,
+   String topicName,
+   String hostAndPortForEmulator) {
+   this.failureHandler = new FailureHandler();
+   this.credentials = credentials;
+   this.serializationSchema = serializationSchema;
+   this.projectName = projectName;
+   this.topicName = topicName;
+   this.hostAndPortForEmulator = hostAndPortForEmulator;
+   }
+
+   private transient ManagedChannel managedChannel = null;
+   private transient TransportChannel channel = null;
+
+   @Override
+   public void open(Configuration configuration) throws Exception {
+   Publisher.Builder builder = Publisher
+   .newBuilder(ProjectTopicName.of(projectName, topicName))
+   
.setCredentialsProvider(FixedCredentialsProvider.create(credentials));
+
+   if (hostAndPortForEmulator != null) {
+   managedChannel = ManagedChannelBuilder
+   .forTarget(hostAndPortForEmulator)
+   .usePlaintext(true) // This is 'Ok' because 
this is ONLY used for testing.
+   .build();
+   channel = 
GrpcTransportChannel.newBuilder().setManagedChannel(managedChannel).build();
+   
builder.setChannelProvider(FixedTransportChannelProvider.create(channel))
+   

[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…

2019-07-03 Thread GitBox
danny0405 commented on a change in pull request #8844: 
[FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
URL: https://github.com/apache/flink/pull/8844#discussion_r300222783
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 ##
 @@ -245,6 +247,57 @@ public String explain(Table table) {
return planner.getCompletionHints(statement, position);
}
 
+   @Override
+   public void sql(String statement) {
+   List operations = planner.parse(statement);
+   operations.forEach(operation -> {
+   if (operation instanceof CreateTableOperation) {
+   CreateTableOperation operation1 = 
(CreateTableOperation) operation;
+   registerTable(
+   operation1.getTablePath(),
+   operation1.getCatalogTable(),
+   operation1.isIgnoreIfExists());
+   } else if (operation instanceof ModifyOperation) {
+   queryConfigProvider.setConfig(new 
StreamQueryConfig());
+   List> transformations =
+   
planner.translate(Collections.singletonList((ModifyOperation) operation));
+
+   execEnv.apply(transformations);
+   } else {
+   throw new ValidationException(
+   "Unsupported SQL statement: sql() only 
accepts DDLs or Inserts.");
+   }
+   });
+   }
+
+   /**
+* Registers a {@link CatalogBaseTable} under a given object path. The 
{@code path} could be
+* 3 formats:
+* 
+*   `catalog.db.table`: A full table path including the catalog 
name,
+*   the database name and table name.
+*   `db.table`: database name following table name, with the 
current catalog name.
+*   `table`: Only the table name, with the current catalog name 
and database  name.
+* 
+* The registered tables then can be referenced in Sql queries.
+*
+* @param path   The path under which the table will be 
registered
+* @param catalogTable   The table to register
+* @param ignoreIfExists If true, do nothing if there is already same 
table name under
+*   the {@code path}. If false, a 
TableAlreadyExistException throws.
+*/
+   private void registerTable(String[] path, CatalogBaseTable 
catalogTable, boolean ignoreIfExists) {
+   String[] fullName = 
catalogManager.paddedTableName(Arrays.asList(path));
+   Catalog catalog = getCatalog(fullName[0]).orElseThrow(() ->
+   new TableException("Catalog " + fullName[0] + " does 
not exist"));
+   ObjectPath objectPath = new ObjectPath(fullName[1], 
fullName[2]);
+   try {
+   catalog.createTable(objectPath, catalogTable, 
ignoreIfExists);
+   } catch (TableAlreadyExistException | DatabaseNotExistException 
e) {
+   throw new RuntimeException(e);
 
 Review comment:
   I don't want the invoker to force try catch the exception.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…

2019-07-03 Thread GitBox
danny0405 commented on a change in pull request #8844: 
[FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
URL: https://github.com/apache/flink/pull/8844#discussion_r300221845
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 ##
 @@ -245,6 +247,57 @@ public String explain(Table table) {
return planner.getCompletionHints(statement, position);
}
 
+   @Override
+   public void sql(String statement) {
+   List operations = planner.parse(statement);
+   operations.forEach(operation -> {
+   if (operation instanceof CreateTableOperation) {
+   CreateTableOperation operation1 = 
(CreateTableOperation) operation;
 
 Review comment:
   better name, thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…

2019-07-03 Thread GitBox
danny0405 commented on a change in pull request #8844: 
[FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
URL: https://github.com/apache/flink/pull/8844#discussion_r300222964
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateOperation.java
 ##
 @@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.operations.Operation;
+
+/**
+ * A {@link Operation} that describes the DDL statements, e.g. CREATE TABLE or 
CREATE FUNCTION.
+ *
+ * Different sub operations can have their special instances. For example, a
+ * create table operation will have a {@link 
org.apache.flink.table.catalog.CatalogTable} instance,
+ * while a create function operation will have a
+ * {@link org.apache.flink.table.catalog.CatalogFunction} instance.
+ */
+@Internal
+public interface CreateOperation extends Operation {
 
 Review comment:
   It is a remark interface, just like `ModifyOperation` and `QueryOperation`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #8704: [FLINK-12812][runtime] Set resource profiles for task slots

2019-07-03 Thread GitBox
xintongsong commented on a change in pull request #8704: [FLINK-12812][runtime] 
Set resource profiles for task slots
URL: https://github.com/apache/flink/pull/8704#discussion_r300232458
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ##
 @@ -240,7 +254,8 @@ public String toString() {
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB +
-   ", stateSizeInMB=" + stateSizeInMB + extend +
+   ", stateSizeInMB=" + stateSizeInMB +
+   ", managedMemoryInMB=" + managedMemoryInMB + 
extend +
 
 Review comment:
   I don't think there is any missing space. The string `extend` is either 
start with ", " or empty.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese

2019-07-03 Thread GitBox
klion26 commented on a change in pull request #8943: 
[FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
URL: https://github.com/apache/flink/pull/8943#discussion_r300227447
 
 

 ##
 File path: docs/ops/state/checkpoints.zh.md
 ##
 @@ -26,69 +26,49 @@ under the License.
 * toc
 {:toc}
 
-## Overview
+## 概述
+checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。
 
 Review comment:
   yes


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TsReaper commented on a change in pull request #8682: [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead to blink

2019-07-03 Thread GitBox
TsReaper commented on a change in pull request #8682: 
[FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce 
conversion overhead to blink
URL: https://github.com/apache/flink/pull/8682#discussion_r300226601
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/DecimalSerializerTest.java
 ##
 @@ -22,7 +22,7 @@
 import org.apache.flink.table.dataformat.Decimal;
 
 /**
- * A test for the {@link BinaryArraySerializer}.
+ * A test for the {@link BaseArraySerializer}.
 
 Review comment:
   Shoud be "DecimalSerializer"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TsReaper commented on a change in pull request #8682: [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead to blink

2019-07-03 Thread GitBox
TsReaper commented on a change in pull request #8682: 
[FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce 
conversion overhead to blink
URL: https://github.com/apache/flink/pull/8682#discussion_r300226648
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryRowSerializerTest.java
 ##
 @@ -24,7 +24,7 @@
 import org.apache.flink.table.dataformat.BinaryString;
 
 /**
- * A test for the {@link BinaryArraySerializer}.
+ * A test for the {@link BaseArraySerializer}.
 
 Review comment:
   Should be "BinaryRowSerializer"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TsReaper commented on a change in pull request #8682: [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead to blink

2019-07-03 Thread GitBox
TsReaper commented on a change in pull request #8682: 
[FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce 
conversion overhead to blink
URL: https://github.com/apache/flink/pull/8682#discussion_r300226705
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryGenericSerializerTest.java
 ##
 @@ -23,7 +23,7 @@
 import org.apache.flink.table.dataformat.BinaryGeneric;
 
 /**
- * A test for the {@link BinaryArraySerializer}.
+ * A test for the {@link BaseArraySerializer}.
 
 Review comment:
   Should be "BinaryGenericSerializer"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document

2019-07-03 Thread GitBox
wuchong commented on a change in pull request #8937: [FLINK-13040] promote 
blink table config and add to document
URL: https://github.com/apache/flink/pull/8937#discussion_r300217135
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableConfig.scala
 ##
 @@ -213,36 +226,29 @@ class TableConfig {
 *
 * @param minTime The minimum time interval for which idle state is 
retained. Set to 0 (zero) to
 *never clean-up the state.
-* @param maxTime The maximum time interval for which idle state is 
retained. May not be smaller
-*than than minTime. Set to 0 (zero) to never clean-up the 
state.
 */
-  def withIdleStateRetentionTime(minTime: Time, maxTime: Time): TableConfig = {
-if (maxTime.toMilliseconds < minTime.toMilliseconds) {
-  throw new IllegalArgumentException("maxTime may not be smaller than 
minTime.")
-}
-this.conf.setLong(TableConfigOptions.SQL_EXEC_STATE_TTL_MS, 
minTime.toMilliseconds)
-this.conf.setLong(TableConfigOptions.SQL_EXEC_STATE_TTL_MAX_MS, 
maxTime.toMilliseconds)
+  def withIdleStateRetentionTime(minTime: Time): TableConfig = {
+this.conf.setString(TableConfigOptions.SQL_EXEC_STATE_TTL,
+  String.valueOf(minTime.toMilliseconds) + " ms")
 
 Review comment:
   simplify to `minTime.toString`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document

2019-07-03 Thread GitBox
wuchong commented on a change in pull request #8937: [FLINK-13040] promote 
blink table config and add to document
URL: https://github.com/apache/flink/pull/8937#discussion_r300217863
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableConfig.scala
 ##
 @@ -200,9 +202,20 @@ class TableConfig {
 }
   }
 
+  def getMillisecondFromConfigDuration(config: ConfigOption[String]): Long = {
+val duration = Duration.create(this.conf.getString(config))
 
 Review comment:
   TableConfig will be moved to `api-java` module before 1.9 release, it will 
not depend on scala. So I think we should implement the parser ourselves or 
find other alternatives.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document

2019-07-03 Thread GitBox
wuchong commented on a change in pull request #8937: [FLINK-13040] promote 
blink table config and add to document
URL: https://github.com/apache/flink/pull/8937#discussion_r300215876
 
 

 ##
 File path: 
flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
 ##
 @@ -59,6 +59,27 @@
int position() default Integer.MAX_VALUE;
}
 
+   /**
+* Annotation used on table config to introduce additional 
table-related meta data.
+*
+* The {@link TableMeta#execMode()} argument indicates which exec 
mode the config works for,
+* for batch, streaming or both.
+*
+*/
+   @Target(ElementType.FIELD)
+   @Retention(RetentionPolicy.RUNTIME)
+   @Internal
+   public @interface TableMeta {
+   ExecMode execMode();
+   }
+
+   /**
+* Which exec mode the config works for.
+*/
+   public enum ExecMode {
+   BATCH, STREAMING, BOTH
 
 Review comment:
   What about renaming `BOTH` to `BATCH_STREAMING` to explicitly statement it 
works for both batch and streaming and more align with the generated tags. Just 
`BOTH` doesn't know what's BOTH.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on issue #8937: [FLINK-13040] promote blink table config and add to document

2019-07-03 Thread GitBox
wuchong commented on issue #8937: [FLINK-13040] promote blink table config and 
add to document
URL: https://github.com/apache/flink/pull/8937#issuecomment-508335301
 
 
   Btw, I also suggest to split this pull request into 2 commits:
   1. add documentation annotation for table
   2. improve options in blink 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document

2019-07-03 Thread GitBox
wuchong commented on a change in pull request #8937: [FLINK-13040] promote 
blink table config and add to document
URL: https://github.com/apache/flink/pull/8937#discussion_r300215596
 
 

 ##
 File path: docs/ops/config.md
 ##
 @@ -209,6 +209,13 @@ You have to configure `jobmanager.archive.fs.dir` in 
order to archive terminated
 
 {% include generated/history_server_configuration.html %}
 
+
+### Blink Table Planner
+{% include generated/planner_config_configuration.html %}
+
+### Blink Table Runtime
 
 Review comment:
   I don't think we should place the configuration in this page. I suggest to 
create a new page for sql configurations under `/dev/table/config.md`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] beyond1920 commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.

2019-07-03 Thread GitBox
beyond1920 commented on a change in pull request #8977: 
[FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner 
support add kinds of QueryOperations.
URL: https://github.com/apache/flink/pull/8977#discussion_r300226150
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/AggregateVisitors.java
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.AggregateFunctionDefinition;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.UserDefinedAggregateFunction;
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.functions.utils.AggSqlFunction;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.isFunctionOfKind;
+import static org.apache.flink.table.functions.FunctionKind.AGGREGATE;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+
+/**
+ * The class contains all kinds of visitors to visit Aggregate.
+ */
+public class AggregateVisitors {
+
+   private static final Map 
AGG_DEF_SQL_OPERATOR_MAPPING = new HashMap<>();
+
+   static {
+   
AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.AVG, 
FlinkSqlOperatorTable.AVG);
+   
AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.COUNT, 
FlinkSqlOperatorTable.COUNT);
+   
AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.MAX, 
FlinkSqlOperatorTable.MAX);
+   
AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.MIN, 
FlinkSqlOperatorTable.MIN);
+   
AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SUM, 
FlinkSqlOperatorTable.SUM);
+   
AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SUM0, 
FlinkSqlOperatorTable.SUM0);
+   
AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.STDDEV_POP, 
FlinkSqlOperatorTable.STDDEV_POP);
+   
AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.STDDEV_SAMP, 
FlinkSqlOperatorTable.STDDEV_SAMP);
+   
AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.VAR_POP, 
FlinkSqlOperatorTable.VAR_POP);
+   
AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.VAR_SAMP, 
FlinkSqlOperatorTable.VAR_SAMP);
+   
AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.COLLECT, 
FlinkSqlOperatorTable.COLLECT);
+   }
+
+   static class AggFunctionVisitor extends 
ExpressionDefaultVisitor {
+   private final FlinkTypeFactory typeFactory;
+
+   AggFunctionVisitor(FlinkTypeFactory typeFactory) {
+   this.typeFactory = typeFactory;
+   }
+
+   @Override
+   public SqlAggFunction visit(CallExpression call) {
+   Preconditions.checkArgument(isFunctionOfKind(call, 
AGGREGATE));
+   FunctionDefinition def = call.getFunctionDefinition();
+   if (AGG_DEF_SQL_OPERATOR_MAPPING.containsKey(def)) {
+   return AGG_DEF_SQL_OPERATOR_MAPPING.get(def);
+   }
+   if (BuiltInFunctionDefinitions.DISTINCT.equals(def)) {
+   Expression innerAgg = call.getChildren().get(0);
+   return innerAgg.accept(this);
+   }
+   AggregateFunctionDefinition aggDef = 
(AggregateFunctionDefinition) def;
+   UserDefinedAggregateFunction userDefinedAggregateFunc 

[GitHub] [flink] beyond1920 commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.

2019-07-03 Thread GitBox
beyond1920 commented on a change in pull request #8977: 
[FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner 
support add kinds of QueryOperations.
URL: https://github.com/apache/flink/pull/8977#discussion_r300226114
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/AggregateVisitors.java
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.AggregateFunctionDefinition;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.UserDefinedAggregateFunction;
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.functions.utils.AggSqlFunction;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.isFunctionOfKind;
+import static org.apache.flink.table.functions.FunctionKind.AGGREGATE;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+
+/**
+ * The class contains all kinds of visitors to visit Aggregate.
+ */
+public class AggregateVisitors {
 
 Review comment:
   AggregateVisitors.AggFunctionVisitor used by RexNodeConverter.
   AggregateVisitors.AggCallVisitor used by QueryOperationConverter.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8978: [FLINK-13089][table-planner-blink] Implement batch nested loop join and add join it cases in blink

2019-07-03 Thread GitBox
flinkbot commented on issue #8978: [FLINK-13089][table-planner-blink] Implement 
batch nested loop join and add join it cases in blink
URL: https://github.com/apache/flink/pull/8978#issuecomment-508334487
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13089) Implement batch nested loop join in blink

2019-07-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-13089:
---
Labels: pull-request-available  (was: )

> Implement batch nested loop join in blink
> -
>
> Key: FLINK-13089
> URL: https://issues.apache.org/jira/browse/FLINK-13089
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
>
> Nested loop join has two advantages:
> 1.Nested loop join is quicker when build row size is small.
> 2.Nested loop join support all kind of joins, include non-key join.
> Plan:
> Introduce NestedLoopJoinCodeGenerator.
> Implement BatchExecNestedLoopJoin.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xintongsong commented on issue #8704: [FLINK-12812][runtime] Set resource profiles for task slots

2019-07-03 Thread GitBox
xintongsong commented on issue #8704: [FLINK-12812][runtime] Set resource 
profiles for task slots
URL: https://github.com/apache/flink/pull/8704#issuecomment-508334308
 
 
   Thanks for the review, @StephanEwen. 
   
   I would like to explain regarding your concern about the assumption of RM/TM 
having same configuration:
   
   The reason we need to calculate TM's slot resource profiles on RM side is 
that, we need to set resource profile for `PendingTaskManagerSlot` before the 
corresponding TM is started. 
   
   Currently, Flink can assign a pending slot to a slot request before the TM 
is started and registered. In this way, the subsequent slot requests will first 
consume slots on the pending TM (for multi-slot TMs) before requesting and 
launching a new one. When the TM is registered, the SlotManager matches the 
registered new slot to a `PendingTaskManagerSlot` with the same resource 
profile, and assigns the registered slot to the same slot request that the 
pending slot is assigned to (if any).
   
   Before this PR, both the pending slot on RM side and the actual slot on TM 
side have the same resource profile `ANY`, which can be matched with the method 
`equals`. Since this PR sets the slot resource profile on TM side to the actual 
resource of the slot, we need to set the resource profile for the pending slots 
on RM side in the same way. This is way I introduced calculating TM's slot 
resource profiles on RM side, and the approximate matching.
   
   The assigning over pending slots and the RM side slot resource calculating 
only happens on Yarn/Mesos. In these scenarios, TMs do have the same 
configuration as RM does, which is transmitted from RM side. For a standalone 
cluster, there should be no pending slots because RM can not actively start any 
TM.
   
   Except for the `PendingTaskManagerSlot`, RM does use the slot resource 
profile reported from TM for matching slot request against registered slots, 
and converting requested `UNKNOWN` resource profile to a default value (as 
shown in the following PR #8846 for dynamic managed memory). Therefore, it 
should not cause problems on a standalone cluster with TMs having different 
configs.
   
   It's my bad not making these clear in codes and comments. For the rest of 
your comments, I'll address them ASAP. I especially admire your suggestions on 
encapsulation and simplifying tests. It's a good lesson for me. Thank you again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi opened a new pull request #8978: [FLINK-13089][table-planner-blink] Implement batch nested loop join in blink

2019-07-03 Thread GitBox
JingsongLi opened a new pull request #8978: [FLINK-13089][table-planner-blink] 
Implement batch nested loop join in blink
URL: https://github.com/apache/flink/pull/8978
 
 
   
   ## What is the purpose of the change
   
   Nested loop join has two advantages:
   1.Nested loop join is quicker when build row size is small.
   2.Nested loop join support all kind of joins, include non-key join.
   
   ## Brief change log
   
   Introduce NestedLoopJoinCodeGenerator.
   Implement BatchExecNestedLoopJoin.
   Add join it cases.
   
   
   ## Verifying this change
   
   it cases
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (JavaDocs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13089) Implement batch nested loop join in blink

2019-07-03 Thread Jingsong Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-13089:
-
Description: 
Nested loop join has two advantages:

1.Nested loop join is quicker when build row size is small.

2.Nested loop join support all kind of joins, include non-key join.

Plan:

Introduce NestedLoopJoinCodeGenerator.

Implement BatchExecNestedLoopJoin.

  was:
Introduce NestedLoopJoinCodeGenerator.

Implement BatchExecNestedLoopJoin.


> Implement batch nested loop join in blink
> -
>
> Key: FLINK-13089
> URL: https://issues.apache.org/jira/browse/FLINK-13089
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>
> Nested loop join has two advantages:
> 1.Nested loop join is quicker when build row size is small.
> 2.Nested loop join support all kind of joins, include non-key join.
> Plan:
> Introduce NestedLoopJoinCodeGenerator.
> Implement BatchExecNestedLoopJoin.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-13086) add Chinese documentation for catalogs

2019-07-03 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-13086:
-
Description: the ticket for corresponding English documentation is 
FLINK-12277

> add Chinese documentation for catalogs
> --
>
> Key: FLINK-13086
> URL: https://issues.apache.org/jira/browse/FLINK-13086
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Connectors / Hive, Documentation, 
> Table SQL / API
>Reporter: Bowen Li
>Assignee: frank wang
>Priority: Major
> Fix For: 1.9.0
>
>
> the ticket for corresponding English documentation is FLINK-12277



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13089) Implement batch nested loop join in blink

2019-07-03 Thread Jingsong Lee (JIRA)
Jingsong Lee created FLINK-13089:


 Summary: Implement batch nested loop join in blink
 Key: FLINK-13089
 URL: https://issues.apache.org/jira/browse/FLINK-13089
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner
Reporter: Jingsong Lee
Assignee: Jingsong Lee


Introduce NestedLoopJoinCodeGenerator.

Implement BatchExecNestedLoopJoin.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.

2019-07-03 Thread GitBox
JingsongLi commented on a change in pull request #8977: 
[FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner 
support add kinds of QueryOperations.
URL: https://github.com/apache/flink/pull/8977#discussion_r300223342
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/AggregateVisitors.java
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.AggregateFunctionDefinition;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.UserDefinedAggregateFunction;
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.functions.utils.AggSqlFunction;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.isFunctionOfKind;
+import static org.apache.flink.table.functions.FunctionKind.AGGREGATE;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+
+/**
+ * The class contains all kinds of visitors to visit Aggregate.
+ */
+public class AggregateVisitors {
+
+   private static final Map 
AGG_DEF_SQL_OPERATOR_MAPPING = new HashMap<>();
+
+   static {
+   
AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.AVG, 
FlinkSqlOperatorTable.AVG);
+   
AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.COUNT, 
FlinkSqlOperatorTable.COUNT);
+   
AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.MAX, 
FlinkSqlOperatorTable.MAX);
+   
AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.MIN, 
FlinkSqlOperatorTable.MIN);
+   
AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SUM, 
FlinkSqlOperatorTable.SUM);
+   
AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SUM0, 
FlinkSqlOperatorTable.SUM0);
+   
AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.STDDEV_POP, 
FlinkSqlOperatorTable.STDDEV_POP);
+   
AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.STDDEV_SAMP, 
FlinkSqlOperatorTable.STDDEV_SAMP);
+   
AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.VAR_POP, 
FlinkSqlOperatorTable.VAR_POP);
+   
AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.VAR_SAMP, 
FlinkSqlOperatorTable.VAR_SAMP);
+   
AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.COLLECT, 
FlinkSqlOperatorTable.COLLECT);
+   }
+
+   static class AggFunctionVisitor extends 
ExpressionDefaultVisitor {
+   private final FlinkTypeFactory typeFactory;
+
+   AggFunctionVisitor(FlinkTypeFactory typeFactory) {
+   this.typeFactory = typeFactory;
+   }
+
+   @Override
+   public SqlAggFunction visit(CallExpression call) {
+   Preconditions.checkArgument(isFunctionOfKind(call, 
AGGREGATE));
+   FunctionDefinition def = call.getFunctionDefinition();
+   if (AGG_DEF_SQL_OPERATOR_MAPPING.containsKey(def)) {
+   return AGG_DEF_SQL_OPERATOR_MAPPING.get(def);
+   }
+   if (BuiltInFunctionDefinitions.DISTINCT.equals(def)) {
+   Expression innerAgg = call.getChildren().get(0);
+   return innerAgg.accept(this);
+   }
+   AggregateFunctionDefinition aggDef = 
(AggregateFunctionDefinition) def;
+   UserDefinedAggregateFunction userDefinedAggregateFunc 

[GitHub] [flink] banmoy commented on issue #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable

2019-07-03 Thread GitBox
banmoy commented on issue #8611: [FLINK-12693][state] Store state per key-group 
in CopyOnWriteStateTable
URL: https://github.com/apache/flink/pull/8611#issuecomment-508331445
 
 
   @StefanRRichter Thanks for the review. I have addressed the comments, and 
please help to review again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-12926) Main thread checking in some tests fails

2019-07-03 Thread Zhu Zhu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878302#comment-16878302
 ] 

Zhu Zhu edited comment on FLINK-12926 at 7/4/19 4:12 AM:
-

Hi [~till.rohrmann], from my observation, the issue is happening though it does 
not break current tests.

Below are some cases it happens or may happen:

1. Even though the tests do not trigger actions from other thread, the 
production logic might do it, e.g. *Execution#deploy()* as shown in the 
attached picture, which happens in most of the tests mentioned above. It does 
not break tests since it is not in the critical path and the failed main thread 
checking does not cause failovers. 

!Execution#deploy.jpg|width=568,height=328!

2. The *TestingComponentMainThreadExecutorServiceAdapter* uses 
*DirectScheduledExecutorService* as the underlying ScheduledExecutorService. 
However, DirectScheduledExecutorService will schedule tasks from another 
thread. So if any *mainThreadExecutor.schedule** action is invoked in tests or 
production process, it may also violate the main thread checking. No test 
breaks for it yet. But I think we just fortunately dodged(Or intentional?). 
e.g. 

    -  FixedDelayRestartStrategy. No test breaks because no test uses 
FixedDelayRestartStrategy to do failover yet.

    -  HeartbeatMonitor. No test breaks because it does not check main thread, 
HeartbeatManagerTest#testHeartbeatTimeout actually does the timeout handling in 
another pool thread.

 

I'd be OK to close this issue as no test breaks yet, as long as we are already 
aware of this. 

The manual executor way as we explored in FLINK-12876  can be a solution for 
this case.

 


was (Author: zhuzh):
Hi [~till.rohrmann], from my observation, the issue is happening though it does 
not break current tests.

Below are some cases it happens or may happen:

1. Even though the tests do not trigger actions from other thread, the 
production logic might do it, e.g. *Execution#deploy()* as shown in the 
attached picture, which happens in most of the tests mentioned above. It does 
not break tests since it is not in the critical path and the failed main thread 
checking does not cause failovers.

2. Besides, the *TestingComponentMainThreadExecutorServiceAdapter* uses 
*DirectScheduledExecutorService* as the underlying ScheduledExecutorService. 
However, DirectScheduledExecutorService will schedule tasks from another 
thread. So if any mainThreadExecutor.schedule* action is invoked in tests or 
production process, it may also violate the main thread checking. No test 
breaks for it yet. But I think we just fortunately dodged(Or intentional?). 
e.g. 

    -  FixedDelayRestartStrategy. No test breaks because no test uses 
FixedDelayRestartStrategy to do failover yet.

    -  HeartbeatMonitor. No test breaks because it does not check main thread, 
HeartbeatManagerTest#testHeartbeatTimeout actually does the timeout handling in 
another pool thread.

 

I'd be OK to close this issue as no test breaks yet, as long as we are already 
aware of this. 

The manual executor way as we explored in [FLINK-12876 
|https://issues.apache.org/jira/browse/FLINK-12876] can be a solution for this 
case.

 

!Execution#deploy.jpg|width=568,height=328!

> Main thread checking in some tests fails
> 
>
> Key: FLINK-12926
> URL: https://issues.apache.org/jira/browse/FLINK-12926
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.9.0
>Reporter: Zhu Zhu
>Priority: Major
> Attachments: Execution#deploy.jpg, mainThreadCheckFailure.log
>
>
> Currently all JM side job changing actions are expected to be taken in 
> JobMaster main thread.
> In current Flink tests, many cases tend to use the test main thread as the JM 
> main thread. This can lead to 2 issues:
> 1. TestingComponentMainThreadExecutorServiceAdapter is a direct executor, so 
> if it is invoked from any other thread, it will break the main thread 
> checking and fail the submitted action (as in the attached log 
> [^mainThreadCheckFailure.log])
> 2. The test main thread does not support other actions queued in its 
> executor, as the test will end once the current test thread action(the 
> current running test body) is done
>  
> In my observation, most cases which starts 
> ExecutionGraph.scheduleForExecution() will encounter this issue. Cases 
> include ExecutionGraphRestartTest, FailoverRegionTest, 
> ConcurrentFailoverStrategyExecutionGraphTest, GlobalModVersionTest, 
> ExecutionGraphDeploymentTest, etc.
>  
> One solution in my mind is to create a ScheduledExecutorService for those 
> tests, use it as the main thread and run the test body in this thread.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12926) Main thread checking in some tests fails

2019-07-03 Thread Zhu Zhu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878302#comment-16878302
 ] 

Zhu Zhu edited comment on FLINK-12926 at 7/4/19 4:10 AM:
-

Hi [~till.rohrmann], from my observation, the issue is happening though it does 
not break current tests.

Below are some cases it happens or may happen:

1. Even though the tests do not trigger actions from other thread, the 
production logic might do it, e.g. *Execution#deploy()* as shown in the 
attached picture, which happens in most of the tests mentioned above. It does 
not break tests since it is not in the critical path and the failed main thread 
checking does not cause failovers.

2. Besides, the *TestingComponentMainThreadExecutorServiceAdapter* uses 
*DirectScheduledExecutorService* as the underlying ScheduledExecutorService. 
However, DirectScheduledExecutorService will schedule tasks from another 
thread. So if any mainThreadExecutor.schedule* action is invoked in tests or 
production process, it may also violate the main thread checking. No test 
breaks for it yet. But I think we just fortunately dodged(Or intentional?). 
e.g. 

    -  FixedDelayRestartStrategy. No test breaks because no test uses 
FixedDelayRestartStrategy to do failover yet.

    -  HeartbeatMonitor. No test breaks because it does not check main thread, 
HeartbeatManagerTest#testHeartbeatTimeout actually does the timeout handling in 
another pool thread.

 

I'd be OK to close this issue as no test breaks yet, as long as we are already 
aware of this. 

The manual executor way as we explored in [FLINK-12876 
|https://issues.apache.org/jira/browse/FLINK-12876] can be a solution for this 
case.

 

!Execution#deploy.jpg|width=568,height=328!


was (Author: zhuzh):
Hi [~till.rohrmann], from my observation, the issue is happening though it does 
not break current tests.

Below are some cases it happens or may happen:

1. Even though the tests do not trigger actions from other thread, the 
production logic might do it, e.g. *Execution#deploy()* as in the attached 
picture, this happens but does not break tests since it is not in the critical 
path and the failed main thread checking does not cause failovers.

2. Besides, the *TestingComponentMainThreadExecutorServiceAdapter* uses 
*DirectScheduledExecutorService* as the underlying ScheduledExecutorService. 
However, DirectScheduledExecutorService will schedule tasks from another 
thread. So if any mainThreadExecutor.schedule* action is invoked in tests or 
production process, it may also violate the main thread checking. No test 
breaks for it yet. But I think we just fortunately dodged(Or intentional?). 
e.g. 

    -  FixedDelayRestartStrategy. No test breaks because no test uses 
FixedDelayRestartStrategy to do failover yet.

    -  HeartbeatMonitor. No test breaks because it does not check main thread, 
HeartbeatManagerTest#testHeartbeatTimeout actually does the timeout handling in 
another pool thread.

 

!Execution#deploy.jpg|width=568,height=328!

> Main thread checking in some tests fails
> 
>
> Key: FLINK-12926
> URL: https://issues.apache.org/jira/browse/FLINK-12926
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.9.0
>Reporter: Zhu Zhu
>Priority: Major
> Attachments: Execution#deploy.jpg, mainThreadCheckFailure.log
>
>
> Currently all JM side job changing actions are expected to be taken in 
> JobMaster main thread.
> In current Flink tests, many cases tend to use the test main thread as the JM 
> main thread. This can lead to 2 issues:
> 1. TestingComponentMainThreadExecutorServiceAdapter is a direct executor, so 
> if it is invoked from any other thread, it will break the main thread 
> checking and fail the submitted action (as in the attached log 
> [^mainThreadCheckFailure.log])
> 2. The test main thread does not support other actions queued in its 
> executor, as the test will end once the current test thread action(the 
> current running test body) is done
>  
> In my observation, most cases which starts 
> ExecutionGraph.scheduleForExecution() will encounter this issue. Cases 
> include ExecutionGraphRestartTest, FailoverRegionTest, 
> ConcurrentFailoverStrategyExecutionGraphTest, GlobalModVersionTest, 
> ExecutionGraphDeploymentTest, etc.
>  
> One solution in my mind is to create a ScheduledExecutorService for those 
> tests, use it as the main thread and run the test body in this thread.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable

2019-07-03 Thread GitBox
banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store 
state per key-group in CopyOnWriteStateTable
URL: https://github.com/apache/flink/pull/8611#discussion_r300222482
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
 ##
 @@ -39,7 +47,8 @@
  * @param  type of namespace
  * @param  type of state
  */
-public abstract class StateTable implements StateSnapshotRestore {
+public abstract class StateTable
 
 Review comment:
   I prefer to use the inheritance for the following reasons:
   1. in the followup spill implementation, we need some memory statistics 
about the `StateTable` to decide whether to build a on-heap map or a on-disk 
map. I think it's not convenient for `MapAndSnapshotFactory ` to communicate 
with `StateTable`
   2. implementation of `StateTable` may need it's own method, such as 
`getStateMapSnapshotArray` in `CopyOnWriteStateTable` and more custom methods 
in the spill implementation
   3. if we introduce `MapAndSnapshotFactory`, we should also change the way to 
create `StateTable` using `SnapshotStrategySynchronicityBehavior` currently, 
but I think this is not in the scope of this refactor


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] beyond1920 commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.

2019-07-03 Thread GitBox
beyond1920 commented on a change in pull request #8977: 
[FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner 
support add kinds of QueryOperations.
URL: https://github.com/apache/flink/pull/8977#discussion_r300222311
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeExpression.java
 ##
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.table.types.DataType;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Dummy wrapper for expressions that were converted to RexNode in a different 
way.
+ */
+public class RexNodeExpression implements ResolvedExpression {
 
 Review comment:
   Of course.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] beyond1920 commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.

2019-07-03 Thread GitBox
beyond1920 commented on a change in pull request #8977: 
[FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner 
support add kinds of QueryOperations.
URL: https://github.com/apache/flink/pull/8977#discussion_r300222038
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
 ##
 @@ -79,25 +112,196 @@
private final RelBuilder relBuilder;
 
 Review comment:
   Sure. Will update to use ResolvedExpressionVisitor  after introduce 
Expression resolve in AggCodeGen later. which may be done in another pr.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] beyond1920 commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.

2019-07-03 Thread GitBox
beyond1920 commented on a change in pull request #8977: 
[FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner 
support add kinds of QueryOperations.
URL: https://github.com/apache/flink/pull/8977#discussion_r300221925
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
 ##
 @@ -79,25 +112,196 @@
private final RelBuilder relBuilder;
private final FlinkTypeFactory typeFactory;
 
+   private static final int DECIMAL_PRECISION_NEEDED_FOR_LONG = 19;
+
+   /**
+* The mapping only keeps part of FunctionDefinitions, which could be 
converted to SqlOperator in a very simple
+* way.
+*/
+   private static final Map 
SIMPLE_DEF_SQL_OPERATOR_MAPPING = new HashMap<>();
+
+   static {
+   // logic functions
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.AND, 
FlinkSqlOperatorTable.AND);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OR, 
FlinkSqlOperatorTable.OR);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT, 
FlinkSqlOperatorTable.NOT);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IF, 
FlinkSqlOperatorTable.CASE);
+
+   // comparison functions
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.EQUALS, 
FlinkSqlOperatorTable.EQUALS);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   .put(BuiltInFunctionDefinitions.GREATER_THAN, 
FlinkSqlOperatorTable.GREATER_THAN);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   
.put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, 
FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LESS_THAN, 
FlinkSqlOperatorTable.LESS_THAN);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   
.put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, 
FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT_EQUALS, 
FlinkSqlOperatorTable.NOT_EQUALS);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NULL, 
FlinkSqlOperatorTable.IS_NULL);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_NULL, 
FlinkSqlOperatorTable.IS_NOT_NULL);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_TRUE, 
FlinkSqlOperatorTable.IS_TRUE);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_FALSE, 
FlinkSqlOperatorTable.IS_FALSE);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_TRUE, 
FlinkSqlOperatorTable.IS_NOT_TRUE);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   .put(BuiltInFunctionDefinitions.IS_NOT_FALSE, 
FlinkSqlOperatorTable.IS_NOT_FALSE);
+
+   // string functions
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CHAR_LENGTH, 
FlinkSqlOperatorTable.CHAR_LENGTH);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.INIT_CAP, 
FlinkSqlOperatorTable.INITCAP);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LIKE, 
FlinkSqlOperatorTable.LIKE);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOWER, 
FlinkSqlOperatorTable.LOWER);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SIMILAR, 
FlinkSqlOperatorTable.SIMILAR_TO);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SUBSTRING, 
FlinkSqlOperatorTable.SUBSTRING);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.UPPER, 
FlinkSqlOperatorTable.UPPER);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.POSITION, 
FlinkSqlOperatorTable.POSITION);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OVERLAY, 
FlinkSqlOperatorTable.OVERLAY);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT, 
FlinkSqlOperatorTable.CONCAT_FUNCTION);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT_WS, 
FlinkSqlOperatorTable.CONCAT_WS);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LPAD, 
FlinkSqlOperatorTable.LPAD);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.RPAD, 
FlinkSqlOperatorTable.RPAD);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   .put(BuiltInFunctionDefinitions.REGEXP_EXTRACT, 
FlinkSqlOperatorTable.REGEXP_EXTRACT);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.FROM_BASE64, 

[jira] [Commented] (FLINK-11607) Translate the "DataStream API Tutorial" page into Chinese

2019-07-03 Thread LakeShen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878304#comment-16878304
 ] 

LakeShen commented on FLINK-11607:
--

Hi Zhang Ziqiang,have you work on this?If not ,could assign this to me . I want 
to translate this page, thanks.

> Translate the "DataStream API Tutorial" page into Chinese
> -
>
> Key: FLINK-11607
> URL: https://issues.apache.org/jira/browse/FLINK-11607
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Jark Wu
>Assignee: Zhang Ziqiang
>Priority: Major
>
> The page url is 
> https://ci.apache.org/projects/flink/flink-docs-master/tutorials/datastream_api.html
> The markdown file is located in flink/docs/tutorials/datastream_api.zh.md
> The markdown file will be created once FLINK-11529 is merged.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12926) Main thread checking in some tests fails

2019-07-03 Thread Zhu Zhu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878302#comment-16878302
 ] 

Zhu Zhu edited comment on FLINK-12926 at 7/4/19 3:56 AM:
-

Hi [~till.rohrmann], from my observation, the issue is happening though it does 
not break current tests.

Below are some cases it happens or may happen:

1. Even though the tests do not trigger actions from other thread, the 
production logic might do it, e.g. *Execution#deploy()* as in the attached 
picture, this happens but does not break tests since it is not in the critical 
path and the failed main thread checking does not cause failovers.

2. Besides, the *TestingComponentMainThreadExecutorServiceAdapter* uses 
*DirectScheduledExecutorService* as the underlying ScheduledExecutorService. 
However, DirectScheduledExecutorService will schedule tasks from another 
thread. So if any mainThreadExecutor.schedule* action is invoked in tests or 
production process, it may also violate the main thread checking. No test 
breaks for it yet. But I think we just fortunately dodged(Or intentional?). 
e.g. 

    -  FixedDelayRestartStrategy. No test breaks because no test uses 
FixedDelayRestartStrategy to do failover yet.

    -  HeartbeatMonitor. No test breaks because it does not check main thread, 
HeartbeatManagerTest#testHeartbeatTimeout actually does the timeout handling in 
another pool thread.

 

!Execution#deploy.jpg|width=568,height=328!


was (Author: zhuzh):
>From my observation, the issue is happening though it does not break current 
>tests.

Below are some cases it happens or may happen:

1. Even though the tests do not trigger actions from other thread, the 
production logic might do it, e.g. *Execution#deploy()* as in the attached 
picture, this happens but does not break tests since it is not in the critical 
path and the failed main thread checking does not cause failovers.

2. Besides, the *TestingComponentMainThreadExecutorServiceAdapter* uses 
*DirectScheduledExecutorService* as the underlying ScheduledExecutorService. 
However, DirectScheduledExecutorService will schedule tasks from another 
thread. So if any mainThreadExecutor.schedule* action is invoked in tests or 
production process, it may also violate the main thread checking. No test 
breaks for it yet. But I think we just fortunately dodged(Or intentional?). 
e.g. 

    -  FixedDelayRestartStrategy. No test breaks because no test uses 
FixedDelayRestartStrategy to do failover yet.

    -  HeartbeatMonitor. No test breaks because it does not check main thread, 
HeartbeatManagerTest#testHeartbeatTimeout actually does the timeout handling in 
another pool thread.

 

!Execution#deploy.jpg!

> Main thread checking in some tests fails
> 
>
> Key: FLINK-12926
> URL: https://issues.apache.org/jira/browse/FLINK-12926
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.9.0
>Reporter: Zhu Zhu
>Priority: Major
> Attachments: Execution#deploy.jpg, mainThreadCheckFailure.log
>
>
> Currently all JM side job changing actions are expected to be taken in 
> JobMaster main thread.
> In current Flink tests, many cases tend to use the test main thread as the JM 
> main thread. This can lead to 2 issues:
> 1. TestingComponentMainThreadExecutorServiceAdapter is a direct executor, so 
> if it is invoked from any other thread, it will break the main thread 
> checking and fail the submitted action (as in the attached log 
> [^mainThreadCheckFailure.log])
> 2. The test main thread does not support other actions queued in its 
> executor, as the test will end once the current test thread action(the 
> current running test body) is done
>  
> In my observation, most cases which starts 
> ExecutionGraph.scheduleForExecution() will encounter this issue. Cases 
> include ExecutionGraphRestartTest, FailoverRegionTest, 
> ConcurrentFailoverStrategyExecutionGraphTest, GlobalModVersionTest, 
> ExecutionGraphDeploymentTest, etc.
>  
> One solution in my mind is to create a ScheduledExecutorService for those 
> tests, use it as the main thread and run the test body in this thread.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12926) Main thread checking in some tests fails

2019-07-03 Thread Zhu Zhu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878302#comment-16878302
 ] 

Zhu Zhu commented on FLINK-12926:
-

>From my observation, the issue is happening though it does not break current 
>tests.

Below are some cases it happens or may happen:

1. Even though the tests do not trigger actions from other thread, the 
production logic might do it, e.g. *Execution#deploy()* as in the attached 
picture, this happens but does not break tests since it is not in the critical 
path and the failed main thread checking does not cause failovers.

2. Besides, the *TestingComponentMainThreadExecutorServiceAdapter* uses 
*DirectScheduledExecutorService* as the underlying ScheduledExecutorService. 
However, DirectScheduledExecutorService will schedule tasks from another 
thread. So if any mainThreadExecutor.schedule* action is invoked in tests or 
production process, it may also violate the main thread checking. No test 
breaks for it yet. But I think we just fortunately dodged(Or intentional?). 
e.g. 

    -  FixedDelayRestartStrategy. No test breaks because no test uses 
FixedDelayRestartStrategy to do failover yet.

    -  HeartbeatMonitor. No test breaks because it does not check main thread, 
HeartbeatManagerTest#testHeartbeatTimeout actually does the timeout handling in 
another pool thread.

 

!Execution#deploy.jpg!

> Main thread checking in some tests fails
> 
>
> Key: FLINK-12926
> URL: https://issues.apache.org/jira/browse/FLINK-12926
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.9.0
>Reporter: Zhu Zhu
>Priority: Major
> Attachments: Execution#deploy.jpg, mainThreadCheckFailure.log
>
>
> Currently all JM side job changing actions are expected to be taken in 
> JobMaster main thread.
> In current Flink tests, many cases tend to use the test main thread as the JM 
> main thread. This can lead to 2 issues:
> 1. TestingComponentMainThreadExecutorServiceAdapter is a direct executor, so 
> if it is invoked from any other thread, it will break the main thread 
> checking and fail the submitted action (as in the attached log 
> [^mainThreadCheckFailure.log])
> 2. The test main thread does not support other actions queued in its 
> executor, as the test will end once the current test thread action(the 
> current running test body) is done
>  
> In my observation, most cases which starts 
> ExecutionGraph.scheduleForExecution() will encounter this issue. Cases 
> include ExecutionGraphRestartTest, FailoverRegionTest, 
> ConcurrentFailoverStrategyExecutionGraphTest, GlobalModVersionTest, 
> ExecutionGraphDeploymentTest, etc.
>  
> One solution in my mind is to create a ScheduledExecutorService for those 
> tests, use it as the main thread and run the test body in this thread.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] tianchen92 closed pull request #8624: [FLINK-10932]Initial flink-kubernetes module with empty implementation

2019-07-03 Thread GitBox
tianchen92 closed pull request #8624: [FLINK-10932]Initial flink-kubernetes 
module with empty implementation
URL: https://github.com/apache/flink/pull/8624
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tianchen92 opened a new pull request #8624: [FLINK-10932]Initial flink-kubernetes module with empty implementation

2019-07-03 Thread GitBox
tianchen92 opened a new pull request #8624: [FLINK-10932]Initial 
flink-kubernetes module with empty implementation
URL: https://github.com/apache/flink/pull/8624
 
 
   
   ## What is the purpose of the change
   
   *Initialize the skeleton module to start native k8s integration, related to 
[FLINK-10932](https://issues.apache.org/jira/browse/FLINK-10932)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *Add flink-kubernetes module*
 - *Add Interface of Kubernetes client*
   
   
   ## Verifying this change
   
   *This change is an initial empty implementation, verified by mvn clean 
install and Travis CI.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (the document will be introduced 
in later pull requests)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] beyond1920 commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.

2019-07-03 Thread GitBox
beyond1920 commented on a change in pull request #8977: 
[FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner 
support add kinds of QueryOperations.
URL: https://github.com/apache/flink/pull/8977#discussion_r300220124
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
 ##
 @@ -79,25 +112,196 @@
private final RelBuilder relBuilder;
private final FlinkTypeFactory typeFactory;
 
+   private static final int DECIMAL_PRECISION_NEEDED_FOR_LONG = 19;
+
+   /**
+* The mapping only keeps part of FunctionDefinitions, which could be 
converted to SqlOperator in a very simple
+* way.
+*/
+   private static final Map 
SIMPLE_DEF_SQL_OPERATOR_MAPPING = new HashMap<>();
+
+   static {
+   // logic functions
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.AND, 
FlinkSqlOperatorTable.AND);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OR, 
FlinkSqlOperatorTable.OR);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT, 
FlinkSqlOperatorTable.NOT);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IF, 
FlinkSqlOperatorTable.CASE);
+
+   // comparison functions
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.EQUALS, 
FlinkSqlOperatorTable.EQUALS);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   .put(BuiltInFunctionDefinitions.GREATER_THAN, 
FlinkSqlOperatorTable.GREATER_THAN);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   
.put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, 
FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LESS_THAN, 
FlinkSqlOperatorTable.LESS_THAN);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   
.put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, 
FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT_EQUALS, 
FlinkSqlOperatorTable.NOT_EQUALS);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NULL, 
FlinkSqlOperatorTable.IS_NULL);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_NULL, 
FlinkSqlOperatorTable.IS_NOT_NULL);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_TRUE, 
FlinkSqlOperatorTable.IS_TRUE);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_FALSE, 
FlinkSqlOperatorTable.IS_FALSE);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_TRUE, 
FlinkSqlOperatorTable.IS_NOT_TRUE);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   .put(BuiltInFunctionDefinitions.IS_NOT_FALSE, 
FlinkSqlOperatorTable.IS_NOT_FALSE);
+
+   // string functions
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CHAR_LENGTH, 
FlinkSqlOperatorTable.CHAR_LENGTH);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.INIT_CAP, 
FlinkSqlOperatorTable.INITCAP);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LIKE, 
FlinkSqlOperatorTable.LIKE);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOWER, 
FlinkSqlOperatorTable.LOWER);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SIMILAR, 
FlinkSqlOperatorTable.SIMILAR_TO);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SUBSTRING, 
FlinkSqlOperatorTable.SUBSTRING);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.UPPER, 
FlinkSqlOperatorTable.UPPER);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.POSITION, 
FlinkSqlOperatorTable.POSITION);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OVERLAY, 
FlinkSqlOperatorTable.OVERLAY);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT, 
FlinkSqlOperatorTable.CONCAT_FUNCTION);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT_WS, 
FlinkSqlOperatorTable.CONCAT_WS);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LPAD, 
FlinkSqlOperatorTable.LPAD);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.RPAD, 
FlinkSqlOperatorTable.RPAD);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   .put(BuiltInFunctionDefinitions.REGEXP_EXTRACT, 
FlinkSqlOperatorTable.REGEXP_EXTRACT);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.FROM_BASE64, 

[GitHub] [flink] beyond1920 commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.

2019-07-03 Thread GitBox
beyond1920 commented on a change in pull request #8977: 
[FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner 
support add kinds of QueryOperations.
URL: https://github.com/apache/flink/pull/8977#discussion_r300219868
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
 ##
 @@ -79,25 +112,196 @@
private final RelBuilder relBuilder;
private final FlinkTypeFactory typeFactory;
 
+   private static final int DECIMAL_PRECISION_NEEDED_FOR_LONG = 19;
+
+   /**
+* The mapping only keeps part of FunctionDefinitions, which could be 
converted to SqlOperator in a very simple
+* way.
+*/
+   private static final Map 
SIMPLE_DEF_SQL_OPERATOR_MAPPING = new HashMap<>();
+
+   static {
+   // logic functions
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.AND, 
FlinkSqlOperatorTable.AND);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OR, 
FlinkSqlOperatorTable.OR);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT, 
FlinkSqlOperatorTable.NOT);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IF, 
FlinkSqlOperatorTable.CASE);
+
+   // comparison functions
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.EQUALS, 
FlinkSqlOperatorTable.EQUALS);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   .put(BuiltInFunctionDefinitions.GREATER_THAN, 
FlinkSqlOperatorTable.GREATER_THAN);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   
.put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, 
FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LESS_THAN, 
FlinkSqlOperatorTable.LESS_THAN);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   
.put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, 
FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT_EQUALS, 
FlinkSqlOperatorTable.NOT_EQUALS);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NULL, 
FlinkSqlOperatorTable.IS_NULL);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_NULL, 
FlinkSqlOperatorTable.IS_NOT_NULL);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_TRUE, 
FlinkSqlOperatorTable.IS_TRUE);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_FALSE, 
FlinkSqlOperatorTable.IS_FALSE);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_TRUE, 
FlinkSqlOperatorTable.IS_NOT_TRUE);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   .put(BuiltInFunctionDefinitions.IS_NOT_FALSE, 
FlinkSqlOperatorTable.IS_NOT_FALSE);
+
+   // string functions
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CHAR_LENGTH, 
FlinkSqlOperatorTable.CHAR_LENGTH);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.INIT_CAP, 
FlinkSqlOperatorTable.INITCAP);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LIKE, 
FlinkSqlOperatorTable.LIKE);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOWER, 
FlinkSqlOperatorTable.LOWER);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SIMILAR, 
FlinkSqlOperatorTable.SIMILAR_TO);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SUBSTRING, 
FlinkSqlOperatorTable.SUBSTRING);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.UPPER, 
FlinkSqlOperatorTable.UPPER);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.POSITION, 
FlinkSqlOperatorTable.POSITION);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OVERLAY, 
FlinkSqlOperatorTable.OVERLAY);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT, 
FlinkSqlOperatorTable.CONCAT_FUNCTION);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT_WS, 
FlinkSqlOperatorTable.CONCAT_WS);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LPAD, 
FlinkSqlOperatorTable.LPAD);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.RPAD, 
FlinkSqlOperatorTable.RPAD);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   .put(BuiltInFunctionDefinitions.REGEXP_EXTRACT, 
FlinkSqlOperatorTable.REGEXP_EXTRACT);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.FROM_BASE64, 

[GitHub] [flink] JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.

2019-07-03 Thread GitBox
JingsongLi commented on a change in pull request #8977: 
[FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner 
support add kinds of QueryOperations.
URL: https://github.com/apache/flink/pull/8977#discussion_r300213699
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
 ##
 @@ -79,25 +112,196 @@
private final RelBuilder relBuilder;
private final FlinkTypeFactory typeFactory;
 
+   private static final int DECIMAL_PRECISION_NEEDED_FOR_LONG = 19;
+
+   /**
+* The mapping only keeps part of FunctionDefinitions, which could be 
converted to SqlOperator in a very simple
+* way.
+*/
+   private static final Map 
SIMPLE_DEF_SQL_OPERATOR_MAPPING = new HashMap<>();
+
+   static {
+   // logic functions
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.AND, 
FlinkSqlOperatorTable.AND);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OR, 
FlinkSqlOperatorTable.OR);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT, 
FlinkSqlOperatorTable.NOT);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IF, 
FlinkSqlOperatorTable.CASE);
+
+   // comparison functions
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.EQUALS, 
FlinkSqlOperatorTable.EQUALS);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   .put(BuiltInFunctionDefinitions.GREATER_THAN, 
FlinkSqlOperatorTable.GREATER_THAN);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   
.put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, 
FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LESS_THAN, 
FlinkSqlOperatorTable.LESS_THAN);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   
.put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, 
FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT_EQUALS, 
FlinkSqlOperatorTable.NOT_EQUALS);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NULL, 
FlinkSqlOperatorTable.IS_NULL);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_NULL, 
FlinkSqlOperatorTable.IS_NOT_NULL);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_TRUE, 
FlinkSqlOperatorTable.IS_TRUE);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_FALSE, 
FlinkSqlOperatorTable.IS_FALSE);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_TRUE, 
FlinkSqlOperatorTable.IS_NOT_TRUE);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   .put(BuiltInFunctionDefinitions.IS_NOT_FALSE, 
FlinkSqlOperatorTable.IS_NOT_FALSE);
+
+   // string functions
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CHAR_LENGTH, 
FlinkSqlOperatorTable.CHAR_LENGTH);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.INIT_CAP, 
FlinkSqlOperatorTable.INITCAP);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LIKE, 
FlinkSqlOperatorTable.LIKE);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOWER, 
FlinkSqlOperatorTable.LOWER);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SIMILAR, 
FlinkSqlOperatorTable.SIMILAR_TO);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SUBSTRING, 
FlinkSqlOperatorTable.SUBSTRING);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.UPPER, 
FlinkSqlOperatorTable.UPPER);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.POSITION, 
FlinkSqlOperatorTable.POSITION);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OVERLAY, 
FlinkSqlOperatorTable.OVERLAY);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT, 
FlinkSqlOperatorTable.CONCAT_FUNCTION);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT_WS, 
FlinkSqlOperatorTable.CONCAT_WS);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LPAD, 
FlinkSqlOperatorTable.LPAD);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.RPAD, 
FlinkSqlOperatorTable.RPAD);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   .put(BuiltInFunctionDefinitions.REGEXP_EXTRACT, 
FlinkSqlOperatorTable.REGEXP_EXTRACT);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.FROM_BASE64, 

[GitHub] [flink] JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.

2019-07-03 Thread GitBox
JingsongLi commented on a change in pull request #8977: 
[FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner 
support add kinds of QueryOperations.
URL: https://github.com/apache/flink/pull/8977#discussion_r300213763
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
 ##
 @@ -79,25 +112,196 @@
private final RelBuilder relBuilder;
private final FlinkTypeFactory typeFactory;
 
+   private static final int DECIMAL_PRECISION_NEEDED_FOR_LONG = 19;
+
+   /**
+* The mapping only keeps part of FunctionDefinitions, which could be 
converted to SqlOperator in a very simple
+* way.
+*/
+   private static final Map 
SIMPLE_DEF_SQL_OPERATOR_MAPPING = new HashMap<>();
+
+   static {
+   // logic functions
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.AND, 
FlinkSqlOperatorTable.AND);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OR, 
FlinkSqlOperatorTable.OR);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT, 
FlinkSqlOperatorTable.NOT);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IF, 
FlinkSqlOperatorTable.CASE);
+
+   // comparison functions
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.EQUALS, 
FlinkSqlOperatorTable.EQUALS);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   .put(BuiltInFunctionDefinitions.GREATER_THAN, 
FlinkSqlOperatorTable.GREATER_THAN);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   
.put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, 
FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LESS_THAN, 
FlinkSqlOperatorTable.LESS_THAN);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   
.put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, 
FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT_EQUALS, 
FlinkSqlOperatorTable.NOT_EQUALS);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NULL, 
FlinkSqlOperatorTable.IS_NULL);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_NULL, 
FlinkSqlOperatorTable.IS_NOT_NULL);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_TRUE, 
FlinkSqlOperatorTable.IS_TRUE);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_FALSE, 
FlinkSqlOperatorTable.IS_FALSE);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_TRUE, 
FlinkSqlOperatorTable.IS_NOT_TRUE);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   .put(BuiltInFunctionDefinitions.IS_NOT_FALSE, 
FlinkSqlOperatorTable.IS_NOT_FALSE);
+
+   // string functions
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CHAR_LENGTH, 
FlinkSqlOperatorTable.CHAR_LENGTH);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.INIT_CAP, 
FlinkSqlOperatorTable.INITCAP);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LIKE, 
FlinkSqlOperatorTable.LIKE);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOWER, 
FlinkSqlOperatorTable.LOWER);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SIMILAR, 
FlinkSqlOperatorTable.SIMILAR_TO);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SUBSTRING, 
FlinkSqlOperatorTable.SUBSTRING);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.UPPER, 
FlinkSqlOperatorTable.UPPER);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.POSITION, 
FlinkSqlOperatorTable.POSITION);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OVERLAY, 
FlinkSqlOperatorTable.OVERLAY);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT, 
FlinkSqlOperatorTable.CONCAT_FUNCTION);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT_WS, 
FlinkSqlOperatorTable.CONCAT_WS);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LPAD, 
FlinkSqlOperatorTable.LPAD);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.RPAD, 
FlinkSqlOperatorTable.RPAD);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   .put(BuiltInFunctionDefinitions.REGEXP_EXTRACT, 
FlinkSqlOperatorTable.REGEXP_EXTRACT);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.FROM_BASE64, 

[GitHub] [flink] JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.

2019-07-03 Thread GitBox
JingsongLi commented on a change in pull request #8977: 
[FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner 
support add kinds of QueryOperations.
URL: https://github.com/apache/flink/pull/8977#discussion_r300215701
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
 ##
 @@ -150,7 +315,7 @@ public RelNode visit(QueryOperation other) {
FlinkStatistic statistic;
List names;
if (tableSourceOperation instanceof 
RichTableSourceQueryOperation &&
-   ((RichTableSourceQueryOperation) 
tableSourceOperation).getQualifiedName() != null) {
+   ((RichTableSourceQueryOperation) 
tableSourceOperation).getQualifiedName() != null) {
 
 Review comment:
   This PR has many error modification of indentation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.

2019-07-03 Thread GitBox
JingsongLi commented on a change in pull request #8977: 
[FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner 
support add kinds of QueryOperations.
URL: https://github.com/apache/flink/pull/8977#discussion_r300215015
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
 ##
 @@ -79,25 +112,196 @@
private final RelBuilder relBuilder;
 
 Review comment:
   > TODO actually we should use {@link ResolvedExpressionVisitor} here as it 
is the output of the API
   
   You can comment clear, why it keep `ExpressionVisitor` instead of 
`ResolvedExpressionVisitor`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.

2019-07-03 Thread GitBox
JingsongLi commented on a change in pull request #8977: 
[FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner 
support add kinds of QueryOperations.
URL: https://github.com/apache/flink/pull/8977#discussion_r300215219
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
 ##
 @@ -273,7 +720,7 @@ public RexNode visit(ValueLiteralExpression valueLiteral) {
}
 
return literal.getValueAs(clazz)
-   .orElseThrow(() -> new TableException("Unsupported 
literal class: " + clazz));
+   .orElseThrow(() -> new 
TableException("Unsupported literal class: " + clazz));
 
 Review comment:
   Why?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.

2019-07-03 Thread GitBox
JingsongLi commented on a change in pull request #8977: 
[FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner 
support add kinds of QueryOperations.
URL: https://github.com/apache/flink/pull/8977#discussion_r300215614
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeExpression.java
 ##
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.table.types.DataType;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Dummy wrapper for expressions that were converted to RexNode in a different 
way.
+ */
+public class RexNodeExpression implements ResolvedExpression {
 
 Review comment:
   Just use `RexPlannerExpression`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.

2019-07-03 Thread GitBox
JingsongLi commented on a change in pull request #8977: 
[FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner 
support add kinds of QueryOperations.
URL: https://github.com/apache/flink/pull/8977#discussion_r300213615
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
 ##
 @@ -79,25 +112,196 @@
private final RelBuilder relBuilder;
private final FlinkTypeFactory typeFactory;
 
+   private static final int DECIMAL_PRECISION_NEEDED_FOR_LONG = 19;
+
+   /**
+* The mapping only keeps part of FunctionDefinitions, which could be 
converted to SqlOperator in a very simple
+* way.
+*/
+   private static final Map 
SIMPLE_DEF_SQL_OPERATOR_MAPPING = new HashMap<>();
+
+   static {
+   // logic functions
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.AND, 
FlinkSqlOperatorTable.AND);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OR, 
FlinkSqlOperatorTable.OR);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT, 
FlinkSqlOperatorTable.NOT);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IF, 
FlinkSqlOperatorTable.CASE);
+
+   // comparison functions
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.EQUALS, 
FlinkSqlOperatorTable.EQUALS);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   .put(BuiltInFunctionDefinitions.GREATER_THAN, 
FlinkSqlOperatorTable.GREATER_THAN);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   
.put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, 
FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LESS_THAN, 
FlinkSqlOperatorTable.LESS_THAN);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   
.put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, 
FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT_EQUALS, 
FlinkSqlOperatorTable.NOT_EQUALS);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NULL, 
FlinkSqlOperatorTable.IS_NULL);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_NULL, 
FlinkSqlOperatorTable.IS_NOT_NULL);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_TRUE, 
FlinkSqlOperatorTable.IS_TRUE);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_FALSE, 
FlinkSqlOperatorTable.IS_FALSE);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_TRUE, 
FlinkSqlOperatorTable.IS_NOT_TRUE);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   .put(BuiltInFunctionDefinitions.IS_NOT_FALSE, 
FlinkSqlOperatorTable.IS_NOT_FALSE);
+
+   // string functions
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CHAR_LENGTH, 
FlinkSqlOperatorTable.CHAR_LENGTH);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.INIT_CAP, 
FlinkSqlOperatorTable.INITCAP);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LIKE, 
FlinkSqlOperatorTable.LIKE);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOWER, 
FlinkSqlOperatorTable.LOWER);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SIMILAR, 
FlinkSqlOperatorTable.SIMILAR_TO);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SUBSTRING, 
FlinkSqlOperatorTable.SUBSTRING);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.UPPER, 
FlinkSqlOperatorTable.UPPER);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.POSITION, 
FlinkSqlOperatorTable.POSITION);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OVERLAY, 
FlinkSqlOperatorTable.OVERLAY);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT, 
FlinkSqlOperatorTable.CONCAT_FUNCTION);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT_WS, 
FlinkSqlOperatorTable.CONCAT_WS);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LPAD, 
FlinkSqlOperatorTable.LPAD);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.RPAD, 
FlinkSqlOperatorTable.RPAD);
+   SIMPLE_DEF_SQL_OPERATOR_MAPPING
+   .put(BuiltInFunctionDefinitions.REGEXP_EXTRACT, 
FlinkSqlOperatorTable.REGEXP_EXTRACT);
+   
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.FROM_BASE64, 

[GitHub] [flink] JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.

2019-07-03 Thread GitBox
JingsongLi commented on a change in pull request #8977: 
[FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner 
support add kinds of QueryOperations.
URL: https://github.com/apache/flink/pull/8977#discussion_r300215120
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/AggregateVisitors.java
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.AggregateFunctionDefinition;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.UserDefinedAggregateFunction;
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.functions.utils.AggSqlFunction;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.isFunctionOfKind;
+import static org.apache.flink.table.functions.FunctionKind.AGGREGATE;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+
+/**
+ * The class contains all kinds of visitors to visit Aggregate.
+ */
+public class AggregateVisitors {
 
 Review comment:
   Change to name? I think this class is just for `RexNodeConverter`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ye-lun commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese

2019-07-03 Thread GitBox
ye-lun commented on a change in pull request #8943: 
[FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
URL: https://github.com/apache/flink/pull/8943#discussion_r300216656
 
 

 ##
 File path: docs/ops/state/checkpoints.zh.md
 ##
 @@ -26,69 +26,49 @@ under the License.
 * toc
 {:toc}
 
-## Overview
+## 概述
+checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。
 
-Checkpoints make state in Flink fault tolerant by allowing state and the
-corresponding stream positions to be recovered, thereby giving the application
-the same semantics as a failure-free execution.
+参考 [Checkpointing]({{ site.baseurl }}/zh/dev/stream/state/checkpointing.html) 
查看如何在 Flink 程序中开启和配置 checkpoint。
 
-See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) 
for how to enable and
-configure checkpoints for your program.
+## 保留 Checkpoint
 
-## Retained Checkpoints
-
-Checkpoints are by default not retained and are only used to resume a
-job from failures. They are deleted when a program is cancelled.
-You can, however, configure periodic checkpoints to be retained.
-Depending on the configuration these *retained* checkpoints are *not*
-automatically cleaned up when the job fails or is canceled.
-This way, you will have a checkpoint around to resume from if your job fails.
+checkpoint 在默认的情况下仅用于恢复失败的作业,并不保留,当程序取消时 checkpoint 就会被删除。当然,你可以通过配置来保留 
checkpoint,这些被保留的 checkpoint 在作业失败或取消时不会被清除。这样,你就可以使用该 checkpoint 来恢复失败的作业。
 
 {% highlight java %}
 CheckpointConfig config = env.getCheckpointConfig();
 
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 {% endhighlight %}
 
-The `ExternalizedCheckpointCleanup` mode configures what happens with 
checkpoints when you cancel the job:
-
-- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the 
checkpoint when the job is cancelled. Note that you have to manually clean up 
the checkpoint state after cancellation in this case.
-
-- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the 
checkpoint when the job is cancelled. The checkpoint state will only be 
available if the job fails.
+`ExternalizedCheckpointCleanup` 配置项定义了当作业取消时,对作业 checkpoint 的操作:
+- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的 
checkpoint。注意,这种情况下,需要手动清除该作业的 checkpoint。
 
 Review comment:
   @klion26 I agree with you on this point, I will modify these and force push 
again because there exist many commits,thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ye-lun commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese

2019-07-03 Thread GitBox
ye-lun commented on a change in pull request #8943: 
[FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
URL: https://github.com/apache/flink/pull/8943#discussion_r300218022
 
 

 ##
 File path: docs/ops/state/checkpoints.zh.md
 ##
 @@ -26,69 +26,49 @@ under the License.
 * toc
 {:toc}
 
-## Overview
+## 概述
+checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。
 
-Checkpoints make state in Flink fault tolerant by allowing state and the
-corresponding stream positions to be recovered, thereby giving the application
 
 Review comment:
   In this position,the sentence of "thereby thereby giving the application the 
same semantics as a failure-free execution" according your suggestion is not  
translated and in my first commit it is translated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-07-03 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878289#comment-16878289
 ] 

zhijiang edited comment on FLINK-12852 at 7/4/19 3:27 AM:
--

Network resource matching in slot has many unsetting issues which would be 
further discussed future, so we could not make it effect in short time.

Lazy allocation buffers on producer side seems a feasible way atm. It could 
still retain the current core and maximum mechanism in local pool. But it 
brings another two effects:
 * Higher time to ramp up to full throughput as Stephan mentioned, especially 
for some very short-time jobs (several seconds finish) and I remembered there 
exists such cases in Kurt's benchmark before. We change the previous concurrent 
production and consumption to sequential way. For short-time job, before the 
consumer requests partition, all the data set might already be emitted and 
cached in partition pool on producer side before.
 * We rely on another assumption that produced buffers could be recycled 
finally once subpartition view is established. This assumption might limit our 
new features/improvements future. ATM we need to adjust the action to trigger 
partition request, that means RemoteInputChannel could only send partition 
request if the correspond task has no result partition or the partition's view 
has already been established. In future the InputSelection might also destroy 
the above assumption. Although the partition was requested, but the OP could 
select not to consumer that partition long time.


was (Author: zjwang):
Network resource matching in slot has many unsetting issues which should be 
further discussed future, so we could not make it effect in short time.

Lazy allocation buffers on producer side seems a feasible way atm. It could 
still retain the current core and maximum mechanism in local pool. But it 
brings another two effects:
 * Higher time to ramp up to full throughput as Stephan mentioned, especially 
for some very short-time jobs (several seconds finish) and I remembered there 
exists such cases in Kurt's benchmark before. We change the previous concurrent 
production and consumption to sequential way. For short-time job, before the 
consumer requests partition, all the data set might already be emitted and 
cached in partition pool on producer side before.
 * We rely on another assumption that produced buffers could be recycled 
finally once subpartition view is established. This assumption might limit our 
new features/improvements future. ATM we need to adjust the action to trigger 
partition request, that means `RemoteInputChannel` could only send partition 
request if this task has no result partition or the partition's view has 
already been established. In future the InputSelection might also destroy the 
above assumption. Although the partition was requested, but the OP could select 
not to consumer that partition long time.

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> --
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.7.2, 1.8.1, 1.9.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
> nid=0x38845 waiting on condition [0x7f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x00073fbc81f0> (a java.lang.Object)
> at 
> 

[GitHub] [flink] XuQianJin-Stars edited a comment on issue #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese

2019-07-03 Thread GitBox
XuQianJin-Stars edited a comment on issue #8300: [FLINK-11638][docs-zh] 
Translate Savepoints page into Chinese
URL: https://github.com/apache/flink/pull/8300#issuecomment-508323987
 
 
   hi @klion26 @wuchong  I have change it already.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ye-lun commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese

2019-07-03 Thread GitBox
ye-lun commented on a change in pull request #8943: 
[FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
URL: https://github.com/apache/flink/pull/8943#discussion_r300216919
 
 

 ##
 File path: docs/ops/state/checkpoints.zh.md
 ##
 @@ -26,69 +26,49 @@ under the License.
 * toc
 {:toc}
 
-## Overview
+## 概述
+checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。
 
-Checkpoints make state in Flink fault tolerant by allowing state and the
-corresponding stream positions to be recovered, thereby giving the application
-the same semantics as a failure-free execution.
+参考 [Checkpointing]({{ site.baseurl }}/zh/dev/stream/state/checkpointing.html) 
查看如何在 Flink 程序中开启和配置 checkpoint。
 
-See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) 
for how to enable and
-configure checkpoints for your program.
+## 保留 Checkpoint
 
-## Retained Checkpoints
-
-Checkpoints are by default not retained and are only used to resume a
-job from failures. They are deleted when a program is cancelled.
-You can, however, configure periodic checkpoints to be retained.
-Depending on the configuration these *retained* checkpoints are *not*
-automatically cleaned up when the job fails or is canceled.
-This way, you will have a checkpoint around to resume from if your job fails.
+checkpoint 在默认的情况下仅用于恢复失败的作业,并不保留,当程序取消时 checkpoint 就会被删除。当然,你可以通过配置来保留 
checkpoint,这些被保留的 checkpoint 在作业失败或取消时不会被清除。这样,你就可以使用该 checkpoint 来恢复失败的作业。
 
 {% highlight java %}
 CheckpointConfig config = env.getCheckpointConfig();
 
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 {% endhighlight %}
 
-The `ExternalizedCheckpointCleanup` mode configures what happens with 
checkpoints when you cancel the job:
-
-- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the 
checkpoint when the job is cancelled. Note that you have to manually clean up 
the checkpoint state after cancellation in this case.
-
-- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the 
checkpoint when the job is cancelled. The checkpoint state will only be 
available if the job fails.
+`ExternalizedCheckpointCleanup` 配置项定义了当作业取消时,对作业 checkpoint 的操作:
+- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的 
checkpoint。注意,这种情况下,需要手动清除该作业的 checkpoint。
+- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**:当作业取消时,删除作业的 
checkpoint。仅当作业失败时,作业的 checkpoint 才会被保留。
 
-### Directory Structure
+### 目录结构
 
-Similarly to [savepoints](savepoints.html), a checkpoint consists
-of a meta data file and, depending on the state backend, some additional data
-files. The meta data file and data files are stored in the directory that is
-configured via `state.checkpoints.dir` in the configuration files, 
-and also can be specified for per job in the code.
+与 [savepoints](savepoints.html) 相似,checkpoint 由元数据文件、数据文件(与state backend 
相关)组成。可通过配置文件中 “state.checkpoints.dir” 
配置项来指定元数据文件和数据文件的存储路径,另外也可以在代码中针对单个作业特别指定该配置项。
 
 Review comment:
   yes, I will change the quotation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-07-03 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878289#comment-16878289
 ] 

zhijiang commented on FLINK-12852:
--

Network resource matching in slot has many unsetting issues which should be 
further discussed future, so we could not make it effect in short time.

Lazy allocation buffers on producer side seems a feasible way atm. It could 
still retain the current core and maximum mechanism in local pool. But it 
brings another two effects:
 * Higher time to ramp up to full throughput as Stephan mentioned, especially 
for some very short-time jobs (several seconds finish) and I remembered there 
exists such cases in Kurt's benchmark before. We change the previous concurrent 
production and consumption to sequential way. For short-time job, before the 
consumer requests partition, all the data set might already be emitted and 
cached in partition pool on producer side before.
 * We rely on another assumption that produced buffers could be recycled 
finally once subpartition view is established. This assumption might limit our 
new features/improvements future. ATM we need to adjust the action to trigger 
partition request, that means `RemoteInputChannel` could only send partition 
request if this task has no result partition or the partition's view has 
already been established. In future the InputSelection might also destroy the 
above assumption. Although the partition was requested, but the OP could select 
not to consumer that partition long time.

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> --
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.7.2, 1.8.1, 1.9.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
> nid=0x38845 waiting on condition [0x7f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x00073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM 
> and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 
> 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
> local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces 
> data quickly and each occupy about 990 buffers. Then the DownStream task 
> starts and try to assigning exclusive buffers for 1500 -9 = 1491 
> InputChannels. It requires 2981 buffers but only 1786 left. Since not all 
> downstream tasks can start, the job will be blocked finally and no buffer can 
> be released, and the deadlock finally occurred.
>  
> I think although increasing the network memory solves the problem, the 
> deadlock may not be acceptable.  Fined grained resource management  
> Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the 
> 

[GitHub] [flink] XuQianJin-Stars commented on issue #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese

2019-07-03 Thread GitBox
XuQianJin-Stars commented on issue #8300: [FLINK-11638][docs-zh] Translate 
Savepoints page into Chinese
URL: https://github.com/apache/flink/pull/8300#issuecomment-508323987
 
 
   hi @klion26 @wuchong  I have me change it already.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ye-lun commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese

2019-07-03 Thread GitBox
ye-lun commented on a change in pull request #8943: 
[FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
URL: https://github.com/apache/flink/pull/8943#discussion_r300216656
 
 

 ##
 File path: docs/ops/state/checkpoints.zh.md
 ##
 @@ -26,69 +26,49 @@ under the License.
 * toc
 {:toc}
 
-## Overview
+## 概述
+checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。
 
-Checkpoints make state in Flink fault tolerant by allowing state and the
-corresponding stream positions to be recovered, thereby giving the application
-the same semantics as a failure-free execution.
+参考 [Checkpointing]({{ site.baseurl }}/zh/dev/stream/state/checkpointing.html) 
查看如何在 Flink 程序中开启和配置 checkpoint。
 
-See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) 
for how to enable and
-configure checkpoints for your program.
+## 保留 Checkpoint
 
-## Retained Checkpoints
-
-Checkpoints are by default not retained and are only used to resume a
-job from failures. They are deleted when a program is cancelled.
-You can, however, configure periodic checkpoints to be retained.
-Depending on the configuration these *retained* checkpoints are *not*
-automatically cleaned up when the job fails or is canceled.
-This way, you will have a checkpoint around to resume from if your job fails.
+checkpoint 在默认的情况下仅用于恢复失败的作业,并不保留,当程序取消时 checkpoint 就会被删除。当然,你可以通过配置来保留 
checkpoint,这些被保留的 checkpoint 在作业失败或取消时不会被清除。这样,你就可以使用该 checkpoint 来恢复失败的作业。
 
 {% highlight java %}
 CheckpointConfig config = env.getCheckpointConfig();
 
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 {% endhighlight %}
 
-The `ExternalizedCheckpointCleanup` mode configures what happens with 
checkpoints when you cancel the job:
-
-- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the 
checkpoint when the job is cancelled. Note that you have to manually clean up 
the checkpoint state after cancellation in this case.
-
-- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the 
checkpoint when the job is cancelled. The checkpoint state will only be 
available if the job fails.
+`ExternalizedCheckpointCleanup` 配置项定义了当作业取消时,对作业 checkpoint 的操作:
+- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的 
checkpoint。注意,这种情况下,需要手动清除该作业的 checkpoint。
 
 Review comment:
   @klion26 I agree with you on this point, I will modify these and force push 
again because there exist many commits,thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ye-lun commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese

2019-07-03 Thread GitBox
ye-lun commented on a change in pull request #8943: 
[FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
URL: https://github.com/apache/flink/pull/8943#discussion_r300216623
 
 

 ##
 File path: docs/ops/state/checkpoints.zh.md
 ##
 @@ -26,69 +26,49 @@ under the License.
 * toc
 {:toc}
 
-## Overview
+## 概述
+checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。
 
-Checkpoints make state in Flink fault tolerant by allowing state and the
-corresponding stream positions to be recovered, thereby giving the application
-the same semantics as a failure-free execution.
+参考 [Checkpointing]({{ site.baseurl }}/zh/dev/stream/state/checkpointing.html) 
查看如何在 Flink 程序中开启和配置 checkpoint。
 
-See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) 
for how to enable and
-configure checkpoints for your program.
+## 保留 Checkpoint
 
-## Retained Checkpoints
-
-Checkpoints are by default not retained and are only used to resume a
-job from failures. They are deleted when a program is cancelled.
-You can, however, configure periodic checkpoints to be retained.
-Depending on the configuration these *retained* checkpoints are *not*
-automatically cleaned up when the job fails or is canceled.
-This way, you will have a checkpoint around to resume from if your job fails.
+checkpoint 在默认的情况下仅用于恢复失败的作业,并不保留,当程序取消时 checkpoint 就会被删除。当然,你可以通过配置来保留 
checkpoint,这些被保留的 checkpoint 在作业失败或取消时不会被清除。这样,你就可以使用该 checkpoint 来恢复失败的作业。
 
 {% highlight java %}
 CheckpointConfig config = env.getCheckpointConfig();
 
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 {% endhighlight %}
 
-The `ExternalizedCheckpointCleanup` mode configures what happens with 
checkpoints when you cancel the job:
-
-- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the 
checkpoint when the job is cancelled. Note that you have to manually clean up 
the checkpoint state after cancellation in this case.
-
-- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the 
checkpoint when the job is cancelled. The checkpoint state will only be 
available if the job fails.
+`ExternalizedCheckpointCleanup` 配置项定义了当作业取消时,对作业 checkpoint 的操作:
+- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的 
checkpoint。注意,这种情况下,需要手动清除该作业的 checkpoint。
 
 Review comment:
   @klion26 I agree with you on this point, I will modify these and force push 
again because there exist many commits,thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable

2019-07-03 Thread GitBox
banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store 
state per key-group in CopyOnWriteStateTable
URL: https://github.com/apache/flink/pull/8611#discussion_r300216523
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateMapSnapshot.java
 ##
 @@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * Base class for snapshots of a {@link StateMap}.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public abstract class StateMapSnapshot> {
+
+   /**
+* The {@link StateMap} from which this snapshot was created.
+*/
+   protected final T owningStateMap;
 
 Review comment:
   Actually, `owningStateMap ` is also used in 
`NestedStateMapSnapshot#writeState`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung closed pull request #8854: [FLINK-12959][table-planner-blink] Use BoundedInput and InputSelectable in blink and implement hash join

2019-07-03 Thread GitBox
KurtYoung closed pull request #8854: [FLINK-12959][table-planner-blink] Use 
BoundedInput and InputSelectable in blink and implement hash join
URL: https://github.com/apache/flink/pull/8854
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12959) Use BoundedInput and InputSelectable in blink

2019-07-03 Thread Kurt Young (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-12959.
--
   Resolution: Implemented
Fix Version/s: 1.9.0

merged in 1.9.0:
960ae97a5ac137faa35b83675c2f087334d4d3b7
0e4d4b4869aea6ded9c3ad1255c426ca9e7dfd99

> Use BoundedInput and InputSelectable in blink
> -
>
> Key: FLINK-12959
> URL: https://issues.apache.org/jira/browse/FLINK-12959
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Now BoundedInput and InputSelectable are ready in runtime. Blink planner 
> should use it instead of invoking endInput in close.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] ye-lun commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese

2019-07-03 Thread GitBox
ye-lun commented on a change in pull request #8943: 
[FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
URL: https://github.com/apache/flink/pull/8943#discussion_r300216052
 
 

 ##
 File path: docs/ops/state/checkpoints.zh.md
 ##
 @@ -26,69 +26,49 @@ under the License.
 * toc
 {:toc}
 
-## Overview
+## 概述
+checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。
 
 Review comment:
   @klion26 what do you mean is that at the begining of each paragrah is 
"Checkpoint" and other posititon is "checkpoint"?  thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable

2019-07-03 Thread GitBox
banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store 
state per key-group in CopyOnWriteStateTable
URL: https://github.com/apache/flink/pull/8611#discussion_r300216045
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
 ##
 @@ -172,14 +229,110 @@ public boolean isEmpty() {
 * @return the state of the mapping with the specified key/namespace 
composite key, or {@code null}
 * if no mapping for the specified key is found.
 */
-   public abstract S get(K key, N namespace);
+   public S get(K key, N namespace) {
+   int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, 
keyContext.getNumberOfKeyGroups());
+   return get(key, keyGroup, namespace);
+   }
+
+   public Stream getKeys(N namespace) {
+   return Arrays.stream(state)
+   .flatMap(stateMap -> 
StreamSupport.stream(Spliterators.spliteratorUnknownSize(stateMap.iterator(), 
0), false))
+   .filter(entry -> entry.getNamespace().equals(namespace))
+   .map(StateEntry::getKey);
+   }
+
+   public StateIncrementalVisitor getStateIncrementalVisitor(int 
recommendedMaxNumberOfReturnedRecords) {
+   return new 
StateEntryIterator(recommendedMaxNumberOfReturnedRecords);
+   }
 
-   public abstract Stream getKeys(N namespace);
+   // 

 
-   public abstract StateIncrementalVisitor 
getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords);
+   private S get(K key, int keyGroupIndex, N namespace) {
+   checkKeyNamespacePreconditions(key, namespace);
+
+   StateMap stateMap = getMapForKeyGroup(keyGroupIndex);
+
+   if (stateMap == null) {
 
 Review comment:
   This check is to pass the UT of 
StateBackendTestBase#testKeyGroupSnapshotRestore. The UT will get a key from 
the backend, but the key is in a key group which not belongs to this backend.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Aitozi commented on issue #8559: [FLINK-12576][Network, Metrics]Take localInputChannel into account when compute inputQueueLength

2019-07-03 Thread GitBox
Aitozi commented on issue #8559: [FLINK-12576][Network, Metrics]Take 
localInputChannel into account when compute inputQueueLength
URL: https://github.com/apache/flink/pull/8559#issuecomment-508323128
 
 
   Hi @zhijiangW @pnowojski , please help review this PR again, I have 
addressed your previous comments, and add a unit test case in 
`LocalInputChannelTest`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on a change in pull request #8965: [FLINK-13068][hive] HiveTableSink should implement PartitionableTable…

2019-07-03 Thread GitBox
lirui-apache commented on a change in pull request #8965: [FLINK-13068][hive] 
HiveTableSink should implement PartitionableTable…
URL: https://github.com/apache/flink/pull/8965#discussion_r300214660
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java
 ##
 @@ -161,4 +164,37 @@ private String toStagingDir(String finalDir, 
Configuration conf) throws IOExcept
fs.deleteOnExit(path);
return res;
}
+
+   @Override
+   public List getPartitionFieldNames() {
+   return catalogTable.getPartitionKeys();
+   }
+
+   @Override
+   public void setStaticPartition(Map partitions) {
+   // make it a LinkedHashMap to maintain partition column order
+   staticPartitionSpec = new LinkedHashMap<>();
+   for (String partitionCol : getPartitionFieldNames()) {
+   if (partitions.containsKey(partitionCol)) {
+   staticPartitionSpec.put(partitionCol, 
partitions.get(partitionCol));
+   }
+   }
+   }
+
+   private void validatePartitionSpec() {
+   List partitionCols = getPartitionFieldNames();
+   Preconditions.checkArgument(new 
HashSet<>(partitionCols).containsAll(
 
 Review comment:
   Yeah I'll print the specific columns and reformat the code.
   We don't need the check the order here. Partition columns order is defined 
by `getPartitionFieldNames()`. We can always reorder a partition spec (which is 
a map) as long as it only contains valid partition columns.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12926) Main thread checking in some tests fails

2019-07-03 Thread Zhu Zhu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-12926:

Attachment: Execution#deploy.jpg

> Main thread checking in some tests fails
> 
>
> Key: FLINK-12926
> URL: https://issues.apache.org/jira/browse/FLINK-12926
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.9.0
>Reporter: Zhu Zhu
>Priority: Major
> Attachments: Execution#deploy.jpg, mainThreadCheckFailure.log
>
>
> Currently all JM side job changing actions are expected to be taken in 
> JobMaster main thread.
> In current Flink tests, many cases tend to use the test main thread as the JM 
> main thread. This can lead to 2 issues:
> 1. TestingComponentMainThreadExecutorServiceAdapter is a direct executor, so 
> if it is invoked from any other thread, it will break the main thread 
> checking and fail the submitted action (as in the attached log 
> [^mainThreadCheckFailure.log])
> 2. The test main thread does not support other actions queued in its 
> executor, as the test will end once the current test thread action(the 
> current running test body) is done
>  
> In my observation, most cases which starts 
> ExecutionGraph.scheduleForExecution() will encounter this issue. Cases 
> include ExecutionGraphRestartTest, FailoverRegionTest, 
> ConcurrentFailoverStrategyExecutionGraphTest, GlobalModVersionTest, 
> ExecutionGraphDeploymentTest, etc.
>  
> One solution in my mind is to create a ScheduledExecutorService for those 
> tests, use it as the main thread and run the test body in this thread.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] XuQianJin-Stars commented on issue #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese

2019-07-03 Thread GitBox
XuQianJin-Stars commented on issue #8300: [FLINK-11638][docs-zh] Translate 
Savepoints page into Chinese
URL: https://github.com/apache/flink/pull/8300#issuecomment-508320189
 
 
   well let me change it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on issue #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese

2019-07-03 Thread GitBox
wuchong commented on issue #8300: [FLINK-11638][docs-zh] Translate Savepoints 
page into Chinese
URL: https://github.com/apache/flink/pull/8300#issuecomment-508319820
 
 
   @XuQianJin-Stars , please do not replace all `Savepoint` to `savepoint`. I 
mean only the "Savepoint" in directory path should keep `savepoint`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] XuQianJin-Stars commented on issue #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese

2019-07-03 Thread GitBox
XuQianJin-Stars commented on issue #8300: [FLINK-11638][docs-zh] Translate 
Savepoints page into Chinese
URL: https://github.com/apache/flink/pull/8300#issuecomment-508319511
 
 
   hi @klion26 @wuchong  Thank you very much , Comments addressed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on a change in pull request #8976: [FLINK-12277][table/hive/doc] Add documentation for catalogs

2019-07-03 Thread GitBox
lirui-apache commented on a change in pull request #8976: 
[FLINK-12277][table/hive/doc] Add documentation for catalogs
URL: https://github.com/apache/flink/pull/8976#discussion_r300212301
 
 

 ##
 File path: docs/dev/table/catalog.md
 ##
 @@ -0,0 +1,345 @@
+---
+title: "Catalog"
+is_beta: true
+nav-parent_id: tableapi
+nav-pos: 100
+---
+
+
+A catalog can provide information about metadata, such as names, schemas, 
statistics of tables, and information about how to access data stored in a 
database or table. Once a catalog is registered to a `TableEnvironment`, all 
meta-objects defined in a catalog can be accessed from Table API or SQL queries.
+
+
+* This will be replaced by the TOC
+{:toc}
+
+
+Catalog Interface
+-
+
+APIs are defined in `Catalog` interface. The interface defines a set of APIs 
to read and write catalog meta-objects such as database, tables, partitions, 
views, and functions.
+
+Users can develop their own catalogs by implementing the interface.
+
+
+Naming Structure in Catalog
+---
+
+Flink's catalogs use a strict two-level structure, that is, catalogs contain 
databases, and databases contain meta-objects. Thus, the full name of a 
meta-object is always structured as `catalogName`.`databaseName`.`objectName`.
+
+All registered catalogs are managed by a `CatalogManager` instance in 
`TableEnvironment`. In order to ease access to meta-objects, `CatalogManager` 
has a concept of current catalog and current database. Usually how users access 
meta-objects in a catalog is to specify its full name in the format of 
`catalogName`.`databaseName`.`objectName`. By setting current catalog and 
current database, users can use just the meta-object's name in their queries. 
This greatly simplifies user experience. For example, a previous query as
+
+```sql
+select * from mycatalog.mydb.myTable;
+```
+
+can be shortened as
+
+```sql
+select * from myTable;
+```
+
+Querying tables in a different databases under the default catalog would be
+
+```
+select * from mydb2.myTable
+```
+
+`CatalogManager` always has a built-in `GenericInMemoryCatalog` with name of 
`default_catalog`, which has a built-in default database named 
`default_database`. They will be the current catalog and current database if no 
other catalog and database are explicitly set. All temp meta-objects will be 
registered to this catalog. Users can set current catalog and database via 
`TableEnvironment.useCatalog(...)` and `TableEnvironment.useDatabase(...)` in 
Table API, or `USE CATALOG ...` and `USE DATABASE ...` in Flink SQL.
+
+
+Catalog Types
+-
+
+## GenericInMemoryCatalog
+
+All meta-objects in this catalog are stored in memory, and be will lost once 
the session shuts down.
+
+Its config entry value in SQL CLI yaml file is "generic_in_memory".
+
+## HiveCatalog
+
+`HiveCatalog` can read and write both Flink and Hive meta-objects by using 
Hive Metastore as a persistent storage.
+
+Its config entry value in SQL CLI yaml file is "hive".
+
+### Persist Flink meta-objects
+
+Previously, Flink meta-objects are only stored in memory and are per session 
based. That means users have to recreate all the meta-objects every time they 
start a new session.
+
+To solve this user pain point, users can choose the option to use 
`HiveCatalog` to persist all of users' Flink streaming and batch meta-objects 
by using Hive Metastore as a pure storage. Because Hive Metastore is only used 
for storage in this case, Hive itself may not understand Flink's meta-objects 
stored in the metastore.
+
+### Integrate Flink with Hive metadata
+
+The ultimate goal for integrating Flink with Hive metadata is that:
+
+1. existing meta-objects, like tables, views, and functions, created by Hive 
or other Hive-compatible applications can be used by Flink
+
+2. meta-objects created by `HiveCatalog` can be written back to Hive metastore 
such that Hive and other Hive-compatibile applications can consume.
+
+## User-configured Catalog
+
+Catalogs are pluggable, and users can use their own, customized catalog 
implementations.
+
+
+HiveCatalog
+---
+
+## Supported Hive Versions
+
+`HiveCatalog` officially supports Hive 2.3.4 and 1.2.1, and depends on Hive's 
own compatibility for the other 2.x.x and 1.x.x versions.
+
+Users need to explicitly specify the Hive version as string, by either passing 
it to the constructor when creating `HiveCatalog` instances directly in Table 
API or specifying it in yaml config file in SQL CLI. The Hive version string 
will be either `2.3.4`, `1.2.1`, or your own 2.x.x/1.x.x versions.
+
+## Dependencies
+
+In order to use `HiveCatalog`, users need to either downloading the following 
dependency jars and adding them to the `/lib` dir in Flink distribution, or 
adding their existing Hive jars to Flink's classpath in order for Flink to find 
them at runtime.
+
+Take Hive 2.3.4 for example:
+
+```
+// Hive dependencies
+
+- hive-metastore-2.3.4.jar
 
 

[GitHub] [flink] lirui-apache commented on a change in pull request #8976: [FLINK-12277][table/hive/doc] Add documentation for catalogs

2019-07-03 Thread GitBox
lirui-apache commented on a change in pull request #8976: 
[FLINK-12277][table/hive/doc] Add documentation for catalogs
URL: https://github.com/apache/flink/pull/8976#discussion_r300212111
 
 

 ##
 File path: docs/dev/table/catalog.md
 ##
 @@ -0,0 +1,345 @@
+---
+title: "Catalog"
+is_beta: true
+nav-parent_id: tableapi
+nav-pos: 100
+---
+
+
+A catalog can provide information about metadata, such as names, schemas, 
statistics of tables, and information about how to access data stored in a 
database or table. Once a catalog is registered to a `TableEnvironment`, all 
meta-objects defined in a catalog can be accessed from Table API or SQL queries.
+
+
+* This will be replaced by the TOC
+{:toc}
+
+
+Catalog Interface
+-
+
+APIs are defined in `Catalog` interface. The interface defines a set of APIs 
to read and write catalog meta-objects such as database, tables, partitions, 
views, and functions.
+
+Users can develop their own catalogs by implementing the interface.
+
+
+Naming Structure in Catalog
+---
+
+Flink's catalogs use a strict two-level structure, that is, catalogs contain 
databases, and databases contain meta-objects. Thus, the full name of a 
meta-object is always structured as `catalogName`.`databaseName`.`objectName`.
+
+All registered catalogs are managed by a `CatalogManager` instance in 
`TableEnvironment`. In order to ease access to meta-objects, `CatalogManager` 
has a concept of current catalog and current database. Usually how users access 
meta-objects in a catalog is to specify its full name in the format of 
`catalogName`.`databaseName`.`objectName`. By setting current catalog and 
current database, users can use just the meta-object's name in their queries. 
This greatly simplifies user experience. For example, a previous query as
+
+```sql
+select * from mycatalog.mydb.myTable;
+```
+
+can be shortened as
+
+```sql
+select * from myTable;
+```
+
+Querying tables in a different databases under the default catalog would be
+
+```
+select * from mydb2.myTable
+```
+
+`CatalogManager` always has a built-in `GenericInMemoryCatalog` with name of 
`default_catalog`, which has a built-in default database named 
`default_database`. They will be the current catalog and current database if no 
other catalog and database are explicitly set. All temp meta-objects will be 
registered to this catalog. Users can set current catalog and database via 
`TableEnvironment.useCatalog(...)` and `TableEnvironment.useDatabase(...)` in 
Table API, or `USE CATALOG ...` and `USE DATABASE ...` in Flink SQL.
+
+
+Catalog Types
+-
+
+## GenericInMemoryCatalog
+
+All meta-objects in this catalog are stored in memory, and be will lost once 
the session shuts down.
+
+Its config entry value in SQL CLI yaml file is "generic_in_memory".
+
+## HiveCatalog
+
+`HiveCatalog` can read and write both Flink and Hive meta-objects by using 
Hive Metastore as a persistent storage.
+
+Its config entry value in SQL CLI yaml file is "hive".
+
+### Persist Flink meta-objects
+
+Previously, Flink meta-objects are only stored in memory and are per session 
based. That means users have to recreate all the meta-objects every time they 
start a new session.
+
+To solve this user pain point, users can choose the option to use 
`HiveCatalog` to persist all of users' Flink streaming and batch meta-objects 
by using Hive Metastore as a pure storage. Because Hive Metastore is only used 
for storage in this case, Hive itself may not understand Flink's meta-objects 
stored in the metastore.
+
+### Integrate Flink with Hive metadata
+
+The ultimate goal for integrating Flink with Hive metadata is that:
+
+1. existing meta-objects, like tables, views, and functions, created by Hive 
or other Hive-compatible applications can be used by Flink
+
+2. meta-objects created by `HiveCatalog` can be written back to Hive metastore 
such that Hive and other Hive-compatibile applications can consume.
+
+## User-configured Catalog
+
+Catalogs are pluggable, and users can use their own, customized catalog 
implementations.
+
+
+HiveCatalog
+---
+
+## Supported Hive Versions
+
+`HiveCatalog` officially supports Hive 2.3.4 and 1.2.1, and depends on Hive's 
own compatibility for the other 2.x.x and 1.x.x versions.
+
+Users need to explicitly specify the Hive version as string, by either passing 
it to the constructor when creating `HiveCatalog` instances directly in Table 
API or specifying it in yaml config file in SQL CLI. The Hive version string 
will be either `2.3.4`, `1.2.1`, or your own 2.x.x/1.x.x versions.
 
 Review comment:
   Shall we mention that only 2.3.4 and 1.2.1 are supported at the moment? E.g. 
if user is using 1.2.0, he still needs to specify the version as 1.2.1, 
otherwise `HiveShimLoader` will error out complaining about unsupported hive 
version


This is an automated message from the Apache Git 

[GitHub] [flink] lirui-apache commented on a change in pull request #8976: [FLINK-12277][table/hive/doc] Add documentation for catalogs

2019-07-03 Thread GitBox
lirui-apache commented on a change in pull request #8976: 
[FLINK-12277][table/hive/doc] Add documentation for catalogs
URL: https://github.com/apache/flink/pull/8976#discussion_r300212979
 
 

 ##
 File path: docs/dev/table/catalog.md
 ##
 @@ -0,0 +1,345 @@
+---
+title: "Catalog"
+is_beta: true
+nav-parent_id: tableapi
+nav-pos: 100
+---
+
+
+A catalog can provide information about metadata, such as names, schemas, 
statistics of tables, and information about how to access data stored in a 
database or table. Once a catalog is registered to a `TableEnvironment`, all 
meta-objects defined in a catalog can be accessed from Table API or SQL queries.
+
+
+* This will be replaced by the TOC
+{:toc}
+
+
+Catalog Interface
+-
+
+APIs are defined in `Catalog` interface. The interface defines a set of APIs 
to read and write catalog meta-objects such as database, tables, partitions, 
views, and functions.
+
+Users can develop their own catalogs by implementing the interface.
+
+
+Naming Structure in Catalog
+---
+
+Flink's catalogs use a strict two-level structure, that is, catalogs contain 
databases, and databases contain meta-objects. Thus, the full name of a 
meta-object is always structured as `catalogName`.`databaseName`.`objectName`.
+
+All registered catalogs are managed by a `CatalogManager` instance in 
`TableEnvironment`. In order to ease access to meta-objects, `CatalogManager` 
has a concept of current catalog and current database. Usually how users access 
meta-objects in a catalog is to specify its full name in the format of 
`catalogName`.`databaseName`.`objectName`. By setting current catalog and 
current database, users can use just the meta-object's name in their queries. 
This greatly simplifies user experience. For example, a previous query as
+
+```sql
+select * from mycatalog.mydb.myTable;
+```
+
+can be shortened as
+
+```sql
+select * from myTable;
+```
+
+Querying tables in a different databases under the default catalog would be
+
+```
+select * from mydb2.myTable
+```
+
+`CatalogManager` always has a built-in `GenericInMemoryCatalog` with name of 
`default_catalog`, which has a built-in default database named 
`default_database`. They will be the current catalog and current database if no 
other catalog and database are explicitly set. All temp meta-objects will be 
registered to this catalog. Users can set current catalog and database via 
`TableEnvironment.useCatalog(...)` and `TableEnvironment.useDatabase(...)` in 
Table API, or `USE CATALOG ...` and `USE DATABASE ...` in Flink SQL.
+
+
+Catalog Types
+-
+
+## GenericInMemoryCatalog
+
+All meta-objects in this catalog are stored in memory, and be will lost once 
the session shuts down.
+
+Its config entry value in SQL CLI yaml file is "generic_in_memory".
+
+## HiveCatalog
+
+`HiveCatalog` can read and write both Flink and Hive meta-objects by using 
Hive Metastore as a persistent storage.
+
+Its config entry value in SQL CLI yaml file is "hive".
+
+### Persist Flink meta-objects
+
+Previously, Flink meta-objects are only stored in memory and are per session 
based. That means users have to recreate all the meta-objects every time they 
start a new session.
+
+To solve this user pain point, users can choose the option to use 
`HiveCatalog` to persist all of users' Flink streaming and batch meta-objects 
by using Hive Metastore as a pure storage. Because Hive Metastore is only used 
for storage in this case, Hive itself may not understand Flink's meta-objects 
stored in the metastore.
+
+### Integrate Flink with Hive metadata
+
+The ultimate goal for integrating Flink with Hive metadata is that:
+
+1. existing meta-objects, like tables, views, and functions, created by Hive 
or other Hive-compatible applications can be used by Flink
+
+2. meta-objects created by `HiveCatalog` can be written back to Hive metastore 
such that Hive and other Hive-compatibile applications can consume.
+
+## User-configured Catalog
+
+Catalogs are pluggable, and users can use their own, customized catalog 
implementations.
+
+
+HiveCatalog
+---
+
+## Supported Hive Versions
+
+`HiveCatalog` officially supports Hive 2.3.4 and 1.2.1, and depends on Hive's 
own compatibility for the other 2.x.x and 1.x.x versions.
+
+Users need to explicitly specify the Hive version as string, by either passing 
it to the constructor when creating `HiveCatalog` instances directly in Table 
API or specifying it in yaml config file in SQL CLI. The Hive version string 
will be either `2.3.4`, `1.2.1`, or your own 2.x.x/1.x.x versions.
+
+## Dependencies
+
+In order to use `HiveCatalog`, users need to either downloading the following 
dependency jars and adding them to the `/lib` dir in Flink distribution, or 
adding their existing Hive jars to Flink's classpath in order for Flink to find 
them at runtime.
 
 Review comment:
   If users want to use HiveCatalog, they must have a Hive installation to 

[jira] [Closed] (FLINK-12833) Add Klaviyo to Chinese PoweredBy page

2019-07-03 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-12833.
---
   Resolution: Fixed
Fix Version/s: 1.9.0

Merged in flink-web: a89ff4efa0e0b3e9be23376e206c75a253b3ae40

> Add Klaviyo to Chinese PoweredBy page
> -
>
> Key: FLINK-12833
> URL: https://issues.apache.org/jira/browse/FLINK-12833
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Project Website
>Reporter: Fabian Hueske
>Assignee: yelun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Commit b54ecfa930653bcfecd60df3414deca5291c6cb3 added Klaviyo to the English 
> PoweredBy page.
> It should be added to the Chinese page as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese

2019-07-03 Thread GitBox
klion26 commented on a change in pull request #8943: 
[FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
URL: https://github.com/apache/flink/pull/8943#discussion_r300210930
 
 

 ##
 File path: docs/ops/state/checkpoints.zh.md
 ##
 @@ -26,69 +26,49 @@ under the License.
 * toc
 {:toc}
 
-## Overview
+## 概述
+checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。
 
-Checkpoints make state in Flink fault tolerant by allowing state and the
-corresponding stream positions to be recovered, thereby giving the application
-the same semantics as a failure-free execution.
+参考 [Checkpointing]({{ site.baseurl }}/zh/dev/stream/state/checkpointing.html) 
查看如何在 Flink 程序中开启和配置 checkpoint。
 
-See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) 
for how to enable and
-configure checkpoints for your program.
+## 保留 Checkpoint
 
-## Retained Checkpoints
-
-Checkpoints are by default not retained and are only used to resume a
-job from failures. They are deleted when a program is cancelled.
-You can, however, configure periodic checkpoints to be retained.
-Depending on the configuration these *retained* checkpoints are *not*
-automatically cleaned up when the job fails or is canceled.
-This way, you will have a checkpoint around to resume from if your job fails.
+checkpoint 在默认的情况下仅用于恢复失败的作业,并不保留,当程序取消时 checkpoint 就会被删除。当然,你可以通过配置来保留 
checkpoint,这些被保留的 checkpoint 在作业失败或取消时不会被清除。这样,你就可以使用该 checkpoint 来恢复失败的作业。
 
 {% highlight java %}
 CheckpointConfig config = env.getCheckpointConfig();
 
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 {% endhighlight %}
 
-The `ExternalizedCheckpointCleanup` mode configures what happens with 
checkpoints when you cancel the job:
-
-- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the 
checkpoint when the job is cancelled. Note that you have to manually clean up 
the checkpoint state after cancellation in this case.
-
-- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the 
checkpoint when the job is cancelled. The checkpoint state will only be 
available if the job fails.
+`ExternalizedCheckpointCleanup` 配置项定义了当作业取消时,对作业 checkpoint 的操作:
+- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的 
checkpoint。注意,这种情况下,需要手动清除该作业的 checkpoint。
 
 Review comment:
   what do you think about changing `需要手动清除该作业的 checkpoint` to `需要手动清除该作业保留的的 
checkpoint`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese

2019-07-03 Thread GitBox
klion26 commented on a change in pull request #8943: 
[FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
URL: https://github.com/apache/flink/pull/8943#discussion_r300209240
 
 

 ##
 File path: docs/ops/state/checkpoints.zh.md
 ##
 @@ -26,69 +26,49 @@ under the License.
 * toc
 {:toc}
 
-## Overview
+## 概述
+checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。
 
 Review comment:
   maybe the first "checkpoint" should be "Checkpoint"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese

2019-07-03 Thread GitBox
klion26 commented on a change in pull request #8943: 
[FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
URL: https://github.com/apache/flink/pull/8943#discussion_r300211627
 
 

 ##
 File path: docs/ops/state/checkpoints.zh.md
 ##
 @@ -26,69 +26,49 @@ under the License.
 * toc
 {:toc}
 
-## Overview
+## 概述
+checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。
 
-Checkpoints make state in Flink fault tolerant by allowing state and the
-corresponding stream positions to be recovered, thereby giving the application
-the same semantics as a failure-free execution.
+参考 [Checkpointing]({{ site.baseurl }}/zh/dev/stream/state/checkpointing.html) 
查看如何在 Flink 程序中开启和配置 checkpoint。
 
-See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) 
for how to enable and
-configure checkpoints for your program.
+## 保留 Checkpoint
 
-## Retained Checkpoints
-
-Checkpoints are by default not retained and are only used to resume a
-job from failures. They are deleted when a program is cancelled.
-You can, however, configure periodic checkpoints to be retained.
-Depending on the configuration these *retained* checkpoints are *not*
-automatically cleaned up when the job fails or is canceled.
-This way, you will have a checkpoint around to resume from if your job fails.
+checkpoint 在默认的情况下仅用于恢复失败的作业,并不保留,当程序取消时 checkpoint 就会被删除。当然,你可以通过配置来保留 
checkpoint,这些被保留的 checkpoint 在作业失败或取消时不会被清除。这样,你就可以使用该 checkpoint 来恢复失败的作业。
 
 {% highlight java %}
 CheckpointConfig config = env.getCheckpointConfig();
 
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 {% endhighlight %}
 
-The `ExternalizedCheckpointCleanup` mode configures what happens with 
checkpoints when you cancel the job:
-
-- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the 
checkpoint when the job is cancelled. Note that you have to manually clean up 
the checkpoint state after cancellation in this case.
-
-- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the 
checkpoint when the job is cancelled. The checkpoint state will only be 
available if the job fails.
+`ExternalizedCheckpointCleanup` 配置项定义了当作业取消时,对作业 checkpoint 的操作:
+- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的 
checkpoint。注意,这种情况下,需要手动清除该作业的 checkpoint。
+- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**:当作业取消时,删除作业的 
checkpoint。仅当作业失败时,作业的 checkpoint 才会被保留。
 
-### Directory Structure
+### 目录结构
 
-Similarly to [savepoints](savepoints.html), a checkpoint consists
-of a meta data file and, depending on the state backend, some additional data
-files. The meta data file and data files are stored in the directory that is
-configured via `state.checkpoints.dir` in the configuration files, 
-and also can be specified for per job in the code.
+与 [savepoints](savepoints.html) 相似,checkpoint 由元数据文件、数据文件(与state backend 
相关)组成。可通过配置文件中 “state.checkpoints.dir” 
配置项来指定元数据文件和数据文件的存储路径,另外也可以在代码中针对单个作业特别指定该配置项。
 
 Review comment:
   do you think we need to change `“state.checkpoints.dir”` to 
`"state.checkpoints.dir"`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese

2019-07-03 Thread GitBox
klion26 commented on a change in pull request #8943: 
[FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
URL: https://github.com/apache/flink/pull/8943#discussion_r300209292
 
 

 ##
 File path: docs/ops/state/checkpoints.zh.md
 ##
 @@ -26,69 +26,49 @@ under the License.
 * toc
 {:toc}
 
-## Overview
+## 概述
+checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。
 
-Checkpoints make state in Flink fault tolerant by allowing state and the
-corresponding stream positions to be recovered, thereby giving the application
-the same semantics as a failure-free execution.
+参考 [Checkpointing]({{ site.baseurl }}/zh/dev/stream/state/checkpointing.html) 
查看如何在 Flink 程序中开启和配置 checkpoint。
 
-See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) 
for how to enable and
-configure checkpoints for your program.
+## 保留 Checkpoint
 
-## Retained Checkpoints
-
-Checkpoints are by default not retained and are only used to resume a
-job from failures. They are deleted when a program is cancelled.
-You can, however, configure periodic checkpoints to be retained.
-Depending on the configuration these *retained* checkpoints are *not*
-automatically cleaned up when the job fails or is canceled.
-This way, you will have a checkpoint around to resume from if your job fails.
+checkpoint 在默认的情况下仅用于恢复失败的作业,并不保留,当程序取消时 checkpoint 就会被删除。当然,你可以通过配置来保留 
checkpoint,这些被保留的 checkpoint 在作业失败或取消时不会被清除。这样,你就可以使用该 checkpoint 来恢复失败的作业。
 
 Review comment:
   same as above


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese

2019-07-03 Thread GitBox
klion26 commented on a change in pull request #8943: 
[FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
URL: https://github.com/apache/flink/pull/8943#discussion_r300209240
 
 

 ##
 File path: docs/ops/state/checkpoints.zh.md
 ##
 @@ -26,69 +26,49 @@ under the License.
 * toc
 {:toc}
 
-## Overview
+## 概述
+checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。
 
 Review comment:
   maybe the first "checkpoint" should be "Checkpoint"?  and other places


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese

2019-07-03 Thread GitBox
wuchong commented on a change in pull request #8300: [FLINK-11638][docs-zh] 
Translate Savepoints page into Chinese
URL: https://github.com/apache/flink/pull/8300#discussion_r300211203
 
 

 ##
 File path: docs/ops/state/savepoints.zh.md
 ##
 @@ -78,160 +68,162 @@ source-id   | State of StatefulSource
 mapper-id   | State of StatefulMapper
 {% endhighlight %}
 
-In the above example, the print sink is stateless and hence not part of the 
savepoint state. By default, we try to map each entry of the savepoint back to 
the new program.
+在上面的示例中,print sink 是无状态的,因此不是 Savepoint 状态的一部分。默认情况下,我们尝试将 Savepoint 
的每个条目映射回新程序。
 
-## Operations
+## 算子
 
-You can use the [command line client]({{ site.baseurl 
}}/ops/cli.html#savepoints) to *trigger savepoints*, *cancel a job with a 
savepoint*, *resume from savepoints*, and *dispose savepoints*.
+你可以使用[命令行客户端]({{site.baseurl}}/zh/ops/cli.html#Savepoint)来*触发 Savepoint*,*触发 
Savepoint 并取消作业*,*从 Savepoint* 恢复,以及*删除 Savepoint*。
 
-With Flink >= 1.2.0 it is also possible to *resume from savepoints* using the 
webui.
+从 Flink 1.2.0 开始,还可以使用 webui *从 Savepoint 恢复*。
 
-### Triggering Savepoints
+### 触发 Savepoint
 
-When triggering a savepoint, a new savepoint directory is created where the 
data as well as the meta data will be stored. The location of this directory 
can be controlled by [configuring a default target directory](#configuration) 
or by specifying a custom target directory with the trigger commands (see the 
[`:targetDirectory` argument](#trigger-a-savepoint)).
+当触发 Savepoint 时,将创建一个新的 Savepoint 
目录,其中存储数据和元数据。可以通过[配置默认目标目录](#configuration)或使用触发器命令指定自定义目标目录(参见[`:targetDirectory
 `参数](#trigger-a-savepoint)来控制该目录的位置。
 
 
-Attention: The target directory has to be a location 
accessible by both the JobManager(s) and TaskManager(s) e.g. a location on a 
distributed file-system.
+注意:目标目录必须是 JobManager(s) 和 TaskManager(s) 
都可以访问的位置,例如分布式文件系统上的位置。
 
 
-For example with a `FsStateBackend` or `RocksDBStateBackend`:
+以 `FsStateBackend`  或 `RocksDBStateBackend` 为例:
 
 {% highlight shell %}
-# Savepoint target directory
-/savepoints/
+# Savepoint 目标目录
+/Savepoint/
 
-# Savepoint directory
-/savepoints/savepoint-:shortjobid-:savepointid/
+# Savepoint 目录
 
 Review comment:
   目录应该保持原文小写?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese

2019-07-03 Thread GitBox
wuchong commented on a change in pull request #8300: [FLINK-11638][docs-zh] 
Translate Savepoints page into Chinese
URL: https://github.com/apache/flink/pull/8300#discussion_r300211060
 
 

 ##
 File path: docs/ops/state/savepoints.zh.md
 ##
 @@ -78,160 +68,162 @@ source-id   | State of StatefulSource
 mapper-id   | State of StatefulMapper
 {% endhighlight %}
 
-In the above example, the print sink is stateless and hence not part of the 
savepoint state. By default, we try to map each entry of the savepoint back to 
the new program.
+在上面的示例中,print sink 是无状态的,因此不是 Savepoint 状态的一部分。默认情况下,我们尝试将 Savepoint 
的每个条目映射回新程序。
 
-## Operations
+## 算子
 
-You can use the [command line client]({{ site.baseurl 
}}/ops/cli.html#savepoints) to *trigger savepoints*, *cancel a job with a 
savepoint*, *resume from savepoints*, and *dispose savepoints*.
+你可以使用[命令行客户端]({{site.baseurl}}/zh/ops/cli.html#Savepoint)来*触发 Savepoint*,*触发 
Savepoint 并取消作业*,*从 Savepoint* 恢复,以及*删除 Savepoint*。
 
-With Flink >= 1.2.0 it is also possible to *resume from savepoints* using the 
webui.
+从 Flink 1.2.0 开始,还可以使用 webui *从 Savepoint 恢复*。
 
-### Triggering Savepoints
+### 触发 Savepoint
 
-When triggering a savepoint, a new savepoint directory is created where the 
data as well as the meta data will be stored. The location of this directory 
can be controlled by [configuring a default target directory](#configuration) 
or by specifying a custom target directory with the trigger commands (see the 
[`:targetDirectory` argument](#trigger-a-savepoint)).
+当触发 Savepoint 时,将创建一个新的 Savepoint 
目录,其中存储数据和元数据。可以通过[配置默认目标目录](#configuration)或使用触发器命令指定自定义目标目录(参见[`:targetDirectory
 `参数](#trigger-a-savepoint)来控制该目录的位置。
 
 Review comment:
   ```suggestion
   当触发 Savepoint 时,将创建一个新的 Savepoint 
目录,其中存储数据和元数据。可以通过[配置默认目标目录](#configuration)或使用触发器命令指定自定义目标目录(参见[`:targetDirectory`参数](#trigger-a-savepoint)来控制该目录的位置。
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese

2019-07-03 Thread GitBox
wuchong commented on a change in pull request #8300: [FLINK-11638][docs-zh] 
Translate Savepoints page into Chinese
URL: https://github.com/apache/flink/pull/8300#discussion_r300211127
 
 

 ##
 File path: docs/ops/state/savepoints.zh.md
 ##
 @@ -78,160 +68,162 @@ source-id   | State of StatefulSource
 mapper-id   | State of StatefulMapper
 {% endhighlight %}
 
-In the above example, the print sink is stateless and hence not part of the 
savepoint state. By default, we try to map each entry of the savepoint back to 
the new program.
+在上面的示例中,print sink 是无状态的,因此不是 Savepoint 状态的一部分。默认情况下,我们尝试将 Savepoint 
的每个条目映射回新程序。
 
-## Operations
+## 算子
 
-You can use the [command line client]({{ site.baseurl 
}}/ops/cli.html#savepoints) to *trigger savepoints*, *cancel a job with a 
savepoint*, *resume from savepoints*, and *dispose savepoints*.
+你可以使用[命令行客户端]({{site.baseurl}}/zh/ops/cli.html#Savepoint)来*触发 Savepoint*,*触发 
Savepoint 并取消作业*,*从 Savepoint* 恢复,以及*删除 Savepoint*。
 
-With Flink >= 1.2.0 it is also possible to *resume from savepoints* using the 
webui.
+从 Flink 1.2.0 开始,还可以使用 webui *从 Savepoint 恢复*。
 
-### Triggering Savepoints
+### 触发 Savepoint
 
-When triggering a savepoint, a new savepoint directory is created where the 
data as well as the meta data will be stored. The location of this directory 
can be controlled by [configuring a default target directory](#configuration) 
or by specifying a custom target directory with the trigger commands (see the 
[`:targetDirectory` argument](#trigger-a-savepoint)).
+当触发 Savepoint 时,将创建一个新的 Savepoint 
目录,其中存储数据和元数据。可以通过[配置默认目标目录](#configuration)或使用触发器命令指定自定义目标目录(参见[`:targetDirectory
 `参数](#trigger-a-savepoint)来控制该目录的位置。
 
 
-Attention: The target directory has to be a location 
accessible by both the JobManager(s) and TaskManager(s) e.g. a location on a 
distributed file-system.
+注意:目标目录必须是 JobManager(s) 和 TaskManager(s) 
都可以访问的位置,例如分布式文件系统上的位置。
 
 
-For example with a `FsStateBackend` or `RocksDBStateBackend`:
+以 `FsStateBackend`  或 `RocksDBStateBackend` 为例:
 
 {% highlight shell %}
-# Savepoint target directory
-/savepoints/
+# Savepoint 目标目录
+/Savepoint/
 
-# Savepoint directory
-/savepoints/savepoint-:shortjobid-:savepointid/
+# Savepoint 目录
+/Savepoint/savepoint-:shortjobid-:savepointid/
 
-# Savepoint file contains the checkpoint meta data
-/savepoints/savepoint-:shortjobid-:savepointid/_metadata
+# Savepoint 文件包含 Checkpoint元数据
+/Savepoint/savepoint-:shortjobid-:savepointid/_metadata
 
 Review comment:
   ```suggestion
   /savepoint/savepoint-:shortjobid-:savepointid/_metadata
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   5   6   >