[GitHub] [flink] xintongsong commented on a change in pull request #8704: [FLINK-12812][runtime] Set resource profiles for task slots
xintongsong commented on a change in pull request #8704: [FLINK-12812][runtime] Set resource profiles for task slots URL: https://github.com/apache/flink/pull/8704#discussion_r300238299 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java ## @@ -284,12 +301,39 @@ else if (obj != null && obj.getClass() == ResourceProfile.class) { return this.cpuCores == that.cpuCores && this.heapMemoryInMB == that.heapMemoryInMB && this.directMemoryInMB == that.directMemoryInMB && + this.nativeMemoryInMB == that.nativeMemoryInMB && this.networkMemoryInMB == that.networkMemoryInMB && + this.managedMemoryInMB == that.managedMemoryInMB && Objects.equals(extendedResources, that.extendedResources); } return false; } + public boolean approximate(ResourceProfile that) { + if (that == null) { + return false; + } + if (Math.abs(this.cpuCores - that.cpuCores) > 1e-6f) { Review comment: I'll add comments for these explanations. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #8704: [FLINK-12812][runtime] Set resource profiles for task slots
xintongsong commented on a change in pull request #8704: [FLINK-12812][runtime] Set resource profiles for task slots URL: https://github.com/apache/flink/pull/8704#discussion_r300238018 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java ## @@ -284,12 +301,39 @@ else if (obj != null && obj.getClass() == ResourceProfile.class) { return this.cpuCores == that.cpuCores && this.heapMemoryInMB == that.heapMemoryInMB && this.directMemoryInMB == that.directMemoryInMB && + this.nativeMemoryInMB == that.nativeMemoryInMB && this.networkMemoryInMB == that.networkMemoryInMB && + this.managedMemoryInMB == that.managedMemoryInMB && Objects.equals(extendedResources, that.extendedResources); } return false; } + public boolean approximate(ResourceProfile that) { + if (that == null) { + return false; + } + if (Math.abs(this.cpuCores - that.cpuCores) > 1e-6f) { Review comment: Yes, this is about the rounding errors during the profile calculation. The calculation of profiles involves multiplying integer values with floating values. Rounding the floating value product to integer value will cause error. E.g., rounded result of `Total * Fraction` and `Total - Total * (1 - Fraction)` may be different, where `Total` is an integer value and `Fraction` is a floating value. Each time we do such rounding, we may get an error with max value 1. Since there are two of such fraction based calculation (for managed memory and network memory), I set the max error allowed here to 2. This approximate matching is only used for matching `PendingTaskManagerSlot` with slot registered from TM. It can be replaced with exact matching once we unify the TM resource configuration. After we unify the TM resource configurations, there will be no more resource calculations on TM side. For Yarn, RM will calculate the resource profile and pass the calculation result to the TM to be started, so the pending slots and the actual slots should have exact same profiles. For standalone, there shouldn't be any pending slots. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11638) Translate "Savepoints" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-11638: --- Assignee: Forward Xu (was: yelun) > Translate "Savepoints" page into Chinese > > > Key: FLINK-11638 > URL: https://issues.apache.org/jira/browse/FLINK-11638 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Congxian Qiu(klion26) >Assignee: Forward Xu >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > doc locates in flink/docs/ops/state/savepoints.zh.md -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11638) Translate "Savepoints" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878324#comment-16878324 ] Jark Wu commented on FLINK-11638: - Hi [~guanghui], [~x1q1j1] has assign this issue to himself and created a pull request. Please ask the assignee before take issue. > Translate "Savepoints" page into Chinese > > > Key: FLINK-11638 > URL: https://issues.apache.org/jira/browse/FLINK-11638 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Congxian Qiu(klion26) >Assignee: yelun >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > doc locates in flink/docs/ops/state/savepoints.zh.md -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11638) Translate "Savepoints" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-11638. --- Resolution: Fixed Fix Version/s: 1.9.0 Fixed in 1.9.0: 4ad19eff713284c0f30c7dba3cd21095baf18d42 > Translate "Savepoints" page into Chinese > > > Key: FLINK-11638 > URL: https://issues.apache.org/jira/browse/FLINK-11638 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Congxian Qiu(klion26) >Assignee: yelun >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > doc locates in flink/docs/ops/state/savepoints.zh.md -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11638) Translate "Savepoints" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878324#comment-16878324 ] Jark Wu edited comment on FLINK-11638 at 7/4/19 5:48 AM: - Hi [~guanghui], [~x1q1j1] has assign this issue to himself and created a pull request before. Please ask the assignee before take issue. was (Author: jark): Hi [~guanghui], [~x1q1j1] has assign this issue to himself and created a pull request. Please ask the assignee before take issue. > Translate "Savepoints" page into Chinese > > > Key: FLINK-11638 > URL: https://issues.apache.org/jira/browse/FLINK-11638 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Congxian Qiu(klion26) >Assignee: yelun >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > doc locates in flink/docs/ops/state/savepoints.zh.md -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] asfgit closed pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese
asfgit closed pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese URL: https://github.com/apache/flink/pull/8300 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8979: [FLINK-13087][table] Add group window Aggregate operator to Table API
flinkbot commented on issue #8979: [FLINK-13087][table] Add group window Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8979#issuecomment-508346981 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13087) Add group window Aggregate operator to Table API
[ https://issues.apache.org/jira/browse/FLINK-13087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-13087: --- Labels: pull-request-available (was: ) > Add group window Aggregate operator to Table API > > > Key: FLINK-13087 > URL: https://issues.apache.org/jira/browse/FLINK-13087 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Add Group Window Aggregate operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab > .window(Tumble over 15.minute on 'rowtime as 'w) > .groupBy('w, 'a) // leave out groupBy-clause to define global aggregates > .agg(fun: AggregateFunction) // output has columns 'a, 'b, 'c > .select('a, 'c, 'w.start) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] hequn8128 opened a new pull request #8979: [FLINK-13087][table] Add group window Aggregate operator to Table API
hequn8128 opened a new pull request #8979: [FLINK-13087][table] Add group window Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8979 ## What is the purpose of the change This pull request adds row-based group window Aggregate operator to Table API. Also, there are some hotfixes in separate commits. The implementation window aggregate operator depends on some hotfixes, so I didn't create other jiras. ## Brief change log There are totally 5 commits: - [#f5e8d1f](https://github.com/apache/flink/commit/f5e8d1f60a1de4ff69e5e713c4382327c149f33c) Resolve indent problem for `WindowFlatAggregateTableImpl`. - [#9395878](https://github.com/apache/flink/commit/939587853716a64c787af17be4610216f9c3b225) Add call support in the group by for row based aggregate. For example, support groupBy('a % b) expressions. - [#ee2b065](https://github.com/apache/flink/commit/ee2b06567ef11569dc29fd3c9622fd8afbc49bcc) Throw exceptions if there is a start in the select after window (table)aggregate. Because we don't know which window properties should be selected. - [#365f31d](https://github.com/apache/flink/commit/365f31da14376b74dd4a3286dbe0ae17be22d4a6) Validate alias length for aggregate. The alias length should equal to the length of result type. - [#deca009](https://github.com/apache/flink/commit/deca0096d4ed9b59718e0a14c8f242bdaaab55d5) Add group window Aggregate operator to Table API For the last commit: Add group window Aggregate operator to Table API, it mainly contains the following changes - Add `aggregate` method to the `WindowGroupedTable` - Build `windowAggregate` QueryOperation in `OperationTreeBuilder` - Add tests and docs ## Verifying this change This change added tests and can be verified as follows: - Added plan tests in `AggregateStringExpressionTest`, `AggregateTest` - Add validation tests in `GroupWindowValidationTest` - Add IT tests in `GroupWindowITCase` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-13090) Test Hive connector with hive runner
Rui Li created FLINK-13090: -- Summary: Test Hive connector with hive runner Key: FLINK-13090 URL: https://issues.apache.org/jira/browse/FLINK-13090 Project: Flink Issue Type: Sub-task Components: Connectors / Hive Reporter: Rui Li Assignee: Rui Li -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] lirui-apache commented on issue #8965: [FLINK-13068][hive] HiveTableSink should implement PartitionableTable…
lirui-apache commented on issue #8965: [FLINK-13068][hive] HiveTableSink should implement PartitionableTable… URL: https://github.com/apache/flink/pull/8965#issuecomment-508344669 Thanks @xuefuz and @bowenli86 for the review. Please take another look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on a change in pull request #8976: [FLINK-12277][table/hive/doc] Add documentation for catalogs
lirui-apache commented on a change in pull request #8976: [FLINK-12277][table/hive/doc] Add documentation for catalogs URL: https://github.com/apache/flink/pull/8976#discussion_r300233788 ## File path: docs/dev/table/catalog.md ## @@ -0,0 +1,345 @@ +--- +title: "Catalog" +is_beta: true +nav-parent_id: tableapi +nav-pos: 100 +--- + + +A catalog can provide information about metadata, such as names, schemas, statistics of tables, and information about how to access data stored in a database or table. Once a catalog is registered to a `TableEnvironment`, all meta-objects defined in a catalog can be accessed from Table API or SQL queries. + + +* This will be replaced by the TOC +{:toc} + + +Catalog Interface +- + +APIs are defined in `Catalog` interface. The interface defines a set of APIs to read and write catalog meta-objects such as database, tables, partitions, views, and functions. + +Users can develop their own catalogs by implementing the interface. + + +Naming Structure in Catalog +--- + +Flink's catalogs use a strict two-level structure, that is, catalogs contain databases, and databases contain meta-objects. Thus, the full name of a meta-object is always structured as `catalogName`.`databaseName`.`objectName`. + +All registered catalogs are managed by a `CatalogManager` instance in `TableEnvironment`. In order to ease access to meta-objects, `CatalogManager` has a concept of current catalog and current database. Usually how users access meta-objects in a catalog is to specify its full name in the format of `catalogName`.`databaseName`.`objectName`. By setting current catalog and current database, users can use just the meta-object's name in their queries. This greatly simplifies user experience. For example, a previous query as + +```sql +select * from mycatalog.mydb.myTable; +``` + +can be shortened as + +```sql +select * from myTable; +``` + +Querying tables in a different databases under the default catalog would be + +``` +select * from mydb2.myTable +``` + +`CatalogManager` always has a built-in `GenericInMemoryCatalog` with name of `default_catalog`, which has a built-in default database named `default_database`. They will be the current catalog and current database if no other catalog and database are explicitly set. All temp meta-objects will be registered to this catalog. Users can set current catalog and database via `TableEnvironment.useCatalog(...)` and `TableEnvironment.useDatabase(...)` in Table API, or `USE CATALOG ...` and `USE DATABASE ...` in Flink SQL. + + +Catalog Types +- + +## GenericInMemoryCatalog + +All meta-objects in this catalog are stored in memory, and be will lost once the session shuts down. + +Its config entry value in SQL CLI yaml file is "generic_in_memory". + +## HiveCatalog + +`HiveCatalog` can read and write both Flink and Hive meta-objects by using Hive Metastore as a persistent storage. + +Its config entry value in SQL CLI yaml file is "hive". + +### Persist Flink meta-objects + +Previously, Flink meta-objects are only stored in memory and are per session based. That means users have to recreate all the meta-objects every time they start a new session. + +To solve this user pain point, users can choose the option to use `HiveCatalog` to persist all of users' Flink streaming and batch meta-objects by using Hive Metastore as a pure storage. Because Hive Metastore is only used for storage in this case, Hive itself may not understand Flink's meta-objects stored in the metastore. + +### Integrate Flink with Hive metadata + +The ultimate goal for integrating Flink with Hive metadata is that: + +1. existing meta-objects, like tables, views, and functions, created by Hive or other Hive-compatible applications can be used by Flink + +2. meta-objects created by `HiveCatalog` can be written back to Hive metastore such that Hive and other Hive-compatibile applications can consume. + +## User-configured Catalog + +Catalogs are pluggable, and users can use their own, customized catalog implementations. + + +HiveCatalog +--- + +## Supported Hive Versions + +`HiveCatalog` officially supports Hive 2.3.4 and 1.2.1, and depends on Hive's own compatibility for the other 2.x.x and 1.x.x versions. + +Users need to explicitly specify the Hive version as string, by either passing it to the constructor when creating `HiveCatalog` instances directly in Table API or specifying it in yaml config file in SQL CLI. The Hive version string will be either `2.3.4`, `1.2.1`, or your own 2.x.x/1.x.x versions. + +## Dependencies + +In order to use `HiveCatalog`, users need to either downloading the following dependency jars and adding them to the `/lib` dir in Flink distribution, or adding their existing Hive jars to Flink's classpath in order for Flink to find them at runtime. + +Take Hive 2.3.4 for example: + +``` +// Hive dependencies + +- hive-metastore-2.3.4.jar
[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(… URL: https://github.com/apache/flink/pull/8844#discussion_r300221923 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java ## @@ -245,6 +247,57 @@ public String explain(Table table) { return planner.getCompletionHints(statement, position); } + @Override + public void sql(String statement) { + List operations = planner.parse(statement); + operations.forEach(operation -> { + if (operation instanceof CreateTableOperation) { + CreateTableOperation operation1 = (CreateTableOperation) operation; + registerTable( + operation1.getTablePath(), + operation1.getCatalogTable(), + operation1.isIgnoreIfExists()); + } else if (operation instanceof ModifyOperation) { + queryConfigProvider.setConfig(new StreamQueryConfig()); + List> transformations = + planner.translate(Collections.singletonList((ModifyOperation) operation)); + + execEnv.apply(transformations); Review comment: What do you mean by "translated independently" ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(… URL: https://github.com/apache/flink/pull/8844#discussion_r300223062 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java ## @@ -304,6 +304,22 @@ public Builder() { fieldDataTypes = new ArrayList<>(); } + /** Create a proto builder from an existing schema. +* +* Caution: This will invoke {@link #instance()}} to fetch a new builder first. +*/ + public Builder proto(TableSchema other) { Review comment: Create a builder from another TableSchema. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(… URL: https://github.com/apache/flink/pull/8844#discussion_r300221812 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java ## @@ -287,6 +287,46 @@ @Deprecated String[] getCompletionHints(String statement, int position); + /** +* Evaluates mixed-in sql statements including DDLs and DMLs. +* Note: Always use this interface to execute a sql query if no {@link Table} +* result are expected, else use {@link #sqlQuery(String)}. Review comment: The sqlQuery should returns a Table object which will make the semantics of the interface not that clear, say: how about we have a sql select statement among the multiple statements ? It does not make any sense if we support a single select select statement but return nothing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] becketqin commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
becketqin commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r300232263 ## File path: flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java ## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; + +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; + +/** + * A sink function that outputs to PubSub. + * + * @param type of PubSubSink messages to write + */ +public class PubSubSink extends RichSinkFunction implements CheckpointedFunction { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSink.class); + + private final Credentials credentials; + private final SerializationSchema serializationSchema; + private final String projectName; + private final String topicName; + private final String hostAndPortForEmulator; + + private transient Publisher publisher; + + private PubSubSink( + Credentials credentials, + SerializationSchema serializationSchema, + String projectName, + String topicName, + String hostAndPortForEmulator) { + this.credentials = credentials; + this.serializationSchema = serializationSchema; + this.projectName = projectName; + this.topicName = topicName; + this.hostAndPortForEmulator = hostAndPortForEmulator; + } + + private transient ManagedChannel managedChannel = null; + private transient TransportChannel channel = null; + + @Override + public void open(Configuration configuration) throws Exception { + Publisher.Builder builder = Publisher + .newBuilder(ProjectTopicName.of(projectName, topicName)) + .setCredentialsProvider(FixedCredentialsProvider.create(credentials)); + + if (hostAndPortForEmulator != null) { + managedChannel = ManagedChannelBuilder + .forTarget(hostAndPortForEmulator) + .usePlaintext(true) // This is 'Ok' because this is ONLY used for testing. + .build(); + channel = GrpcTransportChannel.newBuilder().setManagedChannel(managedChannel).build(); + builder.setChannelProvider(FixedTransportChannelProvider.create(channel)) + .setCredentialsProvider(NoCredentialsProvider.create()); + } + + publisher = builder.build(); + } + + @Override + public void close() throws Exception { + super.close(); + shutdownPublisher(); +
[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(… URL: https://github.com/apache/flink/pull/8844#discussion_r300221301 ## File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java ## @@ -235,6 +236,40 @@ public String getColumnSqlString() { return writer.toString(); } + /** Split the computed columns sql into string k-v pairs which are put into +* the {@code container}. +* +* For example, {@code col1 as to_timestamp(col2)} would be split into pair: +* (col1, to_timestamp(col2)). +**/ + public void getComputedColumnExpressions(List> container) { Review comment: Removed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(… URL: https://github.com/apache/flink/pull/8844#discussion_r300222616 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java ## @@ -245,6 +247,57 @@ public String explain(Table table) { return planner.getCompletionHints(statement, position); } + @Override + public void sql(String statement) { + List operations = planner.parse(statement); + operations.forEach(operation -> { + if (operation instanceof CreateTableOperation) { + CreateTableOperation operation1 = (CreateTableOperation) operation; + registerTable( + operation1.getTablePath(), + operation1.getCatalogTable(), + operation1.isIgnoreIfExists()); + } else if (operation instanceof ModifyOperation) { + queryConfigProvider.setConfig(new StreamQueryConfig()); + List> transformations = + planner.translate(Collections.singletonList((ModifyOperation) operation)); + + execEnv.apply(transformations); + } else { + throw new ValidationException( + "Unsupported SQL statement: sql() only accepts DDLs or Inserts."); + } + }); + } + + /** +* Registers a {@link CatalogBaseTable} under a given object path. The {@code path} could be +* 3 formats: +* +* `catalog.db.table`: A full table path including the catalog name, +* the database name and table name. +* `db.table`: database name following table name, with the current catalog name. +* `table`: Only the table name, with the current catalog name and database name. +* +* The registered tables then can be referenced in Sql queries. +* +* @param path The path under which the table will be registered +* @param catalogTable The table to register +* @param ignoreIfExists If true, do nothing if there is already same table name under +* the {@code path}. If false, a TableAlreadyExistException throws. +*/ + private void registerTable(String[] path, CatalogBaseTable catalogTable, boolean ignoreIfExists) { Review comment: merged into `registerTableInternal` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(… URL: https://github.com/apache/flink/pull/8844#discussion_r300222001 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java ## @@ -245,6 +247,57 @@ public String explain(Table table) { return planner.getCompletionHints(statement, position); } + @Override + public void sql(String statement) { + List operations = planner.parse(statement); + operations.forEach(operation -> { + if (operation instanceof CreateTableOperation) { + CreateTableOperation operation1 = (CreateTableOperation) operation; + registerTable( + operation1.getTablePath(), + operation1.getCatalogTable(), + operation1.isIgnoreIfExists()); + } else if (operation instanceof ModifyOperation) { + queryConfigProvider.setConfig(new StreamQueryConfig()); + List> transformations = + planner.translate(Collections.singletonList((ModifyOperation) operation)); + + execEnv.apply(transformations); + } else { + throw new ValidationException( Review comment: Yep, no bridge logic in this patch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(… URL: https://github.com/apache/flink/pull/8844#discussion_r300223365 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlExecutableStatements.java ## @@ -18,62 +18,197 @@ package org.apache.flink.table.sqlexec; +import org.apache.flink.sql.parser.SqlProperty; import org.apache.flink.sql.parser.ddl.SqlCreateTable; -import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.sql.parser.ddl.SqlTableColumn; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.calcite.FlinkPlannerImpl; +import org.apache.flink.table.calcite.FlinkTypeFactory; +import org.apache.flink.table.calcite.FlinkTypeSystem; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogTableImpl; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.PlannerQueryOperation; +import org.apache.flink.table.operations.ddl.CreateTableOperation; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.util.StringUtils; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.util.ReflectUtil; import org.apache.calcite.util.ReflectiveVisitor; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + /** * Mix-in tool class for {@code SqlNode} that allows DDL commands to be * executed directly. * - * For every kind of {@link SqlNode}, there needs a method named - * #execute(type), the 'type' argument should be the subclass - * type for the supported {@link SqlNode}. + * For every kind of {@link SqlNode}, there needs to have a corresponding + * #execute(type) method, the 'type' argument should be the subclass + * of the supported {@link SqlNode}. + * + * Every #execute() should return a {@link Operation} which can be used in + * {@link org.apache.flink.table.delegation.Planner}. */ public class SqlExecutableStatements implements ReflectiveVisitor { - private TableEnvironment tableEnv; + private FlinkPlannerImpl flinkPlanner; - private final ReflectUtil.MethodDispatcher dispatcher = - ReflectUtil.createMethodDispatcher(Void.class, + private final ReflectUtil.MethodDispatcher dispatcher = + ReflectUtil.createMethodDispatcher(Operation.class, Review comment: Visitor returns a SqlNode while i need a Operation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] becketqin commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
becketqin commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r300215868 ## File path: flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java ## @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeIdsForCheckpoint; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeOnCheckpoint; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.Acknowledger; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.util.Preconditions; + +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.ReceivedMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CancellationException; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; + +/** + * PubSub Source, this Source will consume PubSub messages from a subscription and Acknowledge them on the next checkpoint. + * This ensures every message will get acknowledged at least once. + */ +public class PubSubSource extends RichSourceFunction + implements ResultTypeQueryable, ParallelSourceFunction, CheckpointListener, ListCheckpointed> { + public static final int NO_MAX_MESSAGES_TO_ACKNOWLEDGE_LIMIT = -1; + private static final Logger LOG = LoggerFactory.getLogger(PubSubSource.class); + protected final PubSubDeserializationSchema deserializationSchema; + protected final PubSubSubscriberFactory pubSubSubscriberFactory; + protected final Credentials credentials; + protected final int maxMessagesToAcknowledge; + protected final AcknowledgeOnCheckpointFactory acknowledgeOnCheckpointFactory; + + protected transient AcknowledgeOnCheckpoint acknowledgeOnCheckpoint; + protected transient PubSubSubscriber subscriber; + + protected transient volatile boolean isRunning; + + PubSubSource(PubSubDeserializationSchema deserializationSchema, + PubSubSubscriberFactory pubSubSubscriberFactory, + Credentials credentials, + int maxMessagesToAcknowledge, + AcknowledgeOnCheckpointFactory acknowledgeOnCheckpointFactory) { + this.deserializationSchema = deserializationSchema; + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + this.credentials = credentials; + this.maxMessagesToAcknowledge = maxMessagesToAcknowledge; + this.acknowledgeOnCheckpointFactory = acknowledgeOnCheckpointFactory; + } + + @Override + public void open(Configuration configuration) throws Exception { +
[GitHub] [flink] becketqin commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
becketqin commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r300223762 ## File path: flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java ## @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; + +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; +import static org.apache.flink.runtime.concurrent.Executors.directExecutor; + +/** + * A sink function that outputs to PubSub. + * + * @param type of PubSubSink messages to write + */ +public class PubSubSink extends RichSinkFunction implements CheckpointedFunction { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSink.class); + + private final ApiFutureCallback failureHandler; + private final Credentials credentials; + private final SerializationSchema serializationSchema; + private final String projectName; + private final String topicName; + private final String hostAndPortForEmulator; + + private transient Publisher publisher; + + private PubSubSink( + Credentials credentials, + SerializationSchema serializationSchema, + String projectName, + String topicName, + String hostAndPortForEmulator) { + this.failureHandler = new FailureHandler(); + this.credentials = credentials; + this.serializationSchema = serializationSchema; + this.projectName = projectName; + this.topicName = topicName; + this.hostAndPortForEmulator = hostAndPortForEmulator; + } + + private transient ManagedChannel managedChannel = null; + private transient TransportChannel channel = null; + + @Override + public void open(Configuration configuration) throws Exception { + Publisher.Builder builder = Publisher + .newBuilder(ProjectTopicName.of(projectName, topicName)) + .setCredentialsProvider(FixedCredentialsProvider.create(credentials)); + + if (hostAndPortForEmulator != null) { + managedChannel = ManagedChannelBuilder + .forTarget(hostAndPortForEmulator) + .usePlaintext(true) // This is 'Ok' because this is ONLY used for testing. + .build(); + channel = GrpcTransportChannel.newBuilder().setManagedChannel(managedChannel).build(); + builder.setChannelProvider(FixedTransportChannelProvider.create(channel)) +
[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(… URL: https://github.com/apache/flink/pull/8844#discussion_r300222783 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java ## @@ -245,6 +247,57 @@ public String explain(Table table) { return planner.getCompletionHints(statement, position); } + @Override + public void sql(String statement) { + List operations = planner.parse(statement); + operations.forEach(operation -> { + if (operation instanceof CreateTableOperation) { + CreateTableOperation operation1 = (CreateTableOperation) operation; + registerTable( + operation1.getTablePath(), + operation1.getCatalogTable(), + operation1.isIgnoreIfExists()); + } else if (operation instanceof ModifyOperation) { + queryConfigProvider.setConfig(new StreamQueryConfig()); + List> transformations = + planner.translate(Collections.singletonList((ModifyOperation) operation)); + + execEnv.apply(transformations); + } else { + throw new ValidationException( + "Unsupported SQL statement: sql() only accepts DDLs or Inserts."); + } + }); + } + + /** +* Registers a {@link CatalogBaseTable} under a given object path. The {@code path} could be +* 3 formats: +* +* `catalog.db.table`: A full table path including the catalog name, +* the database name and table name. +* `db.table`: database name following table name, with the current catalog name. +* `table`: Only the table name, with the current catalog name and database name. +* +* The registered tables then can be referenced in Sql queries. +* +* @param path The path under which the table will be registered +* @param catalogTable The table to register +* @param ignoreIfExists If true, do nothing if there is already same table name under +* the {@code path}. If false, a TableAlreadyExistException throws. +*/ + private void registerTable(String[] path, CatalogBaseTable catalogTable, boolean ignoreIfExists) { + String[] fullName = catalogManager.paddedTableName(Arrays.asList(path)); + Catalog catalog = getCatalog(fullName[0]).orElseThrow(() -> + new TableException("Catalog " + fullName[0] + " does not exist")); + ObjectPath objectPath = new ObjectPath(fullName[1], fullName[2]); + try { + catalog.createTable(objectPath, catalogTable, ignoreIfExists); + } catch (TableAlreadyExistException | DatabaseNotExistException e) { + throw new RuntimeException(e); Review comment: I don't want the invoker to force try catch the exception. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(… URL: https://github.com/apache/flink/pull/8844#discussion_r300221845 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java ## @@ -245,6 +247,57 @@ public String explain(Table table) { return planner.getCompletionHints(statement, position); } + @Override + public void sql(String statement) { + List operations = planner.parse(statement); + operations.forEach(operation -> { + if (operation instanceof CreateTableOperation) { + CreateTableOperation operation1 = (CreateTableOperation) operation; Review comment: better name, thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(… URL: https://github.com/apache/flink/pull/8844#discussion_r300222964 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateOperation.java ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.ddl; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.operations.Operation; + +/** + * A {@link Operation} that describes the DDL statements, e.g. CREATE TABLE or CREATE FUNCTION. + * + * Different sub operations can have their special instances. For example, a + * create table operation will have a {@link org.apache.flink.table.catalog.CatalogTable} instance, + * while a create function operation will have a + * {@link org.apache.flink.table.catalog.CatalogFunction} instance. + */ +@Internal +public interface CreateOperation extends Operation { Review comment: It is a remark interface, just like `ModifyOperation` and `QueryOperation`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #8704: [FLINK-12812][runtime] Set resource profiles for task slots
xintongsong commented on a change in pull request #8704: [FLINK-12812][runtime] Set resource profiles for task slots URL: https://github.com/apache/flink/pull/8704#discussion_r300232458 ## File path: flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java ## @@ -240,7 +254,8 @@ public String toString() { ", heapMemoryInMB=" + heapMemoryInMB + ", directMemoryInMB=" + directMemoryInMB + ", nativeMemoryInMB=" + nativeMemoryInMB + - ", stateSizeInMB=" + stateSizeInMB + extend + + ", stateSizeInMB=" + stateSizeInMB + + ", managedMemoryInMB=" + managedMemoryInMB + extend + Review comment: I don't think there is any missing space. The string `extend` is either start with ", " or empty. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r300227447 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。 Review comment: yes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TsReaper commented on a change in pull request #8682: [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead to blink
TsReaper commented on a change in pull request #8682: [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead to blink URL: https://github.com/apache/flink/pull/8682#discussion_r300226601 ## File path: flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/DecimalSerializerTest.java ## @@ -22,7 +22,7 @@ import org.apache.flink.table.dataformat.Decimal; /** - * A test for the {@link BinaryArraySerializer}. + * A test for the {@link BaseArraySerializer}. Review comment: Shoud be "DecimalSerializer"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TsReaper commented on a change in pull request #8682: [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead to blink
TsReaper commented on a change in pull request #8682: [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead to blink URL: https://github.com/apache/flink/pull/8682#discussion_r300226648 ## File path: flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryRowSerializerTest.java ## @@ -24,7 +24,7 @@ import org.apache.flink.table.dataformat.BinaryString; /** - * A test for the {@link BinaryArraySerializer}. + * A test for the {@link BaseArraySerializer}. Review comment: Should be "BinaryRowSerializer"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TsReaper commented on a change in pull request #8682: [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead to blink
TsReaper commented on a change in pull request #8682: [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead to blink URL: https://github.com/apache/flink/pull/8682#discussion_r300226705 ## File path: flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryGenericSerializerTest.java ## @@ -23,7 +23,7 @@ import org.apache.flink.table.dataformat.BinaryGeneric; /** - * A test for the {@link BinaryArraySerializer}. + * A test for the {@link BaseArraySerializer}. Review comment: Should be "BinaryGenericSerializer"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document
wuchong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document URL: https://github.com/apache/flink/pull/8937#discussion_r300217135 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableConfig.scala ## @@ -213,36 +226,29 @@ class TableConfig { * * @param minTime The minimum time interval for which idle state is retained. Set to 0 (zero) to *never clean-up the state. -* @param maxTime The maximum time interval for which idle state is retained. May not be smaller -*than than minTime. Set to 0 (zero) to never clean-up the state. */ - def withIdleStateRetentionTime(minTime: Time, maxTime: Time): TableConfig = { -if (maxTime.toMilliseconds < minTime.toMilliseconds) { - throw new IllegalArgumentException("maxTime may not be smaller than minTime.") -} -this.conf.setLong(TableConfigOptions.SQL_EXEC_STATE_TTL_MS, minTime.toMilliseconds) -this.conf.setLong(TableConfigOptions.SQL_EXEC_STATE_TTL_MAX_MS, maxTime.toMilliseconds) + def withIdleStateRetentionTime(minTime: Time): TableConfig = { +this.conf.setString(TableConfigOptions.SQL_EXEC_STATE_TTL, + String.valueOf(minTime.toMilliseconds) + " ms") Review comment: simplify to `minTime.toString`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document
wuchong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document URL: https://github.com/apache/flink/pull/8937#discussion_r300217863 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableConfig.scala ## @@ -200,9 +202,20 @@ class TableConfig { } } + def getMillisecondFromConfigDuration(config: ConfigOption[String]): Long = { +val duration = Duration.create(this.conf.getString(config)) Review comment: TableConfig will be moved to `api-java` module before 1.9 release, it will not depend on scala. So I think we should implement the parser ourselves or find other alternatives. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document
wuchong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document URL: https://github.com/apache/flink/pull/8937#discussion_r300215876 ## File path: flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java ## @@ -59,6 +59,27 @@ int position() default Integer.MAX_VALUE; } + /** +* Annotation used on table config to introduce additional table-related meta data. +* +* The {@link TableMeta#execMode()} argument indicates which exec mode the config works for, +* for batch, streaming or both. +* +*/ + @Target(ElementType.FIELD) + @Retention(RetentionPolicy.RUNTIME) + @Internal + public @interface TableMeta { + ExecMode execMode(); + } + + /** +* Which exec mode the config works for. +*/ + public enum ExecMode { + BATCH, STREAMING, BOTH Review comment: What about renaming `BOTH` to `BATCH_STREAMING` to explicitly statement it works for both batch and streaming and more align with the generated tags. Just `BOTH` doesn't know what's BOTH. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on issue #8937: [FLINK-13040] promote blink table config and add to document
wuchong commented on issue #8937: [FLINK-13040] promote blink table config and add to document URL: https://github.com/apache/flink/pull/8937#issuecomment-508335301 Btw, I also suggest to split this pull request into 2 commits: 1. add documentation annotation for table 2. improve options in blink This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document
wuchong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document URL: https://github.com/apache/flink/pull/8937#discussion_r300215596 ## File path: docs/ops/config.md ## @@ -209,6 +209,13 @@ You have to configure `jobmanager.archive.fs.dir` in order to archive terminated {% include generated/history_server_configuration.html %} + +### Blink Table Planner +{% include generated/planner_config_configuration.html %} + +### Blink Table Runtime Review comment: I don't think we should place the configuration in this page. I suggest to create a new page for sql configurations under `/dev/table/config.md`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] beyond1920 commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.
beyond1920 commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations. URL: https://github.com/apache/flink/pull/8977#discussion_r300226150 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/AggregateVisitors.java ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.calcite.FlinkTypeFactory; +import org.apache.flink.table.functions.AggregateFunction; +import org.apache.flink.table.functions.AggregateFunctionDefinition; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.UserDefinedAggregateFunction; +import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable; +import org.apache.flink.table.functions.utils.AggSqlFunction; +import org.apache.flink.util.Preconditions; + +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.tools.RelBuilder; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.isFunctionOfKind; +import static org.apache.flink.table.functions.FunctionKind.AGGREGATE; +import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; + +/** + * The class contains all kinds of visitors to visit Aggregate. + */ +public class AggregateVisitors { + + private static final Map AGG_DEF_SQL_OPERATOR_MAPPING = new HashMap<>(); + + static { + AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.AVG, FlinkSqlOperatorTable.AVG); + AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.COUNT, FlinkSqlOperatorTable.COUNT); + AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.MAX, FlinkSqlOperatorTable.MAX); + AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.MIN, FlinkSqlOperatorTable.MIN); + AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SUM, FlinkSqlOperatorTable.SUM); + AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SUM0, FlinkSqlOperatorTable.SUM0); + AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.STDDEV_POP, FlinkSqlOperatorTable.STDDEV_POP); + AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.STDDEV_SAMP, FlinkSqlOperatorTable.STDDEV_SAMP); + AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.VAR_POP, FlinkSqlOperatorTable.VAR_POP); + AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.VAR_SAMP, FlinkSqlOperatorTable.VAR_SAMP); + AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.COLLECT, FlinkSqlOperatorTable.COLLECT); + } + + static class AggFunctionVisitor extends ExpressionDefaultVisitor { + private final FlinkTypeFactory typeFactory; + + AggFunctionVisitor(FlinkTypeFactory typeFactory) { + this.typeFactory = typeFactory; + } + + @Override + public SqlAggFunction visit(CallExpression call) { + Preconditions.checkArgument(isFunctionOfKind(call, AGGREGATE)); + FunctionDefinition def = call.getFunctionDefinition(); + if (AGG_DEF_SQL_OPERATOR_MAPPING.containsKey(def)) { + return AGG_DEF_SQL_OPERATOR_MAPPING.get(def); + } + if (BuiltInFunctionDefinitions.DISTINCT.equals(def)) { + Expression innerAgg = call.getChildren().get(0); + return innerAgg.accept(this); + } + AggregateFunctionDefinition aggDef = (AggregateFunctionDefinition) def; + UserDefinedAggregateFunction userDefinedAggregateFunc
[GitHub] [flink] beyond1920 commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.
beyond1920 commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations. URL: https://github.com/apache/flink/pull/8977#discussion_r300226114 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/AggregateVisitors.java ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.calcite.FlinkTypeFactory; +import org.apache.flink.table.functions.AggregateFunction; +import org.apache.flink.table.functions.AggregateFunctionDefinition; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.UserDefinedAggregateFunction; +import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable; +import org.apache.flink.table.functions.utils.AggSqlFunction; +import org.apache.flink.util.Preconditions; + +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.tools.RelBuilder; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.isFunctionOfKind; +import static org.apache.flink.table.functions.FunctionKind.AGGREGATE; +import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; + +/** + * The class contains all kinds of visitors to visit Aggregate. + */ +public class AggregateVisitors { Review comment: AggregateVisitors.AggFunctionVisitor used by RexNodeConverter. AggregateVisitors.AggCallVisitor used by QueryOperationConverter. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8978: [FLINK-13089][table-planner-blink] Implement batch nested loop join and add join it cases in blink
flinkbot commented on issue #8978: [FLINK-13089][table-planner-blink] Implement batch nested loop join and add join it cases in blink URL: https://github.com/apache/flink/pull/8978#issuecomment-508334487 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13089) Implement batch nested loop join in blink
[ https://issues.apache.org/jira/browse/FLINK-13089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-13089: --- Labels: pull-request-available (was: ) > Implement batch nested loop join in blink > - > > Key: FLINK-13089 > URL: https://issues.apache.org/jira/browse/FLINK-13089 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > > Nested loop join has two advantages: > 1.Nested loop join is quicker when build row size is small. > 2.Nested loop join support all kind of joins, include non-key join. > Plan: > Introduce NestedLoopJoinCodeGenerator. > Implement BatchExecNestedLoopJoin. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] xintongsong commented on issue #8704: [FLINK-12812][runtime] Set resource profiles for task slots
xintongsong commented on issue #8704: [FLINK-12812][runtime] Set resource profiles for task slots URL: https://github.com/apache/flink/pull/8704#issuecomment-508334308 Thanks for the review, @StephanEwen. I would like to explain regarding your concern about the assumption of RM/TM having same configuration: The reason we need to calculate TM's slot resource profiles on RM side is that, we need to set resource profile for `PendingTaskManagerSlot` before the corresponding TM is started. Currently, Flink can assign a pending slot to a slot request before the TM is started and registered. In this way, the subsequent slot requests will first consume slots on the pending TM (for multi-slot TMs) before requesting and launching a new one. When the TM is registered, the SlotManager matches the registered new slot to a `PendingTaskManagerSlot` with the same resource profile, and assigns the registered slot to the same slot request that the pending slot is assigned to (if any). Before this PR, both the pending slot on RM side and the actual slot on TM side have the same resource profile `ANY`, which can be matched with the method `equals`. Since this PR sets the slot resource profile on TM side to the actual resource of the slot, we need to set the resource profile for the pending slots on RM side in the same way. This is way I introduced calculating TM's slot resource profiles on RM side, and the approximate matching. The assigning over pending slots and the RM side slot resource calculating only happens on Yarn/Mesos. In these scenarios, TMs do have the same configuration as RM does, which is transmitted from RM side. For a standalone cluster, there should be no pending slots because RM can not actively start any TM. Except for the `PendingTaskManagerSlot`, RM does use the slot resource profile reported from TM for matching slot request against registered slots, and converting requested `UNKNOWN` resource profile to a default value (as shown in the following PR #8846 for dynamic managed memory). Therefore, it should not cause problems on a standalone cluster with TMs having different configs. It's my bad not making these clear in codes and comments. For the rest of your comments, I'll address them ASAP. I especially admire your suggestions on encapsulation and simplifying tests. It's a good lesson for me. Thank you again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi opened a new pull request #8978: [FLINK-13089][table-planner-blink] Implement batch nested loop join in blink
JingsongLi opened a new pull request #8978: [FLINK-13089][table-planner-blink] Implement batch nested loop join in blink URL: https://github.com/apache/flink/pull/8978 ## What is the purpose of the change Nested loop join has two advantages: 1.Nested loop join is quicker when build row size is small. 2.Nested loop join support all kind of joins, include non-key join. ## Brief change log Introduce NestedLoopJoinCodeGenerator. Implement BatchExecNestedLoopJoin. Add join it cases. ## Verifying this change it cases ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13089) Implement batch nested loop join in blink
[ https://issues.apache.org/jira/browse/FLINK-13089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated FLINK-13089: - Description: Nested loop join has two advantages: 1.Nested loop join is quicker when build row size is small. 2.Nested loop join support all kind of joins, include non-key join. Plan: Introduce NestedLoopJoinCodeGenerator. Implement BatchExecNestedLoopJoin. was: Introduce NestedLoopJoinCodeGenerator. Implement BatchExecNestedLoopJoin. > Implement batch nested loop join in blink > - > > Key: FLINK-13089 > URL: https://issues.apache.org/jira/browse/FLINK-13089 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > > Nested loop join has two advantages: > 1.Nested loop join is quicker when build row size is small. > 2.Nested loop join support all kind of joins, include non-key join. > Plan: > Introduce NestedLoopJoinCodeGenerator. > Implement BatchExecNestedLoopJoin. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-13086) add Chinese documentation for catalogs
[ https://issues.apache.org/jira/browse/FLINK-13086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-13086: - Description: the ticket for corresponding English documentation is FLINK-12277 > add Chinese documentation for catalogs > -- > > Key: FLINK-13086 > URL: https://issues.apache.org/jira/browse/FLINK-13086 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Connectors / Hive, Documentation, > Table SQL / API >Reporter: Bowen Li >Assignee: frank wang >Priority: Major > Fix For: 1.9.0 > > > the ticket for corresponding English documentation is FLINK-12277 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13089) Implement batch nested loop join in blink
Jingsong Lee created FLINK-13089: Summary: Implement batch nested loop join in blink Key: FLINK-13089 URL: https://issues.apache.org/jira/browse/FLINK-13089 Project: Flink Issue Type: New Feature Components: Table SQL / Planner Reporter: Jingsong Lee Assignee: Jingsong Lee Introduce NestedLoopJoinCodeGenerator. Implement BatchExecNestedLoopJoin. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.
JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations. URL: https://github.com/apache/flink/pull/8977#discussion_r300223342 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/AggregateVisitors.java ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.calcite.FlinkTypeFactory; +import org.apache.flink.table.functions.AggregateFunction; +import org.apache.flink.table.functions.AggregateFunctionDefinition; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.UserDefinedAggregateFunction; +import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable; +import org.apache.flink.table.functions.utils.AggSqlFunction; +import org.apache.flink.util.Preconditions; + +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.tools.RelBuilder; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.isFunctionOfKind; +import static org.apache.flink.table.functions.FunctionKind.AGGREGATE; +import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; + +/** + * The class contains all kinds of visitors to visit Aggregate. + */ +public class AggregateVisitors { + + private static final Map AGG_DEF_SQL_OPERATOR_MAPPING = new HashMap<>(); + + static { + AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.AVG, FlinkSqlOperatorTable.AVG); + AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.COUNT, FlinkSqlOperatorTable.COUNT); + AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.MAX, FlinkSqlOperatorTable.MAX); + AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.MIN, FlinkSqlOperatorTable.MIN); + AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SUM, FlinkSqlOperatorTable.SUM); + AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SUM0, FlinkSqlOperatorTable.SUM0); + AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.STDDEV_POP, FlinkSqlOperatorTable.STDDEV_POP); + AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.STDDEV_SAMP, FlinkSqlOperatorTable.STDDEV_SAMP); + AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.VAR_POP, FlinkSqlOperatorTable.VAR_POP); + AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.VAR_SAMP, FlinkSqlOperatorTable.VAR_SAMP); + AGG_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.COLLECT, FlinkSqlOperatorTable.COLLECT); + } + + static class AggFunctionVisitor extends ExpressionDefaultVisitor { + private final FlinkTypeFactory typeFactory; + + AggFunctionVisitor(FlinkTypeFactory typeFactory) { + this.typeFactory = typeFactory; + } + + @Override + public SqlAggFunction visit(CallExpression call) { + Preconditions.checkArgument(isFunctionOfKind(call, AGGREGATE)); + FunctionDefinition def = call.getFunctionDefinition(); + if (AGG_DEF_SQL_OPERATOR_MAPPING.containsKey(def)) { + return AGG_DEF_SQL_OPERATOR_MAPPING.get(def); + } + if (BuiltInFunctionDefinitions.DISTINCT.equals(def)) { + Expression innerAgg = call.getChildren().get(0); + return innerAgg.accept(this); + } + AggregateFunctionDefinition aggDef = (AggregateFunctionDefinition) def; + UserDefinedAggregateFunction userDefinedAggregateFunc
[GitHub] [flink] banmoy commented on issue #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable
banmoy commented on issue #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable URL: https://github.com/apache/flink/pull/8611#issuecomment-508331445 @StefanRRichter Thanks for the review. I have addressed the comments, and please help to review again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-12926) Main thread checking in some tests fails
[ https://issues.apache.org/jira/browse/FLINK-12926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878302#comment-16878302 ] Zhu Zhu edited comment on FLINK-12926 at 7/4/19 4:12 AM: - Hi [~till.rohrmann], from my observation, the issue is happening though it does not break current tests. Below are some cases it happens or may happen: 1. Even though the tests do not trigger actions from other thread, the production logic might do it, e.g. *Execution#deploy()* as shown in the attached picture, which happens in most of the tests mentioned above. It does not break tests since it is not in the critical path and the failed main thread checking does not cause failovers. !Execution#deploy.jpg|width=568,height=328! 2. The *TestingComponentMainThreadExecutorServiceAdapter* uses *DirectScheduledExecutorService* as the underlying ScheduledExecutorService. However, DirectScheduledExecutorService will schedule tasks from another thread. So if any *mainThreadExecutor.schedule** action is invoked in tests or production process, it may also violate the main thread checking. No test breaks for it yet. But I think we just fortunately dodged(Or intentional?). e.g. - FixedDelayRestartStrategy. No test breaks because no test uses FixedDelayRestartStrategy to do failover yet. - HeartbeatMonitor. No test breaks because it does not check main thread, HeartbeatManagerTest#testHeartbeatTimeout actually does the timeout handling in another pool thread. I'd be OK to close this issue as no test breaks yet, as long as we are already aware of this. The manual executor way as we explored in FLINK-12876 can be a solution for this case. was (Author: zhuzh): Hi [~till.rohrmann], from my observation, the issue is happening though it does not break current tests. Below are some cases it happens or may happen: 1. Even though the tests do not trigger actions from other thread, the production logic might do it, e.g. *Execution#deploy()* as shown in the attached picture, which happens in most of the tests mentioned above. It does not break tests since it is not in the critical path and the failed main thread checking does not cause failovers. 2. Besides, the *TestingComponentMainThreadExecutorServiceAdapter* uses *DirectScheduledExecutorService* as the underlying ScheduledExecutorService. However, DirectScheduledExecutorService will schedule tasks from another thread. So if any mainThreadExecutor.schedule* action is invoked in tests or production process, it may also violate the main thread checking. No test breaks for it yet. But I think we just fortunately dodged(Or intentional?). e.g. - FixedDelayRestartStrategy. No test breaks because no test uses FixedDelayRestartStrategy to do failover yet. - HeartbeatMonitor. No test breaks because it does not check main thread, HeartbeatManagerTest#testHeartbeatTimeout actually does the timeout handling in another pool thread. I'd be OK to close this issue as no test breaks yet, as long as we are already aware of this. The manual executor way as we explored in [FLINK-12876 |https://issues.apache.org/jira/browse/FLINK-12876] can be a solution for this case. !Execution#deploy.jpg|width=568,height=328! > Main thread checking in some tests fails > > > Key: FLINK-12926 > URL: https://issues.apache.org/jira/browse/FLINK-12926 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.9.0 >Reporter: Zhu Zhu >Priority: Major > Attachments: Execution#deploy.jpg, mainThreadCheckFailure.log > > > Currently all JM side job changing actions are expected to be taken in > JobMaster main thread. > In current Flink tests, many cases tend to use the test main thread as the JM > main thread. This can lead to 2 issues: > 1. TestingComponentMainThreadExecutorServiceAdapter is a direct executor, so > if it is invoked from any other thread, it will break the main thread > checking and fail the submitted action (as in the attached log > [^mainThreadCheckFailure.log]) > 2. The test main thread does not support other actions queued in its > executor, as the test will end once the current test thread action(the > current running test body) is done > > In my observation, most cases which starts > ExecutionGraph.scheduleForExecution() will encounter this issue. Cases > include ExecutionGraphRestartTest, FailoverRegionTest, > ConcurrentFailoverStrategyExecutionGraphTest, GlobalModVersionTest, > ExecutionGraphDeploymentTest, etc. > > One solution in my mind is to create a ScheduledExecutorService for those > tests, use it as the main thread and run the test body in this thread. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12926) Main thread checking in some tests fails
[ https://issues.apache.org/jira/browse/FLINK-12926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878302#comment-16878302 ] Zhu Zhu edited comment on FLINK-12926 at 7/4/19 4:10 AM: - Hi [~till.rohrmann], from my observation, the issue is happening though it does not break current tests. Below are some cases it happens or may happen: 1. Even though the tests do not trigger actions from other thread, the production logic might do it, e.g. *Execution#deploy()* as shown in the attached picture, which happens in most of the tests mentioned above. It does not break tests since it is not in the critical path and the failed main thread checking does not cause failovers. 2. Besides, the *TestingComponentMainThreadExecutorServiceAdapter* uses *DirectScheduledExecutorService* as the underlying ScheduledExecutorService. However, DirectScheduledExecutorService will schedule tasks from another thread. So if any mainThreadExecutor.schedule* action is invoked in tests or production process, it may also violate the main thread checking. No test breaks for it yet. But I think we just fortunately dodged(Or intentional?). e.g. - FixedDelayRestartStrategy. No test breaks because no test uses FixedDelayRestartStrategy to do failover yet. - HeartbeatMonitor. No test breaks because it does not check main thread, HeartbeatManagerTest#testHeartbeatTimeout actually does the timeout handling in another pool thread. I'd be OK to close this issue as no test breaks yet, as long as we are already aware of this. The manual executor way as we explored in [FLINK-12876 |https://issues.apache.org/jira/browse/FLINK-12876] can be a solution for this case. !Execution#deploy.jpg|width=568,height=328! was (Author: zhuzh): Hi [~till.rohrmann], from my observation, the issue is happening though it does not break current tests. Below are some cases it happens or may happen: 1. Even though the tests do not trigger actions from other thread, the production logic might do it, e.g. *Execution#deploy()* as in the attached picture, this happens but does not break tests since it is not in the critical path and the failed main thread checking does not cause failovers. 2. Besides, the *TestingComponentMainThreadExecutorServiceAdapter* uses *DirectScheduledExecutorService* as the underlying ScheduledExecutorService. However, DirectScheduledExecutorService will schedule tasks from another thread. So if any mainThreadExecutor.schedule* action is invoked in tests or production process, it may also violate the main thread checking. No test breaks for it yet. But I think we just fortunately dodged(Or intentional?). e.g. - FixedDelayRestartStrategy. No test breaks because no test uses FixedDelayRestartStrategy to do failover yet. - HeartbeatMonitor. No test breaks because it does not check main thread, HeartbeatManagerTest#testHeartbeatTimeout actually does the timeout handling in another pool thread. !Execution#deploy.jpg|width=568,height=328! > Main thread checking in some tests fails > > > Key: FLINK-12926 > URL: https://issues.apache.org/jira/browse/FLINK-12926 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.9.0 >Reporter: Zhu Zhu >Priority: Major > Attachments: Execution#deploy.jpg, mainThreadCheckFailure.log > > > Currently all JM side job changing actions are expected to be taken in > JobMaster main thread. > In current Flink tests, many cases tend to use the test main thread as the JM > main thread. This can lead to 2 issues: > 1. TestingComponentMainThreadExecutorServiceAdapter is a direct executor, so > if it is invoked from any other thread, it will break the main thread > checking and fail the submitted action (as in the attached log > [^mainThreadCheckFailure.log]) > 2. The test main thread does not support other actions queued in its > executor, as the test will end once the current test thread action(the > current running test body) is done > > In my observation, most cases which starts > ExecutionGraph.scheduleForExecution() will encounter this issue. Cases > include ExecutionGraphRestartTest, FailoverRegionTest, > ConcurrentFailoverStrategyExecutionGraphTest, GlobalModVersionTest, > ExecutionGraphDeploymentTest, etc. > > One solution in my mind is to create a ScheduledExecutorService for those > tests, use it as the main thread and run the test body in this thread. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable
banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable URL: https://github.com/apache/flink/pull/8611#discussion_r300222482 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java ## @@ -39,7 +47,8 @@ * @param type of namespace * @param type of state */ -public abstract class StateTable implements StateSnapshotRestore { +public abstract class StateTable Review comment: I prefer to use the inheritance for the following reasons: 1. in the followup spill implementation, we need some memory statistics about the `StateTable` to decide whether to build a on-heap map or a on-disk map. I think it's not convenient for `MapAndSnapshotFactory ` to communicate with `StateTable` 2. implementation of `StateTable` may need it's own method, such as `getStateMapSnapshotArray` in `CopyOnWriteStateTable` and more custom methods in the spill implementation 3. if we introduce `MapAndSnapshotFactory`, we should also change the way to create `StateTable` using `SnapshotStrategySynchronicityBehavior` currently, but I think this is not in the scope of this refactor This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] beyond1920 commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.
beyond1920 commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations. URL: https://github.com/apache/flink/pull/8977#discussion_r300222311 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeExpression.java ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions; + +import org.apache.flink.table.types.DataType; + +import org.apache.calcite.rex.RexNode; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Dummy wrapper for expressions that were converted to RexNode in a different way. + */ +public class RexNodeExpression implements ResolvedExpression { Review comment: Of course. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] beyond1920 commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.
beyond1920 commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations. URL: https://github.com/apache/flink/pull/8977#discussion_r300222038 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java ## @@ -79,25 +112,196 @@ private final RelBuilder relBuilder; Review comment: Sure. Will update to use ResolvedExpressionVisitor after introduce Expression resolve in AggCodeGen later. which may be done in another pr. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] beyond1920 commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.
beyond1920 commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations. URL: https://github.com/apache/flink/pull/8977#discussion_r300221925 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java ## @@ -79,25 +112,196 @@ private final RelBuilder relBuilder; private final FlinkTypeFactory typeFactory; + private static final int DECIMAL_PRECISION_NEEDED_FOR_LONG = 19; + + /** +* The mapping only keeps part of FunctionDefinitions, which could be converted to SqlOperator in a very simple +* way. +*/ + private static final Map SIMPLE_DEF_SQL_OPERATOR_MAPPING = new HashMap<>(); + + static { + // logic functions + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.AND, FlinkSqlOperatorTable.AND); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OR, FlinkSqlOperatorTable.OR); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT, FlinkSqlOperatorTable.NOT); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IF, FlinkSqlOperatorTable.CASE); + + // comparison functions + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.EQUALS, FlinkSqlOperatorTable.EQUALS); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.GREATER_THAN, FlinkSqlOperatorTable.GREATER_THAN); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LESS_THAN, FlinkSqlOperatorTable.LESS_THAN); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT_EQUALS, FlinkSqlOperatorTable.NOT_EQUALS); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NULL, FlinkSqlOperatorTable.IS_NULL); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_NULL, FlinkSqlOperatorTable.IS_NOT_NULL); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_TRUE, FlinkSqlOperatorTable.IS_TRUE); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_FALSE, FlinkSqlOperatorTable.IS_FALSE); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_TRUE, FlinkSqlOperatorTable.IS_NOT_TRUE); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.IS_NOT_FALSE, FlinkSqlOperatorTable.IS_NOT_FALSE); + + // string functions + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CHAR_LENGTH, FlinkSqlOperatorTable.CHAR_LENGTH); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.INIT_CAP, FlinkSqlOperatorTable.INITCAP); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LIKE, FlinkSqlOperatorTable.LIKE); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOWER, FlinkSqlOperatorTable.LOWER); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SIMILAR, FlinkSqlOperatorTable.SIMILAR_TO); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SUBSTRING, FlinkSqlOperatorTable.SUBSTRING); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.UPPER, FlinkSqlOperatorTable.UPPER); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.POSITION, FlinkSqlOperatorTable.POSITION); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OVERLAY, FlinkSqlOperatorTable.OVERLAY); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT, FlinkSqlOperatorTable.CONCAT_FUNCTION); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT_WS, FlinkSqlOperatorTable.CONCAT_WS); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LPAD, FlinkSqlOperatorTable.LPAD); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.RPAD, FlinkSqlOperatorTable.RPAD); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.REGEXP_EXTRACT, FlinkSqlOperatorTable.REGEXP_EXTRACT); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.FROM_BASE64,
[jira] [Commented] (FLINK-11607) Translate the "DataStream API Tutorial" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878304#comment-16878304 ] LakeShen commented on FLINK-11607: -- Hi Zhang Ziqiang,have you work on this?If not ,could assign this to me . I want to translate this page, thanks. > Translate the "DataStream API Tutorial" page into Chinese > - > > Key: FLINK-11607 > URL: https://issues.apache.org/jira/browse/FLINK-11607 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: Zhang Ziqiang >Priority: Major > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/tutorials/datastream_api.html > The markdown file is located in flink/docs/tutorials/datastream_api.zh.md > The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12926) Main thread checking in some tests fails
[ https://issues.apache.org/jira/browse/FLINK-12926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878302#comment-16878302 ] Zhu Zhu edited comment on FLINK-12926 at 7/4/19 3:56 AM: - Hi [~till.rohrmann], from my observation, the issue is happening though it does not break current tests. Below are some cases it happens or may happen: 1. Even though the tests do not trigger actions from other thread, the production logic might do it, e.g. *Execution#deploy()* as in the attached picture, this happens but does not break tests since it is not in the critical path and the failed main thread checking does not cause failovers. 2. Besides, the *TestingComponentMainThreadExecutorServiceAdapter* uses *DirectScheduledExecutorService* as the underlying ScheduledExecutorService. However, DirectScheduledExecutorService will schedule tasks from another thread. So if any mainThreadExecutor.schedule* action is invoked in tests or production process, it may also violate the main thread checking. No test breaks for it yet. But I think we just fortunately dodged(Or intentional?). e.g. - FixedDelayRestartStrategy. No test breaks because no test uses FixedDelayRestartStrategy to do failover yet. - HeartbeatMonitor. No test breaks because it does not check main thread, HeartbeatManagerTest#testHeartbeatTimeout actually does the timeout handling in another pool thread. !Execution#deploy.jpg|width=568,height=328! was (Author: zhuzh): >From my observation, the issue is happening though it does not break current >tests. Below are some cases it happens or may happen: 1. Even though the tests do not trigger actions from other thread, the production logic might do it, e.g. *Execution#deploy()* as in the attached picture, this happens but does not break tests since it is not in the critical path and the failed main thread checking does not cause failovers. 2. Besides, the *TestingComponentMainThreadExecutorServiceAdapter* uses *DirectScheduledExecutorService* as the underlying ScheduledExecutorService. However, DirectScheduledExecutorService will schedule tasks from another thread. So if any mainThreadExecutor.schedule* action is invoked in tests or production process, it may also violate the main thread checking. No test breaks for it yet. But I think we just fortunately dodged(Or intentional?). e.g. - FixedDelayRestartStrategy. No test breaks because no test uses FixedDelayRestartStrategy to do failover yet. - HeartbeatMonitor. No test breaks because it does not check main thread, HeartbeatManagerTest#testHeartbeatTimeout actually does the timeout handling in another pool thread. !Execution#deploy.jpg! > Main thread checking in some tests fails > > > Key: FLINK-12926 > URL: https://issues.apache.org/jira/browse/FLINK-12926 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.9.0 >Reporter: Zhu Zhu >Priority: Major > Attachments: Execution#deploy.jpg, mainThreadCheckFailure.log > > > Currently all JM side job changing actions are expected to be taken in > JobMaster main thread. > In current Flink tests, many cases tend to use the test main thread as the JM > main thread. This can lead to 2 issues: > 1. TestingComponentMainThreadExecutorServiceAdapter is a direct executor, so > if it is invoked from any other thread, it will break the main thread > checking and fail the submitted action (as in the attached log > [^mainThreadCheckFailure.log]) > 2. The test main thread does not support other actions queued in its > executor, as the test will end once the current test thread action(the > current running test body) is done > > In my observation, most cases which starts > ExecutionGraph.scheduleForExecution() will encounter this issue. Cases > include ExecutionGraphRestartTest, FailoverRegionTest, > ConcurrentFailoverStrategyExecutionGraphTest, GlobalModVersionTest, > ExecutionGraphDeploymentTest, etc. > > One solution in my mind is to create a ScheduledExecutorService for those > tests, use it as the main thread and run the test body in this thread. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12926) Main thread checking in some tests fails
[ https://issues.apache.org/jira/browse/FLINK-12926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878302#comment-16878302 ] Zhu Zhu commented on FLINK-12926: - >From my observation, the issue is happening though it does not break current >tests. Below are some cases it happens or may happen: 1. Even though the tests do not trigger actions from other thread, the production logic might do it, e.g. *Execution#deploy()* as in the attached picture, this happens but does not break tests since it is not in the critical path and the failed main thread checking does not cause failovers. 2. Besides, the *TestingComponentMainThreadExecutorServiceAdapter* uses *DirectScheduledExecutorService* as the underlying ScheduledExecutorService. However, DirectScheduledExecutorService will schedule tasks from another thread. So if any mainThreadExecutor.schedule* action is invoked in tests or production process, it may also violate the main thread checking. No test breaks for it yet. But I think we just fortunately dodged(Or intentional?). e.g. - FixedDelayRestartStrategy. No test breaks because no test uses FixedDelayRestartStrategy to do failover yet. - HeartbeatMonitor. No test breaks because it does not check main thread, HeartbeatManagerTest#testHeartbeatTimeout actually does the timeout handling in another pool thread. !Execution#deploy.jpg! > Main thread checking in some tests fails > > > Key: FLINK-12926 > URL: https://issues.apache.org/jira/browse/FLINK-12926 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.9.0 >Reporter: Zhu Zhu >Priority: Major > Attachments: Execution#deploy.jpg, mainThreadCheckFailure.log > > > Currently all JM side job changing actions are expected to be taken in > JobMaster main thread. > In current Flink tests, many cases tend to use the test main thread as the JM > main thread. This can lead to 2 issues: > 1. TestingComponentMainThreadExecutorServiceAdapter is a direct executor, so > if it is invoked from any other thread, it will break the main thread > checking and fail the submitted action (as in the attached log > [^mainThreadCheckFailure.log]) > 2. The test main thread does not support other actions queued in its > executor, as the test will end once the current test thread action(the > current running test body) is done > > In my observation, most cases which starts > ExecutionGraph.scheduleForExecution() will encounter this issue. Cases > include ExecutionGraphRestartTest, FailoverRegionTest, > ConcurrentFailoverStrategyExecutionGraphTest, GlobalModVersionTest, > ExecutionGraphDeploymentTest, etc. > > One solution in my mind is to create a ScheduledExecutorService for those > tests, use it as the main thread and run the test body in this thread. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] tianchen92 closed pull request #8624: [FLINK-10932]Initial flink-kubernetes module with empty implementation
tianchen92 closed pull request #8624: [FLINK-10932]Initial flink-kubernetes module with empty implementation URL: https://github.com/apache/flink/pull/8624 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tianchen92 opened a new pull request #8624: [FLINK-10932]Initial flink-kubernetes module with empty implementation
tianchen92 opened a new pull request #8624: [FLINK-10932]Initial flink-kubernetes module with empty implementation URL: https://github.com/apache/flink/pull/8624 ## What is the purpose of the change *Initialize the skeleton module to start native k8s integration, related to [FLINK-10932](https://issues.apache.org/jira/browse/FLINK-10932)* ## Brief change log *(for example:)* - *Add flink-kubernetes module* - *Add Interface of Kubernetes client* ## Verifying this change *This change is an initial empty implementation, verified by mvn clean install and Travis CI.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (the document will be introduced in later pull requests) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] beyond1920 commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.
beyond1920 commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations. URL: https://github.com/apache/flink/pull/8977#discussion_r300220124 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java ## @@ -79,25 +112,196 @@ private final RelBuilder relBuilder; private final FlinkTypeFactory typeFactory; + private static final int DECIMAL_PRECISION_NEEDED_FOR_LONG = 19; + + /** +* The mapping only keeps part of FunctionDefinitions, which could be converted to SqlOperator in a very simple +* way. +*/ + private static final Map SIMPLE_DEF_SQL_OPERATOR_MAPPING = new HashMap<>(); + + static { + // logic functions + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.AND, FlinkSqlOperatorTable.AND); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OR, FlinkSqlOperatorTable.OR); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT, FlinkSqlOperatorTable.NOT); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IF, FlinkSqlOperatorTable.CASE); + + // comparison functions + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.EQUALS, FlinkSqlOperatorTable.EQUALS); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.GREATER_THAN, FlinkSqlOperatorTable.GREATER_THAN); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LESS_THAN, FlinkSqlOperatorTable.LESS_THAN); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT_EQUALS, FlinkSqlOperatorTable.NOT_EQUALS); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NULL, FlinkSqlOperatorTable.IS_NULL); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_NULL, FlinkSqlOperatorTable.IS_NOT_NULL); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_TRUE, FlinkSqlOperatorTable.IS_TRUE); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_FALSE, FlinkSqlOperatorTable.IS_FALSE); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_TRUE, FlinkSqlOperatorTable.IS_NOT_TRUE); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.IS_NOT_FALSE, FlinkSqlOperatorTable.IS_NOT_FALSE); + + // string functions + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CHAR_LENGTH, FlinkSqlOperatorTable.CHAR_LENGTH); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.INIT_CAP, FlinkSqlOperatorTable.INITCAP); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LIKE, FlinkSqlOperatorTable.LIKE); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOWER, FlinkSqlOperatorTable.LOWER); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SIMILAR, FlinkSqlOperatorTable.SIMILAR_TO); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SUBSTRING, FlinkSqlOperatorTable.SUBSTRING); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.UPPER, FlinkSqlOperatorTable.UPPER); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.POSITION, FlinkSqlOperatorTable.POSITION); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OVERLAY, FlinkSqlOperatorTable.OVERLAY); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT, FlinkSqlOperatorTable.CONCAT_FUNCTION); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT_WS, FlinkSqlOperatorTable.CONCAT_WS); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LPAD, FlinkSqlOperatorTable.LPAD); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.RPAD, FlinkSqlOperatorTable.RPAD); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.REGEXP_EXTRACT, FlinkSqlOperatorTable.REGEXP_EXTRACT); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.FROM_BASE64,
[GitHub] [flink] beyond1920 commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.
beyond1920 commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations. URL: https://github.com/apache/flink/pull/8977#discussion_r300219868 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java ## @@ -79,25 +112,196 @@ private final RelBuilder relBuilder; private final FlinkTypeFactory typeFactory; + private static final int DECIMAL_PRECISION_NEEDED_FOR_LONG = 19; + + /** +* The mapping only keeps part of FunctionDefinitions, which could be converted to SqlOperator in a very simple +* way. +*/ + private static final Map SIMPLE_DEF_SQL_OPERATOR_MAPPING = new HashMap<>(); + + static { + // logic functions + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.AND, FlinkSqlOperatorTable.AND); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OR, FlinkSqlOperatorTable.OR); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT, FlinkSqlOperatorTable.NOT); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IF, FlinkSqlOperatorTable.CASE); + + // comparison functions + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.EQUALS, FlinkSqlOperatorTable.EQUALS); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.GREATER_THAN, FlinkSqlOperatorTable.GREATER_THAN); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LESS_THAN, FlinkSqlOperatorTable.LESS_THAN); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT_EQUALS, FlinkSqlOperatorTable.NOT_EQUALS); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NULL, FlinkSqlOperatorTable.IS_NULL); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_NULL, FlinkSqlOperatorTable.IS_NOT_NULL); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_TRUE, FlinkSqlOperatorTable.IS_TRUE); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_FALSE, FlinkSqlOperatorTable.IS_FALSE); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_TRUE, FlinkSqlOperatorTable.IS_NOT_TRUE); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.IS_NOT_FALSE, FlinkSqlOperatorTable.IS_NOT_FALSE); + + // string functions + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CHAR_LENGTH, FlinkSqlOperatorTable.CHAR_LENGTH); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.INIT_CAP, FlinkSqlOperatorTable.INITCAP); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LIKE, FlinkSqlOperatorTable.LIKE); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOWER, FlinkSqlOperatorTable.LOWER); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SIMILAR, FlinkSqlOperatorTable.SIMILAR_TO); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SUBSTRING, FlinkSqlOperatorTable.SUBSTRING); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.UPPER, FlinkSqlOperatorTable.UPPER); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.POSITION, FlinkSqlOperatorTable.POSITION); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OVERLAY, FlinkSqlOperatorTable.OVERLAY); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT, FlinkSqlOperatorTable.CONCAT_FUNCTION); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT_WS, FlinkSqlOperatorTable.CONCAT_WS); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LPAD, FlinkSqlOperatorTable.LPAD); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.RPAD, FlinkSqlOperatorTable.RPAD); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.REGEXP_EXTRACT, FlinkSqlOperatorTable.REGEXP_EXTRACT); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.FROM_BASE64,
[GitHub] [flink] JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.
JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations. URL: https://github.com/apache/flink/pull/8977#discussion_r300213699 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java ## @@ -79,25 +112,196 @@ private final RelBuilder relBuilder; private final FlinkTypeFactory typeFactory; + private static final int DECIMAL_PRECISION_NEEDED_FOR_LONG = 19; + + /** +* The mapping only keeps part of FunctionDefinitions, which could be converted to SqlOperator in a very simple +* way. +*/ + private static final Map SIMPLE_DEF_SQL_OPERATOR_MAPPING = new HashMap<>(); + + static { + // logic functions + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.AND, FlinkSqlOperatorTable.AND); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OR, FlinkSqlOperatorTable.OR); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT, FlinkSqlOperatorTable.NOT); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IF, FlinkSqlOperatorTable.CASE); + + // comparison functions + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.EQUALS, FlinkSqlOperatorTable.EQUALS); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.GREATER_THAN, FlinkSqlOperatorTable.GREATER_THAN); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LESS_THAN, FlinkSqlOperatorTable.LESS_THAN); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT_EQUALS, FlinkSqlOperatorTable.NOT_EQUALS); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NULL, FlinkSqlOperatorTable.IS_NULL); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_NULL, FlinkSqlOperatorTable.IS_NOT_NULL); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_TRUE, FlinkSqlOperatorTable.IS_TRUE); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_FALSE, FlinkSqlOperatorTable.IS_FALSE); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_TRUE, FlinkSqlOperatorTable.IS_NOT_TRUE); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.IS_NOT_FALSE, FlinkSqlOperatorTable.IS_NOT_FALSE); + + // string functions + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CHAR_LENGTH, FlinkSqlOperatorTable.CHAR_LENGTH); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.INIT_CAP, FlinkSqlOperatorTable.INITCAP); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LIKE, FlinkSqlOperatorTable.LIKE); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOWER, FlinkSqlOperatorTable.LOWER); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SIMILAR, FlinkSqlOperatorTable.SIMILAR_TO); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SUBSTRING, FlinkSqlOperatorTable.SUBSTRING); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.UPPER, FlinkSqlOperatorTable.UPPER); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.POSITION, FlinkSqlOperatorTable.POSITION); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OVERLAY, FlinkSqlOperatorTable.OVERLAY); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT, FlinkSqlOperatorTable.CONCAT_FUNCTION); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT_WS, FlinkSqlOperatorTable.CONCAT_WS); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LPAD, FlinkSqlOperatorTable.LPAD); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.RPAD, FlinkSqlOperatorTable.RPAD); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.REGEXP_EXTRACT, FlinkSqlOperatorTable.REGEXP_EXTRACT); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.FROM_BASE64,
[GitHub] [flink] JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.
JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations. URL: https://github.com/apache/flink/pull/8977#discussion_r300213763 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java ## @@ -79,25 +112,196 @@ private final RelBuilder relBuilder; private final FlinkTypeFactory typeFactory; + private static final int DECIMAL_PRECISION_NEEDED_FOR_LONG = 19; + + /** +* The mapping only keeps part of FunctionDefinitions, which could be converted to SqlOperator in a very simple +* way. +*/ + private static final Map SIMPLE_DEF_SQL_OPERATOR_MAPPING = new HashMap<>(); + + static { + // logic functions + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.AND, FlinkSqlOperatorTable.AND); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OR, FlinkSqlOperatorTable.OR); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT, FlinkSqlOperatorTable.NOT); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IF, FlinkSqlOperatorTable.CASE); + + // comparison functions + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.EQUALS, FlinkSqlOperatorTable.EQUALS); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.GREATER_THAN, FlinkSqlOperatorTable.GREATER_THAN); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LESS_THAN, FlinkSqlOperatorTable.LESS_THAN); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT_EQUALS, FlinkSqlOperatorTable.NOT_EQUALS); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NULL, FlinkSqlOperatorTable.IS_NULL); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_NULL, FlinkSqlOperatorTable.IS_NOT_NULL); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_TRUE, FlinkSqlOperatorTable.IS_TRUE); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_FALSE, FlinkSqlOperatorTable.IS_FALSE); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_TRUE, FlinkSqlOperatorTable.IS_NOT_TRUE); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.IS_NOT_FALSE, FlinkSqlOperatorTable.IS_NOT_FALSE); + + // string functions + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CHAR_LENGTH, FlinkSqlOperatorTable.CHAR_LENGTH); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.INIT_CAP, FlinkSqlOperatorTable.INITCAP); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LIKE, FlinkSqlOperatorTable.LIKE); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOWER, FlinkSqlOperatorTable.LOWER); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SIMILAR, FlinkSqlOperatorTable.SIMILAR_TO); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SUBSTRING, FlinkSqlOperatorTable.SUBSTRING); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.UPPER, FlinkSqlOperatorTable.UPPER); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.POSITION, FlinkSqlOperatorTable.POSITION); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OVERLAY, FlinkSqlOperatorTable.OVERLAY); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT, FlinkSqlOperatorTable.CONCAT_FUNCTION); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT_WS, FlinkSqlOperatorTable.CONCAT_WS); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LPAD, FlinkSqlOperatorTable.LPAD); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.RPAD, FlinkSqlOperatorTable.RPAD); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.REGEXP_EXTRACT, FlinkSqlOperatorTable.REGEXP_EXTRACT); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.FROM_BASE64,
[GitHub] [flink] JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.
JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations. URL: https://github.com/apache/flink/pull/8977#discussion_r300215701 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java ## @@ -150,7 +315,7 @@ public RelNode visit(QueryOperation other) { FlinkStatistic statistic; List names; if (tableSourceOperation instanceof RichTableSourceQueryOperation && - ((RichTableSourceQueryOperation) tableSourceOperation).getQualifiedName() != null) { + ((RichTableSourceQueryOperation) tableSourceOperation).getQualifiedName() != null) { Review comment: This PR has many error modification of indentation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.
JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations. URL: https://github.com/apache/flink/pull/8977#discussion_r300215015 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java ## @@ -79,25 +112,196 @@ private final RelBuilder relBuilder; Review comment: > TODO actually we should use {@link ResolvedExpressionVisitor} here as it is the output of the API You can comment clear, why it keep `ExpressionVisitor` instead of `ResolvedExpressionVisitor`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.
JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations. URL: https://github.com/apache/flink/pull/8977#discussion_r300215219 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java ## @@ -273,7 +720,7 @@ public RexNode visit(ValueLiteralExpression valueLiteral) { } return literal.getValueAs(clazz) - .orElseThrow(() -> new TableException("Unsupported literal class: " + clazz)); + .orElseThrow(() -> new TableException("Unsupported literal class: " + clazz)); Review comment: Why? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.
JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations. URL: https://github.com/apache/flink/pull/8977#discussion_r300215614 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeExpression.java ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions; + +import org.apache.flink.table.types.DataType; + +import org.apache.calcite.rex.RexNode; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Dummy wrapper for expressions that were converted to RexNode in a different way. + */ +public class RexNodeExpression implements ResolvedExpression { Review comment: Just use `RexPlannerExpression`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.
JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations. URL: https://github.com/apache/flink/pull/8977#discussion_r300213615 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java ## @@ -79,25 +112,196 @@ private final RelBuilder relBuilder; private final FlinkTypeFactory typeFactory; + private static final int DECIMAL_PRECISION_NEEDED_FOR_LONG = 19; + + /** +* The mapping only keeps part of FunctionDefinitions, which could be converted to SqlOperator in a very simple +* way. +*/ + private static final Map SIMPLE_DEF_SQL_OPERATOR_MAPPING = new HashMap<>(); + + static { + // logic functions + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.AND, FlinkSqlOperatorTable.AND); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OR, FlinkSqlOperatorTable.OR); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT, FlinkSqlOperatorTable.NOT); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IF, FlinkSqlOperatorTable.CASE); + + // comparison functions + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.EQUALS, FlinkSqlOperatorTable.EQUALS); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.GREATER_THAN, FlinkSqlOperatorTable.GREATER_THAN); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LESS_THAN, FlinkSqlOperatorTable.LESS_THAN); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT_EQUALS, FlinkSqlOperatorTable.NOT_EQUALS); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NULL, FlinkSqlOperatorTable.IS_NULL); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_NULL, FlinkSqlOperatorTable.IS_NOT_NULL); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_TRUE, FlinkSqlOperatorTable.IS_TRUE); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_FALSE, FlinkSqlOperatorTable.IS_FALSE); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_TRUE, FlinkSqlOperatorTable.IS_NOT_TRUE); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.IS_NOT_FALSE, FlinkSqlOperatorTable.IS_NOT_FALSE); + + // string functions + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CHAR_LENGTH, FlinkSqlOperatorTable.CHAR_LENGTH); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.INIT_CAP, FlinkSqlOperatorTable.INITCAP); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LIKE, FlinkSqlOperatorTable.LIKE); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOWER, FlinkSqlOperatorTable.LOWER); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SIMILAR, FlinkSqlOperatorTable.SIMILAR_TO); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SUBSTRING, FlinkSqlOperatorTable.SUBSTRING); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.UPPER, FlinkSqlOperatorTable.UPPER); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.POSITION, FlinkSqlOperatorTable.POSITION); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OVERLAY, FlinkSqlOperatorTable.OVERLAY); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT, FlinkSqlOperatorTable.CONCAT_FUNCTION); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT_WS, FlinkSqlOperatorTable.CONCAT_WS); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LPAD, FlinkSqlOperatorTable.LPAD); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.RPAD, FlinkSqlOperatorTable.RPAD); + SIMPLE_DEF_SQL_OPERATOR_MAPPING + .put(BuiltInFunctionDefinitions.REGEXP_EXTRACT, FlinkSqlOperatorTable.REGEXP_EXTRACT); + SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.FROM_BASE64,
[GitHub] [flink] JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations.
JingsongLi commented on a change in pull request #8977: [FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner support add kinds of QueryOperations. URL: https://github.com/apache/flink/pull/8977#discussion_r300215120 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/AggregateVisitors.java ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.calcite.FlinkTypeFactory; +import org.apache.flink.table.functions.AggregateFunction; +import org.apache.flink.table.functions.AggregateFunctionDefinition; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.UserDefinedAggregateFunction; +import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable; +import org.apache.flink.table.functions.utils.AggSqlFunction; +import org.apache.flink.util.Preconditions; + +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.tools.RelBuilder; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.isFunctionOfKind; +import static org.apache.flink.table.functions.FunctionKind.AGGREGATE; +import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; + +/** + * The class contains all kinds of visitors to visit Aggregate. + */ +public class AggregateVisitors { Review comment: Change to name? I think this class is just for `RexNodeConverter`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ye-lun commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
ye-lun commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r300216656 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。 -Checkpoints make state in Flink fault tolerant by allowing state and the -corresponding stream positions to be recovered, thereby giving the application -the same semantics as a failure-free execution. +参考 [Checkpointing]({{ site.baseurl }}/zh/dev/stream/state/checkpointing.html) 查看如何在 Flink 程序中开启和配置 checkpoint。 -See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) for how to enable and -configure checkpoints for your program. +## 保留 Checkpoint -## Retained Checkpoints - -Checkpoints are by default not retained and are only used to resume a -job from failures. They are deleted when a program is cancelled. -You can, however, configure periodic checkpoints to be retained. -Depending on the configuration these *retained* checkpoints are *not* -automatically cleaned up when the job fails or is canceled. -This way, you will have a checkpoint around to resume from if your job fails. +checkpoint 在默认的情况下仅用于恢复失败的作业,并不保留,当程序取消时 checkpoint 就会被删除。当然,你可以通过配置来保留 checkpoint,这些被保留的 checkpoint 在作业失败或取消时不会被清除。这样,你就可以使用该 checkpoint 来恢复失败的作业。 {% highlight java %} CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); {% endhighlight %} -The `ExternalizedCheckpointCleanup` mode configures what happens with checkpoints when you cancel the job: - -- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case. - -- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails. +`ExternalizedCheckpointCleanup` 配置项定义了当作业取消时,对作业 checkpoint 的操作: +- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的 checkpoint。注意,这种情况下,需要手动清除该作业的 checkpoint。 Review comment: @klion26 I agree with you on this point, I will modify these and force push again because there exist many commits,thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ye-lun commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
ye-lun commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r300218022 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。 -Checkpoints make state in Flink fault tolerant by allowing state and the -corresponding stream positions to be recovered, thereby giving the application Review comment: In this position,the sentence of "thereby thereby giving the application the same semantics as a failure-free execution" according your suggestion is not translated and in my first commit it is translated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878289#comment-16878289 ] zhijiang edited comment on FLINK-12852 at 7/4/19 3:27 AM: -- Network resource matching in slot has many unsetting issues which would be further discussed future, so we could not make it effect in short time. Lazy allocation buffers on producer side seems a feasible way atm. It could still retain the current core and maximum mechanism in local pool. But it brings another two effects: * Higher time to ramp up to full throughput as Stephan mentioned, especially for some very short-time jobs (several seconds finish) and I remembered there exists such cases in Kurt's benchmark before. We change the previous concurrent production and consumption to sequential way. For short-time job, before the consumer requests partition, all the data set might already be emitted and cached in partition pool on producer side before. * We rely on another assumption that produced buffers could be recycled finally once subpartition view is established. This assumption might limit our new features/improvements future. ATM we need to adjust the action to trigger partition request, that means RemoteInputChannel could only send partition request if the correspond task has no result partition or the partition's view has already been established. In future the InputSelection might also destroy the above assumption. Although the partition was requested, but the OP could select not to consumer that partition long time. was (Author: zjwang): Network resource matching in slot has many unsetting issues which should be further discussed future, so we could not make it effect in short time. Lazy allocation buffers on producer side seems a feasible way atm. It could still retain the current core and maximum mechanism in local pool. But it brings another two effects: * Higher time to ramp up to full throughput as Stephan mentioned, especially for some very short-time jobs (several seconds finish) and I remembered there exists such cases in Kurt's benchmark before. We change the previous concurrent production and consumption to sequential way. For short-time job, before the consumer requests partition, all the data set might already be emitted and cached in partition pool on producer side before. * We rely on another assumption that produced buffers could be recycled finally once subpartition view is established. This assumption might limit our new features/improvements future. ATM we need to adjust the action to trigger partition request, that means `RemoteInputChannel` could only send partition request if this task has no result partition or the partition's view has already been established. In future the InputSelection might also destroy the above assumption. Although the partition was requested, but the OP could select not to consumer that partition long time. > Deadlock occurs when requiring exclusive buffer for RemoteInputChannel > -- > > Key: FLINK-12852 > URL: https://issues.apache.org/jira/browse/FLINK-12852 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.7.2, 1.8.1, 1.9.0 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > When running tests with an upstream vertex and downstream vertex, deadlock > occurs when submitting the job: > {code:java} > "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 > nid=0x38845 waiting on condition [0x7f2cbe9fe000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00073ed6b6f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312) > - locked <0x00073fbc81f0> (a java.lang.Object) > at >
[GitHub] [flink] XuQianJin-Stars edited a comment on issue #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese
XuQianJin-Stars edited a comment on issue #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese URL: https://github.com/apache/flink/pull/8300#issuecomment-508323987 hi @klion26 @wuchong I have change it already. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ye-lun commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
ye-lun commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r300216919 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。 -Checkpoints make state in Flink fault tolerant by allowing state and the -corresponding stream positions to be recovered, thereby giving the application -the same semantics as a failure-free execution. +参考 [Checkpointing]({{ site.baseurl }}/zh/dev/stream/state/checkpointing.html) 查看如何在 Flink 程序中开启和配置 checkpoint。 -See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) for how to enable and -configure checkpoints for your program. +## 保留 Checkpoint -## Retained Checkpoints - -Checkpoints are by default not retained and are only used to resume a -job from failures. They are deleted when a program is cancelled. -You can, however, configure periodic checkpoints to be retained. -Depending on the configuration these *retained* checkpoints are *not* -automatically cleaned up when the job fails or is canceled. -This way, you will have a checkpoint around to resume from if your job fails. +checkpoint 在默认的情况下仅用于恢复失败的作业,并不保留,当程序取消时 checkpoint 就会被删除。当然,你可以通过配置来保留 checkpoint,这些被保留的 checkpoint 在作业失败或取消时不会被清除。这样,你就可以使用该 checkpoint 来恢复失败的作业。 {% highlight java %} CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); {% endhighlight %} -The `ExternalizedCheckpointCleanup` mode configures what happens with checkpoints when you cancel the job: - -- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case. - -- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails. +`ExternalizedCheckpointCleanup` 配置项定义了当作业取消时,对作业 checkpoint 的操作: +- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的 checkpoint。注意,这种情况下,需要手动清除该作业的 checkpoint。 +- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**:当作业取消时,删除作业的 checkpoint。仅当作业失败时,作业的 checkpoint 才会被保留。 -### Directory Structure +### 目录结构 -Similarly to [savepoints](savepoints.html), a checkpoint consists -of a meta data file and, depending on the state backend, some additional data -files. The meta data file and data files are stored in the directory that is -configured via `state.checkpoints.dir` in the configuration files, -and also can be specified for per job in the code. +与 [savepoints](savepoints.html) 相似,checkpoint 由元数据文件、数据文件(与state backend 相关)组成。可通过配置文件中 “state.checkpoints.dir” 配置项来指定元数据文件和数据文件的存储路径,另外也可以在代码中针对单个作业特别指定该配置项。 Review comment: yes, I will change the quotation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878289#comment-16878289 ] zhijiang commented on FLINK-12852: -- Network resource matching in slot has many unsetting issues which should be further discussed future, so we could not make it effect in short time. Lazy allocation buffers on producer side seems a feasible way atm. It could still retain the current core and maximum mechanism in local pool. But it brings another two effects: * Higher time to ramp up to full throughput as Stephan mentioned, especially for some very short-time jobs (several seconds finish) and I remembered there exists such cases in Kurt's benchmark before. We change the previous concurrent production and consumption to sequential way. For short-time job, before the consumer requests partition, all the data set might already be emitted and cached in partition pool on producer side before. * We rely on another assumption that produced buffers could be recycled finally once subpartition view is established. This assumption might limit our new features/improvements future. ATM we need to adjust the action to trigger partition request, that means `RemoteInputChannel` could only send partition request if this task has no result partition or the partition's view has already been established. In future the InputSelection might also destroy the above assumption. Although the partition was requested, but the OP could select not to consumer that partition long time. > Deadlock occurs when requiring exclusive buffer for RemoteInputChannel > -- > > Key: FLINK-12852 > URL: https://issues.apache.org/jira/browse/FLINK-12852 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.7.2, 1.8.1, 1.9.0 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > When running tests with an upstream vertex and downstream vertex, deadlock > occurs when submitting the job: > {code:java} > "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 > nid=0x38845 waiting on condition [0x7f2cbe9fe000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00073ed6b6f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312) > - locked <0x00073fbc81f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220) > at > org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598) > at java.lang.Thread.run(Thread.java:834) > {code} > This is due to the required and max of local buffer pool is not the same and > there may be over-allocation, when assignExclusiveSegments there are no > available memory. > > The detail of the scenarios is as follows: The parallelism of both upstream > vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM > and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs > 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with > local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces > data quickly and each occupy about 990 buffers. Then the DownStream task > starts and try to assigning exclusive buffers for 1500 -9 = 1491 > InputChannels. It requires 2981 buffers but only 1786 left. Since not all > downstream tasks can start, the job will be blocked finally and no buffer can > be released, and the deadlock finally occurred. > > I think although increasing the network memory solves the problem, the > deadlock may not be acceptable. Fined grained resource management > Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the >
[GitHub] [flink] XuQianJin-Stars commented on issue #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese
XuQianJin-Stars commented on issue #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese URL: https://github.com/apache/flink/pull/8300#issuecomment-508323987 hi @klion26 @wuchong I have me change it already. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ye-lun commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
ye-lun commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r300216656 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。 -Checkpoints make state in Flink fault tolerant by allowing state and the -corresponding stream positions to be recovered, thereby giving the application -the same semantics as a failure-free execution. +参考 [Checkpointing]({{ site.baseurl }}/zh/dev/stream/state/checkpointing.html) 查看如何在 Flink 程序中开启和配置 checkpoint。 -See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) for how to enable and -configure checkpoints for your program. +## 保留 Checkpoint -## Retained Checkpoints - -Checkpoints are by default not retained and are only used to resume a -job from failures. They are deleted when a program is cancelled. -You can, however, configure periodic checkpoints to be retained. -Depending on the configuration these *retained* checkpoints are *not* -automatically cleaned up when the job fails or is canceled. -This way, you will have a checkpoint around to resume from if your job fails. +checkpoint 在默认的情况下仅用于恢复失败的作业,并不保留,当程序取消时 checkpoint 就会被删除。当然,你可以通过配置来保留 checkpoint,这些被保留的 checkpoint 在作业失败或取消时不会被清除。这样,你就可以使用该 checkpoint 来恢复失败的作业。 {% highlight java %} CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); {% endhighlight %} -The `ExternalizedCheckpointCleanup` mode configures what happens with checkpoints when you cancel the job: - -- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case. - -- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails. +`ExternalizedCheckpointCleanup` 配置项定义了当作业取消时,对作业 checkpoint 的操作: +- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的 checkpoint。注意,这种情况下,需要手动清除该作业的 checkpoint。 Review comment: @klion26 I agree with you on this point, I will modify these and force push again because there exist many commits,thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ye-lun commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
ye-lun commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r300216623 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。 -Checkpoints make state in Flink fault tolerant by allowing state and the -corresponding stream positions to be recovered, thereby giving the application -the same semantics as a failure-free execution. +参考 [Checkpointing]({{ site.baseurl }}/zh/dev/stream/state/checkpointing.html) 查看如何在 Flink 程序中开启和配置 checkpoint。 -See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) for how to enable and -configure checkpoints for your program. +## 保留 Checkpoint -## Retained Checkpoints - -Checkpoints are by default not retained and are only used to resume a -job from failures. They are deleted when a program is cancelled. -You can, however, configure periodic checkpoints to be retained. -Depending on the configuration these *retained* checkpoints are *not* -automatically cleaned up when the job fails or is canceled. -This way, you will have a checkpoint around to resume from if your job fails. +checkpoint 在默认的情况下仅用于恢复失败的作业,并不保留,当程序取消时 checkpoint 就会被删除。当然,你可以通过配置来保留 checkpoint,这些被保留的 checkpoint 在作业失败或取消时不会被清除。这样,你就可以使用该 checkpoint 来恢复失败的作业。 {% highlight java %} CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); {% endhighlight %} -The `ExternalizedCheckpointCleanup` mode configures what happens with checkpoints when you cancel the job: - -- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case. - -- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails. +`ExternalizedCheckpointCleanup` 配置项定义了当作业取消时,对作业 checkpoint 的操作: +- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的 checkpoint。注意,这种情况下,需要手动清除该作业的 checkpoint。 Review comment: @klion26 I agree with you on this point, I will modify these and force push again because there exist many commits,thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable
banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable URL: https://github.com/apache/flink/pull/8611#discussion_r300216523 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateMapSnapshot.java ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * Base class for snapshots of a {@link StateMap}. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public abstract class StateMapSnapshot> { + + /** +* The {@link StateMap} from which this snapshot was created. +*/ + protected final T owningStateMap; Review comment: Actually, `owningStateMap ` is also used in `NestedStateMapSnapshot#writeState`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung closed pull request #8854: [FLINK-12959][table-planner-blink] Use BoundedInput and InputSelectable in blink and implement hash join
KurtYoung closed pull request #8854: [FLINK-12959][table-planner-blink] Use BoundedInput and InputSelectable in blink and implement hash join URL: https://github.com/apache/flink/pull/8854 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12959) Use BoundedInput and InputSelectable in blink
[ https://issues.apache.org/jira/browse/FLINK-12959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-12959. -- Resolution: Implemented Fix Version/s: 1.9.0 merged in 1.9.0: 960ae97a5ac137faa35b83675c2f087334d4d3b7 0e4d4b4869aea6ded9c3ad1255c426ca9e7dfd99 > Use BoundedInput and InputSelectable in blink > - > > Key: FLINK-12959 > URL: https://issues.apache.org/jira/browse/FLINK-12959 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Now BoundedInput and InputSelectable are ready in runtime. Blink planner > should use it instead of invoking endInput in close. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] ye-lun commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
ye-lun commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r300216052 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。 Review comment: @klion26 what do you mean is that at the begining of each paragrah is "Checkpoint" and other posititon is "checkpoint"? thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable
banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable URL: https://github.com/apache/flink/pull/8611#discussion_r300216045 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java ## @@ -172,14 +229,110 @@ public boolean isEmpty() { * @return the state of the mapping with the specified key/namespace composite key, or {@code null} * if no mapping for the specified key is found. */ - public abstract S get(K key, N namespace); + public S get(K key, N namespace) { + int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, keyContext.getNumberOfKeyGroups()); + return get(key, keyGroup, namespace); + } + + public Stream getKeys(N namespace) { + return Arrays.stream(state) + .flatMap(stateMap -> StreamSupport.stream(Spliterators.spliteratorUnknownSize(stateMap.iterator(), 0), false)) + .filter(entry -> entry.getNamespace().equals(namespace)) + .map(StateEntry::getKey); + } + + public StateIncrementalVisitor getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) { + return new StateEntryIterator(recommendedMaxNumberOfReturnedRecords); + } - public abstract Stream getKeys(N namespace); + // - public abstract StateIncrementalVisitor getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords); + private S get(K key, int keyGroupIndex, N namespace) { + checkKeyNamespacePreconditions(key, namespace); + + StateMap stateMap = getMapForKeyGroup(keyGroupIndex); + + if (stateMap == null) { Review comment: This check is to pass the UT of StateBackendTestBase#testKeyGroupSnapshotRestore. The UT will get a key from the backend, but the key is in a key group which not belongs to this backend. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Aitozi commented on issue #8559: [FLINK-12576][Network, Metrics]Take localInputChannel into account when compute inputQueueLength
Aitozi commented on issue #8559: [FLINK-12576][Network, Metrics]Take localInputChannel into account when compute inputQueueLength URL: https://github.com/apache/flink/pull/8559#issuecomment-508323128 Hi @zhijiangW @pnowojski , please help review this PR again, I have addressed your previous comments, and add a unit test case in `LocalInputChannelTest`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on a change in pull request #8965: [FLINK-13068][hive] HiveTableSink should implement PartitionableTable…
lirui-apache commented on a change in pull request #8965: [FLINK-13068][hive] HiveTableSink should implement PartitionableTable… URL: https://github.com/apache/flink/pull/8965#discussion_r300214660 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java ## @@ -161,4 +164,37 @@ private String toStagingDir(String finalDir, Configuration conf) throws IOExcept fs.deleteOnExit(path); return res; } + + @Override + public List getPartitionFieldNames() { + return catalogTable.getPartitionKeys(); + } + + @Override + public void setStaticPartition(Map partitions) { + // make it a LinkedHashMap to maintain partition column order + staticPartitionSpec = new LinkedHashMap<>(); + for (String partitionCol : getPartitionFieldNames()) { + if (partitions.containsKey(partitionCol)) { + staticPartitionSpec.put(partitionCol, partitions.get(partitionCol)); + } + } + } + + private void validatePartitionSpec() { + List partitionCols = getPartitionFieldNames(); + Preconditions.checkArgument(new HashSet<>(partitionCols).containsAll( Review comment: Yeah I'll print the specific columns and reformat the code. We don't need the check the order here. Partition columns order is defined by `getPartitionFieldNames()`. We can always reorder a partition spec (which is a map) as long as it only contains valid partition columns. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12926) Main thread checking in some tests fails
[ https://issues.apache.org/jira/browse/FLINK-12926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-12926: Attachment: Execution#deploy.jpg > Main thread checking in some tests fails > > > Key: FLINK-12926 > URL: https://issues.apache.org/jira/browse/FLINK-12926 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.9.0 >Reporter: Zhu Zhu >Priority: Major > Attachments: Execution#deploy.jpg, mainThreadCheckFailure.log > > > Currently all JM side job changing actions are expected to be taken in > JobMaster main thread. > In current Flink tests, many cases tend to use the test main thread as the JM > main thread. This can lead to 2 issues: > 1. TestingComponentMainThreadExecutorServiceAdapter is a direct executor, so > if it is invoked from any other thread, it will break the main thread > checking and fail the submitted action (as in the attached log > [^mainThreadCheckFailure.log]) > 2. The test main thread does not support other actions queued in its > executor, as the test will end once the current test thread action(the > current running test body) is done > > In my observation, most cases which starts > ExecutionGraph.scheduleForExecution() will encounter this issue. Cases > include ExecutionGraphRestartTest, FailoverRegionTest, > ConcurrentFailoverStrategyExecutionGraphTest, GlobalModVersionTest, > ExecutionGraphDeploymentTest, etc. > > One solution in my mind is to create a ScheduledExecutorService for those > tests, use it as the main thread and run the test body in this thread. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] XuQianJin-Stars commented on issue #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese
XuQianJin-Stars commented on issue #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese URL: https://github.com/apache/flink/pull/8300#issuecomment-508320189 well let me change it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on issue #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese
wuchong commented on issue #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese URL: https://github.com/apache/flink/pull/8300#issuecomment-508319820 @XuQianJin-Stars , please do not replace all `Savepoint` to `savepoint`. I mean only the "Savepoint" in directory path should keep `savepoint`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] XuQianJin-Stars commented on issue #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese
XuQianJin-Stars commented on issue #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese URL: https://github.com/apache/flink/pull/8300#issuecomment-508319511 hi @klion26 @wuchong Thank you very much , Comments addressed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on a change in pull request #8976: [FLINK-12277][table/hive/doc] Add documentation for catalogs
lirui-apache commented on a change in pull request #8976: [FLINK-12277][table/hive/doc] Add documentation for catalogs URL: https://github.com/apache/flink/pull/8976#discussion_r300212301 ## File path: docs/dev/table/catalog.md ## @@ -0,0 +1,345 @@ +--- +title: "Catalog" +is_beta: true +nav-parent_id: tableapi +nav-pos: 100 +--- + + +A catalog can provide information about metadata, such as names, schemas, statistics of tables, and information about how to access data stored in a database or table. Once a catalog is registered to a `TableEnvironment`, all meta-objects defined in a catalog can be accessed from Table API or SQL queries. + + +* This will be replaced by the TOC +{:toc} + + +Catalog Interface +- + +APIs are defined in `Catalog` interface. The interface defines a set of APIs to read and write catalog meta-objects such as database, tables, partitions, views, and functions. + +Users can develop their own catalogs by implementing the interface. + + +Naming Structure in Catalog +--- + +Flink's catalogs use a strict two-level structure, that is, catalogs contain databases, and databases contain meta-objects. Thus, the full name of a meta-object is always structured as `catalogName`.`databaseName`.`objectName`. + +All registered catalogs are managed by a `CatalogManager` instance in `TableEnvironment`. In order to ease access to meta-objects, `CatalogManager` has a concept of current catalog and current database. Usually how users access meta-objects in a catalog is to specify its full name in the format of `catalogName`.`databaseName`.`objectName`. By setting current catalog and current database, users can use just the meta-object's name in their queries. This greatly simplifies user experience. For example, a previous query as + +```sql +select * from mycatalog.mydb.myTable; +``` + +can be shortened as + +```sql +select * from myTable; +``` + +Querying tables in a different databases under the default catalog would be + +``` +select * from mydb2.myTable +``` + +`CatalogManager` always has a built-in `GenericInMemoryCatalog` with name of `default_catalog`, which has a built-in default database named `default_database`. They will be the current catalog and current database if no other catalog and database are explicitly set. All temp meta-objects will be registered to this catalog. Users can set current catalog and database via `TableEnvironment.useCatalog(...)` and `TableEnvironment.useDatabase(...)` in Table API, or `USE CATALOG ...` and `USE DATABASE ...` in Flink SQL. + + +Catalog Types +- + +## GenericInMemoryCatalog + +All meta-objects in this catalog are stored in memory, and be will lost once the session shuts down. + +Its config entry value in SQL CLI yaml file is "generic_in_memory". + +## HiveCatalog + +`HiveCatalog` can read and write both Flink and Hive meta-objects by using Hive Metastore as a persistent storage. + +Its config entry value in SQL CLI yaml file is "hive". + +### Persist Flink meta-objects + +Previously, Flink meta-objects are only stored in memory and are per session based. That means users have to recreate all the meta-objects every time they start a new session. + +To solve this user pain point, users can choose the option to use `HiveCatalog` to persist all of users' Flink streaming and batch meta-objects by using Hive Metastore as a pure storage. Because Hive Metastore is only used for storage in this case, Hive itself may not understand Flink's meta-objects stored in the metastore. + +### Integrate Flink with Hive metadata + +The ultimate goal for integrating Flink with Hive metadata is that: + +1. existing meta-objects, like tables, views, and functions, created by Hive or other Hive-compatible applications can be used by Flink + +2. meta-objects created by `HiveCatalog` can be written back to Hive metastore such that Hive and other Hive-compatibile applications can consume. + +## User-configured Catalog + +Catalogs are pluggable, and users can use their own, customized catalog implementations. + + +HiveCatalog +--- + +## Supported Hive Versions + +`HiveCatalog` officially supports Hive 2.3.4 and 1.2.1, and depends on Hive's own compatibility for the other 2.x.x and 1.x.x versions. + +Users need to explicitly specify the Hive version as string, by either passing it to the constructor when creating `HiveCatalog` instances directly in Table API or specifying it in yaml config file in SQL CLI. The Hive version string will be either `2.3.4`, `1.2.1`, or your own 2.x.x/1.x.x versions. + +## Dependencies + +In order to use `HiveCatalog`, users need to either downloading the following dependency jars and adding them to the `/lib` dir in Flink distribution, or adding their existing Hive jars to Flink's classpath in order for Flink to find them at runtime. + +Take Hive 2.3.4 for example: + +``` +// Hive dependencies + +- hive-metastore-2.3.4.jar
[GitHub] [flink] lirui-apache commented on a change in pull request #8976: [FLINK-12277][table/hive/doc] Add documentation for catalogs
lirui-apache commented on a change in pull request #8976: [FLINK-12277][table/hive/doc] Add documentation for catalogs URL: https://github.com/apache/flink/pull/8976#discussion_r300212111 ## File path: docs/dev/table/catalog.md ## @@ -0,0 +1,345 @@ +--- +title: "Catalog" +is_beta: true +nav-parent_id: tableapi +nav-pos: 100 +--- + + +A catalog can provide information about metadata, such as names, schemas, statistics of tables, and information about how to access data stored in a database or table. Once a catalog is registered to a `TableEnvironment`, all meta-objects defined in a catalog can be accessed from Table API or SQL queries. + + +* This will be replaced by the TOC +{:toc} + + +Catalog Interface +- + +APIs are defined in `Catalog` interface. The interface defines a set of APIs to read and write catalog meta-objects such as database, tables, partitions, views, and functions. + +Users can develop their own catalogs by implementing the interface. + + +Naming Structure in Catalog +--- + +Flink's catalogs use a strict two-level structure, that is, catalogs contain databases, and databases contain meta-objects. Thus, the full name of a meta-object is always structured as `catalogName`.`databaseName`.`objectName`. + +All registered catalogs are managed by a `CatalogManager` instance in `TableEnvironment`. In order to ease access to meta-objects, `CatalogManager` has a concept of current catalog and current database. Usually how users access meta-objects in a catalog is to specify its full name in the format of `catalogName`.`databaseName`.`objectName`. By setting current catalog and current database, users can use just the meta-object's name in their queries. This greatly simplifies user experience. For example, a previous query as + +```sql +select * from mycatalog.mydb.myTable; +``` + +can be shortened as + +```sql +select * from myTable; +``` + +Querying tables in a different databases under the default catalog would be + +``` +select * from mydb2.myTable +``` + +`CatalogManager` always has a built-in `GenericInMemoryCatalog` with name of `default_catalog`, which has a built-in default database named `default_database`. They will be the current catalog and current database if no other catalog and database are explicitly set. All temp meta-objects will be registered to this catalog. Users can set current catalog and database via `TableEnvironment.useCatalog(...)` and `TableEnvironment.useDatabase(...)` in Table API, or `USE CATALOG ...` and `USE DATABASE ...` in Flink SQL. + + +Catalog Types +- + +## GenericInMemoryCatalog + +All meta-objects in this catalog are stored in memory, and be will lost once the session shuts down. + +Its config entry value in SQL CLI yaml file is "generic_in_memory". + +## HiveCatalog + +`HiveCatalog` can read and write both Flink and Hive meta-objects by using Hive Metastore as a persistent storage. + +Its config entry value in SQL CLI yaml file is "hive". + +### Persist Flink meta-objects + +Previously, Flink meta-objects are only stored in memory and are per session based. That means users have to recreate all the meta-objects every time they start a new session. + +To solve this user pain point, users can choose the option to use `HiveCatalog` to persist all of users' Flink streaming and batch meta-objects by using Hive Metastore as a pure storage. Because Hive Metastore is only used for storage in this case, Hive itself may not understand Flink's meta-objects stored in the metastore. + +### Integrate Flink with Hive metadata + +The ultimate goal for integrating Flink with Hive metadata is that: + +1. existing meta-objects, like tables, views, and functions, created by Hive or other Hive-compatible applications can be used by Flink + +2. meta-objects created by `HiveCatalog` can be written back to Hive metastore such that Hive and other Hive-compatibile applications can consume. + +## User-configured Catalog + +Catalogs are pluggable, and users can use their own, customized catalog implementations. + + +HiveCatalog +--- + +## Supported Hive Versions + +`HiveCatalog` officially supports Hive 2.3.4 and 1.2.1, and depends on Hive's own compatibility for the other 2.x.x and 1.x.x versions. + +Users need to explicitly specify the Hive version as string, by either passing it to the constructor when creating `HiveCatalog` instances directly in Table API or specifying it in yaml config file in SQL CLI. The Hive version string will be either `2.3.4`, `1.2.1`, or your own 2.x.x/1.x.x versions. Review comment: Shall we mention that only 2.3.4 and 1.2.1 are supported at the moment? E.g. if user is using 1.2.0, he still needs to specify the version as 1.2.1, otherwise `HiveShimLoader` will error out complaining about unsupported hive version This is an automated message from the Apache Git
[GitHub] [flink] lirui-apache commented on a change in pull request #8976: [FLINK-12277][table/hive/doc] Add documentation for catalogs
lirui-apache commented on a change in pull request #8976: [FLINK-12277][table/hive/doc] Add documentation for catalogs URL: https://github.com/apache/flink/pull/8976#discussion_r300212979 ## File path: docs/dev/table/catalog.md ## @@ -0,0 +1,345 @@ +--- +title: "Catalog" +is_beta: true +nav-parent_id: tableapi +nav-pos: 100 +--- + + +A catalog can provide information about metadata, such as names, schemas, statistics of tables, and information about how to access data stored in a database or table. Once a catalog is registered to a `TableEnvironment`, all meta-objects defined in a catalog can be accessed from Table API or SQL queries. + + +* This will be replaced by the TOC +{:toc} + + +Catalog Interface +- + +APIs are defined in `Catalog` interface. The interface defines a set of APIs to read and write catalog meta-objects such as database, tables, partitions, views, and functions. + +Users can develop their own catalogs by implementing the interface. + + +Naming Structure in Catalog +--- + +Flink's catalogs use a strict two-level structure, that is, catalogs contain databases, and databases contain meta-objects. Thus, the full name of a meta-object is always structured as `catalogName`.`databaseName`.`objectName`. + +All registered catalogs are managed by a `CatalogManager` instance in `TableEnvironment`. In order to ease access to meta-objects, `CatalogManager` has a concept of current catalog and current database. Usually how users access meta-objects in a catalog is to specify its full name in the format of `catalogName`.`databaseName`.`objectName`. By setting current catalog and current database, users can use just the meta-object's name in their queries. This greatly simplifies user experience. For example, a previous query as + +```sql +select * from mycatalog.mydb.myTable; +``` + +can be shortened as + +```sql +select * from myTable; +``` + +Querying tables in a different databases under the default catalog would be + +``` +select * from mydb2.myTable +``` + +`CatalogManager` always has a built-in `GenericInMemoryCatalog` with name of `default_catalog`, which has a built-in default database named `default_database`. They will be the current catalog and current database if no other catalog and database are explicitly set. All temp meta-objects will be registered to this catalog. Users can set current catalog and database via `TableEnvironment.useCatalog(...)` and `TableEnvironment.useDatabase(...)` in Table API, or `USE CATALOG ...` and `USE DATABASE ...` in Flink SQL. + + +Catalog Types +- + +## GenericInMemoryCatalog + +All meta-objects in this catalog are stored in memory, and be will lost once the session shuts down. + +Its config entry value in SQL CLI yaml file is "generic_in_memory". + +## HiveCatalog + +`HiveCatalog` can read and write both Flink and Hive meta-objects by using Hive Metastore as a persistent storage. + +Its config entry value in SQL CLI yaml file is "hive". + +### Persist Flink meta-objects + +Previously, Flink meta-objects are only stored in memory and are per session based. That means users have to recreate all the meta-objects every time they start a new session. + +To solve this user pain point, users can choose the option to use `HiveCatalog` to persist all of users' Flink streaming and batch meta-objects by using Hive Metastore as a pure storage. Because Hive Metastore is only used for storage in this case, Hive itself may not understand Flink's meta-objects stored in the metastore. + +### Integrate Flink with Hive metadata + +The ultimate goal for integrating Flink with Hive metadata is that: + +1. existing meta-objects, like tables, views, and functions, created by Hive or other Hive-compatible applications can be used by Flink + +2. meta-objects created by `HiveCatalog` can be written back to Hive metastore such that Hive and other Hive-compatibile applications can consume. + +## User-configured Catalog + +Catalogs are pluggable, and users can use their own, customized catalog implementations. + + +HiveCatalog +--- + +## Supported Hive Versions + +`HiveCatalog` officially supports Hive 2.3.4 and 1.2.1, and depends on Hive's own compatibility for the other 2.x.x and 1.x.x versions. + +Users need to explicitly specify the Hive version as string, by either passing it to the constructor when creating `HiveCatalog` instances directly in Table API or specifying it in yaml config file in SQL CLI. The Hive version string will be either `2.3.4`, `1.2.1`, or your own 2.x.x/1.x.x versions. + +## Dependencies + +In order to use `HiveCatalog`, users need to either downloading the following dependency jars and adding them to the `/lib` dir in Flink distribution, or adding their existing Hive jars to Flink's classpath in order for Flink to find them at runtime. Review comment: If users want to use HiveCatalog, they must have a Hive installation to
[jira] [Closed] (FLINK-12833) Add Klaviyo to Chinese PoweredBy page
[ https://issues.apache.org/jira/browse/FLINK-12833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-12833. --- Resolution: Fixed Fix Version/s: 1.9.0 Merged in flink-web: a89ff4efa0e0b3e9be23376e206c75a253b3ae40 > Add Klaviyo to Chinese PoweredBy page > - > > Key: FLINK-12833 > URL: https://issues.apache.org/jira/browse/FLINK-12833 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Project Website >Reporter: Fabian Hueske >Assignee: yelun >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Commit b54ecfa930653bcfecd60df3414deca5291c6cb3 added Klaviyo to the English > PoweredBy page. > It should be added to the Chinese page as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r300210930 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。 -Checkpoints make state in Flink fault tolerant by allowing state and the -corresponding stream positions to be recovered, thereby giving the application -the same semantics as a failure-free execution. +参考 [Checkpointing]({{ site.baseurl }}/zh/dev/stream/state/checkpointing.html) 查看如何在 Flink 程序中开启和配置 checkpoint。 -See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) for how to enable and -configure checkpoints for your program. +## 保留 Checkpoint -## Retained Checkpoints - -Checkpoints are by default not retained and are only used to resume a -job from failures. They are deleted when a program is cancelled. -You can, however, configure periodic checkpoints to be retained. -Depending on the configuration these *retained* checkpoints are *not* -automatically cleaned up when the job fails or is canceled. -This way, you will have a checkpoint around to resume from if your job fails. +checkpoint 在默认的情况下仅用于恢复失败的作业,并不保留,当程序取消时 checkpoint 就会被删除。当然,你可以通过配置来保留 checkpoint,这些被保留的 checkpoint 在作业失败或取消时不会被清除。这样,你就可以使用该 checkpoint 来恢复失败的作业。 {% highlight java %} CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); {% endhighlight %} -The `ExternalizedCheckpointCleanup` mode configures what happens with checkpoints when you cancel the job: - -- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case. - -- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails. +`ExternalizedCheckpointCleanup` 配置项定义了当作业取消时,对作业 checkpoint 的操作: +- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的 checkpoint。注意,这种情况下,需要手动清除该作业的 checkpoint。 Review comment: what do you think about changing `需要手动清除该作业的 checkpoint` to `需要手动清除该作业保留的的 checkpoint`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r300209240 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。 Review comment: maybe the first "checkpoint" should be "Checkpoint"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r300211627 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。 -Checkpoints make state in Flink fault tolerant by allowing state and the -corresponding stream positions to be recovered, thereby giving the application -the same semantics as a failure-free execution. +参考 [Checkpointing]({{ site.baseurl }}/zh/dev/stream/state/checkpointing.html) 查看如何在 Flink 程序中开启和配置 checkpoint。 -See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) for how to enable and -configure checkpoints for your program. +## 保留 Checkpoint -## Retained Checkpoints - -Checkpoints are by default not retained and are only used to resume a -job from failures. They are deleted when a program is cancelled. -You can, however, configure periodic checkpoints to be retained. -Depending on the configuration these *retained* checkpoints are *not* -automatically cleaned up when the job fails or is canceled. -This way, you will have a checkpoint around to resume from if your job fails. +checkpoint 在默认的情况下仅用于恢复失败的作业,并不保留,当程序取消时 checkpoint 就会被删除。当然,你可以通过配置来保留 checkpoint,这些被保留的 checkpoint 在作业失败或取消时不会被清除。这样,你就可以使用该 checkpoint 来恢复失败的作业。 {% highlight java %} CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); {% endhighlight %} -The `ExternalizedCheckpointCleanup` mode configures what happens with checkpoints when you cancel the job: - -- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case. - -- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails. +`ExternalizedCheckpointCleanup` 配置项定义了当作业取消时,对作业 checkpoint 的操作: +- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的 checkpoint。注意,这种情况下,需要手动清除该作业的 checkpoint。 +- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**:当作业取消时,删除作业的 checkpoint。仅当作业失败时,作业的 checkpoint 才会被保留。 -### Directory Structure +### 目录结构 -Similarly to [savepoints](savepoints.html), a checkpoint consists -of a meta data file and, depending on the state backend, some additional data -files. The meta data file and data files are stored in the directory that is -configured via `state.checkpoints.dir` in the configuration files, -and also can be specified for per job in the code. +与 [savepoints](savepoints.html) 相似,checkpoint 由元数据文件、数据文件(与state backend 相关)组成。可通过配置文件中 “state.checkpoints.dir” 配置项来指定元数据文件和数据文件的存储路径,另外也可以在代码中针对单个作业特别指定该配置项。 Review comment: do you think we need to change `“state.checkpoints.dir”` to `"state.checkpoints.dir"`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r300209292 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。 -Checkpoints make state in Flink fault tolerant by allowing state and the -corresponding stream positions to be recovered, thereby giving the application -the same semantics as a failure-free execution. +参考 [Checkpointing]({{ site.baseurl }}/zh/dev/stream/state/checkpointing.html) 查看如何在 Flink 程序中开启和配置 checkpoint。 -See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) for how to enable and -configure checkpoints for your program. +## 保留 Checkpoint -## Retained Checkpoints - -Checkpoints are by default not retained and are only used to resume a -job from failures. They are deleted when a program is cancelled. -You can, however, configure periodic checkpoints to be retained. -Depending on the configuration these *retained* checkpoints are *not* -automatically cleaned up when the job fails or is canceled. -This way, you will have a checkpoint around to resume from if your job fails. +checkpoint 在默认的情况下仅用于恢复失败的作业,并不保留,当程序取消时 checkpoint 就会被删除。当然,你可以通过配置来保留 checkpoint,这些被保留的 checkpoint 在作业失败或取消时不会被清除。这样,你就可以使用该 checkpoint 来恢复失败的作业。 Review comment: same as above This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r300209240 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。 Review comment: maybe the first "checkpoint" should be "Checkpoint"? and other places This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese
wuchong commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese URL: https://github.com/apache/flink/pull/8300#discussion_r300211203 ## File path: docs/ops/state/savepoints.zh.md ## @@ -78,160 +68,162 @@ source-id | State of StatefulSource mapper-id | State of StatefulMapper {% endhighlight %} -In the above example, the print sink is stateless and hence not part of the savepoint state. By default, we try to map each entry of the savepoint back to the new program. +在上面的示例中,print sink 是无状态的,因此不是 Savepoint 状态的一部分。默认情况下,我们尝试将 Savepoint 的每个条目映射回新程序。 -## Operations +## 算子 -You can use the [command line client]({{ site.baseurl }}/ops/cli.html#savepoints) to *trigger savepoints*, *cancel a job with a savepoint*, *resume from savepoints*, and *dispose savepoints*. +你可以使用[命令行客户端]({{site.baseurl}}/zh/ops/cli.html#Savepoint)来*触发 Savepoint*,*触发 Savepoint 并取消作业*,*从 Savepoint* 恢复,以及*删除 Savepoint*。 -With Flink >= 1.2.0 it is also possible to *resume from savepoints* using the webui. +从 Flink 1.2.0 开始,还可以使用 webui *从 Savepoint 恢复*。 -### Triggering Savepoints +### 触发 Savepoint -When triggering a savepoint, a new savepoint directory is created where the data as well as the meta data will be stored. The location of this directory can be controlled by [configuring a default target directory](#configuration) or by specifying a custom target directory with the trigger commands (see the [`:targetDirectory` argument](#trigger-a-savepoint)). +当触发 Savepoint 时,将创建一个新的 Savepoint 目录,其中存储数据和元数据。可以通过[配置默认目标目录](#configuration)或使用触发器命令指定自定义目标目录(参见[`:targetDirectory `参数](#trigger-a-savepoint)来控制该目录的位置。 -Attention: The target directory has to be a location accessible by both the JobManager(s) and TaskManager(s) e.g. a location on a distributed file-system. +注意:目标目录必须是 JobManager(s) 和 TaskManager(s) 都可以访问的位置,例如分布式文件系统上的位置。 -For example with a `FsStateBackend` or `RocksDBStateBackend`: +以 `FsStateBackend` 或 `RocksDBStateBackend` 为例: {% highlight shell %} -# Savepoint target directory -/savepoints/ +# Savepoint 目标目录 +/Savepoint/ -# Savepoint directory -/savepoints/savepoint-:shortjobid-:savepointid/ +# Savepoint 目录 Review comment: 目录应该保持原文小写? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese
wuchong commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese URL: https://github.com/apache/flink/pull/8300#discussion_r300211060 ## File path: docs/ops/state/savepoints.zh.md ## @@ -78,160 +68,162 @@ source-id | State of StatefulSource mapper-id | State of StatefulMapper {% endhighlight %} -In the above example, the print sink is stateless and hence not part of the savepoint state. By default, we try to map each entry of the savepoint back to the new program. +在上面的示例中,print sink 是无状态的,因此不是 Savepoint 状态的一部分。默认情况下,我们尝试将 Savepoint 的每个条目映射回新程序。 -## Operations +## 算子 -You can use the [command line client]({{ site.baseurl }}/ops/cli.html#savepoints) to *trigger savepoints*, *cancel a job with a savepoint*, *resume from savepoints*, and *dispose savepoints*. +你可以使用[命令行客户端]({{site.baseurl}}/zh/ops/cli.html#Savepoint)来*触发 Savepoint*,*触发 Savepoint 并取消作业*,*从 Savepoint* 恢复,以及*删除 Savepoint*。 -With Flink >= 1.2.0 it is also possible to *resume from savepoints* using the webui. +从 Flink 1.2.0 开始,还可以使用 webui *从 Savepoint 恢复*。 -### Triggering Savepoints +### 触发 Savepoint -When triggering a savepoint, a new savepoint directory is created where the data as well as the meta data will be stored. The location of this directory can be controlled by [configuring a default target directory](#configuration) or by specifying a custom target directory with the trigger commands (see the [`:targetDirectory` argument](#trigger-a-savepoint)). +当触发 Savepoint 时,将创建一个新的 Savepoint 目录,其中存储数据和元数据。可以通过[配置默认目标目录](#configuration)或使用触发器命令指定自定义目标目录(参见[`:targetDirectory `参数](#trigger-a-savepoint)来控制该目录的位置。 Review comment: ```suggestion 当触发 Savepoint 时,将创建一个新的 Savepoint 目录,其中存储数据和元数据。可以通过[配置默认目标目录](#configuration)或使用触发器命令指定自定义目标目录(参见[`:targetDirectory`参数](#trigger-a-savepoint)来控制该目录的位置。 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese
wuchong commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese URL: https://github.com/apache/flink/pull/8300#discussion_r300211127 ## File path: docs/ops/state/savepoints.zh.md ## @@ -78,160 +68,162 @@ source-id | State of StatefulSource mapper-id | State of StatefulMapper {% endhighlight %} -In the above example, the print sink is stateless and hence not part of the savepoint state. By default, we try to map each entry of the savepoint back to the new program. +在上面的示例中,print sink 是无状态的,因此不是 Savepoint 状态的一部分。默认情况下,我们尝试将 Savepoint 的每个条目映射回新程序。 -## Operations +## 算子 -You can use the [command line client]({{ site.baseurl }}/ops/cli.html#savepoints) to *trigger savepoints*, *cancel a job with a savepoint*, *resume from savepoints*, and *dispose savepoints*. +你可以使用[命令行客户端]({{site.baseurl}}/zh/ops/cli.html#Savepoint)来*触发 Savepoint*,*触发 Savepoint 并取消作业*,*从 Savepoint* 恢复,以及*删除 Savepoint*。 -With Flink >= 1.2.0 it is also possible to *resume from savepoints* using the webui. +从 Flink 1.2.0 开始,还可以使用 webui *从 Savepoint 恢复*。 -### Triggering Savepoints +### 触发 Savepoint -When triggering a savepoint, a new savepoint directory is created where the data as well as the meta data will be stored. The location of this directory can be controlled by [configuring a default target directory](#configuration) or by specifying a custom target directory with the trigger commands (see the [`:targetDirectory` argument](#trigger-a-savepoint)). +当触发 Savepoint 时,将创建一个新的 Savepoint 目录,其中存储数据和元数据。可以通过[配置默认目标目录](#configuration)或使用触发器命令指定自定义目标目录(参见[`:targetDirectory `参数](#trigger-a-savepoint)来控制该目录的位置。 -Attention: The target directory has to be a location accessible by both the JobManager(s) and TaskManager(s) e.g. a location on a distributed file-system. +注意:目标目录必须是 JobManager(s) 和 TaskManager(s) 都可以访问的位置,例如分布式文件系统上的位置。 -For example with a `FsStateBackend` or `RocksDBStateBackend`: +以 `FsStateBackend` 或 `RocksDBStateBackend` 为例: {% highlight shell %} -# Savepoint target directory -/savepoints/ +# Savepoint 目标目录 +/Savepoint/ -# Savepoint directory -/savepoints/savepoint-:shortjobid-:savepointid/ +# Savepoint 目录 +/Savepoint/savepoint-:shortjobid-:savepointid/ -# Savepoint file contains the checkpoint meta data -/savepoints/savepoint-:shortjobid-:savepointid/_metadata +# Savepoint 文件包含 Checkpoint元数据 +/Savepoint/savepoint-:shortjobid-:savepointid/_metadata Review comment: ```suggestion /savepoint/savepoint-:shortjobid-:savepointid/_metadata ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services